package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CancellationException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImpl.class */
class ChannelStateWriteRequestExecutorImpl implements ChannelStateWriteRequestExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriteRequestExecutorImpl.class);
    private static final int DEFAULT_HANDOVER_CAPACITY = 10000;
    private final ChannelStateWriteRequestDispatcher dispatcher;
    private final BlockingDeque<ChannelStateWriteRequest> deque;
    private final Thread thread;
    private volatile Exception thrown;
    private volatile boolean wasClosed;
    private final String taskName;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelStateWriteRequestExecutorImpl(String str, ChannelStateWriteRequestDispatcher channelStateWriteRequestDispatcher) {
        this(str, channelStateWriteRequestDispatcher, new LinkedBlockingDeque(DEFAULT_HANDOVER_CAPACITY));
    }

    ChannelStateWriteRequestExecutorImpl(String str, ChannelStateWriteRequestDispatcher channelStateWriteRequestDispatcher, BlockingDeque<ChannelStateWriteRequest> blockingDeque) {
        this.thrown = null;
        this.wasClosed = false;
        this.taskName = str;
        this.dispatcher = channelStateWriteRequestDispatcher;
        this.deque = blockingDeque;
        this.thread = new Thread(this::run, "Channel state writer " + str);
        this.thread.setDaemon(true);
    }

    @VisibleForTesting
    void run() {
        try {
            try {
                loop();
                try {
                    IOUtils.closeAll(new AutoCloseable[]{this::cleanupRequests, () -> {
                        this.dispatcher.fail(this.thrown == null ? new CancellationException() : this.thrown);
                    }});
                } catch (Exception e) {
                    this.thrown = (Exception) ExceptionUtils.firstOrSuppressed(e, this.thrown);
                }
            } catch (Throwable th) {
                try {
                    IOUtils.closeAll(new AutoCloseable[]{this::cleanupRequests, () -> {
                        this.dispatcher.fail(this.thrown == null ? new CancellationException() : this.thrown);
                    }});
                } catch (Exception e2) {
                    this.thrown = (Exception) ExceptionUtils.firstOrSuppressed(e2, this.thrown);
                }
                throw th;
            }
        } catch (Exception e3) {
            this.thrown = e3;
            try {
                IOUtils.closeAll(new AutoCloseable[]{this::cleanupRequests, () -> {
                    this.dispatcher.fail(this.thrown == null ? new CancellationException() : this.thrown);
                }});
            } catch (Exception e4) {
                this.thrown = (Exception) ExceptionUtils.firstOrSuppressed(e4, this.thrown);
            }
        }
        LOG.debug("{} loop terminated", this.taskName);
    }

    private void loop() throws Exception {
        while (!this.wasClosed) {
            try {
                this.dispatcher.dispatch(this.deque.take());
            } catch (InterruptedException e) {
                if (this.wasClosed) {
                    Thread.currentThread().interrupt();
                } else {
                    LOG.debug(this.taskName + " interrupted while waiting for a request (continue waiting)", e);
                }
            }
        }
    }

    private void cleanupRequests() throws Exception {
        Throwable cancellationException = this.thrown == null ? new CancellationException() : this.thrown;
        ArrayList arrayList = new ArrayList();
        this.deque.drainTo(arrayList);
        LOG.info("{} discarding {} drained requests", this.taskName, Integer.valueOf(arrayList.size()));
        IOUtils.closeAll((Iterable) arrayList.stream().map(channelStateWriteRequest -> {
            return () -> {
                channelStateWriteRequest.cancel(cancellationException);
            };
        }).collect(Collectors.toList()));
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor
    public void start() throws IllegalStateException {
        this.thread.start();
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor
    public void submit(ChannelStateWriteRequest channelStateWriteRequest) throws Exception {
        submitInternal(channelStateWriteRequest, () -> {
            this.deque.add(channelStateWriteRequest);
        });
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor
    public void submitPriority(ChannelStateWriteRequest channelStateWriteRequest) throws Exception {
        submitInternal(channelStateWriteRequest, () -> {
            this.deque.addFirst(channelStateWriteRequest);
        });
    }

    private void submitInternal(ChannelStateWriteRequest channelStateWriteRequest, RunnableWithException runnableWithException) throws Exception {
        try {
            runnableWithException.run();
            ensureRunning();
        } catch (Exception e) {
            channelStateWriteRequest.cancel(e);
            throw e;
        }
    }

    private void ensureRunning() throws Exception {
        if (this.wasClosed || !this.thread.isAlive()) {
            cleanupRequests();
            throw ((Exception) ExceptionUtils.firstOrSuppressed(new IllegalStateException("not running"), this.thrown));
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.wasClosed = true;
        while (this.thread.isAlive()) {
            this.thread.interrupt();
            try {
                this.thread.join();
            } catch (InterruptedException e) {
                if (!this.thread.isAlive()) {
                    Thread.currentThread().interrupt();
                }
                LOG.debug(this.taskName + " interrupted while waiting for the writer thread to die", e);
            }
        }
        if (this.thrown != null) {
            throw new IOException(this.thrown);
        }
    }

    @VisibleForTesting
    Thread getThread() {
        return this.thread;
    }
}
