package org.apache.druid.java.util.http.client.response;

import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.util.Enumeration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpResponse;

/* loaded from: input_file:org/apache/druid/java/util/http/client/response/SequenceInputStreamResponseHandler.class */
public class SequenceInputStreamResponseHandler implements HttpResponseHandler<InputStream, InputStream> {
    private static final Logger log = new Logger(SequenceInputStreamResponseHandler.class);
    private final AtomicLong byteCount = new AtomicLong(0);
    private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue();
    private final AtomicBoolean done = new AtomicBoolean(false);

    @Override // org.apache.druid.java.util.http.client.response.HttpResponseHandler
    public ClientResponse<InputStream> handleResponse(HttpResponse httpResponse, HttpResponseHandler.TrafficCop trafficCop) {
        try {
            this.queue.put(new ChannelBufferInputStream(httpResponse.getContent()));
            this.byteCount.addAndGet(httpResponse.getContent().readableBytes());
            return ClientResponse.finished(new SequenceInputStream(new Enumeration<InputStream>() { // from class: org.apache.druid.java.util.http.client.response.SequenceInputStreamResponseHandler.1
                @Override // java.util.Enumeration
                public boolean hasMoreElements() {
                    boolean z;
                    synchronized (SequenceInputStreamResponseHandler.this.done) {
                        z = (SequenceInputStreamResponseHandler.this.done.get() && SequenceInputStreamResponseHandler.this.queue.isEmpty()) ? false : true;
                    }
                    return z;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Enumeration
                public InputStream nextElement() {
                    try {
                        return (InputStream) SequenceInputStreamResponseHandler.this.queue.take();
                    } catch (InterruptedException e) {
                        SequenceInputStreamResponseHandler.log.warn(e, "Thread interrupted while taking from queue", new Object[0]);
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
            }));
        } catch (InterruptedException e) {
            log.error(e, "Queue appending interrupted", new Object[0]);
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.druid.java.util.http.client.response.HttpResponseHandler
    public ClientResponse<InputStream> handleChunk(ClientResponse<InputStream> clientResponse, HttpChunk httpChunk, long j) {
        ChannelBuffer content = httpChunk.getContent();
        int readableBytes = content.readableBytes();
        if (readableBytes > 0) {
            try {
                this.queue.put(new ChannelBufferInputStream(content));
                log.debug("Added stream. Queue length %d", Integer.valueOf(this.queue.size()));
                this.byteCount.addAndGet(readableBytes);
            } catch (InterruptedException e) {
                log.warn(e, "Thread interrupted while adding to queue", new Object[0]);
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        } else {
            log.debug("Skipping zero length chunk", new Object[0]);
        }
        return clientResponse;
    }

    @Override // org.apache.druid.java.util.http.client.response.HttpResponseHandler
    public ClientResponse<InputStream> done(ClientResponse<InputStream> clientResponse) {
        synchronized (this.done) {
            try {
                try {
                    this.queue.put(ByteSource.empty().openStream());
                    log.debug("Added terminal empty stream", new Object[0]);
                    log.debug("Done after adding %d bytes of streams", Long.valueOf(this.byteCount.get()));
                    this.done.set(true);
                } catch (IOException e) {
                    log.wtf(e, "The empty stream threw an IOException", new Object[0]);
                    throw new RuntimeException(e);
                } catch (InterruptedException e2) {
                    log.warn(e2, "Thread interrupted while adding to queue", new Object[0]);
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e2);
                }
            } catch (Throwable th) {
                log.debug("Done after adding %d bytes of streams", Long.valueOf(this.byteCount.get()));
                this.done.set(true);
                throw th;
            }
        }
        return ClientResponse.finished(clientResponse.getObj());
    }

    @Override // org.apache.druid.java.util.http.client.response.HttpResponseHandler
    public void exceptionCaught(ClientResponse<InputStream> clientResponse, final Throwable th) {
        synchronized (this.done) {
            this.done.set(true);
            if (this.queue.offer(new InputStream() { // from class: org.apache.druid.java.util.http.client.response.SequenceInputStreamResponseHandler.2
                @Override // java.io.InputStream
                public int read() throws IOException {
                    throw new IOException(th);
                }
            })) {
                log.debug("Placed IOException in queue", new Object[0]);
            } else {
                log.warn("Unable to place final IOException offer in queue", new Object[0]);
            }
            log.debug(th, "Exception with queue length of %d and %d bytes available", Integer.valueOf(this.queue.size()), Long.valueOf(this.byteCount.get()));
        }
    }

    public final long getByteCount() {
        return this.byteCount.get();
    }
}
