package datadog.trace.agent.common.writer.ddagent;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import datadog.common.exec.CommonTaskExecutor;
import datadog.common.exec.DaemonThreadFactory;
import datadog.slf4j.Logger;
import datadog.slf4j.LoggerFactory;
import datadog.trace.agent.common.writer.ddagent.DisruptorEvent;
import datadog.trace.agent.core.DDSpan;
import datadog.trace.agent.core.monitor.Monitor;
import datadog.trace.agent.core.processor.TraceProcessor;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:inst/datadog/trace/agent/common/writer/ddagent/TraceProcessingDisruptor.classdata */
public class TraceProcessingDisruptor implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TraceProcessingDisruptor.class);
    private final Disruptor<DisruptorEvent<List<DDSpan>>> disruptor;
    private final DisruptorEvent.DataTranslator<List<DDSpan>> dataTranslator;
    private final DisruptorEvent.FlushTranslator<List<DDSpan>> flushTranslator;
    private final DisruptorEvent.HeartbeatTranslator<List<DDSpan>> heartbeatTranslator = new DisruptorEvent.HeartbeatTranslator<>();
    private final boolean doHeartbeat;
    private volatile ScheduledFuture<?> heartbeat;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:inst/datadog/trace/agent/common/writer/ddagent/TraceProcessingDisruptor$HeartbeatTask.classdata */
    public static final class HeartbeatTask implements CommonTaskExecutor.Task<TraceProcessingDisruptor> {
        private HeartbeatTask() {
        }

        @Override // datadog.common.exec.CommonTaskExecutor.Task
        public void run(TraceProcessingDisruptor traceProcessingDisruptor) {
            traceProcessingDisruptor.heartbeat();
        }
    }

    /* loaded from: input_file:inst/datadog/trace/agent/common/writer/ddagent/TraceProcessingDisruptor$TraceSerializingHandler.classdata */
    public static class TraceSerializingHandler implements EventHandler<DisruptorEvent<List<DDSpan>>> {
        private final TraceProcessor processor = new TraceProcessor();
        private final Monitor monitor;
        private final long flushIntervalMillis;
        private final boolean doTimeFlush;
        private final PayloadDispatcher payloadDispatcher;
        private long nextFlushMillis;

        public TraceSerializingHandler(Monitor monitor, long j, TimeUnit timeUnit, PayloadDispatcher payloadDispatcher) {
            this.monitor = monitor;
            this.doTimeFlush = j > 0;
            this.payloadDispatcher = payloadDispatcher;
            if (!this.doTimeFlush) {
                this.flushIntervalMillis = Long.MAX_VALUE;
            } else {
                this.flushIntervalMillis = timeUnit.toMillis(j);
                scheduleNextTimeFlush();
            }
        }

        @Override // com.lmax.disruptor.EventHandler
        public void onEvent(DisruptorEvent<List<DDSpan>> disruptorEvent, long j, boolean z) {
            try {
                try {
                    if (disruptorEvent.data == null && this.doTimeFlush && millisecondTime() > this.nextFlushMillis) {
                        this.payloadDispatcher.flush();
                        scheduleNextTimeFlush();
                    }
                    if (disruptorEvent.data != null) {
                        this.payloadDispatcher.addTrace(this.processor.onTraceComplete(disruptorEvent.data));
                    }
                    if (null != disruptorEvent.flushLatch) {
                        this.payloadDispatcher.flush();
                        disruptorEvent.flushLatch.countDown();
                    }
                } catch (Throwable th) {
                    if (TraceProcessingDisruptor.log.isDebugEnabled()) {
                        TraceProcessingDisruptor.log.debug("Error while serializing trace", th);
                    }
                    this.monitor.onFailedSerialize(disruptorEvent.data, th);
                    disruptorEvent.reset();
                }
            } finally {
                disruptorEvent.reset();
            }
        }

        private void scheduleNextTimeFlush() {
            if (this.doTimeFlush) {
                this.nextFlushMillis = millisecondTime() + this.flushIntervalMillis;
            }
        }

        private long millisecondTime() {
            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        }
    }

    public TraceProcessingDisruptor(int i, Monitor monitor, DDAgentApi dDAgentApi, long j, TimeUnit timeUnit, boolean z) {
        this.disruptor = DisruptorUtils.create(new DisruptorEvent.Factory(), i, DaemonThreadFactory.TRACE_PROCESSOR, ProducerType.MULTI, new BlockingWaitStrategy());
        this.disruptor.handleEventsWith(new TraceSerializingHandler(monitor, j, timeUnit, new PayloadDispatcher(dDAgentApi, monitor)));
        this.dataTranslator = new DisruptorEvent.DataTranslator<>();
        this.flushTranslator = new DisruptorEvent.FlushTranslator<>();
        this.doHeartbeat = z;
    }

    public void start() {
        if (this.doHeartbeat) {
            this.heartbeat = CommonTaskExecutor.INSTANCE.scheduleAtFixedRate(new HeartbeatTask(), this, 1000L, 1000L, TimeUnit.MILLISECONDS, "disruptor heartbeat");
        }
        this.disruptor.start();
    }

    public boolean flush(long j, TimeUnit timeUnit) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.disruptor.publishEvent(this.flushTranslator, 0, countDownLatch);
        try {
            return countDownLatch.await(j, timeUnit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (null != this.heartbeat) {
            this.heartbeat.cancel(true);
        }
        this.disruptor.halt();
    }

    public boolean publish(List<DDSpan> list, int i) {
        return this.disruptor.getRingBuffer().tryPublishEvent(this.dataTranslator, list, Integer.valueOf(i));
    }

    void heartbeat() {
        this.disruptor.getRingBuffer().publishEvent(this.heartbeatTranslator);
    }

    public int getDisruptorCapacity() {
        return this.disruptor.getRingBuffer().getBufferSize();
    }

    public long getDisruptorRemainingCapacity() {
        return this.disruptor.getRingBuffer().remainingCapacity();
    }
}
