/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.executor;

import java.util.ArrayList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutorTransfer {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
    private final WorkerState workerData;
    private final KryoTupleSerializer serializer;
    private final boolean isDebug;
    private int indexingBase = 0;
    private ArrayList<JCQueue> localReceiveQueues;
    private AtomicReferenceArray<JCQueue> queuesToFlush;

    public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
        this.workerData = workerData;
        this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
        this.isDebug = ObjectReader.getBoolean(topoConf.get("topology.debug"), false);
    }

    public void initLocalRecvQueues() {
        Integer minTaskId = (Integer)this.workerData.getLocalReceiveQueues().keySet().stream().min(Integer::compareTo).get();
        this.localReceiveQueues = Utils.convertToArray(this.workerData.getLocalReceiveQueues(), minTaskId);
        this.indexingBase = minTaskId;
        this.queuesToFlush = new AtomicReferenceArray(this.localReceiveQueues.size());
    }

    public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
        JCQueue localQueue;
        if (this.isDebug) {
            LOG.info("TRANSFERRING tuple {}", (Object)addressedTuple);
        }
        if ((localQueue = this.getLocalQueue(addressedTuple)) != null) {
            return this.tryTransferLocal(addressedTuple, localQueue, pendingEmits);
        }
        return this.workerData.tryTransferRemote(addressedTuple, pendingEmits, this.serializer);
    }

    public void flush() throws InterruptedException {
        this.flushLocal();
        this.workerData.flushRemotes();
    }

    private void flushLocal() throws InterruptedException {
        for (int i = 0; i < this.queuesToFlush.length(); ++i) {
            JCQueue q = this.queuesToFlush.get(i);
            if (q == null) continue;
            q.flush();
            this.queuesToFlush.set(i, null);
        }
    }

    public JCQueue getLocalQueue(AddressedTuple tuple) {
        if (tuple.dest - this.indexingBase >= this.localReceiveQueues.size()) {
            return null;
        }
        return this.localReceiveQueues.get(tuple.dest - this.indexingBase);
    }

    public boolean tryTransferLocal(AddressedTuple tuple, JCQueue localQueue, Queue<AddressedTuple> pendingEmits) {
        this.workerData.checkSerialize(this.serializer, tuple);
        if (pendingEmits != null) {
            if (pendingEmits.isEmpty() && localQueue.tryPublish(tuple)) {
                this.queuesToFlush.set(tuple.dest - this.indexingBase, localQueue);
                return true;
            }
            pendingEmits.add(tuple);
            return false;
        }
        return localQueue.tryPublish(tuple);
    }
}

