/*
 * Decompiled with CFR 0.152.
 */
package com.github.kristofa.brave.zipkin;

import com.github.kristofa.brave.internal.Util;
import com.github.kristofa.brave.zipkin.Base64;
import com.github.kristofa.brave.zipkin.ZipkinCollectorClientProvider;
import com.twitter.zipkin.gen.LogEntry;
import com.twitter.zipkin.gen.Span;
import com.twitter.zipkin.gen.scribe;
import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TTransport;

class SpanProcessingThread
implements Callable<Integer> {
    private static final Logger LOGGER = Logger.getLogger(SpanProcessingThread.class.getName());
    private static final int MAX_SUBSEQUENT_EMPTY_BATCHES = 2;
    private final BlockingQueue<Span> queue;
    private final ZipkinCollectorClientProvider clientProvider;
    private final TProtocolFactory protocolFactory;
    private volatile boolean stop = false;
    private int processedSpans = 0;
    private final List<LogEntry> logEntries;
    private final int maxBatchSize;

    public SpanProcessingThread(BlockingQueue<Span> queue, ZipkinCollectorClientProvider clientProvider, int maxBatchSize) {
        if (maxBatchSize <= 0) {
            throw new IllegalArgumentException("maxBatchSize must be positive");
        }
        this.queue = (BlockingQueue)Util.checkNotNull(queue, (String)"Null queue", (Object[])new Object[0]);
        this.clientProvider = (ZipkinCollectorClientProvider)Util.checkNotNull((Object)clientProvider, (String)"Null clientProvider", (Object[])new Object[0]);
        this.protocolFactory = new TBinaryProtocol.Factory();
        this.maxBatchSize = maxBatchSize;
        this.logEntries = new ArrayList<LogEntry>(maxBatchSize);
    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public Integer call() {
        int subsequentEmptyBatches = 0;
        do {
            try {
                Span span = this.queue.poll(5L, TimeUnit.SECONDS);
                if (span == null) {
                    ++subsequentEmptyBatches;
                } else {
                    this.logEntries.add(this.create(span));
                }
                if ((subsequentEmptyBatches < 2 || this.logEntries.isEmpty()) && this.logEntries.size() < this.maxBatchSize && (this.logEntries.isEmpty() || !this.stop)) continue;
                this.log(this.logEntries);
                this.logEntries.clear();
                subsequentEmptyBatches = 0;
            }
            catch (Exception e) {
                LOGGER.log(Level.WARNING, "Unexpected exception flushing spans", e);
            }
        } while (!this.stop);
        return this.processedSpans;
    }

    private void log(List<LogEntry> logEntries) {
        long start = System.currentTimeMillis();
        boolean success = this.log(this.clientProvider.getClient(), logEntries);
        this.processedSpans += logEntries.size();
        if (success && LOGGER.isLoggable(Level.FINE)) {
            long end = System.currentTimeMillis();
            LOGGER.fine("Submitting " + logEntries.size() + " spans to service took " + (end - start) + "ms.");
        }
    }

    private boolean log(scribe.Client client, List<LogEntry> logEntries) {
        try {
            this.clientProvider.getClient().Log(logEntries);
            return true;
        }
        catch (TException e) {
            LOGGER.fine(String.format("Exception when trying to log Span.  Will retry: %s", e.getMessage()));
            scribe.Client newClient = this.clientProvider.exception(e);
            if (newClient != null) {
                LOGGER.fine("Got new client with new connection. Logging with new client.");
                try {
                    newClient.Log(logEntries);
                    return true;
                }
                catch (TException e2) {
                    LOGGER.log(Level.WARNING, "Logging spans failed. " + logEntries.size() + " spans are lost!", e2);
                }
            } else {
                LOGGER.warning("Logging spans failed (couldn't establish connection). " + logEntries.size() + " spans are lost!");
            }
            return false;
        }
    }

    private LogEntry create(Span span) throws TException {
        String spanAsString = Base64.encode(this.spanToBytes(span));
        return new LogEntry("zipkin", spanAsString);
    }

    private byte[] spanToBytes(Span thriftSpan) throws TException {
        ByteArrayOutputStream buf = new ByteArrayOutputStream();
        TProtocol proto = this.protocolFactory.getProtocol((TTransport)new TIOStreamTransport((OutputStream)buf));
        thriftSpan.write(proto);
        return buf.toByteArray();
    }
}

