/*
 * Decompiled with CFR 0.152.
 */
package com.aerospike.client.query;

import com.aerospike.client.AerospikeException;
import com.aerospike.client.Value;
import com.aerospike.client.cluster.Cluster;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.lua.LuaCache;
import com.aerospike.client.lua.LuaInputStream;
import com.aerospike.client.lua.LuaInstance;
import com.aerospike.client.lua.LuaOutputStream;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.query.QueryAggregateCommand;
import com.aerospike.client.query.QueryCommand;
import com.aerospike.client.query.QueryExecutor;
import com.aerospike.client.query.ResultSet;
import com.aerospike.client.query.Statement;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.luaj.vm2.LuaInteger;
import org.luaj.vm2.LuaValue;

public final class QueryAggregateExecutor
extends QueryExecutor
implements Runnable {
    private final BlockingQueue<LuaValue> inputQueue = new ArrayBlockingQueue<LuaValue>(500);
    private final ResultSet resultSet;
    private LuaInstance lua;
    private Future<?> future;

    public QueryAggregateExecutor(Cluster cluster, QueryPolicy policy, Statement statement, String packageName, String functionName, Value[] functionArgs) throws AerospikeException {
        super(cluster, policy, statement, null);
        this.resultSet = new ResultSet(this, policy.recordQueueSize);
        statement.setAggregateFunction(packageName, functionName, functionArgs, true);
        statement.prepare();
        LuaValue.valueOf((int)0);
    }

    public void execute() {
        this.future = this.threadPool.submit(this);
    }

    @Override
    public void run() {
        try {
            this.runThreads();
        }
        catch (Exception e) {
            super.stopThreads(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runThreads() throws AerospikeException {
        this.lua = LuaCache.getInstance();
        this.startThreads();
        try {
            this.lua.load(this.statement.getPackageName(), false);
            LuaValue[] args = new LuaValue[4 + this.statement.getFunctionArgs().length];
            args[0] = this.lua.getFunction(this.statement.getFunctionName());
            args[1] = LuaInteger.valueOf((int)2);
            args[2] = new LuaInputStream(this.inputQueue);
            args[3] = new LuaOutputStream(this.resultSet);
            int count = 4;
            for (Value value : this.statement.getFunctionArgs()) {
                args[count++] = value.getLuaValue(this.lua);
            }
            this.lua.call("apply_stream", args);
        }
        finally {
            LuaCache.putInstance(this.lua);
        }
    }

    @Override
    protected QueryCommand createCommand(Node node) {
        return new QueryAggregateCommand(node, this.policy, this.statement, this.lua, this.inputQueue);
    }

    @Override
    protected void sendCompleted() {
        try {
            this.inputQueue.put(LuaValue.NIL);
        }
        catch (InterruptedException ie) {
            // empty catch block
        }
        if (this.exception == null) {
            try {
                this.future.get(1000L, TimeUnit.MILLISECONDS);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        this.resultSet.put(ResultSet.END);
    }

    public ResultSet getResultSet() {
        return this.resultSet;
    }
}

