package com.urbanairship.connect.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.ning.http.client.AsyncHttpClient;
import com.urbanairship.connect.client.consume.BackoffConnectionRetryStrategy;
import com.urbanairship.connect.client.consume.ConnectionRetryStrategy;
import com.urbanairship.connect.client.model.GsonUtil;
import com.urbanairship.connect.client.model.StreamQueryDescriptor;
import com.urbanairship.connect.client.model.request.StartPosition;
import com.urbanairship.connect.java8.Consumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/urbanairship/connect/client/StreamConsumeTask.class */
public final class StreamConsumeTask implements Runnable {
    private final AsyncHttpClient http;
    private final StreamQueryDescriptor streamQueryDescriptor;
    private final Optional<StartPosition> initialPosition;
    private final StreamConnectionSupplier supplier;
    private final boolean manageHttpLifecycle;
    private final EnqueuingConsumer consumer;
    private final AtomicBoolean active;
    private final CountDownLatch done;
    private final Object streamLock;
    private volatile StreamConnection streamConnection;
    private static final Logger log = LoggerFactory.getLogger(StreamConsumeTask.class);
    private static final ConnectionRetryStrategy CONNECTION_RETRY_STRATEGY = BackoffConnectionRetryStrategy.newBuilder().setMaxAttempts(10).setInterval(500).setMaxWaitSeconds(30).build();

    /* loaded from: input_file:com/urbanairship/connect/client/StreamConsumeTask$Builder.class */
    public static final class Builder {
        private StreamConnectionSupplier supplier;
        private StreamQueryDescriptor streamQueryDescriptor;
        private Optional<StartPosition> initialPosition;
        private BlockingQueue<String> targetQueue;
        private AsyncHttpClient http;

        private Builder() {
            this.supplier = new MobileEventStreamConnectionSupplier();
            this.streamQueryDescriptor = null;
            this.initialPosition = Optional.absent();
            this.targetQueue = null;
            this.http = null;
        }

        public Builder setTargetQueue(BlockingQueue<String> blockingQueue) {
            this.targetQueue = blockingQueue;
            return this;
        }

        public Builder setStreamQueryDescriptor(StreamQueryDescriptor streamQueryDescriptor) {
            this.streamQueryDescriptor = streamQueryDescriptor;
            return this;
        }

        public Builder setStartingPosition(StartPosition startPosition) {
            this.initialPosition = Optional.of(startPosition);
            return this;
        }

        public Builder setHttpClient(AsyncHttpClient asyncHttpClient) {
            this.http = asyncHttpClient;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        public Builder setStreamConnectionSupplier(StreamConnectionSupplier streamConnectionSupplier) {
            this.supplier = streamConnectionSupplier;
            return this;
        }

        public StreamConsumeTask build() {
            Preconditions.checkNotNull(this.streamQueryDescriptor, "Stream query descriptor must be provided");
            Preconditions.checkNotNull(this.targetQueue, "Target queue must be provided");
            boolean z = false;
            if (this.http == null) {
                this.http = HttpClientUtil.defaultHttpClient();
                z = true;
            }
            return new StreamConsumeTask(this.http, this.streamQueryDescriptor, this.targetQueue, this.initialPosition, this.supplier, z);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/urbanairship/connect/client/StreamConsumeTask$EnqueuingConsumer.class */
    public final class EnqueuingConsumer implements Consumer<String>, Supplier<Optional<String>> {
        private final AtomicReference<String> lastOffset = new AtomicReference<>(null);
        private final Gson gson;
        private final BlockingQueue<String> targetQueue;

        public EnqueuingConsumer(Gson gson, BlockingQueue<String> blockingQueue) {
            this.gson = gson;
            this.targetQueue = blockingQueue;
        }

        /* JADX WARN: Code restructure failed: missing block: B:14:0x0040, code lost:
        
            r6.lastOffset.set(r0);
         */
        @Override // com.urbanairship.connect.java8.Consumer
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void accept(java.lang.String r7) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                java.lang.String r0 = r0.getOffset(r1)
                r8 = r0
                r0 = r6
                java.util.concurrent.atomic.AtomicReference<java.lang.String> r0 = r0.lastOffset
                java.lang.Object r0 = r0.get()
                if (r0 == 0) goto L22
                r0 = r6
                java.util.concurrent.atomic.AtomicReference<java.lang.String> r0 = r0.lastOffset
                java.lang.Object r0 = r0.get()
                java.lang.String r0 = (java.lang.String) r0
                r1 = r8
                boolean r0 = r0.equals(r1)
                if (r0 == 0) goto L22
                return
            L22:
                r0 = r6
                com.urbanairship.connect.client.StreamConsumeTask r0 = com.urbanairship.connect.client.StreamConsumeTask.this     // Catch: java.lang.InterruptedException -> L4e
                java.util.concurrent.atomic.AtomicBoolean r0 = com.urbanairship.connect.client.StreamConsumeTask.access$500(r0)     // Catch: java.lang.InterruptedException -> L4e
                boolean r0 = r0.get()     // Catch: java.lang.InterruptedException -> L4e
                if (r0 == 0) goto L4b
                r0 = r6
                java.util.concurrent.BlockingQueue<java.lang.String> r0 = r0.targetQueue     // Catch: java.lang.InterruptedException -> L4e
                r1 = r7
                r2 = 1
                java.util.concurrent.TimeUnit r3 = java.util.concurrent.TimeUnit.SECONDS     // Catch: java.lang.InterruptedException -> L4e
                boolean r0 = r0.offer(r1, r2, r3)     // Catch: java.lang.InterruptedException -> L4e
                if (r0 == 0) goto L22
                r0 = r6
                java.util.concurrent.atomic.AtomicReference<java.lang.String> r0 = r0.lastOffset     // Catch: java.lang.InterruptedException -> L4e
                r1 = r8
                r0.set(r1)     // Catch: java.lang.InterruptedException -> L4e
                goto L4b
            L4b:
                goto L55
            L4e:
                r9 = move-exception
                java.lang.Thread r0 = java.lang.Thread.currentThread()
                r0.interrupt()
            L55:
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: com.urbanairship.connect.client.StreamConsumeTask.EnqueuingConsumer.accept(java.lang.String):void");
        }

        private String getOffset(String str) {
            return ((JsonObject) this.gson.fromJson(str, JsonObject.class)).get("offset").getAsString();
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public Optional<String> m5get() {
            return Optional.fromNullable(this.lastOffset.get());
        }
    }

    /* loaded from: input_file:com/urbanairship/connect/client/StreamConsumeTask$MobileEventStreamConnectionSupplier.class */
    private static class MobileEventStreamConnectionSupplier implements StreamConnectionSupplier {
        private MobileEventStreamConnectionSupplier() {
        }

        @Override // com.urbanairship.connect.client.StreamConnectionSupplier
        public StreamConnection get(StreamQueryDescriptor streamQueryDescriptor, AsyncHttpClient asyncHttpClient, Consumer<String> consumer) {
            return new StreamConnection(streamQueryDescriptor, asyncHttpClient, StreamConsumeTask.CONNECTION_RETRY_STRATEGY, consumer, Constants.API_URL);
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private StreamConsumeTask(AsyncHttpClient asyncHttpClient, StreamQueryDescriptor streamQueryDescriptor, BlockingQueue<String> blockingQueue, Optional<StartPosition> optional, StreamConnectionSupplier streamConnectionSupplier, boolean z) {
        this.active = new AtomicBoolean(true);
        this.done = new CountDownLatch(1);
        this.streamLock = new Object();
        this.http = asyncHttpClient;
        this.streamQueryDescriptor = streamQueryDescriptor;
        this.initialPosition = optional;
        this.supplier = streamConnectionSupplier;
        this.manageHttpLifecycle = z;
        this.consumer = new EnqueuingConsumer(GsonUtil.getGson(), blockingQueue);
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            log.debug("Starting run");
            stream();
        } finally {
            if (this.manageHttpLifecycle) {
                this.http.close();
            }
            log.debug("Stopping run");
            this.done.countDown();
        }
    }

    private void stream() throws ConnectionException {
        while (this.active.get()) {
            Optional<StartPosition> position = getPosition();
            log.debug("Opening new stream connection at position " + position);
            try {
                StreamConnection streamConnection = this.supplier.get(this.streamQueryDescriptor, this.http, this.consumer);
                Throwable th = null;
                try {
                    try {
                        transitionToReading(position, streamConnection);
                        if (streamConnection != null) {
                            if (0 != 0) {
                                try {
                                    streamConnection.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                streamConnection.close();
                            }
                        }
                    } catch (Throwable th3) {
                        if (streamConnection != null) {
                            if (th != null) {
                                try {
                                    streamConnection.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                streamConnection.close();
                            }
                        }
                        throw th3;
                        break;
                    }
                } catch (Throwable th5) {
                    th = th5;
                    throw th5;
                    break;
                }
            } catch (ConnectionException e) {
                throw e;
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return;
            } catch (Throwable th6) {
                log.warn("caught exception consuming from connect stream, will resume reading from last successfully consumed event", th6);
            }
        }
    }

    private Optional<StartPosition> getPosition() {
        Optional<String> m5get = this.consumer.m5get();
        log.debug("Consumer last offset: " + this.consumer.lastOffset + ", InitialPosition: " + this.initialPosition);
        return m5get.isPresent() ? Optional.of(StartPosition.offset((String) m5get.get())) : this.initialPosition;
    }

    private void transitionToReading(Optional<StartPosition> optional, StreamConnection streamConnection) throws InterruptedException, ConnectionException {
        synchronized (this.streamLock) {
            if (this.active.get()) {
                this.streamConnection = streamConnection;
                this.streamConnection.read(optional);
            }
        }
    }

    public void stop() {
        if (!this.active.compareAndSet(true, false)) {
            log.debug("Ignoring call to stop as initial call has already occurred");
            return;
        }
        log.info("Shutting down stream handler for app " + this.streamQueryDescriptor.getCreds().getAppKey());
        synchronized (this.streamLock) {
            if (this.streamConnection != null) {
                try {
                    this.streamConnection.close();
                } catch (Exception e) {
                    throw new RuntimeException("Failed to shutdown stream and stop gracefully", e);
                }
            }
        }
    }
}
