package org.apache.flink.runtime.source.coordinator;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/source/coordinator/SourceCoordinator.class */
public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements OperatorCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinator.class);
    private final String operatorName;
    private final ExecutorService coordinatorExecutor;
    private final Source<?, SplitT, EnumChkT> source;
    private final SimpleVersionedSerializer<EnumChkT> enumCheckpointSerializer;
    private final SimpleVersionedSerializer<SplitT> splitSerializer;
    private final SourceCoordinatorContext<SplitT> context;
    private SplitEnumerator<SplitT, EnumChkT> enumerator;
    private boolean started = false;

    public SourceCoordinator(String str, ExecutorService executorService, Source<?, SplitT, EnumChkT> source, SourceCoordinatorContext<SplitT> sourceCoordinatorContext) {
        this.operatorName = str;
        this.coordinatorExecutor = executorService;
        this.source = source;
        this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
        this.splitSerializer = source.getSplitSerializer();
        this.context = sourceCoordinatorContext;
        this.enumerator = source.createEnumerator(sourceCoordinatorContext);
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void start() throws Exception {
        LOG.info("Starting split enumerator for source {}.", this.operatorName);
        this.enumerator.start();
        this.started = true;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, java.lang.AutoCloseable
    public void close() throws Exception {
        ExecutorService executorService;
        long j;
        TimeUnit timeUnit;
        LOG.info("Closing SourceCoordinator for source {}.", this.operatorName);
        try {
            if (this.started) {
                this.context.close();
                this.enumerator.close();
            }
            if (!executorService.awaitTermination(j, timeUnit)) {
                throw new TimeoutException("The source coordinator failed to close before timeout.");
            }
            LOG.info("Source coordinator for source {} closed.", this.operatorName);
        } finally {
            this.coordinatorExecutor.shutdownNow();
            this.coordinatorExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        }
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void handleEventFromOperator(int i, OperatorEvent operatorEvent) throws Exception {
        ensureStarted();
        this.coordinatorExecutor.execute(() -> {
            try {
                LOG.debug("Handling event from subtask {} of source {}: {}", new Object[]{Integer.valueOf(i), this.operatorName, operatorEvent});
                if (operatorEvent instanceof SourceEventWrapper) {
                    this.enumerator.handleSourceEvent(i, ((SourceEventWrapper) operatorEvent).getSourceEvent());
                } else if (operatorEvent instanceof ReaderRegistrationEvent) {
                    handleReaderRegistrationEvent((ReaderRegistrationEvent) operatorEvent);
                }
            } catch (Exception e) {
                LOG.error("Failing the job due to exception when handling operator event {} from subtask {} of source {}.", new Object[]{operatorEvent, Integer.valueOf(i), this.operatorName, e});
                this.context.failJob(e);
            }
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void subtaskFailed(int i, @Nullable Throwable th) {
        ensureStarted();
        this.coordinatorExecutor.execute(() -> {
            try {
                LOG.info("Handling subtask {} failure of source {}.", Integer.valueOf(i), this.operatorName);
                List<SplitT> andRemoveUncheckpointedAssignment = this.context.getAndRemoveUncheckpointedAssignment(i);
                this.context.unregisterSourceReader(i);
                LOG.debug("Adding {} back to the split enumerator of source {}.", andRemoveUncheckpointedAssignment, this.operatorName);
                this.enumerator.addSplitsBack(andRemoveUncheckpointedAssignment, i);
            } catch (Exception e) {
                LOG.error("Failing the job due to exception when handling subtask {} failure in source {}.", new Object[]{Integer.valueOf(i), this.operatorName, e});
                this.context.failJob(e);
            }
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) throws Exception {
        ensureStarted();
        this.coordinatorExecutor.execute(() -> {
            try {
                LOG.debug("Taking a state snapshot on operator {} for checkpoint {}", this.operatorName, Long.valueOf(j));
                completableFuture.complete(toBytes(j));
            } catch (Exception e) {
                completableFuture.completeExceptionally(new CompletionException(String.format("Failed to checkpoint coordinator for source %s due to ", this.operatorName), e));
            }
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void checkpointComplete(long j) {
        ensureStarted();
        this.coordinatorExecutor.execute(() -> {
            try {
                LOG.info("Marking checkpoint {} as completed for source {}.", Long.valueOf(j), this.operatorName);
                this.context.onCheckpointComplete(j);
            } catch (Exception e) {
                LOG.error("Failing the job due to exception when completing the checkpoint {} for source {}.", new Object[]{Long.valueOf(j), this.operatorName, e});
                this.context.failJob(e);
            }
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void resetToCheckpoint(byte[] bArr) throws Exception {
        if (this.started) {
            throw new IllegalStateException(String.format("The coordinator for source %s has started. The source coordinator state can only be reset to a checkpoint before it starts.", this.operatorName));
        }
        LOG.info("Resetting coordinator of source {} from checkpoint.", this.operatorName);
        fromBytes(bArr);
    }

    @VisibleForTesting
    SplitEnumerator<SplitT, EnumChkT> getEnumerator() {
        return this.enumerator;
    }

    @VisibleForTesting
    SourceCoordinatorContext<SplitT> getContext() {
        return this.context;
    }

    private byte[] toBytes(long j) throws Exception {
        Object snapshotState = this.enumerator.snapshotState();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(byteArrayOutputStream);
            Throwable th2 = null;
            try {
                try {
                    SourceCoordinatorSerdeUtils.writeCoordinatorSerdeVersion(dataOutputViewStreamWrapper);
                    dataOutputViewStreamWrapper.writeInt(this.enumCheckpointSerializer.getVersion());
                    byte[] serialize = this.enumCheckpointSerializer.serialize(snapshotState);
                    dataOutputViewStreamWrapper.writeInt(serialize.length);
                    dataOutputViewStreamWrapper.write(serialize);
                    this.context.snapshotState(j, this.splitSerializer, dataOutputViewStreamWrapper);
                    dataOutputViewStreamWrapper.flush();
                    byte[] byteArray = byteArrayOutputStream.toByteArray();
                    if (dataOutputViewStreamWrapper != null) {
                        if (0 != 0) {
                            try {
                                dataOutputViewStreamWrapper.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            dataOutputViewStreamWrapper.close();
                        }
                    }
                    return byteArray;
                } finally {
                }
            } catch (Throwable th4) {
                if (dataOutputViewStreamWrapper != null) {
                    if (th2 != null) {
                        try {
                            dataOutputViewStreamWrapper.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        dataOutputViewStreamWrapper.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
        }
    }

    private void fromBytes(byte[] bArr) throws Exception {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        Throwable th = null;
        try {
            DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(byteArrayInputStream);
            Throwable th2 = null;
            try {
                SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion(dataInputViewStreamWrapper);
                Object deserialize = this.enumCheckpointSerializer.deserialize(dataInputViewStreamWrapper.readInt(), SourceCoordinatorSerdeUtils.readBytes(dataInputViewStreamWrapper, dataInputViewStreamWrapper.readInt()));
                this.context.restoreState(this.splitSerializer, dataInputViewStreamWrapper);
                this.enumerator = this.source.restoreEnumerator(this.context, deserialize);
                if (dataInputViewStreamWrapper != null) {
                    if (0 != 0) {
                        try {
                            dataInputViewStreamWrapper.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        dataInputViewStreamWrapper.close();
                    }
                }
                if (byteArrayInputStream != null) {
                    if (0 == 0) {
                        byteArrayInputStream.close();
                        return;
                    }
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (dataInputViewStreamWrapper != null) {
                    if (0 != 0) {
                        try {
                            dataInputViewStreamWrapper.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        dataInputViewStreamWrapper.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (byteArrayInputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayInputStream.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    byteArrayInputStream.close();
                }
            }
            throw th7;
        }
    }

    private void handleReaderRegistrationEvent(ReaderRegistrationEvent readerRegistrationEvent) {
        this.context.registerSourceReader(new ReaderInfo(readerRegistrationEvent.subtaskId(), readerRegistrationEvent.location()));
        this.enumerator.addReader(readerRegistrationEvent.subtaskId());
    }

    private void ensureStarted() {
        if (!this.started) {
            throw new IllegalStateException("The coordinator has not started yet.");
        }
    }
}
