package reactor.core.processor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.LifecycleAware;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nonnull;
import reactor.event.registry.Registration;
import reactor.event.registry.Registry;
import reactor.function.Consumer;
import reactor.function.Supplier;
import reactor.function.batch.BatchConsumer;
import reactor.support.NamedDaemonThreadFactory;
import reactor.util.Assert;

/* loaded from: input_file:reactor/core/processor/Processor.class */
public class Processor<T> implements Supplier<Operation<T>> {
    private final int opsBufferSize;
    private final ExecutorService executor;
    private final Disruptor<Operation<T>> disruptor;
    private final RingBuffer<Operation<T>> ringBuffer;

    /* loaded from: input_file:reactor/core/processor/Processor$ConsumerEventHandler.class */
    private static class ConsumerEventHandler<T> implements EventHandler<Operation<T>>, LifecycleAware {
        final Consumer<T> consumer;
        final boolean isBatchConsumer;

        private ConsumerEventHandler(Consumer<T> consumer) {
            this.consumer = consumer;
            this.isBatchConsumer = consumer instanceof BatchConsumer;
        }

        public void onStart() {
            if (this.isBatchConsumer) {
                ((BatchConsumer) this.consumer).start();
            }
        }

        public void onShutdown() {
            if (this.isBatchConsumer) {
                ((BatchConsumer) this.consumer).end();
            }
        }

        public void onEvent(Operation<T> operation, long j, boolean z) throws Exception {
            this.consumer.accept(operation.get());
        }
    }

    /* loaded from: input_file:reactor/core/processor/Processor$ConsumerExceptionHandler.class */
    private static class ConsumerExceptionHandler implements ExceptionHandler {
        final Registry<Consumer<Throwable>> errorConsumers;

        private ConsumerExceptionHandler(Registry<Consumer<Throwable>> registry) {
            this.errorConsumers = registry;
        }

        public void handleEventException(Throwable th, long j, Object obj) {
            Iterator<Registration<? extends Consumer<Throwable>>> it = this.errorConsumers.select(th.getClass()).iterator();
            while (it.hasNext()) {
                it.next().getObject().accept(th);
            }
        }

        public void handleOnStartException(Throwable th) {
            handleEventException(th, -1L, null);
        }

        public void handleOnShutdownException(Throwable th) {
            handleEventException(th, -1L, null);
        }
    }

    public Processor(@Nonnull final Supplier<T> supplier, @Nonnull Consumer<T> consumer, @Nonnull Registry<Consumer<Throwable>> registry, boolean z, int i) {
        Assert.notNull(supplier, "Data Supplier cannot be null.");
        Assert.notNull(consumer, "Consumer cannot be null.");
        Assert.notNull(registry, "Error Consumers Registry cannot be null.");
        this.executor = Executors.newCachedThreadPool(new NamedDaemonThreadFactory("processor"));
        if (i < 1) {
            this.opsBufferSize = 256 * Runtime.getRuntime().availableProcessors();
        } else {
            this.opsBufferSize = i;
        }
        this.disruptor = new Disruptor<>(new EventFactory<Operation<T>>() { // from class: reactor.core.processor.Processor.1
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public Operation<T> m10newInstance() {
                return new Operation<T>(supplier.get()) { // from class: reactor.core.processor.Processor.1.1
                    @Override // reactor.core.processor.Operation
                    public void commit() {
                        Processor.this.ringBuffer.publish(this.id.longValue());
                    }
                };
            }
        }, this.opsBufferSize, this.executor, z ? ProducerType.MULTI : ProducerType.SINGLE, new BlockingWaitStrategy());
        this.disruptor.handleExceptionsWith(new ConsumerExceptionHandler(registry));
        this.disruptor.handleEventsWith(new EventHandler[]{new ConsumerEventHandler(consumer)});
        this.ringBuffer = this.disruptor.start();
    }

    public void shutdown() {
        this.executor.shutdown();
        this.disruptor.shutdown();
    }

    public Operation<T> prepare() {
        long next = this.ringBuffer.next();
        Operation<T> operation = (Operation) this.ringBuffer.get(next);
        operation.setId(Long.valueOf(next));
        return operation;
    }

    public Processor<T> commit(List<Operation<T>> list) {
        if (null == list || list.isEmpty()) {
            return this;
        }
        Operation<T> operation = list.get(0);
        Operation<T> operation2 = list.get(list.size() - 1);
        long longValue = operation.id.longValue();
        long longValue2 = operation2.id.longValue();
        if (longValue2 > longValue) {
            this.ringBuffer.publish(longValue, longValue2);
        } else {
            this.ringBuffer.publish(longValue);
        }
        return this;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Processor<T> batch(int i, Consumer<T> consumer) {
        long j = -1;
        long j2 = 0;
        for (int i2 = 0; i2 < i; i2++) {
            if (i2 > 0 && i2 % this.opsBufferSize == 0) {
                this.ringBuffer.publish(j, j2);
                j = -1;
            }
            long next = this.ringBuffer.next();
            if (j < 0) {
                j = next;
            }
            j2 = next;
            consumer.accept(((Operation) this.ringBuffer.get(j2)).get());
        }
        this.ringBuffer.publish(j, j2);
        return this;
    }

    @Override // reactor.function.Supplier
    public Operation<T> get() {
        return prepare();
    }
}
