package org.apache.flink.streaming.runtime.operators;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.class */
public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
    private static final long serialVersionUID = 1;
    protected static final Logger LOG = LoggerFactory.getLogger(GenericWriteAheadSink.class);
    private final CheckpointCommitter committer;
    private transient AbstractStateBackend.CheckpointStateOutputView out;
    protected final TypeSerializer<IN> serializer;
    private ExactlyOnceState state = new ExactlyOnceState();
    private final String id = UUID.randomUUID().toString();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink$ExactlyOnceState.class */
    public static class ExactlyOnceState implements StateHandle<Serializable> {
        private static final long serialVersionUID = -3571063495273460743L;
        protected TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> pendingHandles = new TreeMap<>();

        /* renamed from: getState, reason: merged with bridge method [inline-methods] */
        public TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> m257getState(ClassLoader classLoader) throws Exception {
            return this.pendingHandles;
        }

        public void discardState() throws Exception {
        }

        public long getStateSize() throws Exception {
            int i = 0;
            Iterator<Tuple2<Long, StateHandle<DataInputView>>> it = this.pendingHandles.values().iterator();
            while (it.hasNext()) {
                i = (int) (i + ((StateHandle) it.next().f1).getStateSize());
            }
            return i;
        }

        public void close() throws IOException {
            Throwable th = null;
            Iterator<Tuple2<Long, StateHandle<DataInputView>>> it = this.pendingHandles.values().iterator();
            while (it.hasNext()) {
                StateHandle stateHandle = (StateHandle) it.next().f1;
                if (stateHandle != null) {
                    try {
                        stateHandle.close();
                    } catch (Throwable th2) {
                        if (th != null) {
                            th = th2;
                        }
                    }
                }
            }
            if (th != null) {
                ExceptionUtils.rethrowIOException(th);
            }
        }

        public String toString() {
            return this.pendingHandles.toString();
        }
    }

    public GenericWriteAheadSink(CheckpointCommitter checkpointCommitter, TypeSerializer<IN> typeSerializer, String str) throws Exception {
        this.committer = checkpointCommitter;
        this.serializer = typeSerializer;
        this.committer.setJobId(str);
        this.committer.createResource();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        this.committer.setOperatorId(this.id);
        this.committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
        this.committer.open();
        cleanState();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        this.committer.close();
    }

    private void saveHandleInState(long j, long j2) throws Exception {
        if (this.out != null) {
            try {
                StateHandle closeAndGetHandle = this.out.closeAndGetHandle();
                if (this.state.pendingHandles.containsKey(Long.valueOf(j))) {
                    try {
                        closeAndGetHandle.discardState();
                    } catch (Exception e) {
                        LOG.warn("Could not discard state handle for checkpoint {} of {}, which already has been stored.", new Object[]{Long.valueOf(j), getOperatorName(), e});
                    }
                } else {
                    this.state.pendingHandles.put(Long.valueOf(j), new Tuple2<>(Long.valueOf(j2), closeAndGetHandle));
                }
                this.out = null;
            } catch (Exception e2) {
                throw new Exception("Could not close and get state handle from checkpoint state output view belonging to " + getOperatorName() + '.', e2);
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public StreamTaskState snapshotOperatorState(long j, long j2) throws Exception {
        StreamTaskState snapshotOperatorState = super.snapshotOperatorState(j, j2);
        try {
            saveHandleInState(j, j2);
            snapshotOperatorState.setFunctionState(this.state);
            return snapshotOperatorState;
        } catch (Exception e) {
            try {
                snapshotOperatorState.discardState();
            } catch (Exception e2) {
                LOG.warn("Could not discard stream task state of {}.", getOperatorName(), e2);
            }
            throw new Exception("Could not save handle in state of " + getOperatorName() + '.', e);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void restoreState(StreamTaskState streamTaskState) throws Exception {
        super.restoreState(streamTaskState);
        this.state = (ExactlyOnceState) streamTaskState.getFunctionState();
        this.out = null;
    }

    private void cleanState() throws Exception {
        synchronized (this.state.pendingHandles) {
            Set<Long> keySet = this.state.pendingHandles.keySet();
            HashSet hashSet = new HashSet();
            for (Long l : keySet) {
                if (this.committer.isCheckpointCommitted(l.longValue())) {
                    hashSet.add(l);
                }
            }
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                this.state.pendingHandles.remove((Long) it.next());
            }
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void notifyOfCompletedCheckpoint(long j) throws Exception {
        super.notifyOfCompletedCheckpoint(j);
        synchronized (this.state.pendingHandles) {
            Set<Long> keySet = this.state.pendingHandles.keySet();
            HashSet<Long> hashSet = new HashSet();
            for (Long l : keySet) {
                if (l.longValue() <= j) {
                    try {
                        if (this.committer.isCheckpointCommitted(l.longValue())) {
                            hashSet.add(l);
                        } else {
                            Tuple2<Long, StateHandle<DataInputView>> tuple2 = this.state.pendingHandles.get(l);
                            if (sendValues(new ReusingMutableToRegularIteratorWrapper(new InputViewIterator((DataInputView) ((StateHandle) tuple2.f1).getState(getUserCodeClassloader()), this.serializer), this.serializer), ((Long) tuple2.f0).longValue())) {
                                this.committer.commitCheckpoint(l.longValue());
                                hashSet.add(l);
                            }
                        }
                    } catch (Exception e) {
                        LOG.error("Could not commit checkpoint.", e);
                    }
                }
            }
            for (Long l2 : hashSet) {
                Tuple2<Long, StateHandle<DataInputView>> tuple22 = this.state.pendingHandles.get(l2);
                this.state.pendingHandles.remove(l2);
                ((StateHandle) tuple22.f1).discardState();
            }
        }
    }

    protected abstract boolean sendValues(Iterable<IN> iterable, long j) throws Exception;

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        IN value = streamRecord.getValue();
        if (this.out == null) {
            this.out = getStateBackend().createCheckpointStateOutputView(0L, 0L);
        }
        this.serializer.serialize(value, this.out);
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processWatermark(Watermark watermark) throws Exception {
    }
}
