/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamRecordWriter;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class OperatorChain<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorChain.class);
    private final StreamOperator<?>[] allOperators;
    private final RecordWriterOutput<?>[] streamOutputs;
    private final Output<StreamRecord<OUT>> chainEntryPoint;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public OperatorChain(StreamTask<OUT, ?> containingTask, StreamOperator<OUT> headOperator, AccumulatorRegistry.Reporter reporter) {
        ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
        StreamConfig configuration = containingTask.getConfiguration();
        boolean enableTimestamps = containingTask.isSerializingTimestamps();
        Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
        chainedConfigs.put(configuration.getVertexID(), configuration);
        List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(userCodeClassloader);
        HashMap streamOutputMap = new HashMap(outEdgesInOrder.size());
        this.streamOutputs = new RecordWriterOutput[outEdgesInOrder.size()];
        boolean success = false;
        try {
            for (int i = 0; i < outEdgesInOrder.size(); ++i) {
                StreamEdge outEdge = outEdgesInOrder.get(i);
                RecordWriterOutput streamOutput = OperatorChain.createStreamOutput(outEdge, chainedConfigs.get(outEdge.getSourceId()), i, containingTask.getEnvironment(), enableTimestamps, reporter, containingTask.getName());
                this.streamOutputs[i] = streamOutput;
                streamOutputMap.put(outEdge, streamOutput);
            }
            ArrayList allOps = new ArrayList(chainedConfigs.size());
            this.chainEntryPoint = OperatorChain.createOutputCollector(containingTask, configuration, chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
            this.allOperators = allOps.toArray(new StreamOperator[allOps.size() + 1]);
            this.allOperators[this.allOperators.length - 1] = headOperator;
            success = true;
        }
        finally {
            if (!success) {
                for (RecordWriterOutput<?> output : this.streamOutputs) {
                    if (output == null) continue;
                    output.close();
                    output.clearBuffers();
                }
            }
        }
    }

    public void broadcastCheckpointBarrier(long id, long timestamp) throws IOException, InterruptedException {
        CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
        for (RecordWriterOutput<?> streamOutput : this.streamOutputs) {
            streamOutput.broadcastEvent((AbstractEvent)barrier);
        }
    }

    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.streamOutputs;
    }

    public StreamOperator<?>[] getAllOperators() {
        return this.allOperators;
    }

    public Output<StreamRecord<OUT>> getChainEntryPoint() {
        return this.chainEntryPoint;
    }

    public void flushOutputs() throws IOException {
        for (RecordWriterOutput<?> streamOutput : this.getStreamOutputs()) {
            streamOutput.flush();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseOutputs() {
        try {
            for (RecordWriterOutput<?> streamOutput : this.streamOutputs) {
                streamOutput.close();
            }
        }
        finally {
            for (RecordWriterOutput<?> output : this.streamOutputs) {
                output.clearBuffers();
            }
        }
    }

    private static <T> Output<StreamRecord<T>> createOutputCollector(StreamTask<?, ?> containingTask, StreamConfig operatorConfig, Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<StreamEdge, RecordWriterOutput<?>> streamOutputs, List<StreamOperator<?>> allOperators) {
        ArrayList allOutputs = new ArrayList(4);
        for (StreamEdge outputEdge : operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
            RecordWriterOutput<?> output = streamOutputs.get(outputEdge);
            allOutputs.add(new Tuple2(output, (Object)outputEdge));
        }
        for (StreamEdge outputEdge : operatorConfig.getChainedOutputs(userCodeClassloader)) {
            int outputId = outputEdge.getTargetId();
            StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
            Output output = OperatorChain.createChainedOperator(containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
            allOutputs.add(new Tuple2(output, (Object)outputEdge));
        }
        List selectors = operatorConfig.getOutputSelectors(userCodeClassloader);
        if (selectors == null || selectors.isEmpty()) {
            if (allOutputs.size() == 1) {
                return (Output)((Tuple2)allOutputs.get((int)0)).f0;
            }
            Output[] asArray = new Output[allOutputs.size()];
            for (int i = 0; i < allOutputs.size(); ++i) {
                asArray[i] = (Output)((Tuple2)allOutputs.get((int)i)).f0;
            }
            return new BroadcastingOutputCollector(asArray);
        }
        return new DirectedOutput(selectors, allOutputs);
    }

    private static <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(StreamTask<?, ?> containingTask, StreamConfig operatorConfig, Map<Integer, StreamConfig> chainedConfigs, ClassLoader userCodeClassloader, Map<StreamEdge, RecordWriterOutput<?>> streamOutputs, List<StreamOperator<?>> allOperators) {
        Output output = OperatorChain.createOutputCollector(containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
        OneInputStreamOperator chainedOperator = (OneInputStreamOperator)operatorConfig.getStreamOperator(userCodeClassloader);
        chainedOperator.setup(containingTask, operatorConfig, output);
        allOperators.add(chainedOperator);
        if (containingTask.getExecutionConfig().isObjectReuseEnabled() || chainedOperator.isInputCopyingDisabled()) {
            return new ChainingOutput(chainedOperator);
        }
        TypeSerializer inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
        return new CopyingChainingOutput(chainedOperator, inSerializer);
    }

    private static <T> RecordWriterOutput<T> createStreamOutput(StreamEdge edge, StreamConfig upStreamConfig, int outputIndex, Environment taskEnvironment, boolean withTimestamps, AccumulatorRegistry.Reporter reporter, String taskName) {
        TypeSerializer outSerializer = upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
        StreamPartitioner<?> outputPartitioner = edge.getPartitioner();
        LOG.debug("Using partitioner {} for output {} of task ", new Object[]{outputPartitioner, outputIndex, taskName});
        ResultPartitionWriter bufferWriter = taskEnvironment.getWriter(outputIndex);
        StreamRecordWriter output = new StreamRecordWriter(bufferWriter, outputPartitioner, upStreamConfig.getBufferTimeout());
        output.setReporter(reporter);
        return new RecordWriterOutput(output, outSerializer, withTimestamps);
    }

    private static final class BroadcastingOutputCollector<T>
    implements Output<StreamRecord<T>> {
        private final Output<StreamRecord<T>>[] outputs;

        public BroadcastingOutputCollector(Output<StreamRecord<T>>[] outputs) {
            this.outputs = outputs;
        }

        @Override
        public void emitWatermark(Watermark mark) {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.emitWatermark(mark);
            }
        }

        public void collect(StreamRecord<T> record) {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.collect(record);
            }
        }

        public void close() {
            for (Output<StreamRecord<T>> output : this.outputs) {
                output.close();
            }
        }
    }

    private static class CopyingChainingOutput<T>
    extends ChainingOutput<T> {
        private final TypeSerializer<T> serializer;

        public CopyingChainingOutput(OneInputStreamOperator<T, ?> operator, TypeSerializer<T> serializer) {
            super(operator);
            this.serializer = serializer;
        }

        @Override
        public void collect(StreamRecord<T> record) {
            try {
                StreamRecord<Object> copy = record.copy(this.serializer.copy(record.getValue()));
                this.operator.setKeyContextElement1(copy);
                this.operator.processElement(copy);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not forward element to next operator", e);
            }
        }
    }

    private static class ChainingOutput<T>
    implements Output<StreamRecord<T>> {
        protected final OneInputStreamOperator<T, ?> operator;

        public ChainingOutput(OneInputStreamOperator<T, ?> operator) {
            this.operator = operator;
        }

        public void collect(StreamRecord<T> record) {
            try {
                this.operator.setKeyContextElement1(record);
                this.operator.processElement(record);
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        @Override
        public void emitWatermark(Watermark mark) {
            try {
                this.operator.processWatermark(mark);
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }

        public void close() {
            try {
                this.operator.close();
            }
            catch (Exception e) {
                throw new ExceptionInChainedOperatorException(e);
            }
        }
    }
}

