package _ss_com.streamsets.datacollector.execution.runner.cluster;

import _ss_com.com.google.common.annotations.VisibleForTesting;
import _ss_com.com.google.common.base.Optional;
import _ss_com.com.google.common.collect.ImmutableMap;
import _ss_com.com.google.common.collect.ImmutableSet;
import _ss_com.com.google.common.io.Files;
import _ss_com.fasterxml.jackson.core.JsonProcessingException;
import _ss_com.fasterxml.jackson.databind.ObjectMapper;
import _ss_com.streamsets.datacollector.callback.CallbackInfo;
import _ss_com.streamsets.datacollector.cluster.ApplicationState;
import _ss_com.streamsets.datacollector.cluster.ClusterModeConstants;
import _ss_com.streamsets.datacollector.cluster.ClusterPipelineStatus;
import _ss_com.streamsets.datacollector.config.PipelineConfiguration;
import _ss_com.streamsets.datacollector.config.RuleDefinitions;
import _ss_com.streamsets.datacollector.config.StageConfiguration;
import _ss_com.streamsets.datacollector.creation.PipelineBeanCreator;
import _ss_com.streamsets.datacollector.creation.PipelineConfigBean;
import _ss_com.streamsets.datacollector.execution.AbstractRunner;
import _ss_com.streamsets.datacollector.execution.EventListenerManager;
import _ss_com.streamsets.datacollector.execution.PipelineState;
import _ss_com.streamsets.datacollector.execution.PipelineStateStore;
import _ss_com.streamsets.datacollector.execution.PipelineStatus;
import _ss_com.streamsets.datacollector.execution.Snapshot;
import _ss_com.streamsets.datacollector.execution.SnapshotInfo;
import _ss_com.streamsets.datacollector.execution.alerts.AlertInfo;
import _ss_com.streamsets.datacollector.execution.cluster.ClusterHelper;
import _ss_com.streamsets.datacollector.execution.metrics.MetricsEventRunnable;
import _ss_com.streamsets.datacollector.execution.runner.RetryUtils;
import _ss_com.streamsets.datacollector.execution.runner.common.PipelineRunnerException;
import _ss_com.streamsets.datacollector.execution.runner.common.ProductionPipeline;
import _ss_com.streamsets.datacollector.execution.runner.common.ProductionPipelineBuilder;
import _ss_com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunner;
import _ss_com.streamsets.datacollector.execution.runner.common.SampledRecord;
import _ss_com.streamsets.datacollector.json.ObjectMapperFactory;
import _ss_com.streamsets.datacollector.main.RuntimeInfo;
import _ss_com.streamsets.datacollector.restapi.bean.IssuesJson;
import _ss_com.streamsets.datacollector.runner.Pipeline;
import _ss_com.streamsets.datacollector.runner.PipelineRuntimeException;
import _ss_com.streamsets.datacollector.security.SecurityConfiguration;
import _ss_com.streamsets.datacollector.stagelibrary.StageLibraryTask;
import _ss_com.streamsets.datacollector.store.PipelineStoreException;
import _ss_com.streamsets.datacollector.store.PipelineStoreTask;
import _ss_com.streamsets.datacollector.updatechecker.UpdateChecker;
import _ss_com.streamsets.datacollector.util.Configuration;
import _ss_com.streamsets.datacollector.util.ContainerError;
import _ss_com.streamsets.datacollector.util.PipelineException;
import _ss_com.streamsets.datacollector.validation.Issue;
import _ss_com.streamsets.datacollector.validation.Issues;
import _ss_com.streamsets.datacollector.validation.ValidationError;
import _ss_com.streamsets.dc.execution.manager.standalone.ResourceManager;
import _ss_com.streamsets.dc.execution.manager.standalone.ThreadUsage;
import _ss_com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService;
import _ss_org.apache.commons.io.FileUtils;
import com.codahale.metrics.MetricRegistry;
import com.streamsets.pipeline.api.ExecutionMode;
import com.streamsets.pipeline.api.Record;
import com.streamsets.pipeline.api.StageException;
import com.streamsets.pipeline.api.impl.ClusterSource;
import com.streamsets.pipeline.api.impl.ErrorMessage;
import com.streamsets.pipeline.api.impl.PipelineUtils;
import com.streamsets.pipeline.api.impl.Utils;
import dagger.ObjectGraph;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.inject.Inject;
import javax.inject.Named;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:_ss_com/streamsets/datacollector/execution/runner/cluster/ClusterRunner.class */
public class ClusterRunner extends AbstractRunner {
    static final String APPLICATION_STATE = "cluster.application.state";
    private static final String APPLICATION_STATE_START_TIME = "cluster.application.startTime";

    @Inject
    PipelineStateStore pipelineStateStore;

    @Inject
    @Named("runnerExecutor")
    SafeScheduledExecutorService runnerExecutor;

    @Inject
    ResourceManager resourceManager;

    @Inject
    SlaveCallbackManager slaveCallbackManager;
    private final String name;
    private final String rev;
    private final String user;
    private ObjectGraph objectGraph;
    private ClusterHelper clusterHelper;
    private final File tempDir;
    private static final long SUBMIT_TIMEOUT_SECS = 120;
    private ScheduledFuture<?> managerRunnableFuture;
    private ScheduledFuture<?> metricRunnableFuture;
    private volatile boolean isClosed;
    private ScheduledFuture<?> updateCheckerFuture;
    private UpdateChecker updateChecker;
    private MetricsEventRunnable metricsEventRunnable;
    private PipelineConfiguration pipelineConf;
    private int maxRetries;
    private boolean shouldRetry;
    private ScheduledFuture<Void> retryFuture;
    private long rateLimit = -1;
    private static final Logger LOG = LoggerFactory.getLogger(ClusterRunner.class);
    private static final Map<PipelineStatus, Set<PipelineStatus>> VALID_TRANSITIONS = new ImmutableMap.Builder().put(PipelineStatus.EDITED, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.STARTING, ImmutableSet.of(PipelineStatus.START_ERROR, PipelineStatus.RUNNING, PipelineStatus.STOPPING, PipelineStatus.DISCONNECTED)).put(PipelineStatus.START_ERROR, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.RUNNING, ImmutableSet.of(PipelineStatus.CONNECT_ERROR, PipelineStatus.STOPPING, PipelineStatus.DISCONNECTED, PipelineStatus.FINISHED, PipelineStatus.KILLED, PipelineStatus.RUN_ERROR, PipelineStatus.RETRY)).put(PipelineStatus.RUN_ERROR, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.RETRY, ImmutableSet.of(PipelineStatus.STARTING, PipelineStatus.STOPPING, PipelineStatus.DISCONNECTED, PipelineStatus.RUN_ERROR)).put(PipelineStatus.STOPPING, ImmutableSet.of(PipelineStatus.STOPPED, PipelineStatus.CONNECT_ERROR, PipelineStatus.DISCONNECTED)).put(PipelineStatus.FINISHED, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.STOPPED, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.KILLED, ImmutableSet.of(PipelineStatus.STARTING)).put(PipelineStatus.CONNECT_ERROR, ImmutableSet.of(PipelineStatus.RUNNING, PipelineStatus.STOPPING, PipelineStatus.DISCONNECTED, PipelineStatus.KILLED, PipelineStatus.FINISHED, PipelineStatus.RUN_ERROR, PipelineStatus.RETRY)).put(PipelineStatus.DISCONNECTED, ImmutableSet.of(PipelineStatus.CONNECTING)).put(PipelineStatus.CONNECTING, ImmutableSet.of(PipelineStatus.STARTING, PipelineStatus.RUNNING, PipelineStatus.CONNECT_ERROR, PipelineStatus.RETRY, PipelineStatus.FINISHED, PipelineStatus.KILLED, PipelineStatus.RUN_ERROR, PipelineStatus.DISCONNECTED)).build();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:_ss_com/streamsets/datacollector/execution/runner/cluster/ClusterRunner$ClusterSourceInfo.class */
    public static class ClusterSourceInfo {
        private final int parallelism;
        private final Map<String, String> configsToShip;

        ClusterSourceInfo(int i, Map<String, String> map) {
            this.parallelism = i;
            this.configsToShip = map;
        }

        int getParallelism() {
            return this.parallelism;
        }

        Map<String, String> getConfigsToShip() {
            return this.configsToShip;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:_ss_com/streamsets/datacollector/execution/runner/cluster/ClusterRunner$ManagerRunnable.class */
    public static class ManagerRunnable implements Runnable {
        private final ClusterRunner clusterRunner;
        private final PipelineConfiguration pipelineConf;

        public ManagerRunnable(ClusterRunner clusterRunner, PipelineConfiguration pipelineConfiguration) {
            this.clusterRunner = clusterRunner;
            this.pipelineConf = pipelineConfiguration;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                checkStatus();
            } catch (Throwable th) {
                ClusterRunner.LOG.error("Unexpected error: " + th, th);
            }
        }

        private void checkStatus() throws PipelineStoreException, PipelineRunnerException {
            if (this.clusterRunner.getState().getStatus().isActive()) {
                this.clusterRunner.connect(new ApplicationState((Map) this.clusterRunner.getState().getAttributes().get(ClusterRunner.APPLICATION_STATE)), this.pipelineConf);
            }
            if (!this.clusterRunner.getState().getStatus().isActive() || this.clusterRunner.getState().getStatus() == PipelineStatus.RETRY) {
                ClusterRunner.LOG.debug(Utils.format("Cancelling the task as the runner is in a non-active state '{}'", new Object[]{this.clusterRunner.getState()}));
                this.clusterRunner.cancelRunnable();
            }
        }
    }

    @VisibleForTesting
    ClusterRunner(String str, String str2, String str3, RuntimeInfo runtimeInfo, Configuration configuration, PipelineStoreTask pipelineStoreTask, PipelineStateStore pipelineStateStore, StageLibraryTask stageLibraryTask, SafeScheduledExecutorService safeScheduledExecutorService, ClusterHelper clusterHelper, ResourceManager resourceManager, EventListenerManager eventListenerManager, String str4) {
        this.runtimeInfo = runtimeInfo;
        this.configuration = configuration;
        this.pipelineStateStore = pipelineStateStore;
        this.pipelineStore = pipelineStoreTask;
        this.stageLibrary = stageLibraryTask;
        this.runnerExecutor = safeScheduledExecutorService;
        this.name = str;
        this.rev = str2;
        this.user = str3;
        this.tempDir = Files.createTempDir();
        if (clusterHelper == null) {
            this.clusterHelper = new ClusterHelper(runtimeInfo, null, this.tempDir);
        } else {
            this.clusterHelper = clusterHelper;
        }
        this.resourceManager = resourceManager;
        this.eventListenerManager = eventListenerManager;
        this.slaveCallbackManager = new SlaveCallbackManager();
        this.slaveCallbackManager.setClusterToken(str4);
    }

    public ClusterRunner(String str, String str2, String str3, ObjectGraph objectGraph) {
        String str4;
        ExecutionMode executionMode;
        this.name = str2;
        this.rev = str3;
        this.user = str;
        this.objectGraph = objectGraph;
        this.objectGraph.inject(this);
        this.tempDir = new File(new File(this.runtimeInfo.getDataDir(), "temp"), PipelineUtils.escapedPipelineName(Utils.format("pipeline-{}-{}-{}", new Object[]{str, str2, str3})));
        if (!this.tempDir.mkdirs() && !this.tempDir.isDirectory()) {
            throw new IllegalStateException(Utils.format("Could not create temp directory: {}", new Object[]{this.tempDir}));
        }
        this.clusterHelper = new ClusterHelper(this.runtimeInfo, new SecurityConfiguration(this.runtimeInfo, this.configuration), this.tempDir);
        if (this.configuration.get("ui.refresh.interval.ms", 2000) > 0) {
            this.metricsEventRunnable = (MetricsEventRunnable) this.objectGraph.get(MetricsEventRunnable.class);
        }
        try {
            if (getState().getExecutionMode() == ExecutionMode.CLUSTER) {
                String str5 = null;
                Iterator<StageConfiguration> it = getPipelineConf(str2, str3).getStages().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    StageConfiguration next = it.next();
                    if (next.getInputLanes().isEmpty()) {
                        str5 = next.getStageName();
                        break;
                    }
                }
                Utils.checkNotNull(str5, "Source name should not be null");
                if (str5.contains("ClusterHdfsDSource")) {
                    str4 = "Upgrading execution mode to " + ExecutionMode.CLUSTER_BATCH + " from " + ExecutionMode.CLUSTER;
                    executionMode = ExecutionMode.CLUSTER_BATCH;
                } else {
                    str4 = "Upgrading execution mode to " + ExecutionMode.CLUSTER_YARN_STREAMING + " from " + ExecutionMode.CLUSTER;
                    executionMode = ExecutionMode.CLUSTER_YARN_STREAMING;
                }
                PipelineState state = getState();
                this.pipelineStateStore.saveState(str, str2, str3, state.getStatus(), str4, state.getAttributes(), executionMode, state.getMetrics(), state.getRetryAttempt(), state.getNextRetryTimeStamp());
            }
        } catch (PipelineException e) {
            throw new RuntimeException("Error while accessing Pipeline State: " + e, e);
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:2:0x0035. Please report as an issue. */
    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void prepareForDataCollectorStart() throws PipelineStoreException, PipelineRunnerException {
        PipelineStatus status = getState().getStatus();
        LOG.info("Pipeline '{}::{}' has status: '{}'", new Object[]{this.name, this.rev, status});
        String str = null;
        switch (status) {
            case STARTING:
                str = "Pipeline was in STARTING state, forcing it to DISCONNECTED";
            case RETRY:
                str = str == null ? "Pipeline was in RETRY state, forcing it to DISCONNECTING" : str;
            case CONNECTING:
                str = str == null ? "Pipeline was in CONNECTING state, forcing it to DISCONNECTED" : str;
            case RUNNING:
                str = str == null ? "Pipeline was in RUNNING state, forcing it to DISCONNECTED" : str;
            case CONNECT_ERROR:
                str = str == null ? "Pipeline was in CONNECT_ERROR state, forcing it to DISCONNECTED" : str;
            case STOPPING:
                String str2 = str == null ? "Pipeline was in STOPPING state, forcing it to DISCONNECTED" : str;
                LOG.debug(str2);
                validateAndSetStateTransition(PipelineStatus.DISCONNECTED, str2);
                return;
            case DISCONNECTED:
            case RUN_ERROR:
            case EDITED:
            case FINISHED:
            case KILLED:
            case START_ERROR:
            case STOPPED:
                return;
            default:
                throw new IllegalStateException(Utils.format("Pipeline in undefined state: '{}'", new Object[]{status}));
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void onDataCollectorStart() throws PipelineStoreException, PipelineRunnerException, PipelineRuntimeException, StageException {
        PipelineStatus status = getState().getStatus();
        LOG.info("Pipeline '{}::{}' has status: '{}'", new Object[]{this.name, this.rev, status});
        switch (status) {
            case DISCONNECTED:
                LOG.debug("Pipeline was in DISCONNECTED state, changing it to CONNECTING");
                validateAndSetStateTransition(PipelineStatus.CONNECTING, "Pipeline was in DISCONNECTED state, changing it to CONNECTING");
                connectOrStart();
                return;
            default:
                LOG.error(Utils.format("Pipeline has unexpected status: '{}' on data collector start", new Object[]{status}));
                return;
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String getName() {
        return this.name;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String getRev() {
        return this.rev;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String getUser() {
        return this.user;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void resetOffset() {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void onDataCollectorStop() throws PipelineStoreException, PipelineRunnerException, PipelineRuntimeException {
        stopPipeline(true);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public synchronized void stop() throws PipelineStoreException, PipelineRunnerException, PipelineRuntimeException {
        stopPipeline(false);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public synchronized void forceQuit() throws PipelineStoreException, PipelineRunnerException, PipelineRuntimeException {
        throw new UnsupportedOperationException("ForceQuit is not supported in Cluster mode");
    }

    private synchronized void stopPipeline(boolean z) throws PipelineStoreException, PipelineRunnerException, PipelineRuntimeException {
        try {
            if (z) {
                if (getState().getStatus() == PipelineStatus.RETRY) {
                    this.retryFuture.cancel(true);
                }
                validateAndSetStateTransition(PipelineStatus.DISCONNECTED, "Node is shutting down, disconnecting from the pipeline in " + getState().getExecutionMode() + " mode");
            } else {
                ApplicationState applicationState = new ApplicationState((Map) getState().getAttributes().get(APPLICATION_STATE));
                if (applicationState.getId() == null && getState().getStatus() != PipelineStatus.STOPPED) {
                    throw new PipelineRunnerException(ContainerError.CONTAINER_0101, "for cluster application");
                }
                stop(applicationState, this.pipelineConf);
            }
        } finally {
            cancelRunnable();
        }
    }

    private Map<String, Object> getAttributes() throws PipelineStoreException {
        return this.pipelineStateStore.getState(this.name, this.rev).getAttributes();
    }

    private void connectOrStart() throws PipelineStoreException, PipelineRunnerException, PipelineRuntimeException, StageException {
        HashMap hashMap = new HashMap();
        hashMap.putAll(getAttributes());
        ApplicationState applicationState = new ApplicationState((Map) hashMap.get(APPLICATION_STATE));
        if (applicationState.getId() == null) {
            retryOrStart();
            return;
        }
        try {
            this.slaveCallbackManager.setClusterToken(applicationState.getSdcToken());
            this.pipelineConf = getPipelineConf(this.name, this.rev);
            connect(applicationState, this.pipelineConf);
            if (getState().getStatus().isActive()) {
                scheduleRunnable(this.pipelineConf);
            }
        } catch (PipelineRunnerException | PipelineStoreException e) {
            validateAndSetStateTransition(PipelineStatus.CONNECT_ERROR, e.toString(), hashMap);
            throw e;
        }
    }

    private void retryOrStart() throws PipelineStoreException, PipelineRunnerException, PipelineRuntimeException, StageException {
        PipelineState state = getState();
        if (state.getRetryAttempt() == 0) {
            prepareForStart();
            start();
            return;
        }
        validateAndSetStateTransition(PipelineStatus.RETRY, "Changing the state to RETRY on startup");
        long nextRetryTimeStamp = state.getNextRetryTimeStamp();
        long j = 0;
        long currentTimeMillis = System.currentTimeMillis();
        if (nextRetryTimeStamp > currentTimeMillis) {
            j = nextRetryTimeStamp - currentTimeMillis;
        }
        this.retryFuture = scheduleForRetries(this.runnerExecutor, j);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void prepareForStart() throws PipelineStoreException, PipelineRunnerException {
        PipelineState state = getState();
        checkState(VALID_TRANSITIONS.get(state.getStatus()).contains(PipelineStatus.STARTING), ContainerError.CONTAINER_0102, state.getStatus(), PipelineStatus.STARTING);
        if (!this.resourceManager.requestRunnerResources(ThreadUsage.CLUSTER)) {
            throw new PipelineRunnerException(ContainerError.CONTAINER_0166, this.name);
        }
        LOG.info("Preparing to start pipeline '{}::{}'", this.name, this.rev);
        validateAndSetStateTransition(PipelineStatus.STARTING, "Starting pipeline in " + getState().getExecutionMode() + " mode");
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void prepareForStop() throws PipelineStoreException, PipelineRunnerException {
        LOG.info("Preparing to stop pipeline '{}::{}'", this.name, this.rev);
        if (getState().getStatus() != PipelineStatus.RETRY) {
            validateAndSetStateTransition(PipelineStatus.STOPPING, "Stopping pipeline in " + getState().getExecutionMode() + " mode");
            return;
        }
        this.retryFuture.cancel(true);
        validateAndSetStateTransition(PipelineStatus.STOPPING, null);
        validateAndSetStateTransition(PipelineStatus.STOPPED, "Stopped while the pipeline was in RETRY state");
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public synchronized void start() throws PipelineStoreException, PipelineRunnerException, PipelineRuntimeException, StageException {
        try {
            Utils.checkState(!this.isClosed, Utils.formatL("Cannot start the pipeline '{}::{}' as the runner is already closed", new Object[]{this.name, this.rev}));
            ExecutionMode executionMode = this.pipelineStateStore.getState(this.name, this.rev).getExecutionMode();
            if (executionMode != ExecutionMode.CLUSTER_BATCH && executionMode != ExecutionMode.CLUSTER_YARN_STREAMING && executionMode != ExecutionMode.CLUSTER_MESOS_STREAMING) {
                throw new PipelineRunnerException(ValidationError.VALIDATION_0073, new Object[0]);
            }
            LOG.debug("State of pipeline for '{}::{}' is '{}' ", new Object[]{this.name, this.rev, getState()});
            this.pipelineConf = getPipelineConf(this.name, this.rev);
            doStart(this.pipelineConf, getClusterSourceInfo(this.name, this.rev, this.pipelineConf));
        } catch (Exception e) {
            validateAndSetStateTransition(PipelineStatus.START_ERROR, e.toString(), getAttributes());
            throw e;
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public PipelineState getState() throws PipelineStoreException {
        return this.pipelineStateStore.getState(this.name, this.rev);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String captureSnapshot(String str, String str2, int i, int i2) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String updateSnapshotLabel(String str, String str2) throws PipelineException {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Snapshot getSnapshot(String str) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<SnapshotInfo> getSnapshotsInfo() {
        return Collections.EMPTY_LIST;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void deleteSnapshot(String str) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<PipelineState> getHistory() throws PipelineStoreException {
        return this.pipelineStateStore.getHistory(this.name, this.rev, false);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void deleteHistory() {
        this.pipelineStateStore.deleteHistory(this.name, this.rev);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Object getMetrics() {
        if (this.metricsEventRunnable != null) {
            return this.metricsEventRunnable.getAggregatedMetrics();
        }
        return null;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<Record> getErrorRecords(String str, int i) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<ErrorMessage> getErrorMessages(String str, int i) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<SampledRecord> getSampledRecords(String str, int i) {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Collection<CallbackInfo> getSlaveCallbackList() {
        return this.slaveCallbackManager.getSlaveCallbackList();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public boolean deleteAlert(String str) throws PipelineRunnerException, PipelineStoreException {
        throw new UnsupportedOperationException();
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public List<AlertInfo> getAlerts() throws PipelineStoreException {
        return Collections.EMPTY_LIST;
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void close() {
        this.isClosed = true;
    }

    private void validateAndSetStateTransition(PipelineStatus pipelineStatus, String str) throws PipelineStoreException, PipelineRunnerException {
        HashMap hashMap = new HashMap();
        hashMap.putAll(getAttributes());
        validateAndSetStateTransition(pipelineStatus, str, hashMap);
    }

    @VisibleForTesting
    void validateAndSetStateTransition(PipelineStatus pipelineStatus, String str, Map<String, Object> map) throws PipelineStoreException, PipelineRunnerException {
        PipelineState state;
        PipelineState saveState;
        Utils.checkState(map != null, "Attributes cannot be set to null");
        PipelineState state2 = getState();
        if (state2.getStatus() == pipelineStatus && pipelineStatus != PipelineStatus.STARTING) {
            LOG.debug(Utils.format("Ignoring status '{}' as this is same as current status", new Object[]{state2.getStatus()}));
            return;
        }
        synchronized (this) {
            state = getState();
            checkState(VALID_TRANSITIONS.get(state.getStatus()).contains(pipelineStatus), ContainerError.CONTAINER_0102, state.getStatus(), pipelineStatus);
            long nextRetryTimeStamp = state.getNextRetryTimeStamp();
            int retryAttempt = state.getRetryAttempt();
            if (pipelineStatus == PipelineStatus.RUN_ERROR && this.shouldRetry) {
                pipelineStatus = PipelineStatus.RETRY;
                checkState(VALID_TRANSITIONS.get(state.getStatus()).contains(pipelineStatus), ContainerError.CONTAINER_0102, state.getStatus(), pipelineStatus);
            }
            if (pipelineStatus == PipelineStatus.RETRY && state.getStatus() != PipelineStatus.CONNECTING) {
                retryAttempt = state.getRetryAttempt() + 1;
                if (retryAttempt <= this.maxRetries || this.maxRetries == -1) {
                    nextRetryTimeStamp = RetryUtils.getNextRetryTimeStamp(retryAttempt, getState().getTimeStamp());
                    long j = 0;
                    long currentTimeMillis = System.currentTimeMillis();
                    if (nextRetryTimeStamp > currentTimeMillis) {
                        j = nextRetryTimeStamp - currentTimeMillis;
                    }
                    this.retryFuture = scheduleForRetries(this.runnerExecutor, j);
                } else {
                    LOG.info("Retry attempt '{}' is greater than max no of retries '{}'", Integer.valueOf(retryAttempt), Integer.valueOf(this.maxRetries));
                    pipelineStatus = PipelineStatus.RUN_ERROR;
                    retryAttempt = 0;
                    nextRetryTimeStamp = 0;
                }
            } else if (!pipelineStatus.isActive()) {
                retryAttempt = 0;
                nextRetryTimeStamp = 0;
            }
            ObjectMapper objectMapper = ObjectMapperFactory.get();
            String str2 = null;
            if (!pipelineStatus.isActive() || pipelineStatus == PipelineStatus.DISCONNECTED) {
                Object metrics = getMetrics();
                if (metrics != null) {
                    try {
                        str2 = objectMapper.writer().writeValueAsString(metrics);
                    } catch (JsonProcessingException e) {
                        throw new PipelineStoreException(ContainerError.CONTAINER_0210, e.toString(), e);
                    }
                }
                if (str2 == null) {
                    str2 = getState().getMetrics();
                }
            }
            saveState = this.pipelineStateStore.saveState(this.user, this.name, this.rev, pipelineStatus, str, map, getState().getExecutionMode(), str2, retryAttempt, nextRetryTimeStamp);
        }
        if (this.eventListenerManager != null) {
            this.eventListenerManager.broadcastStateChange(state, saveState, ThreadUsage.CLUSTER);
        }
    }

    private void checkState(boolean z, ContainerError containerError, Object... objArr) throws PipelineRunnerException {
        if (!z) {
            throw new PipelineRunnerException(containerError, objArr);
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public void updateSlaveCallbackInfo(CallbackInfo callbackInfo) {
        this.slaveCallbackManager.updateSlaveCallbackInfo(callbackInfo);
    }

    @VisibleForTesting
    ClusterSourceInfo getClusterSourceInfo(String str, String str2, PipelineConfiguration pipelineConfiguration) throws PipelineRuntimeException, StageException, PipelineStoreException, PipelineRunnerException {
        ProductionPipeline createProductionPipeline = createProductionPipeline(str, str2, this.configuration, pipelineConfiguration);
        Pipeline pipeline = createProductionPipeline.getPipeline();
        try {
            List<Issue> init = pipeline.init();
            if (!init.isEmpty()) {
                PipelineRuntimeException pipelineRuntimeException = new PipelineRuntimeException(ContainerError.CONTAINER_0800, str, init.get(0).getMessage());
                HashMap hashMap = new HashMap();
                hashMap.putAll(getAttributes());
                hashMap.put("issues", new IssuesJson(new Issues(init)));
                validateAndSetStateTransition(PipelineStatus.START_ERROR, init.get(0).getMessage(), hashMap);
                throw pipelineRuntimeException;
            }
            ClusterSource source = createProductionPipeline.getPipeline().getSource();
            if (!(source instanceof ClusterSource)) {
                throw new RuntimeException(Utils.format("Stage '{}' does not implement '{}'", new Object[]{source.getClass().getName(), ClusterSource.class.getName()}));
            }
            ClusterSource clusterSource = source;
            try {
                int parallelism = clusterSource.getParallelism();
                if (parallelism < 1) {
                    throw new PipelineRuntimeException(ContainerError.CONTAINER_0112, new Object[0]);
                }
                return new ClusterSourceInfo(parallelism, clusterSource.getConfigsToShip());
            } catch (IOException | StageException e) {
                throw new PipelineRuntimeException(ContainerError.CONTAINER_0117, e.toString(), e);
            }
        } finally {
            pipeline.destroy();
        }
    }

    private ProductionPipeline createProductionPipeline(String str, String str2, Configuration configuration, PipelineConfiguration pipelineConfiguration) throws PipelineStoreException, PipelineRuntimeException, StageException {
        ProductionPipelineRunner productionPipelineRunner = new ProductionPipelineRunner(str, str2, configuration, this.runtimeInfo, new MetricRegistry(), null, null);
        if (this.rateLimit > 0) {
            productionPipelineRunner.setRateLimit(Long.valueOf(this.rateLimit));
        }
        return new ProductionPipelineBuilder(str, str2, configuration, this.runtimeInfo, this.stageLibrary, productionPipelineRunner, null).build(pipelineConfiguration);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connect(ApplicationState applicationState, PipelineConfiguration pipelineConfiguration) throws PipelineStoreException, PipelineRunnerException {
        ClusterPipelineStatus clusterPipelineStatus = null;
        boolean z = false;
        try {
            clusterPipelineStatus = this.clusterHelper.getStatus(applicationState, pipelineConfiguration);
            z = true;
        } catch (IOException e) {
            String str = "IO Error while trying to check the status of pipeline: " + e;
            LOG.error(str, e);
            validateAndSetStateTransition(PipelineStatus.CONNECT_ERROR, str);
        } catch (TimeoutException e2) {
            String str2 = "Timedout while trying to check the status of pipeline: " + e2;
            LOG.error(str2, e2);
            validateAndSetStateTransition(PipelineStatus.CONNECT_ERROR, str2);
        } catch (Exception e3) {
            String str3 = "Error getting status of pipeline: " + e3;
            LOG.error(str3, e3);
            validateAndSetStateTransition(PipelineStatus.CONNECT_ERROR, str3);
        }
        if (z) {
            if (clusterPipelineStatus == ClusterPipelineStatus.RUNNING) {
                validateAndSetStateTransition(PipelineStatus.RUNNING, "Connected to pipeline in cluster mode");
                return;
            }
            if (clusterPipelineStatus == ClusterPipelineStatus.FAILED) {
                LOG.debug("Pipeline failed in cluster");
                postTerminate(applicationState, PipelineStatus.RUN_ERROR, "Pipeline failed in cluster");
            } else if (clusterPipelineStatus == ClusterPipelineStatus.KILLED) {
                LOG.debug("Pipeline killed in cluster");
                postTerminate(applicationState, PipelineStatus.KILLED, "Pipeline killed in cluster");
            } else if (clusterPipelineStatus == ClusterPipelineStatus.SUCCEEDED) {
                LOG.debug("Pipeline succeeded in cluster");
                postTerminate(applicationState, PipelineStatus.FINISHED, "Pipeline succeeded in cluster");
            }
        }
    }

    private void postTerminate(ApplicationState applicationState, PipelineStatus pipelineStatus, String str) throws PipelineStoreException, PipelineRunnerException {
        Optional<String> dirId = applicationState.getDirId();
        if (dirId.isPresent()) {
            deleteDir(dirId.get());
        }
        HashMap hashMap = new HashMap();
        hashMap.putAll(getAttributes());
        hashMap.remove(APPLICATION_STATE);
        hashMap.remove(APPLICATION_STATE_START_TIME);
        validateAndSetStateTransition(pipelineStatus, str, hashMap);
    }

    private void deleteDir(String str) {
        FileUtils.deleteQuietly(new File(this.runtimeInfo.getDataDir(), str));
    }

    private synchronized void doStart(PipelineConfiguration pipelineConfiguration, ClusterSourceInfo clusterSourceInfo) throws PipelineStoreException, PipelineRunnerException {
        try {
            Utils.checkNotNull(pipelineConfiguration, "PipelineConfiguration cannot be null");
            Utils.checkState(clusterSourceInfo.getParallelism() != 0, "Parallelism cannot be zero");
            if (this.metricsEventRunnable != null) {
                this.metricsEventRunnable.clearSlaveMetrics();
            }
            ArrayList arrayList = new ArrayList();
            PipelineConfigBean create = PipelineBeanCreator.get().create(pipelineConfiguration, arrayList);
            if (create == null) {
                throw new PipelineRunnerException(ContainerError.CONTAINER_0116, arrayList);
            }
            this.maxRetries = create.retryAttempts;
            this.shouldRetry = create.shouldRetry;
            this.rateLimit = create.rateLimit;
            registerEmailNotifierIfRequired(create, this.name, this.rev);
            HashMap hashMap = new HashMap(create.clusterLauncherEnv);
            HashMap hashMap2 = new HashMap();
            File file = new File(this.runtimeInfo.getLibexecDir(), "bootstrap-libs");
            hashMap2.put(ClusterModeConstants.NUM_EXECUTORS_KEY, String.valueOf(clusterSourceInfo.getParallelism()));
            hashMap2.put(ClusterModeConstants.CLUSTER_PIPELINE_NAME, this.name);
            hashMap2.put(ClusterModeConstants.CLUSTER_PIPELINE_REV, this.rev);
            hashMap2.put(ClusterModeConstants.CLUSTER_PIPELINE_USER, this.user);
            hashMap2.put(ClusterModeConstants.CLUSTER_PIPELINE_REMOTE, String.valueOf(isRemotePipeline()));
            for (Map.Entry<String, String> entry : clusterSourceInfo.getConfigsToShip().entrySet()) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Config to ship " + entry.getKey() + " = " + entry.getValue());
                }
                hashMap2.put(entry.getKey(), entry.getValue());
            }
            this.runtimeInfo.setAttribute(ClusterModeConstants.NUM_EXECUTORS_KEY, Integer.valueOf(clusterSourceInfo.getParallelism()));
            this.slaveCallbackManager.clearSlaveList();
            ApplicationState submit = this.clusterHelper.submit(pipelineConfiguration, this.stageLibrary, new File(this.runtimeInfo.getConfigDir()), new File(this.runtimeInfo.getResourcesDir()), new File(this.runtimeInfo.getStaticWebDir()), file, hashMap, hashMap2, SUBMIT_TIMEOUT_SECS, getRules());
            HashMap hashMap3 = new HashMap();
            hashMap3.putAll(getAttributes());
            hashMap3.put(APPLICATION_STATE, submit.getMap());
            hashMap3.put(APPLICATION_STATE_START_TIME, Long.valueOf(System.currentTimeMillis()));
            this.slaveCallbackManager.setClusterToken(submit.getSdcToken());
            validateAndSetStateTransition(PipelineStatus.RUNNING, "Pipeline in cluster is running", hashMap3);
            scheduleRunnable(pipelineConfiguration);
        } catch (IOException e) {
            String str = "IO Error while trying to start the pipeline: " + e;
            LOG.error(str, e);
            validateAndSetStateTransition(PipelineStatus.START_ERROR, str);
        } catch (TimeoutException e2) {
            String str2 = "Timedout while trying to start the pipeline: " + e2;
            LOG.error(str2, e2);
            validateAndSetStateTransition(PipelineStatus.START_ERROR, str2);
        } catch (Exception e3) {
            String str3 = "Unexpected error starting pipeline: " + e3;
            LOG.error(str3, e3);
            validateAndSetStateTransition(PipelineStatus.START_ERROR, str3);
        }
    }

    private void scheduleRunnable(PipelineConfiguration pipelineConfiguration) {
        this.updateChecker = new UpdateChecker(this.runtimeInfo, this.configuration, pipelineConfiguration, this);
        this.updateCheckerFuture = this.runnerExecutor.scheduleAtFixedRate(this.updateChecker, 1L, 1440L, TimeUnit.MINUTES);
        if (this.metricsEventRunnable != null) {
            this.metricRunnableFuture = this.runnerExecutor.scheduleAtFixedRate(this.metricsEventRunnable, 0L, this.metricsEventRunnable.getScheduledDelay(), TimeUnit.MILLISECONDS);
        }
        this.managerRunnableFuture = this.runnerExecutor.scheduleAtFixedRate(new ManagerRunnable(this, pipelineConfiguration), 0L, 30L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cancelRunnable() {
        if (this.metricRunnableFuture != null) {
            this.metricRunnableFuture.cancel(true);
            this.metricsEventRunnable.clearSlaveMetrics();
        }
        if (this.managerRunnableFuture != null) {
            this.managerRunnableFuture.cancel(false);
        }
        if (this.updateCheckerFuture != null) {
            this.updateCheckerFuture.cancel(true);
        }
    }

    private synchronized void stop(ApplicationState applicationState, PipelineConfiguration pipelineConfiguration) throws PipelineStoreException, PipelineRunnerException {
        Utils.checkState(applicationState != null, "Application state cannot be null");
        boolean z = false;
        try {
            this.clusterHelper.kill(applicationState, pipelineConfiguration);
            z = true;
        } catch (IOException e) {
            String str = "IO Error while trying to stop the pipeline: " + e;
            LOG.error(str, e);
            validateAndSetStateTransition(PipelineStatus.CONNECT_ERROR, str);
        } catch (TimeoutException e2) {
            String str2 = "Timedout while trying to stop the pipeline: " + e2;
            LOG.error(str2, e2);
            validateAndSetStateTransition(PipelineStatus.CONNECT_ERROR, str2);
        } catch (Exception e3) {
            String str3 = "Unexpected error stopping pipeline: " + e3;
            LOG.error(str3, e3);
            validateAndSetStateTransition(PipelineStatus.CONNECT_ERROR, str3);
        }
        HashMap hashMap = new HashMap();
        if (z) {
            Optional<String> dirId = applicationState.getDirId();
            if (dirId.isPresent()) {
                deleteDir(dirId.get());
            }
            hashMap.putAll(getAttributes());
            hashMap.remove(APPLICATION_STATE);
            hashMap.remove(APPLICATION_STATE_START_TIME);
            validateAndSetStateTransition(PipelineStatus.STOPPED, "Stopped cluster pipeline", hashMap);
        }
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public Map getUpdateInfo() {
        return this.updateChecker.getUpdateInfo();
    }

    RuleDefinitions getRules() throws PipelineStoreException {
        return this.pipelineStore.retrieveRules(this.name, this.rev);
    }

    @Override // _ss_com.streamsets.datacollector.execution.Runner
    public String getToken() {
        return this.slaveCallbackManager.getClusterToken();
    }
}
