/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.jstorm.task;

import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.tuple.ITupleExt;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.task.Task;
import com.alibaba.jstorm.task.TaskStatus;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.utils.EventSampler;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskBatchTransfer
extends TaskTransfer {
    private static Logger LOG = LoggerFactory.getLogger(TaskBatchTransfer.class);
    protected static final double BATCH_SIZE_THRESHOLD = 2.0;
    protected static final int BATCH_FLUSH_INTERVAL_MS = 5;
    protected static final int BATCH_CHECK_INTERVAL_S = 3600;
    protected static final int BATCH_EVENT_SAMPLER_INTERVAL_S = 960;
    private Map<Integer, BatchTuple> batchMap;
    private final int maxBatchSize;
    private int batchSize;
    private Object lock = new Object();
    private EventSampler eventSampler = null;

    public TaskBatchTransfer(Task task, String taskName, KryoTupleSerializer serializer, TaskStatus taskStatus, WorkerData workerData) {
        super(task, taskName, serializer, taskStatus, workerData);
        this.batchMap = new HashMap<Integer, BatchTuple>();
        this.maxBatchSize = ConfigExtension.getTaskMsgBatchSize(workerData.getStormConf());
    }

    @Override
    protected AsyncLoopThread setupSerializeThread() {
        return new AsyncLoopThread(new TransferBatchRunnable());
    }

    @Override
    public void transfer(BatchTuple batch) {
        int taskId = batch.getTargetTaskId();
        DisruptorQueue exeQueue = (DisruptorQueue)this.innerTaskTransfer.get(taskId);
        if (exeQueue != null) {
            exeQueue.publish(batch);
        } else {
            this.serializeQueue.publish(batch);
        }
        if (this.backpressureController.isBackpressureMode()) {
            this.backpressureController.flowControl();
        }
    }

    @Override
    public void transferControl(BatchTuple tuple) {
        int taskId = tuple.getTargetTaskId();
        DisruptorQueue controlQueue = (DisruptorQueue)this.controlQueues.get(taskId);
        if (controlQueue != null) {
            controlQueue.publish(tuple);
        } else {
            this.transferControlQueue.publish(tuple);
        }
    }

    protected class TransferBatchRunnable
    extends TaskTransfer.TransferRunnable {
        protected TransferBatchRunnable() {
            super(TaskBatchTransfer.this);
        }

        @Override
        public byte[] serialize(ITupleExt tuple) {
            BatchTuple batchTuple = (BatchTuple)tuple;
            return TaskBatchTransfer.this.serializer.serializeBatch(batchTuple);
        }
    }
}

