package datadog.trace.agent.common.writer;

import datadog.communication.ddagent.DroppingPolicy;
import datadog.jctools.queues.MessagePassingQueue;
import datadog.jctools.queues.MpscBlockingConsumerArrayQueue;
import datadog.slf4j.Logger;
import datadog.slf4j.LoggerFactory;
import datadog.trace.agent.common.sampling.SingleSpanSampler;
import datadog.trace.agent.common.writer.ddagent.FlushEvent;
import datadog.trace.agent.common.writer.ddagent.Prioritization;
import datadog.trace.agent.common.writer.ddagent.PrioritizationStrategy;
import datadog.trace.agent.core.CoreSpan;
import datadog.trace.agent.core.DDSpan;
import datadog.trace.agent.core.monitor.HealthMetrics;
import datadog.trace.agent.core.postprocessor.SpanPostProcessor;
import datadog.trace.util.AgentThreadFactory;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:trace/datadog/trace/agent/common/writer/TraceProcessingWorker.classdata */
public class TraceProcessingWorker implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TraceProcessingWorker.class);
    private final PrioritizationStrategy prioritizationStrategy;
    private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
    private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
    private final TraceSerializingHandler serializingHandler;
    private final Thread serializerThread;
    private final int capacity;
    private final SpanSamplingWorker spanSamplingWorker;

    /* loaded from: input_file:trace/datadog/trace/agent/common/writer/TraceProcessingWorker$TraceSerializingHandler.classdata */
    public static class TraceSerializingHandler implements Runnable {
        private final MpscBlockingConsumerArrayQueue<Object> primaryQueue;
        private final MpscBlockingConsumerArrayQueue<Object> secondaryQueue;
        private final HealthMetrics healthMetrics;
        private final long ticksRequiredToFlush;
        private final boolean doTimeFlush;
        private final PayloadDispatcher payloadDispatcher;
        private long lastTicks;
        private final SpanPostProcessor spanPostProcessor;

        public TraceSerializingHandler(MpscBlockingConsumerArrayQueue<Object> mpscBlockingConsumerArrayQueue, MpscBlockingConsumerArrayQueue<Object> mpscBlockingConsumerArrayQueue2, HealthMetrics healthMetrics, PayloadDispatcher payloadDispatcher, long j, TimeUnit timeUnit, SpanPostProcessor spanPostProcessor) {
            this.primaryQueue = mpscBlockingConsumerArrayQueue;
            this.secondaryQueue = mpscBlockingConsumerArrayQueue2;
            this.healthMetrics = healthMetrics;
            this.doTimeFlush = j > 0;
            this.payloadDispatcher = payloadDispatcher;
            if (this.doTimeFlush) {
                this.lastTicks = System.nanoTime();
                this.ticksRequiredToFlush = timeUnit.toNanos(j);
            } else {
                this.ticksRequiredToFlush = Long.MAX_VALUE;
            }
            this.spanPostProcessor = spanPostProcessor;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                runDutyCycle();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            TraceProcessingWorker.log.debug("Datadog trace processor exited. Publishing traces stopped. Unpublished traces left: " + (!queuesAreEmpty()));
        }

        private void runDutyCycle() throws InterruptedException {
            Thread currentThread = Thread.currentThread();
            while (!currentThread.isInterrupted()) {
                consumeFromPrimaryQueue();
                consumeFromSecondaryQueue();
                flushIfNecessary();
            }
        }

        public void onEvent(Object obj) {
            try {
                if (obj instanceof List) {
                    List<DDSpan> list = (List) obj;
                    maybeTracePostProcessing(list);
                    this.payloadDispatcher.addTrace(list);
                } else if (obj instanceof FlushEvent) {
                    this.payloadDispatcher.flush();
                    ((FlushEvent) obj).sync();
                }
            } catch (Throwable th) {
                if (TraceProcessingWorker.log.isDebugEnabled()) {
                    TraceProcessingWorker.log.debug("Error while serializing trace", th);
                }
                this.healthMetrics.onFailedSerialize(obj instanceof List ? (List) obj : null, th);
            }
        }

        protected void consumeFromPrimaryQueue() throws InterruptedException {
            Object poll = this.primaryQueue.poll(100L, TimeUnit.MILLISECONDS);
            if (null != poll) {
                onEvent(poll);
                consumeBatch(this.primaryQueue);
            }
        }

        protected void consumeFromSecondaryQueue() {
            Object poll = this.secondaryQueue.poll();
            if (null != poll) {
                onEvent(poll);
                consumeBatch(this.secondaryQueue);
            }
        }

        protected void flushIfNecessary() {
            if (shouldFlush()) {
                this.payloadDispatcher.flush();
            }
        }

        private boolean shouldFlush() {
            if (!this.doTimeFlush) {
                return false;
            }
            long nanoTime = System.nanoTime();
            if (nanoTime - this.lastTicks <= this.ticksRequiredToFlush) {
                return false;
            }
            this.lastTicks = nanoTime;
            return true;
        }

        private void consumeBatch(MessagePassingQueue<Object> messagePassingQueue) {
            messagePassingQueue.drain(this::onEvent, messagePassingQueue.size());
        }

        protected boolean queuesAreEmpty() {
            return this.primaryQueue.isEmpty() && this.secondaryQueue.isEmpty();
        }

        /* JADX WARN: Code restructure failed: missing block: B:35:0x00a1, code lost:
        
            datadog.trace.agent.common.writer.TraceProcessingWorker.log.debug("Span post-processing interrupted due to timeout.");
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        private void maybeTracePostProcessing(java.util.List<datadog.trace.agent.core.DDSpan> r6) {
            /*
                r5 = this;
                r0 = r6
                if (r0 == 0) goto Ld
                r0 = r6
                boolean r0 = r0.isEmpty()
                if (r0 == 0) goto Le
            Ld:
                return
            Le:
                r0 = 0
                r7 = r0
                r0 = r6
                java.util.Iterator r0 = r0.iterator()
                r8 = r0
            L17:
                r0 = r8
                boolean r0 = r0.hasNext()
                if (r0 == 0) goto L57
                r0 = r8
                java.lang.Object r0 = r0.next()
                datadog.trace.agent.core.DDSpan r0 = (datadog.trace.agent.core.DDSpan) r0
                r9 = r0
                r0 = r9
                datadog.trace.agent.core.DDSpanContext r0 = r0.context()
                r10 = r0
                r0 = r10
                if (r0 == 0) goto L54
                r0 = r10
                boolean r0 = r0.isRequiresPostProcessing()
                if (r0 == 0) goto L54
                r0 = r7
                if (r0 != 0) goto L4b
                java.util.ArrayList r0 = new java.util.ArrayList
                r1 = r0
                r1.<init>()
                r7 = r0
            L4b:
                r0 = r7
                r1 = r9
                boolean r0 = r0.add(r1)
            L54:
                goto L17
            L57:
                r0 = r7
                if (r0 != 0) goto L5c
                return
            L5c:
                datadog.trace.api.Config r0 = datadog.trace.api.Config.get()     // Catch: java.lang.Throwable -> Lb5
                long r0 = r0.getTracePostProcessingTimeout()     // Catch: java.lang.Throwable -> Lb5
                r8 = r0
                long r0 = java.lang.System.currentTimeMillis()     // Catch: java.lang.Throwable -> Lb5
                r1 = r8
                long r0 = r0 + r1
                r10 = r0
                r0 = r10
                void r0 = () -> { // java.util.function.BooleanSupplier.getAsBoolean():boolean
                    return lambda$maybeTracePostProcessing$0(r0);
                }     // Catch: java.lang.Throwable -> Lb5
                r12 = r0
                r0 = r7
                java.util.Iterator r0 = r0.iterator()     // Catch: java.lang.Throwable -> Lb5
                r13 = r0
            L7b:
                r0 = r13
                boolean r0 = r0.hasNext()     // Catch: java.lang.Throwable -> Lb5
                if (r0 == 0) goto Lb2
                r0 = r13
                java.lang.Object r0 = r0.next()     // Catch: java.lang.Throwable -> Lb5
                datadog.trace.agent.core.DDSpan r0 = (datadog.trace.agent.core.DDSpan) r0     // Catch: java.lang.Throwable -> Lb5
                r14 = r0
                r0 = r5
                datadog.trace.agent.core.postprocessor.SpanPostProcessor r0 = r0.spanPostProcessor     // Catch: java.lang.Throwable -> Lb5
                r1 = r14
                r2 = r12
                boolean r0 = r0.process(r1, r2)     // Catch: java.lang.Throwable -> Lb5
                if (r0 != 0) goto Laf
                datadog.slf4j.Logger r0 = datadog.trace.agent.common.writer.TraceProcessingWorker.access$000()     // Catch: java.lang.Throwable -> Lb5
                java.lang.String r1 = "Span post-processing interrupted due to timeout."
                r0.debug(r1)     // Catch: java.lang.Throwable -> Lb5
                goto Lb2
            Laf:
                goto L7b
            Lb2:
                goto Lcd
            Lb5:
                r8 = move-exception
                datadog.slf4j.Logger r0 = datadog.trace.agent.common.writer.TraceProcessingWorker.access$000()
                boolean r0 = r0.isDebugEnabled()
                if (r0 == 0) goto Lcd
                datadog.slf4j.Logger r0 = datadog.trace.agent.common.writer.TraceProcessingWorker.access$000()
                java.lang.String r1 = "Error while trace post-processing"
                r2 = r8
                r0.debug(r1, r2)
            Lcd:
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: datadog.trace.agent.common.writer.TraceProcessingWorker.TraceSerializingHandler.maybeTracePostProcessing(java.util.List):void");
        }
    }

    public TraceProcessingWorker(int i, HealthMetrics healthMetrics, PayloadDispatcher payloadDispatcher, DroppingPolicy droppingPolicy, Prioritization prioritization, long j, TimeUnit timeUnit, SingleSpanSampler singleSpanSampler, SpanPostProcessor spanPostProcessor) {
        this.capacity = i;
        this.primaryQueue = createQueue(i);
        this.secondaryQueue = createQueue(i);
        this.spanSamplingWorker = SpanSamplingWorker.build(i, this.primaryQueue, this.secondaryQueue, singleSpanSampler, healthMetrics, droppingPolicy);
        this.prioritizationStrategy = prioritization.create(this.primaryQueue, this.secondaryQueue, this.spanSamplingWorker.getSpanSamplingQueue(), droppingPolicy);
        this.serializingHandler = new TraceSerializingHandler(this.primaryQueue, this.secondaryQueue, healthMetrics, payloadDispatcher, j, timeUnit, spanPostProcessor);
        this.serializerThread = AgentThreadFactory.newAgentThread(AgentThreadFactory.AgentThread.TRACE_PROCESSOR, this.serializingHandler);
    }

    public void start() {
        this.serializerThread.start();
        this.spanSamplingWorker.start();
    }

    public boolean flush(long j, TimeUnit timeUnit) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        FlushEvent flushEvent = new FlushEvent(countDownLatch);
        while (!this.primaryQueue.offer(flushEvent) && this.serializerThread.isAlive()) {
        }
        try {
            return countDownLatch.await(j, timeUnit);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.spanSamplingWorker.close();
        this.serializerThread.interrupt();
        try {
            this.serializerThread.join(800L);
        } catch (InterruptedException e) {
        }
    }

    public <T extends CoreSpan<T>> PrioritizationStrategy.PublishResult publish(T t, int i, List<T> list) {
        return this.prioritizationStrategy.publish(t, i, list);
    }

    public int getCapacity() {
        return this.capacity;
    }

    public long getRemainingCapacity() {
        return this.primaryQueue.remainingCapacity();
    }

    private static MpscBlockingConsumerArrayQueue<Object> createQueue(int i) {
        return new MpscBlockingConsumerArrayQueue<>(i);
    }
}
