package com.azure.messaging.eventhubs.implementation;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.Messages;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;

/* loaded from: input_file:com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.class */
public class SynchronousEventSubscriber extends BaseSubscriber<PartitionEvent> {
    private final Timer timer = new Timer();
    private final ClientLogger logger = new ClientLogger((Class<?>) SynchronousEventSubscriber.class);
    private final SynchronousReceiveWork work;
    private volatile Subscription subscription;

    /* loaded from: input_file:com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber$ReceiveTimeoutTask.class */
    private static class ReceiveTimeoutTask extends TimerTask {
        private final ClientLogger logger = new ClientLogger((Class<?>) ReceiveTimeoutTask.class);
        private final long workId;
        private final Runnable onDispose;

        ReceiveTimeoutTask(long j, Runnable runnable) {
            this.workId = j;
            this.onDispose = runnable;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            this.logger.info("Work: {}. Timeout encountered, disposing of subscriber.", Long.valueOf(this.workId));
            this.onDispose.run();
        }
    }

    public SynchronousEventSubscriber(SynchronousReceiveWork synchronousReceiveWork) {
        this.work = (SynchronousReceiveWork) Objects.requireNonNull(synchronousReceiveWork, "'work' cannot be null.");
    }

    @Override // reactor.core.publisher.BaseSubscriber
    protected void hookOnSubscribe(Subscription subscription) {
        if (this.subscription == null) {
            this.subscription = subscription;
        }
        this.logger.info("Work: {}, Pending: {}, Scheduling receive timeout task.", Long.valueOf(this.work.getId()), Integer.valueOf(this.work.getNumberOfEvents()));
        subscription.request(this.work.getNumberOfEvents());
        this.timer.schedule(new ReceiveTimeoutTask(this.work.getId(), this::dispose), this.work.getTimeout().toMillis());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.core.publisher.BaseSubscriber
    public void hookOnNext(PartitionEvent partitionEvent) {
        this.work.next(partitionEvent);
        if (this.work.isTerminal()) {
            this.logger.info("Work: {}. Completed. Closing Flux and cancelling subscription.", Long.valueOf(this.work.getId()));
            dispose();
        }
    }

    @Override // reactor.core.publisher.BaseSubscriber
    protected void hookOnComplete() {
        this.logger.info("Completed. No events to listen to.");
        dispose();
    }

    @Override // reactor.core.publisher.BaseSubscriber
    protected void hookOnError(Throwable th) {
        this.logger.error(Messages.ERROR_OCCURRED_IN_SUBSCRIBER_ERROR, th);
        this.work.error(th);
        dispose();
    }

    @Override // reactor.core.publisher.BaseSubscriber, reactor.core.Disposable
    public void dispose() {
        this.work.complete();
        this.subscription.cancel();
        this.timer.cancel();
        super.dispose();
    }
}
