package org.wso2.siddhi.core.stream;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.lang.reflect.Constructor;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.event.EventFactory;
import org.wso2.siddhi.core.stream.input.InputProcessor;
import org.wso2.siddhi.core.stream.output.StreamCallback;
import org.wso2.siddhi.core.util.SiddhiConstants;
import org.wso2.siddhi.core.util.statistics.ThroughputTracker;
import org.wso2.siddhi.query.api.definition.StreamDefinition;
import org.wso2.siddhi.query.api.exception.DuplicateAnnotationException;
import org.wso2.siddhi.query.api.util.AnnotationHelper;

/* loaded from: input_file:org/wso2/siddhi/core/stream/StreamJunction.class */
public class StreamJunction {
    private static final Logger log = Logger.getLogger(StreamJunction.class);
    private final ExecutionPlanContext executionPlanContext;
    private final StreamDefinition streamDefinition;
    private final int bufferSize;
    private List<Receiver> receivers = new CopyOnWriteArrayList();
    private List<Publisher> publishers = new CopyOnWriteArrayList();
    private ExecutorService executorService;
    private Boolean parallel;
    private Disruptor<Event> disruptor;
    private RingBuffer<Event> ringBuffer;
    private ThroughputTracker throughputTracker;
    private boolean isTraceEnabled;

    /* loaded from: input_file:org/wso2/siddhi/core/stream/StreamJunction$Publisher.class */
    public class Publisher implements InputProcessor {
        private StreamJunction streamJunction;

        public Publisher() {
        }

        public void setStreamJunction(StreamJunction streamJunction) {
            this.streamJunction = streamJunction;
        }

        public void send(ComplexEvent complexEvent) {
            this.streamJunction.sendEvent(complexEvent);
        }

        @Override // org.wso2.siddhi.core.stream.input.InputProcessor
        public void send(Event event, int i) {
            this.streamJunction.sendEvent(event);
        }

        @Override // org.wso2.siddhi.core.stream.input.InputProcessor
        public void send(Event[] eventArr, int i) {
            this.streamJunction.sendEvent(eventArr);
        }

        @Override // org.wso2.siddhi.core.stream.input.InputProcessor
        public void send(List<Event> list, int i) {
            this.streamJunction.sendEvent(list);
        }

        @Override // org.wso2.siddhi.core.stream.input.InputProcessor
        public void send(long j, Object[] objArr, int i) {
            this.streamJunction.sendData(j, objArr);
        }

        public String getStreamId() {
            return this.streamJunction.getStreamId();
        }
    }

    /* loaded from: input_file:org/wso2/siddhi/core/stream/StreamJunction$Receiver.class */
    public interface Receiver {
        String getStreamId();

        void receive(ComplexEvent complexEvent);

        void receive(Event event);

        void receive(Event event, boolean z);

        void receive(long j, Object[] objArr);

        void receive(Event[] eventArr);
    }

    /* loaded from: input_file:org/wso2/siddhi/core/stream/StreamJunction$StreamHandler.class */
    public class StreamHandler implements EventHandler<Event> {
        private Receiver receiver;

        public StreamHandler(Receiver receiver) {
            this.receiver = receiver;
        }

        public void onEvent(Event event, long j, boolean z) {
            this.receiver.receive(event, z);
        }
    }

    public StreamJunction(StreamDefinition streamDefinition, ExecutorService executorService, int i, ExecutionPlanContext executionPlanContext) {
        this.parallel = null;
        this.throughputTracker = null;
        this.streamDefinition = streamDefinition;
        this.bufferSize = i;
        this.executorService = executorService;
        this.executionPlanContext = executionPlanContext;
        if (executionPlanContext.isStatsEnabled() && executionPlanContext.getStatisticsManager() != null) {
            this.throughputTracker = executionPlanContext.getSiddhiContext().getStatisticsConfiguration().getFactory().createThroughputTracker(executionPlanContext.getSiddhiContext().getStatisticsConfiguration().getMatricPrefix() + SiddhiConstants.METRIC_DELIMITER + SiddhiConstants.METRIC_INFIX_EXECUTION_PLANS + SiddhiConstants.METRIC_DELIMITER + executionPlanContext.getName() + SiddhiConstants.METRIC_DELIMITER + SiddhiConstants.METRIC_INFIX_SIDDHI + SiddhiConstants.METRIC_DELIMITER + SiddhiConstants.METRIC_INFIX_STREAMS + SiddhiConstants.METRIC_DELIMITER + streamDefinition.getId(), executionPlanContext.getStatisticsManager());
        }
        try {
            if (AnnotationHelper.getAnnotation(SiddhiConstants.ANNOTATION_PARALLEL, streamDefinition.getAnnotations()) != null) {
                this.parallel = true;
            }
            this.isTraceEnabled = log.isTraceEnabled();
        } catch (DuplicateAnnotationException e) {
            throw new DuplicateAnnotationException(e.getMessage() + " for the same Stream " + streamDefinition.getId());
        }
    }

    public void sendEvent(ComplexEvent complexEvent) {
        if (this.isTraceEnabled) {
            log.trace("Event is received by streamJunction " + this);
        }
        ComplexEvent complexEvent2 = complexEvent;
        if (this.disruptor == null) {
            if (this.throughputTracker != null) {
                int i = 0;
                while (complexEvent2 != null) {
                    i++;
                    complexEvent2 = complexEvent2.getNext();
                }
                this.throughputTracker.eventsIn(i);
            }
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive(complexEvent);
            }
            return;
        }
        while (complexEvent2 != null) {
            if (this.throughputTracker != null) {
                this.throughputTracker.eventIn();
            }
            long next = this.ringBuffer.next();
            try {
                ((Event) this.ringBuffer.get(next)).copyFrom(complexEvent2);
                this.ringBuffer.publish(next);
                complexEvent2 = complexEvent2.getNext();
            } catch (Throwable th) {
                this.ringBuffer.publish(next);
                throw th;
            }
        }
    }

    public void sendEvent(Event event) {
        if (this.throughputTracker != null) {
            this.throughputTracker.eventIn();
        }
        if (this.isTraceEnabled) {
            log.trace(event + " event is received by streamJunction " + this);
        }
        if (this.disruptor == null) {
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive(event);
            }
        } else {
            long next = this.ringBuffer.next();
            try {
                ((Event) this.ringBuffer.get(next)).copyFrom(event);
                this.ringBuffer.publish(next);
            } catch (Throwable th) {
                this.ringBuffer.publish(next);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendEvent(Event[] eventArr) {
        if (this.throughputTracker != null) {
            this.throughputTracker.eventsIn(eventArr.length);
        }
        if (this.isTraceEnabled) {
            log.trace("Event is received by streamJunction " + this);
        }
        if (this.disruptor == null) {
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive(eventArr);
            }
            return;
        }
        for (Event event : eventArr) {
            long next = this.ringBuffer.next();
            try {
                ((Event) this.ringBuffer.get(next)).copyFrom(event);
                this.ringBuffer.publish(next);
            } catch (Throwable th) {
                this.ringBuffer.publish(next);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendEvent(List<Event> list) {
        if (this.isTraceEnabled) {
            log.trace("Event is received by streamJunction " + this);
        }
        if (this.disruptor == null) {
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive((Event[]) list.toArray(new Event[list.size()]));
            }
            return;
        }
        for (Event event : list) {
            long next = this.ringBuffer.next();
            try {
                ((Event) this.ringBuffer.get(next)).copyFrom(event);
                this.ringBuffer.publish(next);
            } catch (Throwable th) {
                this.ringBuffer.publish(next);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendData(long j, Object[] objArr) {
        if (this.throughputTracker != null) {
            this.throughputTracker.eventIn();
        }
        if (this.disruptor == null) {
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive(j, objArr);
            }
            return;
        }
        long next = this.ringBuffer.next();
        try {
            Event event = (Event) this.ringBuffer.get(next);
            event.setTimestamp(j);
            event.setIsExpired(false);
            System.arraycopy(objArr, 0, event.getData(), 0, objArr.length);
            this.ringBuffer.publish(next);
        } catch (Throwable th) {
            this.ringBuffer.publish(next);
            throw th;
        }
    }

    public synchronized void startProcessing() {
        if (this.receivers.isEmpty()) {
            return;
        }
        if (this.parallel == null) {
            this.parallel = Boolean.valueOf(this.executionPlanContext.isParallel());
        }
        if (!this.parallel.booleanValue()) {
            for (Receiver receiver : this.receivers) {
                if (receiver instanceof StreamCallback) {
                    ((StreamCallback) receiver).startProcessing();
                }
            }
            return;
        }
        Constructor<?>[] constructors = Disruptor.class.getConstructors();
        int length = constructors.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            if (constructors[i].getParameterTypes().length == 5) {
                ProducerType producerType = ProducerType.SINGLE;
                if (this.publishers.size() > 1) {
                    producerType = ProducerType.MULTI;
                }
                this.disruptor = new Disruptor<>(new EventFactory(this.streamDefinition.getAttributeList().size()), this.bufferSize, this.executorService, producerType, PhasedBackoffWaitStrategy.withLiteLock(1L, 4L, TimeUnit.SECONDS));
                this.disruptor.handleExceptionsWith(this.executionPlanContext.getSiddhiContext().getExceptionHandler());
            } else {
                i++;
            }
        }
        if (this.disruptor == null) {
            this.disruptor = new Disruptor<>(new EventFactory(this.streamDefinition.getAttributeList().size()), this.bufferSize, this.executorService);
            this.disruptor.handleExceptionsWith(this.executionPlanContext.getSiddhiContext().getExceptionHandler());
        }
        Iterator<Receiver> it = this.receivers.iterator();
        while (it.hasNext()) {
            this.disruptor.handleEventsWith(new EventHandler[]{new StreamHandler(it.next())});
        }
        this.ringBuffer = this.disruptor.start();
    }

    public synchronized void stopProcessing() {
        if (this.disruptor != null) {
            this.disruptor.shutdown();
            return;
        }
        for (Receiver receiver : this.receivers) {
            if (receiver instanceof StreamCallback) {
                ((StreamCallback) receiver).stopProcessing();
            }
        }
    }

    public synchronized Publisher constructPublisher() {
        Publisher publisher = new Publisher();
        publisher.setStreamJunction(this);
        this.publishers.add(publisher);
        return publisher;
    }

    public synchronized void subscribe(Receiver receiver) {
        if (this.receivers.contains(receiver)) {
            return;
        }
        this.receivers.add(receiver);
    }

    public String getStreamId() {
        return this.streamDefinition.getId();
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }
}
