/*
 * Decompiled with CFR 0.152.
 */
package com.baidu.hugegraph.task;

import com.baidu.hugegraph.HugeException;
import com.baidu.hugegraph.HugeGraph;
import com.baidu.hugegraph.backend.id.Id;
import com.baidu.hugegraph.backend.query.Condition;
import com.baidu.hugegraph.backend.query.ConditionQuery;
import com.baidu.hugegraph.backend.store.BackendStore;
import com.baidu.hugegraph.backend.tx.GraphTransaction;
import com.baidu.hugegraph.event.Event;
import com.baidu.hugegraph.event.EventListener;
import com.baidu.hugegraph.exception.NotFoundException;
import com.baidu.hugegraph.iterator.ExtendableIterator;
import com.baidu.hugegraph.iterator.MapperIterator;
import com.baidu.hugegraph.schema.IndexLabel;
import com.baidu.hugegraph.schema.PropertyKey;
import com.baidu.hugegraph.schema.SchemaManager;
import com.baidu.hugegraph.schema.VertexLabel;
import com.baidu.hugegraph.structure.HugeVertex;
import com.baidu.hugegraph.task.HugeTask;
import com.baidu.hugegraph.task.TaskStatus;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.Cardinality;
import com.baidu.hugegraph.type.define.DataType;
import com.baidu.hugegraph.type.define.HugeKeys;
import com.baidu.hugegraph.util.E;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;

public class TaskScheduler {
    private final HugeGraph graph;
    private final ExecutorService taskExecutor;
    private final ExecutorService dbExecutor;
    private final EventListener eventListener;
    private final Map<Id, HugeTask<?>> tasks;
    private volatile TaskTransaction taskTx;
    private static final long NO_LIMIT = -1L;
    private static final long QUERY_INTERVAL = 100L;

    public TaskScheduler(HugeGraph graph, ExecutorService taskExecutor, ExecutorService dbExecutor) {
        E.checkNotNull((Object)graph, (String)"graph");
        E.checkNotNull((Object)taskExecutor, (String)"taskExecutor");
        E.checkNotNull((Object)dbExecutor, (String)"dbExecutor");
        this.graph = graph;
        this.taskExecutor = taskExecutor;
        this.dbExecutor = dbExecutor;
        this.tasks = new ConcurrentHashMap();
        this.taskTx = null;
        this.eventListener = this.listenChanges();
    }

    public HugeGraph graph() {
        return this.graph;
    }

    public int pendingTasks() {
        return this.tasks.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TaskTransaction tx() {
        if (this.taskTx == null) {
            TaskScheduler taskScheduler = this;
            synchronized (taskScheduler) {
                if (this.taskTx == null) {
                    BackendStore store = this.graph.loadSystemStore();
                    TaskTransaction tx = new TaskTransaction(this.graph, store);
                    assert (this.taskTx == null);
                    this.taskTx = tx;
                }
            }
        }
        assert (this.taskTx != null);
        return this.taskTx;
    }

    private EventListener listenChanges() {
        ImmutableSet storeEvents = ImmutableSet.of((Object)"store.init", (Object)"store.truncate");
        EventListener eventListener = arg_0 -> this.lambda$listenChanges$1((Set)storeEvents, arg_0);
        this.graph.loadSystemStore().provider().listen(eventListener);
        return eventListener;
    }

    private void unlistenChanges() {
        this.graph.loadSystemStore().provider().unlisten(this.eventListener);
    }

    private void relistenChanges() {
        this.graph.loadSystemStore().provider().unlisten(this.eventListener);
        this.graph.loadSystemStore().provider().listen(this.eventListener);
    }

    public <V> void restoreTasks() {
        for (TaskStatus status : TaskStatus.PENDING_STATUSES) {
            Iterator<HugeTask<V>> iter = this.findTask(status, -1L);
            while (iter.hasNext()) {
                HugeTask<V> task = iter.next();
                this.restore(task);
            }
        }
    }

    public <V> Future<?> restore(HugeTask<V> task) {
        E.checkArgumentNotNull(task, (String)"Task can't be null", (Object[])new Object[0]);
        E.checkState((!task.isDone() ? 1 : 0) != 0, (String)"No need to restore task '%s', it has been completed", (Object[])new Object[]{task.id()});
        task.status(TaskStatus.RESTORING);
        return this.submitTask(task);
    }

    public <V> Future<?> schedule(HugeTask<V> task) {
        E.checkArgumentNotNull(task, (String)"Task can't be null", (Object[])new Object[0]);
        task.status(TaskStatus.QUEUED);
        return this.submitTask(task);
    }

    private <V> Future<?> submitTask(HugeTask<V> task) {
        this.tasks.put(task.id(), task);
        task.callable().scheduler(this);
        task.callable().task(task);
        return this.taskExecutor.submit(task);
    }

    public <V> void cancel(HugeTask<V> task) {
        E.checkArgumentNotNull(task, (String)"Task can't be null", (Object[])new Object[0]);
        if (!task.completed()) {
            task.cancel(true);
            this.remove(task.id());
        }
    }

    protected void remove(Id id) {
        HugeTask<?> task = this.tasks.remove(id);
        assert (task == null || task.completed());
    }

    public <V> void save(HugeTask<V> task) {
        E.checkArgumentNotNull(task, (String)"Task can't be null", (Object[])new Object[0]);
        this.call(() -> {
            HugeVertex vertex = this.tx().constructVertex(task);
            return this.tx().addVertex(vertex);
        });
    }

    public boolean close() {
        this.unlistenChanges();
        if (!this.dbExecutor.isShutdown()) {
            this.call(() -> {
                this.tx().close();
                this.graph.closeTx();
            });
        }
        return true;
    }

    public <V> HugeTask<V> task(Id id) {
        E.checkArgumentNotNull((Object)id, (String)"Parameter task id can't be null", (Object[])new Object[0]);
        HugeTask<?> task = this.tasks.get(id);
        if (task != null) {
            return task;
        }
        return this.findTask(id);
    }

    public <V> Iterator<HugeTask<V>> tasks(List<Id> ids) {
        ArrayList<Id> taskIdsNotInMem = new ArrayList<Id>();
        ArrayList taskInMem = new ArrayList();
        for (Id id : ids) {
            HugeTask<?> task = this.tasks.get(id);
            if (task != null) {
                taskInMem.add(task);
                continue;
            }
            taskIdsNotInMem.add(id);
        }
        ExtendableIterator iterator = taskInMem.isEmpty() ? new ExtendableIterator() : new ExtendableIterator(taskInMem.iterator());
        iterator.extend(this.findTasks(taskIdsNotInMem));
        return iterator;
    }

    public <V> HugeTask<V> findTask(Id id) {
        HugeTask result = this.call(() -> {
            HugeTask task = null;
            Iterator<Vertex> vertices = this.tx().queryVertices(id);
            if (vertices.hasNext()) {
                task = HugeTask.fromVertex(vertices.next());
                assert (!vertices.hasNext());
            }
            return task;
        });
        if (result == null) {
            throw new NotFoundException("Can't find task with id '%s'", id);
        }
        return result;
    }

    public <V> Iterator<HugeTask<V>> findTasks(List<Id> ids) {
        Object[] idArray = ids.toArray(new Id[ids.size()]);
        return (Iterator)this.call(() -> {
            Iterator<Vertex> vertices = this.tx().queryVertices(idArray);
            return new MapperIterator(vertices, HugeTask::fromVertex);
        });
    }

    public <V> Iterator<HugeTask<V>> findAllTask(long limit) {
        return this.queryTask((Map<String, Object>)ImmutableMap.of(), limit);
    }

    public <V> Iterator<HugeTask<V>> findTask(TaskStatus status, long limit) {
        return this.queryTask("~task_status", status.code(), limit);
    }

    public <V> HugeTask<V> deleteTask(Id id) {
        HugeTask<?> task = this.tasks.get(id);
        if (task != null) {
            E.checkState((boolean)task.completed(), (String)"Can't delete incomplete task '%s' in status '%s'. Please try to cancel the task first", (Object[])new Object[]{id, task.status()});
            this.remove(id);
        }
        return this.call(() -> {
            HugeTask result = null;
            Iterator<Vertex> vertices = this.tx().queryVertices(id);
            if (vertices.hasNext()) {
                HugeVertex vertex = (HugeVertex)vertices.next();
                result = HugeTask.fromVertex(vertex);
                E.checkState((boolean)result.completed(), (String)"Can't delete incomplete task '%s' in status '%s'", (Object[])new Object[]{id, result.status()});
                this.tx().removeVertex(vertex);
                assert (!vertices.hasNext());
            }
            return result;
        });
    }

    public <V> HugeTask<V> waitUntilTaskCompleted(Id id, long seconds) throws TimeoutException {
        long passes = seconds * 1000L / 100L;
        long pass = 0L;
        while (true) {
            HugeTask<V> task;
            if ((task = this.task(id)).completed()) {
                return task;
            }
            if (pass >= passes) break;
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            ++pass;
        }
        throw new TimeoutException(String.format("Task '%s' was not completed in %s seconds", id, seconds));
    }

    public void waitUntilAllTasksCompleted(long seconds) throws TimeoutException {
        long passes = seconds * 1000L / 100L;
        int taskSize = 0;
        long pass = 0L;
        while (true) {
            if ((taskSize = this.pendingTasks()) == 0) {
                return;
            }
            if (pass >= passes) break;
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            ++pass;
        }
        throw new TimeoutException(String.format("There are still %d incomplete tasks after %s seconds", taskSize, seconds));
    }

    private <V> Iterator<HugeTask<V>> queryTask(String key, Object value, long limit) {
        return this.queryTask((Map<String, Object>)ImmutableMap.of((Object)key, (Object)value), limit);
    }

    private <V> Iterator<HugeTask<V>> queryTask(Map<String, Object> conditions, long limit) {
        return (Iterator)this.call(() -> {
            ConditionQuery query = new ConditionQuery(HugeType.VERTEX);
            VertexLabel vl = this.graph.vertexLabel(TaskTransaction.TASK);
            query.eq(HugeKeys.LABEL, vl.id());
            for (Map.Entry entry : conditions.entrySet()) {
                PropertyKey pk = this.graph.propertyKey((String)entry.getKey());
                query.query(Condition.eq(pk.id(), entry.getValue()));
            }
            query.showHidden(true);
            if (limit != -1L) {
                query.limit(limit);
            }
            Iterator<Vertex> vertices = this.tx().queryVertices(query);
            return new MapperIterator(vertices, HugeTask::fromVertex);
        });
    }

    private <V> V call(Runnable runnable) {
        return this.call(Executors.callable(runnable, null));
    }

    private <V> V call(Callable<V> callable) {
        try {
            return this.dbExecutor.submit(callable).get();
        }
        catch (Exception e) {
            throw new HugeException("Failed to update/query TaskStore", e);
        }
    }

    private /* synthetic */ Object lambda$listenChanges$1(Set storeEvents, Event event) {
        if (storeEvents.contains(event.name())) {
            this.call(() -> this.tx().initSchema());
            this.relistenChanges();
            return true;
        }
        return false;
    }

    private static class TaskTransaction
    extends GraphTransaction {
        public static final String TASK = HugeTask.P.TASK;

        public TaskTransaction(HugeGraph graph, BackendStore store) {
            super(graph, store);
            this.autoCommit(true);
        }

        public HugeVertex constructVertex(HugeTask<?> task) {
            if (this.graph().schemaTransaction().getVertexLabel(TASK) == null) {
                throw new HugeException("Schema is missing for task(%s) '%s'", task.id(), task.name());
            }
            return this.constructVertex(false, task.asArray());
        }

        public boolean indexValueChanged(Vertex oldV, HugeVertex newV) {
            return !oldV.value("~task_status").equals(newV.value("~task_status"));
        }

        private void deleteIndex(HugeVertex vertex) {
            Iterator<Vertex> old = this.queryVertices(vertex.id());
            if (old.hasNext()) {
                HugeVertex oldV = (HugeVertex)old.next();
                assert (!old.hasNext());
                if (this.indexValueChanged(oldV, vertex)) {
                    this.removeVertex(oldV);
                }
            }
        }

        protected void initSchema() {
            HugeGraph graph = this.graph();
            VertexLabel label = graph.schemaTransaction().getVertexLabel(TASK);
            if (label != null) {
                return;
            }
            String[] properties = this.initProperties();
            label = (VertexLabel)graph.schema().vertexLabel(TASK).properties(properties).useCustomizeNumberId().nullableKeys("~task_description", "~task_update", "~task_input", "~task_result", "~task_dependencies").enableLabelIndex(true).build();
            graph.schemaTransaction().addVertexLabel(label);
            this.createIndex(label, "~task_status");
        }

        private String[] initProperties() {
            ArrayList<String> props = new ArrayList<String>();
            props.add(this.createPropertyKey("~task_type"));
            props.add(this.createPropertyKey("~task_name"));
            props.add(this.createPropertyKey("~task_callable"));
            props.add(this.createPropertyKey("~task_description"));
            props.add(this.createPropertyKey("~task_status", DataType.BYTE));
            props.add(this.createPropertyKey("~task_progress", DataType.INT));
            props.add(this.createPropertyKey("~task_create", DataType.DATE));
            props.add(this.createPropertyKey("~task_update", DataType.DATE));
            props.add(this.createPropertyKey("~task_retries", DataType.INT));
            props.add(this.createPropertyKey("~task_input"));
            props.add(this.createPropertyKey("~task_result"));
            props.add(this.createPropertyKey("~task_dependencies", DataType.LONG, Cardinality.SET));
            return props.toArray(new String[0]);
        }

        private String createPropertyKey(String name) {
            return this.createPropertyKey(name, DataType.TEXT);
        }

        private String createPropertyKey(String name, DataType dataType) {
            return this.createPropertyKey(name, dataType, Cardinality.SINGLE);
        }

        private String createPropertyKey(String name, DataType dataType, Cardinality cardinality) {
            HugeGraph graph = this.graph();
            SchemaManager schema = graph.schema();
            PropertyKey propertyKey = (PropertyKey)schema.propertyKey(name).dataType(dataType).cardinality(cardinality).build();
            graph.schemaTransaction().addPropertyKey(propertyKey);
            return name;
        }

        private IndexLabel createIndex(VertexLabel label, String field) {
            HugeGraph graph = this.graph();
            SchemaManager schema = graph.schema();
            String name = Graph.Hidden.hide((String)("task-index-by-" + field));
            IndexLabel indexLabel = (IndexLabel)schema.indexLabel(name).on(HugeType.VERTEX_LABEL, TASK).by(field).build();
            graph.schemaTransaction().addIndexLabel(label, indexLabel);
            return indexLabel;
        }
    }
}

