package reactor.spring.messaging;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;

/* loaded from: input_file:reactor/spring/messaging/RingBufferBatchingMessageHandler.class */
public class RingBufferBatchingMessageHandler implements MessageHandler, InitializingBean {
    private static final AtomicLongFieldUpdater<RingBufferBatchingMessageHandler> SEQ_START = AtomicLongFieldUpdater.newUpdater(RingBufferBatchingMessageHandler.class, "sequenceStart");
    private static final AtomicReferenceFieldUpdater<RingBufferBatchingMessageHandler, MessageHeaders> MSG_HDRS = AtomicReferenceFieldUpdater.newUpdater(RingBufferBatchingMessageHandler.class, MessageHeaders.class, "messageHeaders");
    private final List messagePayloads;
    private final MessageHandler delegate;
    private final int batchSize;
    private final RingBuffer ringBuffer;
    private volatile MessageHeaders messageHeaders;
    private volatile long sequenceStart;

    public RingBufferBatchingMessageHandler(MessageHandler messageHandler, int i) {
        this(messageHandler, i, ProducerType.MULTI, new BlockingWaitStrategy());
    }

    public RingBufferBatchingMessageHandler(MessageHandler messageHandler, int i, ProducerType producerType, WaitStrategy waitStrategy) {
        this.messagePayloads = new ArrayList();
        this.sequenceStart = -1L;
        this.delegate = messageHandler;
        this.batchSize = i;
        this.ringBuffer = RingBuffer.create(producerType, new EventFactory() { // from class: reactor.spring.messaging.RingBufferBatchingMessageHandler.1
            public Object newInstance() {
                return new Object();
            }
        }, i, waitStrategy);
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.delegate, "");
    }

    public void handleMessage(Message<?> message) throws MessagingException {
        long next = this.ringBuffer.next();
        if (null == MSG_HDRS.get(this) && SEQ_START.compareAndSet(this, -1L, this.sequenceStart)) {
            MSG_HDRS.compareAndSet(this, null, message.getHeaders());
        }
        this.messagePayloads.add(message.getPayload());
        if (next % this.batchSize == 0) {
            this.delegate.handleMessage(new GenericMessage(new ArrayList(this.messagePayloads), this.messageHeaders));
            this.messagePayloads.clear();
            MSG_HDRS.compareAndSet(this, this.messageHeaders, null);
            this.ringBuffer.publish(this.sequenceStart, next);
            SEQ_START.compareAndSet(this, this.sequenceStart, -1L);
        }
    }
}
