/*
 * Decompiled with CFR 0.152.
 */
package org.janusgraph.graphdb.olap.computer;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.GraphFilter;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
import org.janusgraph.core.JanusGraphComputer;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.core.JanusGraphTransaction;
import org.janusgraph.core.JanusGraphVertex;
import org.janusgraph.core.Transaction;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.diskstorage.configuration.Configuration;
import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanMetrics;
import org.janusgraph.diskstorage.keycolumnvalue.scan.StandardScanner;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.graphdb.database.StandardJanusGraph;
import org.janusgraph.graphdb.olap.computer.FulgoraMapEmitter;
import org.janusgraph.graphdb.olap.computer.FulgoraMemory;
import org.janusgraph.graphdb.olap.computer.FulgoraReduceEmitter;
import org.janusgraph.graphdb.olap.computer.FulgoraVertexMemory;
import org.janusgraph.graphdb.olap.computer.PartitionedVertexProgramExecutor;
import org.janusgraph.graphdb.olap.computer.VertexMapJob;
import org.janusgraph.graphdb.olap.computer.VertexProgramScanJob;
import org.janusgraph.graphdb.util.WorkerPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FulgoraGraphComputer
implements JanusGraphComputer {
    private static final Logger log = LoggerFactory.getLogger(FulgoraGraphComputer.class);
    private VertexProgram<?> vertexProgram;
    private final Set<MapReduce> mapReduces = new HashSet<MapReduce>();
    private final StandardJanusGraph graph;
    private final int expectedNumVertices = 10000;
    private FulgoraMemory memory;
    private FulgoraVertexMemory vertexMemory;
    private boolean executed = false;
    private int numThreads = 1;
    private final int readBatchSize;
    private final int writeBatchSize;
    private GraphComputer.ResultGraph resultGraphMode = null;
    private GraphComputer.Persist persistMode = null;
    private static final AtomicInteger computerCounter = new AtomicInteger(0);
    private final String name;
    private String jobId;
    private final GraphFilter graphFilter = new GraphFilter();

    public FulgoraGraphComputer(StandardJanusGraph graph, Configuration configuration) {
        this.graph = graph;
        this.writeBatchSize = configuration.get(GraphDatabaseConfiguration.BUFFER_SIZE, new String[0]);
        this.readBatchSize = this.writeBatchSize * 10;
        this.name = "compute" + computerCounter.incrementAndGet();
    }

    public GraphComputer vertices(Traversal<Vertex, Vertex> vertexFilter) {
        this.graphFilter.setVertexFilter(vertexFilter);
        return this;
    }

    public GraphComputer edges(Traversal<Vertex, Edge> edgeFilter) {
        this.graphFilter.setEdgeFilter(edgeFilter);
        return this;
    }

    public GraphComputer result(GraphComputer.ResultGraph resultGraph) {
        Preconditions.checkArgument((resultGraph != null ? 1 : 0) != 0, (Object)"Need to specify mode");
        this.resultGraphMode = resultGraph;
        return this;
    }

    public GraphComputer persist(GraphComputer.Persist persist) {
        Preconditions.checkArgument((persist != null ? 1 : 0) != 0, (Object)"Need to specify mode");
        this.persistMode = persist;
        return this;
    }

    @Override
    public JanusGraphComputer workers(int threads) {
        Preconditions.checkArgument((threads > 0 ? 1 : 0) != 0, (String)"Invalid number of threads: %s", (Object[])new Object[]{threads});
        this.numThreads = threads;
        return this;
    }

    public GraphComputer program(VertexProgram vertexProgram) {
        Preconditions.checkState((this.vertexProgram == null ? 1 : 0) != 0, (Object)"A vertex program has already been set");
        this.vertexProgram = vertexProgram;
        return this;
    }

    public GraphComputer mapReduce(MapReduce mapReduce) {
        this.mapReduces.add(mapReduce);
        return this;
    }

    public Future<ComputerResult> submit() {
        if (this.executed) {
            throw GraphComputer.Exceptions.computerHasAlreadyBeenSubmittedAVertexProgram();
        }
        this.executed = true;
        if (null == this.vertexProgram && this.mapReduces.isEmpty()) {
            throw GraphComputer.Exceptions.computerHasNoVertexProgramNorMapReducers();
        }
        if (null != this.vertexProgram) {
            GraphComputerHelper.validateProgramOnComputer((GraphComputer)this, this.vertexProgram);
            this.mapReduces.addAll(this.vertexProgram.getMapReducers());
        }
        this.persistMode = GraphComputerHelper.getPersistState(Optional.ofNullable(this.vertexProgram), Optional.ofNullable(this.persistMode));
        this.resultGraphMode = GraphComputerHelper.getResultGraphState(Optional.ofNullable(this.vertexProgram), Optional.ofNullable(this.resultGraphMode));
        if (!this.features().supportsResultGraphPersistCombination(this.resultGraphMode, this.persistMode)) {
            throw GraphComputer.Exceptions.resultGraphPersistCombinationNotSupported((GraphComputer.ResultGraph)this.resultGraphMode, (GraphComputer.Persist)this.persistMode);
        }
        if (this.numThreads > this.features().getMaxWorkers()) {
            throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported((int)this.numThreads, (int)this.features().getMaxWorkers());
        }
        this.memory = new FulgoraMemory(this.vertexProgram, this.mapReduces);
        return CompletableFuture.supplyAsync(() -> {
            Object scanBuilder;
            long time = System.currentTimeMillis();
            if (null != this.vertexProgram) {
                this.vertexMemory = new FulgoraVertexMemory(10000, this.graph.getIDManager(), this.vertexProgram);
                this.vertexProgram.setup((Memory)this.memory);
                VertexProgramScanJob.Executor job = VertexProgramScanJob.getVertexProgramScanJob(this.graph, this.memory, this.vertexMemory, this.vertexProgram);
                Object object = null;
                try {
                    int iteration = 1;
                    while (true) {
                        block87: {
                            this.memory.completeSubRound();
                            this.vertexMemory.nextIteration(this.vertexProgram.getMessageScopes((Memory)this.memory));
                            this.jobId = this.name + "#" + iteration;
                            scanBuilder = this.graph.getBackend().buildEdgeScanJob();
                            ((StandardScanner.Builder)scanBuilder).setJobId(this.jobId);
                            ((StandardScanner.Builder)scanBuilder).setNumProcessingThreads(this.numThreads);
                            ((StandardScanner.Builder)scanBuilder).setWorkBlockSize(this.readBatchSize);
                            ((StandardScanner.Builder)scanBuilder).setJob(job);
                            PartitionedVertexProgramExecutor programExecutor = new PartitionedVertexProgramExecutor(this.graph, this.memory, this.vertexMemory, this.vertexProgram);
                            try {
                                ScanMetrics jobResult = (ScanMetrics)((StandardScanner.Builder)scanBuilder).execute().get();
                                long failures = jobResult.get(ScanMetrics.Metric.FAILURE);
                                if (failures > 0L) {
                                    throw new JanusGraphException("Failed to process [" + failures + "] vertices in vertex program iteration [" + iteration + "]. Computer is aborting.");
                                }
                                programExecutor.run(this.numThreads, jobResult);
                                failures = jobResult.getCustom("partition-fail");
                                if (failures > 0L) {
                                    throw new JanusGraphException("Failed to process [" + failures + "] partitioned vertices in vertex program iteration [" + iteration + "]. Computer is aborting.");
                                }
                            }
                            catch (Exception e) {
                                throw new JanusGraphException(e);
                            }
                            this.vertexMemory.completeIteration();
                            this.memory.completeSubRound();
                            try {
                                if (!this.vertexProgram.terminate((Memory)this.memory)) break block87;
                                break;
                            }
                            finally {
                                this.memory.incrIteration();
                            }
                        }
                        ++iteration;
                    }
                }
                catch (Throwable iteration) {
                    object = iteration;
                    throw iteration;
                }
                finally {
                    if (job != null) {
                        if (object != null) {
                            try {
                                job.close();
                            }
                            catch (Throwable iteration) {
                                ((Throwable)object).addSuppressed(iteration);
                            }
                        } else {
                            job.close();
                        }
                    }
                }
            }
            HashMap<MapReduce, FulgoraMapEmitter> mapJobs = new HashMap<MapReduce, FulgoraMapEmitter>(this.mapReduces.size());
            for (MapReduce mapReduce : this.mapReduces) {
                if (!mapReduce.doStage(MapReduce.Stage.MAP)) continue;
                FulgoraMapEmitter mapEmitter = new FulgoraMapEmitter(mapReduce.doStage(MapReduce.Stage.REDUCE));
                mapJobs.put(mapReduce, mapEmitter);
            }
            this.jobId = this.name + "#map";
            try (VertexMapJob.Executor job = VertexMapJob.getVertexMapJob(this.graph, this.vertexMemory, mapJobs);){
                scanBuilder = this.graph.getBackend().buildEdgeScanJob();
                ((StandardScanner.Builder)scanBuilder).setJobId(this.jobId);
                ((StandardScanner.Builder)scanBuilder).setNumProcessingThreads(this.numThreads);
                ((StandardScanner.Builder)scanBuilder).setWorkBlockSize(this.readBatchSize);
                ((StandardScanner.Builder)scanBuilder).setJob(job);
                try {
                    ScanMetrics jobResult = (ScanMetrics)((StandardScanner.Builder)scanBuilder).execute().get();
                    long failures = jobResult.get(ScanMetrics.Metric.FAILURE);
                    if (failures > 0L) {
                        throw new JanusGraphException("Failed to process [" + failures + "] vertices in map phase. Computer is aborting.");
                    }
                    failures = jobResult.getCustom("map-fail");
                    if (failures > 0L) {
                        throw new JanusGraphException("Failed to process [" + failures + "] individual map jobs. Computer is aborting.");
                    }
                }
                catch (Exception e) {
                    throw new JanusGraphException(e);
                }
                for (Map.Entry mapJob : mapJobs.entrySet()) {
                    FulgoraMapEmitter mapEmitter = (FulgoraMapEmitter)mapJob.getValue();
                    MapReduce mapReduce = (MapReduce)mapJob.getKey();
                    mapEmitter.complete(mapReduce);
                    if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
                        FulgoraReduceEmitter reduceEmitter = new FulgoraReduceEmitter();
                        try (WorkerPool workers = new WorkerPool(this.numThreads);){
                            workers.submit(() -> mapReduce.workerStart(MapReduce.Stage.REDUCE));
                            for (Map.Entry queueEntry : mapEmitter.reduceMap.entrySet()) {
                                if (null == queueEntry) break;
                                workers.submit(() -> mapReduce.reduce(queueEntry.getKey(), ((Iterable)queueEntry.getValue()).iterator(), (MapReduce.ReduceEmitter)reduceEmitter));
                            }
                            workers.submit(() -> mapReduce.workerEnd(MapReduce.Stage.REDUCE));
                        }
                        catch (Exception e) {
                            throw new JanusGraphException("Exception while executing reduce phase", e);
                        }
                        reduceEmitter.complete(mapReduce);
                        mapReduce.addResultToMemory((Memory.Admin)this.memory, reduceEmitter.reduceQueue.iterator());
                        continue;
                    }
                    mapReduce.addResultToMemory((Memory.Admin)this.memory, mapEmitter.mapQueue.iterator());
                }
            }
            this.memory.attachReferenceElements(this.graph);
            Transaction resultgraph = this.graph;
            if (this.persistMode == GraphComputer.Persist.NOTHING && this.resultGraphMode == GraphComputer.ResultGraph.NEW) {
                resultgraph = EmptyGraph.instance();
            } else if (this.persistMode != GraphComputer.Persist.NOTHING && this.vertexProgram != null && !this.vertexProgram.getVertexComputeKeys().isEmpty()) {
                JanusGraphManagement management = this.graph.openManagement();
                try {
                    for (VertexComputeKey key : this.vertexProgram.getVertexComputeKeys()) {
                        if (!management.containsPropertyKey(key.getKey())) {
                            log.warn("Property key [{}] is not part of the schema and will be created. It is advised to initialize all keys.", (Object)key.getKey());
                        }
                        management.getOrCreatePropertyKey(key.getKey());
                    }
                    management.commit();
                }
                finally {
                    if (management != null && management.isOpen()) {
                        management.rollback();
                    }
                }
                Map mutatedProperties = Maps.transformValues(this.vertexMemory.getMutableVertexProperties(), (Function)new Function<Map<String, Object>, Map<String, Object>>(){

                    @Nullable
                    public Map<String, Object> apply(Map<String, Object> o) {
                        return Maps.filterKeys(o, s -> !VertexProgramHelper.isTransientVertexComputeKey((String)s, (Set)FulgoraGraphComputer.this.vertexProgram.getVertexComputeKeys()));
                    }
                });
                if (this.resultGraphMode == GraphComputer.ResultGraph.ORIGINAL) {
                    AtomicInteger failures = new AtomicInteger(0);
                    try (WorkerPool workers = new WorkerPool(this.numThreads);){
                        ArrayList subset = new ArrayList(this.writeBatchSize / this.vertexProgram.getVertexComputeKeys().size());
                        int currentSize = 0;
                        for (Map.Entry entry : mutatedProperties.entrySet()) {
                            subset.add(entry);
                            if ((currentSize += ((Map)entry.getValue()).size()) < this.writeBatchSize) continue;
                            workers.submit(new VertexPropertyWriter(subset, failures));
                            subset = new ArrayList(subset.size());
                            currentSize = 0;
                        }
                        if (!subset.isEmpty()) {
                            workers.submit(new VertexPropertyWriter(subset, failures));
                        }
                    }
                    catch (Exception e) {
                        throw new JanusGraphException("Exception while attempting to persist result into graph", e);
                    }
                    if (failures.get() > 0) {
                        throw new JanusGraphException("Could not persist program results to graph. Check log for details.");
                    }
                } else if (this.resultGraphMode == GraphComputer.ResultGraph.NEW) {
                    resultgraph = this.graph.newTransaction();
                    for (Map.Entry vertexProperty : mutatedProperties.entrySet()) {
                        Vertex v = (Vertex)resultgraph.vertices(new Object[]{vertexProperty.getKey()}).next();
                        for (Map.Entry prop : ((Map)vertexProperty.getValue()).entrySet()) {
                            if (prop.getValue() instanceof List) {
                                ((List)prop.getValue()).forEach(value -> v.property(VertexProperty.Cardinality.list, (String)prop.getKey(), value, new Object[0]));
                                continue;
                            }
                            v.property(VertexProperty.Cardinality.single, (String)prop.getKey(), prop.getValue(), new Object[0]);
                        }
                    }
                }
            }
            this.memory.setRuntime(System.currentTimeMillis() - time);
            this.memory.complete();
            return new DefaultComputerResult((Graph)resultgraph, (Memory)this.memory);
        });
    }

    public String toString() {
        return StringFactory.graphComputerString((GraphComputer)this);
    }

    public GraphComputer.Features features() {
        return new GraphComputer.Features(){

            public boolean supportsVertexAddition() {
                return false;
            }

            public boolean supportsVertexRemoval() {
                return false;
            }

            public boolean supportsVertexPropertyAddition() {
                return true;
            }

            public boolean supportsVertexPropertyRemoval() {
                return false;
            }

            public boolean supportsEdgeAddition() {
                return false;
            }

            public boolean supportsEdgeRemoval() {
                return false;
            }

            public boolean supportsEdgePropertyAddition() {
                return false;
            }

            public boolean supportsEdgePropertyRemoval() {
                return false;
            }

            public boolean supportsGraphFilter() {
                return false;
            }
        };
    }

    private class VertexPropertyWriter
    implements Runnable {
        private final List<Map.Entry<Long, Map<String, Object>>> properties;
        private final AtomicInteger failures;

        private VertexPropertyWriter(List<Map.Entry<Long, Map<String, Object>>> properties, AtomicInteger failures) {
            assert (properties != null && !properties.isEmpty() && failures != null);
            this.properties = properties;
            this.failures = failures;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            JanusGraphTransaction tx = FulgoraGraphComputer.this.graph.buildTransaction().enableBatchLoading().start();
            try {
                for (Map.Entry<Long, Map<String, Object>> vertexProperty : this.properties) {
                    JanusGraphVertex v = tx.getVertex(vertexProperty.getKey());
                    if (v == null) continue;
                    for (Map.Entry<String, Object> prop : vertexProperty.getValue().entrySet()) {
                        v.property(VertexProperty.Cardinality.single, prop.getKey(), prop.getValue(), new Object[0]);
                    }
                }
                tx.commit();
            }
            catch (Throwable e) {
                this.failures.incrementAndGet();
                log.error("Encountered exception while trying to write properties: ", e);
            }
            finally {
                if (tx != null && tx.isOpen()) {
                    tx.rollback();
                }
            }
        }
    }
}

