package io.siddhi.core.stream;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.Event;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.event.stream.converter.FaultStreamEventConverter;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.input.InputProcessor;
import io.siddhi.core.stream.output.StreamCallback;
import io.siddhi.core.util.SiddhiConstants;
import io.siddhi.core.util.event.handler.EventExchangeHolder;
import io.siddhi.core.util.event.handler.EventExchangeHolderFactory;
import io.siddhi.core.util.event.handler.StreamHandler;
import io.siddhi.core.util.parser.helper.QueryParserHelper;
import io.siddhi.core.util.statistics.EventBufferHolder;
import io.siddhi.core.util.statistics.ThroughputTracker;
import io.siddhi.core.util.statistics.metrics.Level;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.exception.DuplicateAnnotationException;
import io.siddhi.query.api.util.AnnotationHelper;
import java.beans.ExceptionListener;
import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.log4j.Logger;

/* loaded from: input_file:io/siddhi/core/stream/StreamJunction.class */
public class StreamJunction implements EventBufferHolder {
    private static final Logger log = Logger.getLogger(StreamJunction.class);
    private final SiddhiAppContext siddhiAppContext;
    private final StreamDefinition streamDefinition;
    private int batchSize;
    private int workers;
    private int bufferSize;
    private List<Receiver> receivers = new LinkedList();
    private List<Publisher> publishers = new LinkedList();
    private ExecutorService executorService;
    private boolean async;
    private Disruptor<EventExchangeHolder> disruptor;
    private RingBuffer<EventExchangeHolder> ringBuffer;
    private ThroughputTracker throughputTracker;
    private boolean isTraceEnabled;
    private StreamJunction faultStreamJunction;
    private FaultStreamEventConverter faultStreamEventConverter;
    private OnErrorAction onErrorAction;
    private ExceptionListener exceptionListener;

    /* loaded from: input_file:io/siddhi/core/stream/StreamJunction$OnErrorAction.class */
    public enum OnErrorAction {
        LOG,
        STREAM
    }

    /* loaded from: input_file:io/siddhi/core/stream/StreamJunction$Publisher.class */
    public class Publisher implements InputProcessor {
        private StreamJunction streamJunction;

        public Publisher() {
        }

        public void setStreamJunction(StreamJunction streamJunction) {
            this.streamJunction = streamJunction;
        }

        public void send(ComplexEvent complexEvent) {
            try {
                this.streamJunction.sendEvent(complexEvent);
            } catch (Exception e) {
                handleError(complexEvent, e);
            }
        }

        @Override // io.siddhi.core.stream.input.InputProcessor
        public void send(Event event, int i) {
            try {
                this.streamJunction.sendEvent(event);
            } catch (Exception e) {
                handleError(event, e);
            }
        }

        @Override // io.siddhi.core.stream.input.InputProcessor
        public void send(Event[] eventArr, int i) {
            try {
                this.streamJunction.sendEvent(eventArr);
            } catch (Exception e) {
                handleError(eventArr, e);
            }
        }

        @Override // io.siddhi.core.stream.input.InputProcessor
        public void send(List<Event> list, int i) {
            try {
                this.streamJunction.sendEvent(list);
            } catch (Exception e) {
                handleError(list, e);
            }
        }

        @Override // io.siddhi.core.stream.input.InputProcessor
        public void send(long j, Object[] objArr, int i) {
            try {
                this.streamJunction.sendData(j, objArr);
            } catch (Exception e) {
                handleError(j, objArr, e);
            }
        }

        public String getStreamId() {
            return this.streamJunction.getStreamId();
        }

        private void handleError(Object obj, Exception exc) {
            if (StreamJunction.this.exceptionListener != null) {
                StreamJunction.this.exceptionListener.exceptionThrown(exc);
            }
            switch (StreamJunction.this.onErrorAction) {
                case LOG:
                    StreamJunction.log.error("Error in '" + StreamJunction.this.siddhiAppContext.getName() + "' after consuming events from Stream '" + StreamJunction.this.streamDefinition.getId() + "', " + exc.getMessage() + ". Hence, dropping event '" + obj.toString() + "'", exc);
                    return;
                case STREAM:
                    if (StreamJunction.this.faultStreamJunction == null) {
                        StreamJunction.log.error("Error in SiddhiApp '" + StreamJunction.this.siddhiAppContext.getName() + "' after consuming events from Stream '" + StreamJunction.this.streamDefinition.getId() + "', " + exc.getMessage() + ". Siddhi Fault Stream for '" + StreamJunction.this.streamDefinition.getId() + "' is not defined. Hence, dropping event '" + obj.toString() + "'", exc);
                        return;
                    }
                    if (obj instanceof ComplexEvent) {
                        StreamJunction.this.faultStreamJunction.sendEvent(StreamJunction.this.faultStreamEventConverter.convert((ComplexEvent) obj, exc));
                        return;
                    }
                    if (obj instanceof Event) {
                        StreamJunction.this.faultStreamJunction.sendEvent(StreamJunction.this.faultStreamEventConverter.convert((Event) obj, exc));
                        return;
                    } else if (obj instanceof Event[]) {
                        StreamJunction.this.faultStreamJunction.sendEvent(StreamJunction.this.faultStreamEventConverter.convert((Event[]) obj, exc));
                        return;
                    } else {
                        if (obj instanceof List) {
                            StreamJunction.this.faultStreamJunction.sendEvent(StreamJunction.this.faultStreamEventConverter.convert((List<Event>) obj, exc));
                            return;
                        }
                        return;
                    }
                default:
                    return;
            }
        }

        private void handleError(long j, Object[] objArr, Exception exc) {
            if (StreamJunction.this.exceptionListener != null) {
                StreamJunction.this.exceptionListener.exceptionThrown(exc);
            }
            switch (StreamJunction.this.onErrorAction) {
                case LOG:
                    StreamJunction.log.error("Error in '" + StreamJunction.this.siddhiAppContext.getName() + "' after consuming events from Stream '" + StreamJunction.this.streamDefinition.getId() + "' , " + exc.getMessage() + ". Hence, dropping event '" + Arrays.toString(objArr) + "'", exc);
                    return;
                case STREAM:
                    if (StreamJunction.this.faultStreamJunction == null) {
                        StreamJunction.log.error("Error in SiddhiApp '" + StreamJunction.this.siddhiAppContext.getName() + "' after consuming events from Stream '" + StreamJunction.this.streamDefinition.getId() + "', " + exc.getMessage() + ". Siddhi Fault Stream for '" + StreamJunction.this.streamDefinition.getId() + "' is not defined. Hence, dropping data '" + Arrays.toString(objArr) + "'", exc);
                        return;
                    } else {
                        StreamJunction.this.faultStreamJunction.sendEvent(StreamJunction.this.faultStreamEventConverter.convert(j, objArr, exc));
                        return;
                    }
                default:
                    return;
            }
        }
    }

    /* loaded from: input_file:io/siddhi/core/stream/StreamJunction$Receiver.class */
    public interface Receiver {
        String getStreamId();

        void receive(ComplexEvent complexEvent);

        void receive(Event event);

        void receive(List<Event> list);

        void receive(long j, Object[] objArr);

        void receive(Event[] eventArr);
    }

    public StreamJunction(StreamDefinition streamDefinition, ExecutorService executorService, int i, StreamJunction streamJunction, SiddhiAppContext siddhiAppContext) {
        this.workers = -1;
        this.async = false;
        this.throughputTracker = null;
        this.faultStreamJunction = null;
        this.faultStreamEventConverter = null;
        this.onErrorAction = OnErrorAction.LOG;
        this.streamDefinition = streamDefinition;
        this.bufferSize = i;
        this.batchSize = i;
        this.executorService = executorService;
        this.siddhiAppContext = siddhiAppContext;
        if (siddhiAppContext.getStatisticsManager() != null) {
            this.throughputTracker = QueryParserHelper.createThroughputTracker(siddhiAppContext, streamDefinition.getId(), SiddhiConstants.METRIC_INFIX_STREAMS, null);
        }
        this.faultStreamJunction = streamJunction;
        if (streamJunction != null) {
            this.faultStreamEventConverter = new FaultStreamEventConverter(new StreamEventFactory(0, 0, streamJunction.getStreamDefinition().getAttributeList().size()));
        }
        try {
            Annotation annotation = AnnotationHelper.getAnnotation(SiddhiConstants.ANNOTATION_ASYNC, streamDefinition.getAnnotations());
            if (annotation != null) {
                this.async = true;
                String element = annotation.getElement(SiddhiConstants.ANNOTATION_ELEMENT_BUFFER_SIZE);
                if (element != null) {
                    this.bufferSize = Integer.parseInt(element);
                }
                String element2 = annotation.getElement(SiddhiConstants.ANNOTATION_ELEMENT_WORKERS);
                if (element2 != null) {
                    this.workers = Integer.parseInt(element2);
                    if (this.workers <= 0) {
                        throw new SiddhiAppCreationException("Annotation element 'workers' cannot be negative or zero, but found, '" + this.workers + "'.", annotation.getQueryContextStartIndex(), annotation.getQueryContextEndIndex(), siddhiAppContext.getName(), siddhiAppContext.getSiddhiAppString());
                    }
                }
                String element3 = annotation.getElement(SiddhiConstants.ANNOTATION_ELEMENT_MAX_BATCH_SIZE);
                if (element3 != null) {
                    this.batchSize = Integer.parseInt(element3);
                    if (this.batchSize <= 0) {
                        throw new SiddhiAppCreationException("Annotation element 'batch.size.max' cannot be negative or zero, but found, '" + this.batchSize + "'.", annotation.getQueryContextStartIndex(), annotation.getQueryContextEndIndex(), siddhiAppContext.getName(), siddhiAppContext.getSiddhiAppString());
                    }
                }
            }
            Annotation annotation2 = AnnotationHelper.getAnnotation(SiddhiConstants.ANNOTATION_ON_ERROR, streamDefinition.getAnnotations());
            if (annotation2 != null) {
                this.onErrorAction = OnErrorAction.valueOf(annotation2.getElement(SiddhiConstants.ANNOTATION_ELEMENT_ACTION).toUpperCase());
            }
            this.isTraceEnabled = log.isTraceEnabled();
        } catch (DuplicateAnnotationException e) {
            throw new DuplicateAnnotationException(e.getMessageWithOutContext() + " for the same Stream " + streamDefinition.getId(), e, e.getQueryContextStartIndex(), e.getQueryContextEndIndex(), siddhiAppContext.getName(), siddhiAppContext.getSiddhiAppString());
        }
    }

    public void sendEvent(ComplexEvent complexEvent) {
        if (this.isTraceEnabled) {
            log.trace("Event is received by streamJunction " + this);
        }
        ComplexEvent complexEvent2 = complexEvent;
        if (this.disruptor == null) {
            if (this.throughputTracker != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                int i = 0;
                while (complexEvent2 != null) {
                    i++;
                    complexEvent2 = complexEvent2.getNext();
                }
                this.throughputTracker.eventsIn(i);
            }
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive(complexEvent);
            }
            return;
        }
        while (complexEvent2 != null) {
            if (this.throughputTracker != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
                this.throughputTracker.eventIn();
            }
            long next = this.ringBuffer.next();
            try {
                EventExchangeHolder eventExchangeHolder = (EventExchangeHolder) this.ringBuffer.get(next);
                eventExchangeHolder.getEvent().copyFrom(complexEvent2);
                eventExchangeHolder.getAndSetIsProcessed(false);
                this.ringBuffer.publish(next);
                complexEvent2 = complexEvent2.getNext();
            } catch (Throwable th) {
                this.ringBuffer.publish(next);
                throw th;
            }
        }
    }

    public void sendEvent(Event event) {
        if (this.throughputTracker != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
            this.throughputTracker.eventIn();
        }
        if (this.isTraceEnabled) {
            log.trace(event + " event is received by streamJunction " + this);
        }
        if (this.disruptor == null) {
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive(event);
            }
            return;
        }
        long next = this.ringBuffer.next();
        try {
            EventExchangeHolder eventExchangeHolder = (EventExchangeHolder) this.ringBuffer.get(next);
            eventExchangeHolder.getEvent().copyFrom(event);
            eventExchangeHolder.getAndSetIsProcessed(false);
            this.ringBuffer.publish(next);
        } catch (Throwable th) {
            this.ringBuffer.publish(next);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendEvent(Event[] eventArr) {
        if (this.throughputTracker != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
            this.throughputTracker.eventsIn(eventArr.length);
        }
        if (this.isTraceEnabled) {
            log.trace("Event is received by streamJunction " + this);
        }
        if (this.disruptor == null) {
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive(eventArr);
            }
            return;
        }
        for (Event event : eventArr) {
            long next = this.ringBuffer.next();
            try {
                EventExchangeHolder eventExchangeHolder = (EventExchangeHolder) this.ringBuffer.get(next);
                eventExchangeHolder.getEvent().copyFrom(event);
                eventExchangeHolder.getAndSetIsProcessed(false);
                this.ringBuffer.publish(next);
            } catch (Throwable th) {
                this.ringBuffer.publish(next);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendEvent(List<Event> list) {
        if (this.isTraceEnabled) {
            log.trace("Event is received by streamJunction " + this);
        }
        if (this.disruptor == null) {
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive((Event[]) list.toArray(new Event[list.size()]));
            }
            return;
        }
        for (Event event : list) {
            long next = this.ringBuffer.next();
            try {
                EventExchangeHolder eventExchangeHolder = (EventExchangeHolder) this.ringBuffer.get(next);
                eventExchangeHolder.getEvent().copyFrom(event);
                eventExchangeHolder.getAndSetIsProcessed(false);
                this.ringBuffer.publish(next);
            } catch (Throwable th) {
                this.ringBuffer.publish(next);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void sendData(long j, Object[] objArr) {
        if (this.throughputTracker != null && Level.BASIC.compareTo(this.siddhiAppContext.getRootMetricsLevel()) <= 0) {
            this.throughputTracker.eventIn();
        }
        if (this.disruptor == null) {
            Iterator<Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                it.next().receive(j, objArr);
            }
            return;
        }
        long next = this.ringBuffer.next();
        try {
            EventExchangeHolder eventExchangeHolder = (EventExchangeHolder) this.ringBuffer.get(next);
            eventExchangeHolder.getAndSetIsProcessed(false);
            eventExchangeHolder.getEvent().setTimestamp(j);
            eventExchangeHolder.getEvent().setIsExpired(false);
            System.arraycopy(objArr, 0, eventExchangeHolder.getEvent().getData(), 0, objArr.length);
            this.ringBuffer.publish(next);
        } catch (Throwable th) {
            this.ringBuffer.publish(next);
            throw th;
        }
    }

    public void startProcessing() {
        this.exceptionListener = this.siddhiAppContext.getRuntimeExceptionListener();
        if (this.receivers.isEmpty() || !this.async) {
            for (Receiver receiver : this.receivers) {
                if (receiver instanceof StreamCallback) {
                    ((StreamCallback) receiver).startProcessing();
                }
            }
            return;
        }
        Constructor<?>[] constructors = Disruptor.class.getConstructors();
        int length = constructors.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            if (constructors[i].getParameterTypes().length == 5) {
                this.disruptor = new Disruptor<>(new EventExchangeHolderFactory(this.streamDefinition.getAttributeList().size()), this.bufferSize, this.executorService, ProducerType.MULTI, new BlockingWaitStrategy());
                this.disruptor.handleExceptionsWith(this.siddhiAppContext.getDisruptorExceptionHandler());
                break;
            }
            i++;
        }
        if (this.disruptor == null) {
            this.disruptor = new Disruptor<>(new EventExchangeHolderFactory(this.streamDefinition.getAttributeList().size()), this.bufferSize, this.executorService);
            this.disruptor.handleExceptionsWith(this.siddhiAppContext.getDisruptorExceptionHandler());
        }
        if (this.workers > 0) {
            for (int i2 = 0; i2 < this.workers; i2++) {
                this.disruptor.handleEventsWith(new EventHandler[]{new StreamHandler(this.receivers, this.batchSize, this.streamDefinition.getId(), this.siddhiAppContext.getName(), this.faultStreamJunction, this.onErrorAction, this.exceptionListener)});
            }
        } else {
            this.disruptor.handleEventsWith(new EventHandler[]{new StreamHandler(this.receivers, this.batchSize, this.streamDefinition.getId(), this.siddhiAppContext.getName(), this.faultStreamJunction, this.onErrorAction, this.exceptionListener)});
        }
        this.ringBuffer = this.disruptor.start();
    }

    public void stopProcessing() {
        if (this.disruptor != null) {
            this.disruptor.shutdown();
            return;
        }
        for (Receiver receiver : this.receivers) {
            if (receiver instanceof StreamCallback) {
                ((StreamCallback) receiver).stopProcessing();
            }
        }
    }

    public Publisher constructPublisher() {
        Publisher publisher = new Publisher();
        publisher.setStreamJunction(this);
        this.publishers.add(publisher);
        return publisher;
    }

    public void subscribe(Receiver receiver) {
        if (this.receivers.contains(receiver)) {
            return;
        }
        this.receivers.add(receiver);
    }

    public String getStreamId() {
        return this.streamDefinition.getId();
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }

    @Override // io.siddhi.core.util.statistics.EventBufferHolder
    public long getBufferedEvents() {
        if (this.disruptor != null) {
            return this.disruptor.getBufferSize() - this.disruptor.getRingBuffer().remainingCapacity();
        }
        return 0L;
    }

    @Override // io.siddhi.core.util.statistics.EventBufferHolder
    public boolean containsBufferedEvents() {
        return !this.receivers.isEmpty() && this.async;
    }
}
