package org.apache.flink.api.common.io;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.FutureCallback;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:org/apache/flink/api/common/io/OutputFormatBase.class */
public abstract class OutputFormatBase<OUT, V> extends RichOutputFormat<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(OutputFormatBase.class);
    private Semaphore semaphore;
    private Duration maxConcurrentRequestsTimeout;
    private int maxConcurrentRequests;
    private transient FutureCallback<V> callback;
    private AtomicReference<Throwable> throwable;

    protected OutputFormatBase(int i, Duration duration) {
        this.maxConcurrentRequestsTimeout = Duration.ofMillis(Long.MAX_VALUE);
        this.maxConcurrentRequests = ExecutionConfig.PARALLELISM_AUTO_MAX;
        Preconditions.checkArgument(i > 0, "Max concurrent requests is expected to be positive");
        this.maxConcurrentRequests = i;
        Preconditions.checkNotNull(duration, "Max concurrent requests timeout cannot be null");
        Preconditions.checkArgument(!duration.isNegative(), "Max concurrent requests timeout is expected to be positive");
        this.maxConcurrentRequestsTimeout = duration;
    }

    @Override // org.apache.flink.api.common.io.OutputFormat
    public final void open(int i, int i2) {
        this.throwable = new AtomicReference<>();
        this.semaphore = new Semaphore(this.maxConcurrentRequests);
        this.callback = new FutureCallback<V>() { // from class: org.apache.flink.api.common.io.OutputFormatBase.1
            public void onSuccess(V v) {
                OutputFormatBase.this.semaphore.release();
            }

            public void onFailure(Throwable th) {
                OutputFormatBase.this.throwable.compareAndSet(null, th);
                OutputFormatBase.LOG.error("Error while writing value.", th);
                OutputFormatBase.this.semaphore.release();
            }
        };
        postOpen();
    }

    protected void postOpen() {
    }

    private void flush() throws IOException {
        tryAcquire(this.maxConcurrentRequests);
        this.semaphore.release(this.maxConcurrentRequests);
    }

    private void tryAcquire(int i) throws IOException {
        try {
            SinkUtils.tryAcquire(i, this.maxConcurrentRequests, this.maxConcurrentRequestsTimeout, this.semaphore);
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    @Override // org.apache.flink.api.common.io.OutputFormat
    public final void writeRecord(OUT out) throws IOException {
        checkAsyncErrors();
        tryAcquire(1);
        try {
            send(out).whenComplete((obj, th) -> {
                if (th == null) {
                    this.callback.onSuccess(obj);
                } else {
                    this.callback.onFailure(th);
                }
            });
        } catch (Throwable th2) {
            this.semaphore.release();
            throw th2;
        }
    }

    protected abstract CompletionStage<V> send(OUT out);

    private void checkAsyncErrors() throws IOException {
        Throwable andSet = this.throwable.getAndSet(null);
        if (andSet != null) {
            throw new IOException("Write record failed", andSet);
        }
    }

    @Override // org.apache.flink.api.common.io.OutputFormat
    public final void close() throws IOException {
        checkAsyncErrors();
        flush();
        checkAsyncErrors();
        postClose();
    }

    protected void postClose() {
    }

    @VisibleForTesting
    int getAvailablePermits() {
        return this.semaphore.availablePermits();
    }

    @VisibleForTesting
    int getAcquiredPermits() {
        return this.maxConcurrentRequests - this.semaphore.availablePermits();
    }
}
