package org.apache.flink.streaming.runtime.tasks;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
import org.apache.flink.runtime.state.AsynchronousStateHandle;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask.class */
public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>> extends AbstractInvokable implements StatefulTask<StreamTaskStateList> {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    protected Operator headOperator;
    private OperatorChain<OUT> operatorChain;
    private StreamConfig configuration;
    private ClassLoader userClassLoader;
    private ScheduledExecutorService timerService;
    private Map<String, Accumulator<?, ?>> accumulatorMap;
    private StreamTaskStateList lazyRestoreState;
    private volatile AsynchronousException asyncException;
    private volatile boolean isRunning;
    private volatile boolean canceled;
    private long recoveryTimestamp;
    private final Object lock = new Object();
    private final Set<Thread> asyncCheckpointThreads = Collections.synchronizedSet(new HashSet());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$TriggerTask.class */
    public static final class TriggerTask implements Runnable {
        private final Object lock;
        private final Triggerable target;
        private final long timestamp;
        private final StreamTask<?, ?> task;

        TriggerTask(StreamTask<?, ?> streamTask, Object obj, Triggerable triggerable, long j) {
            this.task = streamTask;
            this.lock = obj;
            this.target = triggerable;
            this.timestamp = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (this.lock) {
                try {
                    this.target.trigger(this.timestamp);
                } catch (Throwable th) {
                    if (((StreamTask) this.task).isRunning) {
                        StreamTask.LOG.error("Caught exception while processing timer.", th);
                    }
                    if (((StreamTask) this.task).asyncException == null) {
                        ((StreamTask) this.task).asyncException = new TimerException(th);
                    }
                }
            }
        }
    }

    protected abstract void init() throws Exception;

    protected abstract void run() throws Exception;

    protected abstract void cleanup() throws Exception;

    protected abstract void cancelTask() throws Exception;

    public final void invoke() throws Exception {
        try {
            LOG.debug("Initializing {}", getName());
            this.userClassLoader = getUserCodeClassLoader();
            this.configuration = new StreamConfig(getTaskConfiguration());
            this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
            this.headOperator = (Operator) this.configuration.getStreamOperator(this.userClassLoader);
            this.operatorChain = new OperatorChain<>(this, this.headOperator, getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
            if (this.headOperator != null) {
                this.headOperator.setup(this, this.configuration, this.operatorChain.getChainEntryPoint());
            }
            this.timerService = Executors.newSingleThreadScheduledExecutor(new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
            init();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            LOG.debug("Invoking {}", getName());
            restoreState();
            this.lazyRestoreState = null;
            synchronized (this.lock) {
                openAllOperators();
            }
            if (this.canceled) {
                throw new CancelTaskException();
            }
            this.isRunning = true;
            run();
            LOG.debug("Finished task {}", getName());
            synchronized (this.lock) {
                this.isRunning = false;
                closeAllOperators();
            }
            LOG.debug("Closed operators for task {}", getName());
            this.operatorChain.flushOutputs();
            tryDisposeAllOperators();
            this.isRunning = false;
            if (this.timerService != null) {
                try {
                    this.timerService.shutdownNow();
                } catch (Throwable th) {
                    LOG.error("Could not shut down timer service", th);
                }
            }
            try {
                Iterator<Thread> it = this.asyncCheckpointThreads.iterator();
                while (it.hasNext()) {
                    it.next().interrupt();
                }
                this.asyncCheckpointThreads.clear();
            } catch (Throwable th2) {
                LOG.error("Could not shut down async checkpoint threads", th2);
            }
            if (this.operatorChain != null) {
                this.operatorChain.releaseOutputs();
            }
            try {
                cleanup();
            } catch (Throwable th3) {
                LOG.error("Error during cleanup of stream task", th3);
            }
            if (1 == 0) {
                disposeAllOperators();
            }
        } catch (Throwable th4) {
            this.isRunning = false;
            if (this.timerService != null) {
                try {
                    this.timerService.shutdownNow();
                } catch (Throwable th5) {
                    LOG.error("Could not shut down timer service", th5);
                }
            }
            try {
                Iterator<Thread> it2 = this.asyncCheckpointThreads.iterator();
                while (it2.hasNext()) {
                    it2.next().interrupt();
                }
                this.asyncCheckpointThreads.clear();
            } catch (Throwable th6) {
                LOG.error("Could not shut down async checkpoint threads", th6);
            }
            if (this.operatorChain != null) {
                this.operatorChain.releaseOutputs();
            }
            try {
                cleanup();
            } catch (Throwable th7) {
                LOG.error("Error during cleanup of stream task", th7);
            }
            if (0 == 0) {
                disposeAllOperators();
            }
            throw th4;
        }
    }

    public final void cancel() throws Exception {
        this.isRunning = false;
        this.canceled = true;
        cancelTask();
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    private void openAllOperators() throws Exception {
        for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
            if (streamOperator != null) {
                streamOperator.open();
            }
        }
    }

    private void closeAllOperators() throws Exception {
        StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
        for (int length = allOperators.length - 1; length >= 0; length--) {
            StreamOperator<?> streamOperator = allOperators[length];
            if (streamOperator != null) {
                streamOperator.close();
            }
        }
    }

    private void tryDisposeAllOperators() throws Exception {
        for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
            if (streamOperator != null) {
                streamOperator.dispose();
            }
        }
    }

    private void disposeAllOperators() {
        if (this.operatorChain != null) {
            for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
                if (streamOperator != null) {
                    try {
                        streamOperator.dispose();
                    } catch (Throwable th) {
                        LOG.error("Error during disposal of stream operator.", th);
                    }
                }
            }
        }
    }

    protected void finalize() throws Throwable {
        super/*java.lang.Object*/.finalize();
        if (this.timerService != null) {
            if (!this.timerService.isTerminated()) {
                LOG.warn("Timer service was not shut down. Shutting down in finalize().");
            }
            this.timerService.shutdownNow();
        }
        Iterator<Thread> it = this.asyncCheckpointThreads.iterator();
        while (it.hasNext()) {
            it.next().interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isSerializingTimestamps() {
        TimeCharacteristic timeCharacteristic = this.configuration.getTimeCharacteristic();
        return (timeCharacteristic == TimeCharacteristic.EventTime) | (timeCharacteristic == TimeCharacteristic.IngestionTime);
    }

    public String getName() {
        return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    public Object getCheckpointLock() {
        return this.lock;
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
        return this.accumulatorMap;
    }

    public Output<StreamRecord<OUT>> getHeadOutput() {
        return this.operatorChain.getChainEntryPoint();
    }

    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    public void setInitialState(StreamTaskStateList streamTaskStateList, long j) {
        this.lazyRestoreState = streamTaskStateList;
        this.recoveryTimestamp = j;
    }

    private void restoreState() throws Exception {
        if (this.lazyRestoreState != null) {
            LOG.info("Restoring checkpointed state to task {}", getName());
            try {
                StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
                StreamTaskState[] m262getState = this.lazyRestoreState.m262getState(this.userClassLoader);
                this.lazyRestoreState = null;
                for (int i = 0; i < m262getState.length; i++) {
                    StreamTaskState streamTaskState = m262getState[i];
                    StreamOperator<?> streamOperator = allOperators[i];
                    if (streamTaskState != null && streamOperator != null) {
                        LOG.debug("Task {} in chain ({}) has checkpointed state", Integer.valueOf(i), getName());
                        streamOperator.restoreState(streamTaskState, this.recoveryTimestamp);
                    } else if (streamOperator != null) {
                        LOG.debug("Task {} in chain ({}) does not have checkpointed state", Integer.valueOf(i), getName());
                    }
                }
            } catch (Exception e) {
                throw new Exception("Could not restore checkpointed state to operators and functions", e);
            }
        }
    }

    public boolean triggerCheckpoint(long j, long j2) throws Exception {
        try {
            return performCheckpoint(j, j2);
        } catch (Exception e) {
            if (this.isRunning) {
                throw e;
            }
            return false;
        }
    }

    protected boolean performCheckpoint(final long j, long j2) throws Exception {
        LOG.debug("Starting checkpoint {} on task {}", Long.valueOf(j), getName());
        synchronized (this.lock) {
            if (!this.isRunning) {
                return false;
            }
            this.operatorChain.broadcastCheckpointBarrier(j, j2);
            StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
            final StreamTaskState[] streamTaskStateArr = new StreamTaskState[allOperators.length];
            boolean z = false;
            for (int i = 0; i < streamTaskStateArr.length; i++) {
                StreamOperator<?> streamOperator = allOperators[i];
                if (streamOperator != null) {
                    StreamTaskState snapshotOperatorState = streamOperator.snapshotOperatorState(j, j2);
                    if (snapshotOperatorState.getOperatorState() instanceof AsynchronousStateHandle) {
                        z = true;
                    }
                    if (snapshotOperatorState.getFunctionState() instanceof AsynchronousStateHandle) {
                        z = true;
                    }
                    if (snapshotOperatorState.getKvStates() != null) {
                        Iterator<KvStateSnapshot<?, ?, ?, ?, ?>> it = snapshotOperatorState.getKvStates().values().iterator();
                        while (it.hasNext()) {
                            if (it.next() instanceof AsynchronousKvStateSnapshot) {
                                z = true;
                            }
                        }
                    }
                    streamTaskStateArr[i] = snapshotOperatorState.isEmpty() ? null : snapshotOperatorState;
                }
            }
            if (!this.isRunning) {
                throw new CancelTaskException();
            }
            StreamTaskStateList streamTaskStateList = new StreamTaskStateList(streamTaskStateArr);
            if (streamTaskStateList.isEmpty()) {
                getEnvironment().acknowledgeCheckpoint(j);
            } else if (z) {
                Thread thread = new Thread("Materialize checkpoint state " + j + " - " + getName()) { // from class: org.apache.flink.streaming.runtime.tasks.StreamTask.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            for (StreamTaskState streamTaskState : streamTaskStateArr) {
                                if (streamTaskState != null) {
                                    if (streamTaskState.getFunctionState() instanceof AsynchronousStateHandle) {
                                        streamTaskState.setFunctionState(streamTaskState.getFunctionState().materialize());
                                    }
                                    if (streamTaskState.getOperatorState() instanceof AsynchronousStateHandle) {
                                        streamTaskState.setOperatorState(streamTaskState.getOperatorState().materialize());
                                    }
                                    if (streamTaskState.getKvStates() != null) {
                                        Set<String> keySet = streamTaskState.getKvStates().keySet();
                                        HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = streamTaskState.getKvStates();
                                        for (String str : keySet) {
                                            if (kvStates.get(str) instanceof AsynchronousKvStateSnapshot) {
                                                kvStates.put(str, kvStates.get(str).materialize());
                                            }
                                        }
                                    }
                                }
                            }
                            StreamTask.this.getEnvironment().acknowledgeCheckpoint(j, new StreamTaskStateList(streamTaskStateArr));
                            StreamTask.LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", Long.valueOf(j), getName());
                        } catch (Exception e) {
                            if (StreamTask.this.isRunning()) {
                                StreamTask.LOG.error("Caught exception while materializing asynchronous checkpoints.", e);
                            }
                            if (StreamTask.this.asyncException == null) {
                                StreamTask.this.asyncException = new AsynchronousException(e);
                            }
                        }
                        StreamTask.this.asyncCheckpointThreads.remove(this);
                    }
                };
                this.asyncCheckpointThreads.add(thread);
                thread.setDaemon(true);
                thread.start();
            } else {
                getEnvironment().acknowledgeCheckpoint(j, streamTaskStateList);
            }
            return true;
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        synchronized (this.lock) {
            if (this.isRunning) {
                LOG.debug("Notification of complete checkpoint for task {}", getName());
                for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
                    if (streamOperator != null) {
                        streamOperator.notifyOfCompletedCheckpoint(j);
                    }
                }
            } else {
                LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
            }
        }
    }

    public AbstractStateBackend createStateBackend(String str, TypeSerializer<?> typeSerializer) throws Exception {
        AbstractStateBackend stateBackend = this.configuration.getStateBackend(this.userClassLoader);
        if (stateBackend != null) {
            LOG.info("Using user-defined state backend: " + stateBackend);
        } else {
            Configuration configuration = getEnvironment().getTaskManagerInfo().getConfiguration();
            String string = configuration.getString("state.backend", (String) null);
            if (string == null) {
                LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
                string = "jobmanager";
            }
            String lowerCase = string.toLowerCase();
            boolean z = -1;
            switch (lowerCase.hashCode()) {
                case -1572513109:
                    if (lowerCase.equals("filesystem")) {
                        z = true;
                        break;
                    }
                    break;
                case 1712403792:
                    if (lowerCase.equals("jobmanager")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
                    stateBackend = MemoryStateBackend.create();
                    break;
                case true:
                    AbstractStateBackend createFromConfig = new FsStateBackendFactory().createFromConfig(configuration);
                    LOG.info("State backend is set to heap memory (checkpoints to filesystem \"" + createFromConfig.getBasePath() + "\")");
                    stateBackend = createFromConfig;
                    break;
                default:
                    try {
                        stateBackend = ((StateBackendFactory) Class.forName(lowerCase, false, this.userClassLoader).asSubclass(StateBackendFactory.class).newInstance()).createFromConfig(configuration);
                        break;
                    } catch (ClassCastException e) {
                        throw new IllegalConfigurationException("The class configured under 'state.backend' is not a valid state backend factory (" + lowerCase + ')');
                    } catch (ClassNotFoundException e2) {
                        throw new IllegalConfigurationException("Cannot find configured state backend: " + lowerCase);
                    } catch (Throwable th) {
                        throw new IllegalConfigurationException("Cannot create configured state backend", th);
                    }
            }
        }
        stateBackend.initializeForJob(getEnvironment(), str, typeSerializer);
        return stateBackend;
    }

    public void registerTimer(long j, Triggerable triggerable) {
        this.timerService.schedule(new TriggerTask(this, this.lock, triggerable, j), Math.max(j - System.currentTimeMillis(), 0L), TimeUnit.MILLISECONDS);
    }

    public void checkTimerException() throws TimerException {
        if (this.asyncException != null) {
            throw this.asyncException;
        }
    }

    public String toString() {
        return getName();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
        return new EventListener<CheckpointBarrier>() { // from class: org.apache.flink.streaming.runtime.tasks.StreamTask.2
            public void onEvent(CheckpointBarrier checkpointBarrier) {
                try {
                    StreamTask.this.performCheckpoint(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
                } catch (Exception e) {
                    throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
                } catch (CancelTaskException e2) {
                    throw e2;
                }
            }
        };
    }
}
