package com.alibaba.jstorm.task;

import backtype.storm.serialization.KryoTupleSerializer;
import backtype.storm.tuple.BatchTuple;
import backtype.storm.tuple.ITupleExt;
import backtype.storm.utils.DisruptorQueue;
import com.alibaba.jstorm.callback.AsyncLoopThread;
import com.alibaba.jstorm.client.ConfigExtension;
import com.alibaba.jstorm.daemon.worker.WorkerData;
import com.alibaba.jstorm.task.TaskTransfer;
import com.alibaba.jstorm.utils.EventSampler;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/jstorm/task/TaskBatchTransfer.class */
public class TaskBatchTransfer extends TaskTransfer {
    private static Logger LOG = LoggerFactory.getLogger(TaskBatchTransfer.class);
    protected static final double BATCH_SIZE_THRESHOLD = 2.0d;
    protected static final int BATCH_FLUSH_INTERVAL_MS = 5;
    protected static final int BATCH_CHECK_INTERVAL_S = 3600;
    protected static final int BATCH_EVENT_SAMPLER_INTERVAL_S = 960;
    private Map<Integer, BatchTuple> batchMap;
    private final int maxBatchSize;
    private int batchSize;
    private Object lock;
    private EventSampler eventSampler;

    /* loaded from: input_file:com/alibaba/jstorm/task/TaskBatchTransfer$TransferBatchRunnable.class */
    protected class TransferBatchRunnable extends TaskTransfer.TransferRunnable {
        protected TransferBatchRunnable() {
            super();
        }

        @Override // com.alibaba.jstorm.task.TaskTransfer.TransferRunnable
        public byte[] serialize(ITupleExt iTupleExt) {
            return TaskBatchTransfer.this.serializer.serializeBatch((BatchTuple) iTupleExt);
        }
    }

    public TaskBatchTransfer(Task task, String str, KryoTupleSerializer kryoTupleSerializer, TaskStatus taskStatus, WorkerData workerData) {
        super(task, str, kryoTupleSerializer, taskStatus, workerData);
        this.lock = new Object();
        this.eventSampler = null;
        this.batchMap = new HashMap();
        this.maxBatchSize = ConfigExtension.getTaskMsgBatchSize(workerData.getStormConf()).intValue();
    }

    @Override // com.alibaba.jstorm.task.TaskTransfer
    protected AsyncLoopThread setupSerializeThread() {
        return new AsyncLoopThread(new TransferBatchRunnable());
    }

    @Override // com.alibaba.jstorm.task.TaskTransfer
    public void transfer(BatchTuple batchTuple) {
        DisruptorQueue disruptorQueue = this.innerTaskTransfer.get(Integer.valueOf(batchTuple.getTargetTaskId()));
        if (disruptorQueue != null) {
            disruptorQueue.publish(batchTuple);
        } else {
            this.serializeQueue.publish(batchTuple);
        }
        if (this.backpressureController.isBackpressureMode()) {
            this.backpressureController.flowControl();
        }
    }

    @Override // com.alibaba.jstorm.task.TaskTransfer
    public void transferControl(BatchTuple batchTuple) {
        DisruptorQueue disruptorQueue = this.controlQueues.get(Integer.valueOf(batchTuple.getTargetTaskId()));
        if (disruptorQueue != null) {
            disruptorQueue.publish(batchTuple);
        } else {
            this.transferControlQueue.publish(batchTuple);
        }
    }
}
