package com.urbanairship.connect.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.AbstractIterator;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.urbanairship.connect.client.StreamConsumeTask;
import com.urbanairship.connect.client.model.StreamQueryDescriptor;
import com.urbanairship.connect.client.model.request.StartPosition;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/urbanairship/connect/client/Stream.class */
public final class Stream extends AbstractIterator<String> implements ConnectStreamApi {
    private static final Logger log = LoggerFactory.getLogger(Stream.class);
    private final AtomicReference<SourceExit> sourceExit;
    private final ExecutorService threads;
    private final BlockingQueue<String> eventQueue;
    private final StreamConsumeTask consumeTask;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/urbanairship/connect/client/Stream$SourceExit.class */
    public static final class SourceExit {
        private final Optional<Throwable> error;

        public SourceExit(Optional<Throwable> optional) {
            this.error = optional;
        }
    }

    /* loaded from: input_file:com/urbanairship/connect/client/Stream$SourceWatcher.class */
    private final class SourceWatcher implements Runnable {
        private final Future<?> handle;

        public SourceWatcher(Future<?> future) {
            this.handle = future;
        }

        @Override // java.lang.Runnable
        public void run() {
            Throwable th = null;
            try {
                try {
                    this.handle.get();
                    Stream.this.sourceExit.set(new SourceExit(Optional.fromNullable((Object) null)));
                } catch (InterruptedException e) {
                    Stream.log.info("Source watcher interrupted!", e);
                    Thread.currentThread().interrupt();
                    Stream.this.sourceExit.set(new SourceExit(Optional.fromNullable((Object) null)));
                } catch (ExecutionException e2) {
                    th = e2.getCause();
                    Stream.this.sourceExit.set(new SourceExit(Optional.fromNullable(th)));
                }
            } catch (Throwable th2) {
                Stream.this.sourceExit.set(new SourceExit(Optional.fromNullable(th)));
                throw th2;
            }
        }
    }

    public Stream(StreamQueryDescriptor streamQueryDescriptor, Optional<StartPosition> optional) {
        this(streamQueryDescriptor, optional, Optional.absent());
        log.debug("Stream Filters: " + streamQueryDescriptor.getFilters() + " Subset: " + streamQueryDescriptor.getSubset() + " Starting Position " + optional);
    }

    @VisibleForTesting
    public Stream(StreamQueryDescriptor streamQueryDescriptor, Optional<StartPosition> optional, Optional<StreamConnectionSupplier> optional2) {
        this.sourceExit = new AtomicReference<>(null);
        this.eventQueue = new LinkedBlockingQueue(100);
        this.threads = Executors.newFixedThreadPool(2, new ThreadFactoryBuilder().setDaemon(false).setNameFormat("Stream iteration thread %d").build());
        StreamConsumeTask.Builder streamQueryDescriptor2 = StreamConsumeTask.newBuilder().setTargetQueue(this.eventQueue).setStreamQueryDescriptor(streamQueryDescriptor);
        if (optional.isPresent()) {
            streamQueryDescriptor2.setStartingPosition((StartPosition) optional.get());
        }
        if (optional2.isPresent()) {
            streamQueryDescriptor2.setStreamConnectionSupplier((StreamConnectionSupplier) optional2.get());
        }
        this.consumeTask = streamQueryDescriptor2.build();
        this.threads.submit(new SourceWatcher(this.threads.submit(this.consumeTask)));
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        try {
            this.consumeTask.stop();
        } finally {
            this.threads.shutdown();
        }
    }

    /* renamed from: computeNext, reason: merged with bridge method [inline-methods] */
    public String m2computeNext() {
        String str = null;
        while (true) {
            if (str != null) {
                break;
            }
            SourceExit sourceExit = this.sourceExit.get();
            if (sourceExit == null) {
                try {
                    str = this.eventQueue.poll(1L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            } else if (sourceExit.error.isPresent()) {
                throw Throwables.propagate((Throwable) sourceExit.error.get());
            }
        }
        return str == null ? (String) endOfData() : str;
    }
}
