package org.wso2.siddhi.core.stream;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.lang.reflect.Constructor;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.SiddhiAppContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.exception.SiddhiAppCreationException;
import org.wso2.siddhi.core.stream.input.InputProcessor;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import org.wso2.siddhi.core.util.SiddhiConstants;
import org.wso2.siddhi.core.util.event.handler.EventExchangeHolder;
import org.wso2.siddhi.core.util.event.handler.EventExchangeHolderFactory;
import org.wso2.siddhi.core.util.event.handler.StreamHandler;
import org.wso2.siddhi.core.util.parser.helper.QueryParserHelper;
import org.wso2.siddhi.core.util.statistics.EventBufferHolder;
import org.wso2.siddhi.core.util.statistics.ThroughputTracker;
import org.wso2.siddhi.query.api.annotation.Annotation;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.api.exception.DuplicateAnnotationException;
import org.wso2.siddhi.query.api.util.AnnotationHelper;

/* JADX WARN: Classes with same name are omitted:
  input_file:dependencies/siddhi-core-4.3.7.jar:org/wso2/siddhi/core/stream/StreamJunction.class
 */
/* loaded from: input_file:org/wso2/siddhi/core/stream/StreamJunction.class */
public class StreamJunction implements EventBufferHolder {
    private static final Logger log = Logger.getLogger(StreamJunction.class);
    private final SiddhiAppContext siddhiAppContext;
    private final StreamDefinition streamDefinition;
    private int batchSize;
    private int workers;
    private int bufferSize;
    private List<Receiver> receivers = new CopyOnWriteArrayList();
    private List<Publisher> publishers = Collections.synchronizedList(new LinkedList());
    private ExecutorService executorService;
    private boolean async;
    private Disruptor<EventExchangeHolder> disruptor;
    private RingBuffer<EventExchangeHolder> ringBuffer;
    private ThroughputTracker throughputTracker;
    private boolean isTraceEnabled;

    /* JADX WARN: Classes with same name are omitted:
      input_file:dependencies/siddhi-core-4.3.7.jar:org/wso2/siddhi/core/stream/StreamJunction$Publisher.class
     */
    /* loaded from: input_file:org/wso2/siddhi/core/stream/StreamJunction$Publisher.class */
    public class Publisher implements InputProcessor {
        private StreamJunction streamJunction;

        public Publisher() {
        }

        public void setStreamJunction(StreamJunction streamJunction) {
            this.streamJunction = streamJunction;
        }

        public void send(ComplexEvent complexEvent) {
            this.streamJunction.sendEvent(complexEvent);
        }

        @Override // org.wso2.siddhi.core.stream.input.InputProcessor
        public void send(Event event, int i) {
            this.streamJunction.sendEvent(event);
        }

        @Override // org.wso2.siddhi.core.stream.input.InputProcessor
        public void send(Event[] eventArr, int i) {
            this.streamJunction.sendEvent(eventArr);
        }

        @Override // org.wso2.siddhi.core.stream.input.InputProcessor
        public void send(List<Event> list, int i) {
            this.streamJunction.sendEvent(list);
        }

        @Override // org.wso2.siddhi.core.stream.input.InputProcessor
        public void send(long j, Object[] objArr, int i) {
            this.streamJunction.sendData(j, objArr);
        }

        public String getStreamId() {
            return this.streamJunction.getStreamId();
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:dependencies/siddhi-core-4.3.7.jar:org/wso2/siddhi/core/stream/StreamJunction$Receiver.class
     */
    /* loaded from: input_file:org/wso2/siddhi/core/stream/StreamJunction$Receiver.class */
    public interface Receiver {
        String getStreamId();

        void receive(ComplexEvent complexEvent);

        void receive(Event event);

        void receive(List<Event> list);

        void receive(long j, Object[] objArr);

        void receive(Event[] eventArr);
    }

    public StreamJunction(StreamDefinition streamDefinition, ExecutorService executorService, int i, SiddhiAppContext siddhiAppContext) {
        this.workers = -1;
        this.async = false;
        this.throughputTracker = null;
        this.streamDefinition = streamDefinition;
        this.bufferSize = i;
        this.batchSize = i;
        this.executorService = executorService;
        this.siddhiAppContext = siddhiAppContext;
        if (siddhiAppContext.getStatisticsManager() != null) {
            this.throughputTracker = QueryParserHelper.createThroughputTracker(siddhiAppContext, streamDefinition.getId(), SiddhiConstants.METRIC_INFIX_STREAMS, null);
        }
        try {
            Annotation annotation = AnnotationHelper.getAnnotation(SiddhiConstants.ANNOTATION_ASYNC, streamDefinition.getAnnotations());
            if (annotation != null) {
                this.async = true;
                String element = annotation.getElement(SiddhiConstants.ANNOTATION_ELEMENT_BUFFER_SIZE);
                if (element != null) {
                    this.bufferSize = Integer.parseInt(element);
                }
                String element2 = annotation.getElement(SiddhiConstants.ANNOTATION_ELEMENT_WORKERS);
                if (element2 != null) {
                    this.workers = Integer.parseInt(element2);
                    if (this.workers <= 0) {
                        throw new SiddhiAppCreationException("Annotation element 'workers' cannot be negative or zero, but found, '" + this.workers + "'.", annotation.getQueryContextStartIndex(), annotation.getQueryContextEndIndex(), siddhiAppContext.getName(), siddhiAppContext.getSiddhiAppString());
                    }
                }
                String element3 = annotation.getElement(SiddhiConstants.ANNOTATION_ELEMENT_MAX_BATCH_SIZE);
                if (element3 != null) {
                    this.batchSize = Integer.parseInt(element3);
                    if (this.batchSize <= 0) {
                        throw new SiddhiAppCreationException("Annotation element 'batch.size.max' cannot be negative or zero, but found, '" + this.batchSize + "'.", annotation.getQueryContextStartIndex(), annotation.getQueryContextEndIndex(), siddhiAppContext.getName(), siddhiAppContext.getSiddhiAppString());
                    }
                }
            }
            this.isTraceEnabled = log.isTraceEnabled();
        } catch (DuplicateAnnotationException e) {
            throw new DuplicateAnnotationException(e.getMessageWithOutContext() + " for the same Stream " + streamDefinition.getId(), e, e.getQueryContextStartIndex(), e.getQueryContextEndIndex(), siddhiAppContext.getName(), siddhiAppContext.getSiddhiAppString());
        }
    }

    public void sendEvent(ComplexEvent complexEvent) {
        if (this.isTraceEnabled) {
            log.trace("Event is received by streamJunction " + this);
        }
        ComplexEvent complexEvent2 = complexEvent;
        if (this.disruptor == null) {
            if (this.throughputTracker != null && this.siddhiAppContext.isStatsEnabled()) {
                int i = 0;
                while (complexEvent2 != null) {
                    i++;
                    complexEvent2 = complexEvent2.getNext();
                }
                this.throughputTracker.eventsIn(i);
            }
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive(complexEvent);
            }
            return;
        }
        while (complexEvent2 != null) {
            if (this.throughputTracker != null && this.siddhiAppContext.isStatsEnabled()) {
                this.throughputTracker.eventIn();
            }
            long next = this.ringBuffer.next();
            try {
                EventExchangeHolder eventExchangeHolder = this.ringBuffer.get(next);
                eventExchangeHolder.getEvent().copyFrom(complexEvent2);
                eventExchangeHolder.getAndSetIsProcessed(false);
                this.ringBuffer.publish(next);
                complexEvent2 = complexEvent2.getNext();
            } catch (Throwable th) {
                this.ringBuffer.publish(next);
                throw th;
            }
        }
    }

    public void sendEvent(Event event) {
        if (this.throughputTracker != null && this.siddhiAppContext.isStatsEnabled()) {
            this.throughputTracker.eventIn();
        }
        if (this.isTraceEnabled) {
            log.trace(event + " event is received by streamJunction " + this);
        }
        if (this.disruptor == null) {
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive(event);
            }
            return;
        }
        long next = this.ringBuffer.next();
        try {
            EventExchangeHolder eventExchangeHolder = this.ringBuffer.get(next);
            eventExchangeHolder.getEvent().copyFrom(event);
            eventExchangeHolder.getAndSetIsProcessed(false);
            this.ringBuffer.publish(next);
        } catch (Throwable th) {
            this.ringBuffer.publish(next);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendEvent(Event[] eventArr) {
        if (this.throughputTracker != null && this.siddhiAppContext.isStatsEnabled()) {
            this.throughputTracker.eventsIn(eventArr.length);
        }
        if (this.isTraceEnabled) {
            log.trace("Event is received by streamJunction " + this);
        }
        if (this.disruptor == null) {
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive(eventArr);
            }
            return;
        }
        for (Event event : eventArr) {
            long next = this.ringBuffer.next();
            try {
                EventExchangeHolder eventExchangeHolder = this.ringBuffer.get(next);
                eventExchangeHolder.getEvent().copyFrom(event);
                eventExchangeHolder.getAndSetIsProcessed(false);
                this.ringBuffer.publish(next);
            } catch (Throwable th) {
                this.ringBuffer.publish(next);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendEvent(List<Event> list) {
        if (this.isTraceEnabled) {
            log.trace("Event is received by streamJunction " + this);
        }
        if (this.disruptor == null) {
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive((Event[]) list.toArray(new Event[list.size()]));
            }
            return;
        }
        for (Event event : list) {
            long next = this.ringBuffer.next();
            try {
                EventExchangeHolder eventExchangeHolder = this.ringBuffer.get(next);
                eventExchangeHolder.getEvent().copyFrom(event);
                eventExchangeHolder.getAndSetIsProcessed(false);
                this.ringBuffer.publish(next);
            } catch (Throwable th) {
                this.ringBuffer.publish(next);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendData(long j, Object[] objArr) {
        if (this.throughputTracker != null && this.siddhiAppContext.isStatsEnabled()) {
            this.throughputTracker.eventIn();
        }
        if (this.disruptor == null) {
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive(j, objArr);
            }
            return;
        }
        long next = this.ringBuffer.next();
        try {
            EventExchangeHolder eventExchangeHolder = this.ringBuffer.get(next);
            eventExchangeHolder.getAndSetIsProcessed(false);
            eventExchangeHolder.getEvent().setTimestamp(j);
            eventExchangeHolder.getEvent().setIsExpired(false);
            System.arraycopy(objArr, 0, eventExchangeHolder.getEvent().getData(), 0, objArr.length);
            this.ringBuffer.publish(next);
        } catch (Throwable th) {
            this.ringBuffer.publish(next);
            throw th;
        }
    }

    public synchronized void startProcessing() {
        if (this.receivers.isEmpty() || !this.async) {
            for (Receiver receiver : this.receivers) {
                if (receiver instanceof StreamCallback) {
                    ((StreamCallback) receiver).startProcessing();
                }
            }
            return;
        }
        Constructor<?>[] constructors = Disruptor.class.getConstructors();
        int length = constructors.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            if (constructors[i].getParameterTypes().length == 5) {
                this.disruptor = new Disruptor<>(new EventExchangeHolderFactory(this.streamDefinition.getAttributeList().size()), this.bufferSize, this.executorService, ProducerType.MULTI, new BlockingWaitStrategy());
                this.disruptor.handleExceptionsWith(this.siddhiAppContext.getDisruptorExceptionHandler());
                break;
            }
            i++;
        }
        if (this.disruptor == null) {
            this.disruptor = new Disruptor<>(new EventExchangeHolderFactory(this.streamDefinition.getAttributeList().size()), this.bufferSize, this.executorService);
            this.disruptor.handleExceptionsWith(this.siddhiAppContext.getDisruptorExceptionHandler());
        }
        if (this.workers > 0) {
            for (int i2 = 0; i2 < this.workers; i2++) {
                this.disruptor.handleEventsWith(new StreamHandler(this.receivers, this.batchSize, this.streamDefinition.getId(), this.siddhiAppContext.getName()));
            }
        } else {
            this.disruptor.handleEventsWith(new StreamHandler(this.receivers, this.batchSize, this.streamDefinition.getId(), this.siddhiAppContext.getName()));
        }
        this.ringBuffer = this.disruptor.start();
    }

    public synchronized void stopProcessing() {
        if (this.disruptor != null) {
            this.disruptor.shutdown();
            return;
        }
        for (Receiver receiver : this.receivers) {
            if (receiver instanceof StreamCallback) {
                ((StreamCallback) receiver).stopProcessing();
            }
        }
    }

    public synchronized Publisher constructPublisher() {
        Publisher publisher = new Publisher();
        publisher.setStreamJunction(this);
        this.publishers.add(publisher);
        return publisher;
    }

    public synchronized void subscribe(Receiver receiver) {
        if (this.receivers.contains(receiver)) {
            return;
        }
        this.receivers.add(receiver);
    }

    public String getStreamId() {
        return this.streamDefinition.getId();
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }

    @Override // org.wso2.siddhi.core.util.statistics.EventBufferHolder
    public long getBufferedEvents() {
        if (this.disruptor != null) {
            return this.disruptor.getBufferSize() - this.disruptor.getRingBuffer().remainingCapacity();
        }
        return 0L;
    }

    @Override // org.wso2.siddhi.core.util.statistics.EventBufferHolder
    public boolean containsBufferedEvents() {
        return !this.receivers.isEmpty() && this.async;
    }
}
