package com.github.kristofa.brave.zipkin;

import com.github.kristofa.brave.internal.Util;
import com.twitter.zipkin.gen.LogEntry;
import com.twitter.zipkin.gen.Span;
import com.twitter.zipkin.gen.scribe;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TIOStreamTransport;

/* loaded from: input_file:com/github/kristofa/brave/zipkin/SpanProcessingThread.class */
class SpanProcessingThread implements Callable<Integer> {
    private static final Logger LOGGER = Logger.getLogger(SpanProcessingThread.class.getName());
    private static final int MAX_SUBSEQUENT_EMPTY_BATCHES = 2;
    private final BlockingQueue<Span> queue;
    private final ZipkinCollectorClientProvider clientProvider;
    private final TProtocolFactory protocolFactory;
    private volatile boolean stop = false;
    private int processedSpans = 0;
    private final List<LogEntry> logEntries;
    private final int maxBatchSize;

    public SpanProcessingThread(BlockingQueue<Span> blockingQueue, ZipkinCollectorClientProvider zipkinCollectorClientProvider, int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("maxBatchSize must be positive");
        }
        this.queue = (BlockingQueue) Util.checkNotNull(blockingQueue, "Null queue", new Object[0]);
        this.clientProvider = (ZipkinCollectorClientProvider) Util.checkNotNull(zipkinCollectorClientProvider, "Null clientProvider", new Object[0]);
        this.protocolFactory = new TBinaryProtocol.Factory();
        this.maxBatchSize = i;
        this.logEntries = new ArrayList(i);
    }

    public void stop() {
        this.stop = true;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public Integer call() {
        int i = 0;
        do {
            try {
                Span poll = this.queue.poll(5L, TimeUnit.SECONDS);
                if (poll == null) {
                    i++;
                } else {
                    this.logEntries.add(create(poll));
                }
                if ((i >= MAX_SUBSEQUENT_EMPTY_BATCHES && !this.logEntries.isEmpty()) || this.logEntries.size() >= this.maxBatchSize || (!this.logEntries.isEmpty() && this.stop)) {
                    log(this.logEntries);
                    this.logEntries.clear();
                    i = 0;
                }
            } catch (Exception e) {
                LOGGER.log(Level.WARNING, "Unexpected exception flushing spans", (Throwable) e);
            }
        } while (!this.stop);
        return Integer.valueOf(this.processedSpans);
    }

    private void log(List<LogEntry> list) {
        long currentTimeMillis = System.currentTimeMillis();
        boolean log = log(this.clientProvider.getClient(), list);
        this.processedSpans += list.size();
        if (log && LOGGER.isLoggable(Level.FINE)) {
            LOGGER.fine("Submitting " + list.size() + " spans to service took " + (System.currentTimeMillis() - currentTimeMillis) + "ms.");
        }
    }

    private boolean log(scribe.Client client, List<LogEntry> list) {
        try {
            this.clientProvider.getClient().Log(list);
            return true;
        } catch (TException e) {
            LOGGER.fine(String.format("Exception when trying to log Span.  Will retry: %s", e.getMessage()));
            scribe.Client exception = this.clientProvider.exception(e);
            if (exception == null) {
                LOGGER.warning("Logging spans failed (couldn't establish connection). " + list.size() + " spans are lost!");
                return false;
            }
            LOGGER.fine("Got new client with new connection. Logging with new client.");
            try {
                exception.Log(list);
                return true;
            } catch (TException e2) {
                LOGGER.log(Level.WARNING, "Logging spans failed. " + list.size() + " spans are lost!", e2);
                return false;
            }
        }
    }

    private LogEntry create(Span span) throws TException {
        return new LogEntry("zipkin", Base64.encode(spanToBytes(span)));
    }

    private byte[] spanToBytes(Span span) throws TException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        span.write(this.protocolFactory.getProtocol(new TIOStreamTransport(byteArrayOutputStream)));
        return byteArrayOutputStream.toByteArray();
    }
}
