/*
 * Decompiled with CFR 0.152.
 */
package com.aerospike.client.query;

import com.aerospike.client.Log;
import com.aerospike.client.Value;
import com.aerospike.client.cluster.Cluster;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.command.MultiCommand;
import com.aerospike.client.lua.LuaCache;
import com.aerospike.client.lua.LuaInputStream;
import com.aerospike.client.lua.LuaInstance;
import com.aerospike.client.lua.LuaOutputStream;
import com.aerospike.client.policy.QueryPolicy;
import com.aerospike.client.query.QueryAggregateCommand;
import com.aerospike.client.query.QueryExecutor;
import com.aerospike.client.query.ResultSet;
import com.aerospike.client.query.Statement;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.luaj.vm2.LuaInteger;
import org.luaj.vm2.LuaValue;

public final class QueryAggregateExecutor
extends QueryExecutor
implements Runnable {
    private final BlockingQueue<LuaValue> inputQueue = new ArrayBlockingQueue<LuaValue>(500);
    private final ResultSet resultSet;
    private LuaInstance lua;

    public QueryAggregateExecutor(Cluster cluster, QueryPolicy policy, Statement statement, Node[] nodes) {
        super(cluster, policy, statement, nodes);
        this.resultSet = new ResultSet(this, policy.recordQueueSize);
        LuaValue.valueOf((int)0);
        this.lua = LuaCache.getInstance();
        try {
            this.initializeThreads();
            this.threadPool.execute(this);
        }
        catch (Throwable e) {
            LuaCache.putInstance(this.lua);
            throw e;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            this.lua.loadPackage(this.statement);
            this.startThreads();
            LuaValue[] args = new LuaValue[4 + this.statement.getFunctionArgs().length];
            args[0] = this.lua.getFunction(this.statement.getFunctionName());
            args[1] = LuaInteger.valueOf((int)2);
            args[2] = new LuaInputStream(this.inputQueue);
            args[3] = new LuaOutputStream(this.resultSet);
            int count = 4;
            for (Value value : this.statement.getFunctionArgs()) {
                args[count++] = value.getLuaValue(this.lua);
            }
            this.lua.call("apply_stream", args);
        }
        catch (Throwable e) {
            super.stopThreads(e);
        }
        finally {
            this.resultSet.put(ResultSet.END);
            LuaCache.putInstance(this.lua);
        }
    }

    @Override
    protected MultiCommand createCommand(Node node, long clusterKey, boolean first) {
        return new QueryAggregateCommand(this.cluster, node, this.policy, this.statement, this.taskId, this.lua, this.inputQueue, clusterKey, first);
    }

    @Override
    protected void sendCancel() {
        this.inputQueue.clear();
        this.resultSet.abort();
        while (!this.inputQueue.offer(LuaValue.NIL)) {
            if (this.inputQueue.poll() != null) continue;
            if (!Log.debugEnabled()) break;
            Log.debug("Lua input queue " + this.statement.taskId + " both offer and poll failed on abort");
            break;
        }
    }

    @Override
    protected void sendCompleted() {
        while (true) {
            try {
                this.inputQueue.put(LuaValue.NIL);
            }
            catch (InterruptedException ie) {
                if (!Log.debugEnabled()) continue;
                Log.debug("Lua input queue " + this.statement.taskId + " put interrupted");
                continue;
            }
            break;
        }
    }

    public ResultSet getResultSet() {
        return this.resultSet;
    }
}

