package com.azure.core.amqp.implementation;

import com.azure.core.amqp.implementation.handler.DispatchHandler;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.Pipe;
import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.RejectedExecutionException;
import org.apache.qpid.proton.reactor.Reactor;
import org.apache.qpid.proton.reactor.Selectable;

/* loaded from: input_file:com/azure/core/amqp/implementation/ReactorDispatcher.class */
public final class ReactorDispatcher {
    private final Reactor reactor;
    private final ClientLogger logger = new ClientLogger(ReactorDispatcher.class);
    private final Pipe ioSignal = Pipe.open();
    private final ConcurrentLinkedQueue<Work> workQueue = new ConcurrentLinkedQueue<>();
    private final CloseHandler onClose = new CloseHandler();
    private final WorkScheduler workScheduler = new WorkScheduler();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/ReactorDispatcher$CloseHandler.class */
    public final class CloseHandler implements Selectable.Callback {
        private CloseHandler() {
        }

        public void run(Selectable selectable) {
            try {
                if (ReactorDispatcher.this.ioSignal.sink().isOpen()) {
                    ReactorDispatcher.this.ioSignal.sink().close();
                }
            } catch (IOException e) {
                ReactorDispatcher.this.logger.error("CloseHandler.run() sink().close() failed with an error. %s", new Object[]{e});
            }
            ReactorDispatcher.this.workScheduler.run(null);
            try {
                if (ReactorDispatcher.this.ioSignal.source().isOpen()) {
                    ReactorDispatcher.this.ioSignal.source().close();
                }
            } catch (IOException e2) {
                ReactorDispatcher.this.logger.error("CloseHandler.run() source().close() failed with an error %s", new Object[]{e2});
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/ReactorDispatcher$Work.class */
    public static final class Work {
        private final DispatchHandler dispatchHandler;
        private final Duration delay;

        Work(Runnable runnable) {
            this(runnable, null);
        }

        Work(Runnable runnable, Duration duration) {
            this.dispatchHandler = new DispatchHandler(runnable);
            this.delay = duration;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/ReactorDispatcher$WorkScheduler.class */
    public final class WorkScheduler implements Selectable.Callback {
        private WorkScheduler() {
        }

        public void run(Selectable selectable) {
            try {
                for (ByteBuffer allocate = ByteBuffer.allocate(1024); ReactorDispatcher.this.ioSignal.source().read(allocate) > 0; allocate = ByteBuffer.allocate(1024)) {
                }
            } catch (ClosedChannelException e) {
                ReactorDispatcher.this.logger.info("WorkScheduler.run() failed with an error: %s", new Object[]{e});
            } catch (IOException e2) {
                ReactorDispatcher.this.logger.error("WorkScheduler.run() failed with an error: %s", new Object[]{e2});
                throw ReactorDispatcher.this.logger.logExceptionAsError(new RuntimeException(e2));
            }
            while (true) {
                Work work = (Work) ReactorDispatcher.this.workQueue.poll();
                if (work == null) {
                    return;
                }
                if (work.delay != null) {
                    ReactorDispatcher.this.reactor.schedule((int) work.delay.toMillis(), work.dispatchHandler);
                } else {
                    work.dispatchHandler.onTimerTask(null);
                }
            }
        }
    }

    public ReactorDispatcher(Reactor reactor) throws IOException {
        this.reactor = reactor;
        initializeSelectable();
    }

    private void initializeSelectable() {
        Selectable selectable = this.reactor.selectable();
        selectable.setChannel(this.ioSignal.source());
        selectable.onReadable(this.workScheduler);
        selectable.onFree(this.onClose);
        selectable.setReading(true);
        this.reactor.update(selectable);
    }

    public void invoke(Runnable runnable) throws IOException {
        throwIfSchedulerError();
        this.workQueue.offer(new Work(runnable));
        signalWorkQueue();
    }

    public void invoke(Runnable runnable, Duration duration) throws IOException {
        throwIfSchedulerError();
        this.workQueue.offer(new Work(runnable, duration));
        signalWorkQueue();
    }

    private void throwIfSchedulerError() {
        RejectedExecutionException rejectedExecutionException = (RejectedExecutionException) this.reactor.attachments().get(RejectedExecutionException.class, RejectedExecutionException.class);
        if (rejectedExecutionException != null) {
            throw this.logger.logExceptionAsError(new RejectedExecutionException(rejectedExecutionException.getMessage(), rejectedExecutionException));
        }
        if (!this.ioSignal.sink().isOpen()) {
            throw this.logger.logExceptionAsError(new RejectedExecutionException("ReactorDispatcher instance is closed."));
        }
    }

    private void signalWorkQueue() throws IOException {
        try {
            for (ByteBuffer allocate = ByteBuffer.allocate(1); this.ioSignal.sink().write(allocate) == 0; allocate = ByteBuffer.allocate(1)) {
            }
        } catch (ClosedChannelException e) {
            this.logger.info("signalWorkQueue failed with an error: {}", new Object[]{e});
        }
    }
}
