/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.iterative.task;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.operators.util.JoinHashMap;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.InputViewIterator;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
import org.apache.flink.runtime.iterative.concurrent.Broker;
import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
import org.apache.flink.runtime.iterative.concurrent.SuperstepBarrier;
import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
import org.apache.flink.runtime.iterative.event.TerminationEvent;
import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
import org.apache.flink.runtime.iterative.task.AbstractIterativePactTask;
import org.apache.flink.runtime.iterative.task.RuntimeAggregatorRegistry;
import org.apache.flink.runtime.operators.RegularPactTask;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IterationHeadPactTask<X, Y, S extends Function, OT>
extends AbstractIterativePactTask<S, OT> {
    private static final Logger log = LoggerFactory.getLogger(IterationHeadPactTask.class);
    private Collector<X> finalOutputCollector;
    private TypeSerializerFactory<Y> feedbackTypeSerializer;
    private TypeSerializerFactory<X> solutionTypeSerializer;
    private ResultPartitionWriter toSync;
    private int feedbackDataInput;

    @Override
    protected int getNumTaskInputs() {
        boolean isWorkset = this.config.getIsWorksetIteration();
        return this.driver.getNumberOfInputs() + (isWorkset ? 1 : 0);
    }

    @Override
    protected void initOutputs() throws Exception {
        super.initOutputs();
        ArrayList finalOutputWriters = new ArrayList();
        TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
        ClassLoader userCodeClassLoader = this.getUserCodeClassLoader();
        this.finalOutputCollector = RegularPactTask.getOutputCollector(this, finalOutConfig, userCodeClassLoader, finalOutputWriters, this.config.getNumOutputs(), finalOutConfig.getNumOutputs());
        int writersIntoStepFunction = this.eventualOutputs.size();
        int writersIntoFinalResult = finalOutputWriters.size();
        int syncGateIndex = this.config.getIterationHeadIndexOfSyncOutput();
        if (writersIntoStepFunction + writersIntoFinalResult != syncGateIndex) {
            throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
        }
        this.toSync = this.getEnvironment().getWriter(syncGateIndex);
    }

    private BlockingBackChannel initBackChannel() throws Exception {
        int backChannelMemoryPages = this.getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory());
        ArrayList<MemorySegment> segments = new ArrayList<MemorySegment>();
        int segmentSize = this.getMemoryManager().getPageSize();
        this.getMemoryManager().allocatePages(this, segments, backChannelMemoryPages);
        BlockingBackChannel backChannel = new BlockingBackChannel(new SerializedUpdateBuffer(segments, segmentSize, this.getIOManager()));
        Broker<BlockingBackChannel> broker = BlockingBackChannelBroker.instance();
        broker.handIn(this.brokerKey(), backChannel);
        return backChannel;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <BT> CompactingHashTable<BT> initCompactingHashTable() throws Exception {
        double hashjoinMemorySize = this.config.getRelativeSolutionSetMemory();
        ClassLoader userCodeClassLoader = this.getUserCodeClassLoader();
        TypeSerializerFactory solutionTypeSerializerFactory = this.config.getSolutionSetSerializer(userCodeClassLoader);
        TypeComparatorFactory solutionTypeComparatorFactory = this.config.getSolutionSetComparator(userCodeClassLoader);
        TypeSerializer solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
        TypeComparator solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
        CompactingHashTable hashTable = null;
        List<MemorySegment> memSegments = null;
        boolean success = false;
        try {
            int numPages = this.getMemoryManager().computeNumberOfPages(hashjoinMemorySize);
            memSegments = this.getMemoryManager().allocatePages(this.getOwningNepheleTask(), numPages);
            hashTable = new CompactingHashTable(solutionTypeSerializer, solutionTypeComparator, memSegments);
            success = true;
            CompactingHashTable compactingHashTable = hashTable;
            return compactingHashTable;
        }
        finally {
            if (!success) {
                if (hashTable != null) {
                    try {
                        hashTable.close();
                    }
                    catch (Throwable t) {
                        log.error("Error closing the solution set hash table after unsuccessful creation.", t);
                    }
                }
                if (memSegments != null) {
                    try {
                        this.getMemoryManager().release(memSegments);
                    }
                    catch (Throwable t) {
                        log.error("Error freeing memory after error during solution set hash table creation.", t);
                    }
                }
            }
        }
    }

    private <BT> JoinHashMap<BT> initJoinHashMap() {
        TypeSerializerFactory solutionTypeSerializerFactory = this.config.getSolutionSetSerializer(this.getUserCodeClassLoader());
        TypeComparatorFactory solutionTypeComparatorFactory = this.config.getSolutionSetComparator(this.getUserCodeClassLoader());
        TypeSerializer solutionTypeSerializer = solutionTypeSerializerFactory.getSerializer();
        TypeComparator solutionTypeComparator = solutionTypeComparatorFactory.createComparator();
        return new JoinHashMap(solutionTypeSerializer, solutionTypeComparator);
    }

    private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
        solutionSet.open();
        solutionSet.buildTableWithUniqueKey(solutionSetInput);
    }

    private void readInitialSolutionSet(JoinHashMap<X> solutionSet, MutableObjectIterator<X> solutionSetInput) throws IOException {
        Object next;
        TypeSerializer serializer = this.solutionTypeSerializer.getSerializer();
        while ((next = solutionSetInput.next(serializer.createInstance())) != null) {
            solutionSet.insertOrReplace(next);
        }
    }

    private SuperstepBarrier initSuperstepBarrier() {
        SuperstepBarrier barrier = new SuperstepBarrier(this.getUserCodeClassLoader());
        this.toSync.subscribeToEvent(barrier, AllWorkersDoneEvent.class);
        this.toSync.subscribeToEvent(barrier, TerminationEvent.class);
        return barrier;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() throws Exception {
        String brokerKey = this.brokerKey();
        int workerIndex = this.getEnvironment().getIndexInSubtaskGroup();
        boolean objectSolutionSet = this.config.isSolutionSetUnmanaged();
        CompactingHashTable solutionSet = null;
        JoinHashMap solutionSetObjectMap = null;
        boolean waitForSolutionSetUpdate = this.config.getWaitForSolutionSetUpdate();
        boolean isWorksetIteration = this.config.getIsWorksetIteration();
        try {
            SuperstepKickoffLatch nextStepKickoff = new SuperstepKickoffLatch();
            SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
            BlockingBackChannel backChannel = this.initBackChannel();
            SuperstepBarrier barrier = this.initSuperstepBarrier();
            SolutionSetUpdateBarrier solutionSetUpdateBarrier = null;
            this.feedbackDataInput = this.config.getIterationHeadPartialSolutionOrWorksetInputIndex();
            this.feedbackTypeSerializer = this.getInputSerializer(this.feedbackDataInput);
            this.excludeFromReset(this.feedbackDataInput);
            if (isWorksetIteration) {
                int initialSolutionSetInput = this.config.getIterationHeadSolutionSetInputIndex();
                this.solutionTypeSerializer = this.config.getSolutionSetSerializer(this.getUserCodeClassLoader());
                MutableObjectIterator<X> solutionSetInput = this.createInputIterator(this.inputReaders[initialSolutionSetInput], this.solutionTypeSerializer);
                if (objectSolutionSet) {
                    solutionSetObjectMap = this.initJoinHashMap();
                    this.readInitialSolutionSet(solutionSetObjectMap, solutionSetInput);
                    SolutionSetBroker.instance().handIn(brokerKey, solutionSetObjectMap);
                } else {
                    solutionSet = this.initCompactingHashTable();
                    this.readInitialSolutionSet(solutionSet, solutionSetInput);
                    SolutionSetBroker.instance().handIn(brokerKey, solutionSet);
                }
                if (waitForSolutionSetUpdate) {
                    solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
                    SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
                }
            } else {
                TypeSerializerFactory<Y> solSer = this.feedbackTypeSerializer;
                this.solutionTypeSerializer = solSer;
                if (waitForSolutionSetUpdate) {
                    solutionSetUpdateBarrier = new SolutionSetUpdateBarrier();
                    SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, solutionSetUpdateBarrier);
                }
            }
            RuntimeAggregatorRegistry aggregatorRegistry = new RuntimeAggregatorRegistry(this.config.getIterationAggregators(this.getUserCodeClassLoader()));
            IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry);
            DataInputView superstepResult = null;
            while (this.running && !this.terminationRequested()) {
                if (log.isInfoEnabled()) {
                    log.info(this.formatLogString("starting iteration [" + this.currentIteration() + "]"));
                }
                barrier.setup();
                if (waitForSolutionSetUpdate) {
                    solutionSetUpdateBarrier.setup();
                }
                if (!this.inFirstIteration()) {
                    this.feedBackSuperstepResult(superstepResult);
                }
                super.run();
                this.sendEndOfSuperstepToAllIterationOutputs();
                if (waitForSolutionSetUpdate) {
                    solutionSetUpdateBarrier.waitForSolutionSetUpdate();
                }
                superstepResult = backChannel.getReadEndAfterSuperstepEnded();
                if (log.isInfoEnabled()) {
                    log.info(this.formatLogString("finishing iteration [" + this.currentIteration() + "]"));
                }
                this.sendEventToSync(new WorkerDoneEvent(workerIndex, aggregatorRegistry.getAllAggregators()));
                if (log.isInfoEnabled()) {
                    log.info(this.formatLogString("waiting for other workers in iteration [" + this.currentIteration() + "]"));
                }
                barrier.waitForOtherWorkers();
                if (barrier.terminationSignaled()) {
                    if (log.isInfoEnabled()) {
                        log.info(this.formatLogString("head received termination request in iteration [" + this.currentIteration() + "]"));
                    }
                    this.requestTermination();
                    nextStepKickoff.signalTermination();
                    continue;
                }
                this.incrementIterationCounter();
                String[] globalAggregateNames = barrier.getAggregatorNames();
                Value[] globalAggregates = barrier.getAggregates();
                aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, globalAggregates);
                nextStepKickoff.triggerNextSuperstep();
            }
            if (log.isInfoEnabled()) {
                log.info(this.formatLogString("streaming out final result after [" + this.currentIteration() + "] iterations"));
            }
            if (isWorksetIteration) {
                if (objectSolutionSet) {
                    this.streamSolutionSetToFinalOutput(solutionSetObjectMap);
                } else {
                    this.streamSolutionSetToFinalOutput(solutionSet);
                }
            } else {
                this.streamOutFinalOutputBulk(new InputViewIterator(superstepResult, this.solutionTypeSerializer.getSerializer()));
            }
            this.finalOutputCollector.close();
        }
        finally {
            IterationAggregatorBroker.instance().remove(brokerKey);
            BlockingBackChannelBroker.instance().remove(brokerKey);
            SuperstepKickoffLatchBroker.instance().remove(brokerKey);
            SolutionSetBroker.instance().remove(brokerKey);
            SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
            if (solutionSet != null) {
                solutionSet.close();
            }
        }
    }

    private void streamOutFinalOutputBulk(MutableObjectIterator<X> results) throws IOException {
        Collector<X> out = this.finalOutputCollector;
        Object record = this.solutionTypeSerializer.getSerializer().createInstance();
        while ((record = results.next(record)) != null) {
            out.collect(record);
        }
    }

    private void streamSolutionSetToFinalOutput(CompactingHashTable<X> hashTable) throws IOException {
        MutableObjectIterator<X> results = hashTable.getEntryIterator();
        Collector<X> output = this.finalOutputCollector;
        Object record = this.solutionTypeSerializer.getSerializer().createInstance();
        while ((record = results.next(record)) != null) {
            output.collect(record);
        }
    }

    private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) throws IOException {
        Collector<X> output = this.finalOutputCollector;
        for (Object e : soluionSet.values()) {
            output.collect(e);
        }
    }

    private void feedBackSuperstepResult(DataInputView superstepResult) {
        this.inputs[this.feedbackDataInput] = new InputViewIterator(superstepResult, this.feedbackTypeSerializer.getSerializer());
    }

    private void sendEndOfSuperstepToAllIterationOutputs() throws IOException, InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug(this.formatLogString("Sending end-of-superstep to all iteration outputs."));
        }
        for (RecordWriter eventualOutput : this.eventualOutputs) {
            eventualOutput.sendEndOfSuperstep();
        }
    }

    private void sendEventToSync(WorkerDoneEvent event) throws IOException, InterruptedException {
        if (log.isInfoEnabled()) {
            log.info(this.formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync"));
        }
        this.toSync.writeEventToAllChannels(event);
    }
}

