/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.core.stream;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.siddhi.core.config.SiddhiAppContext;
import io.siddhi.core.event.ComplexEvent;
import io.siddhi.core.event.Event;
import io.siddhi.core.event.stream.StreamEvent;
import io.siddhi.core.event.stream.StreamEventFactory;
import io.siddhi.core.event.stream.converter.FaultStreamEventConverter;
import io.siddhi.core.exception.SiddhiAppCreationException;
import io.siddhi.core.stream.input.InputProcessor;
import io.siddhi.core.stream.output.StreamCallback;
import io.siddhi.core.util.event.handler.EventExchangeHolder;
import io.siddhi.core.util.event.handler.EventExchangeHolderFactory;
import io.siddhi.core.util.event.handler.StreamHandler;
import io.siddhi.core.util.parser.helper.QueryParserHelper;
import io.siddhi.core.util.statistics.EventBufferHolder;
import io.siddhi.core.util.statistics.ThroughputTracker;
import io.siddhi.query.api.annotation.Annotation;
import io.siddhi.query.api.definition.StreamDefinition;
import io.siddhi.query.api.exception.DuplicateAnnotationException;
import io.siddhi.query.api.util.AnnotationHelper;
import java.beans.ExceptionListener;
import java.lang.reflect.Constructor;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.log4j.Logger;

public class StreamJunction
implements EventBufferHolder {
    private static final Logger log = Logger.getLogger(StreamJunction.class);
    private final SiddhiAppContext siddhiAppContext;
    private final StreamDefinition streamDefinition;
    private int batchSize;
    private int workers = -1;
    private int bufferSize;
    private List<Receiver> receivers = new LinkedList<Receiver>();
    private List<Publisher> publishers = new LinkedList<Publisher>();
    private ExecutorService executorService;
    private boolean async = false;
    private Disruptor<EventExchangeHolder> disruptor;
    private RingBuffer<EventExchangeHolder> ringBuffer;
    private ThroughputTracker throughputTracker = null;
    private boolean isTraceEnabled;
    private StreamJunction faultStreamJunction = null;
    private FaultStreamEventConverter faultStreamEventConverter = null;
    private OnErrorAction onErrorAction = OnErrorAction.LOG;
    private ExceptionListener exceptionListener;

    public StreamJunction(StreamDefinition streamDefinition, ExecutorService executorService, int bufferSize, StreamJunction faultStreamJunction, SiddhiAppContext siddhiAppContext) {
        this.streamDefinition = streamDefinition;
        this.bufferSize = bufferSize;
        this.batchSize = bufferSize;
        this.executorService = executorService;
        this.siddhiAppContext = siddhiAppContext;
        if (siddhiAppContext.getStatisticsManager() != null) {
            this.throughputTracker = QueryParserHelper.createThroughputTracker(siddhiAppContext, streamDefinition.getId(), "Streams", null);
        }
        this.exceptionListener = siddhiAppContext.getRuntimeExceptionListener();
        this.faultStreamJunction = faultStreamJunction;
        if (faultStreamJunction != null) {
            StreamDefinition faultStreamDefinition = faultStreamJunction.getStreamDefinition();
            StreamEventFactory faultStreamEventFactory = new StreamEventFactory(0, 0, faultStreamDefinition.getAttributeList().size());
            this.faultStreamEventConverter = new FaultStreamEventConverter(faultStreamEventFactory);
        }
        try {
            Annotation onErrorAnnotation;
            Annotation asyncAnnotation = AnnotationHelper.getAnnotation((String)"Async", (List)streamDefinition.getAnnotations());
            if (asyncAnnotation != null) {
                String batchSizeString;
                String workersString;
                this.async = true;
                String bufferSizeString = asyncAnnotation.getElement("buffer.size");
                if (bufferSizeString != null) {
                    this.bufferSize = Integer.parseInt(bufferSizeString);
                }
                if ((workersString = asyncAnnotation.getElement("workers")) != null) {
                    this.workers = Integer.parseInt(workersString);
                    if (this.workers <= 0) {
                        throw new SiddhiAppCreationException("Annotation element 'workers' cannot be negative or zero, but found, '" + this.workers + "'.", asyncAnnotation.getQueryContextStartIndex(), asyncAnnotation.getQueryContextEndIndex(), siddhiAppContext.getName(), siddhiAppContext.getSiddhiAppString());
                    }
                }
                if ((batchSizeString = asyncAnnotation.getElement("batch.size.max")) != null) {
                    this.batchSize = Integer.parseInt(batchSizeString);
                    if (this.batchSize <= 0) {
                        throw new SiddhiAppCreationException("Annotation element 'batch.size.max' cannot be negative or zero, but found, '" + this.batchSize + "'.", asyncAnnotation.getQueryContextStartIndex(), asyncAnnotation.getQueryContextEndIndex(), siddhiAppContext.getName(), siddhiAppContext.getSiddhiAppString());
                    }
                }
            }
            if ((onErrorAnnotation = AnnotationHelper.getAnnotation((String)"OnError", (List)streamDefinition.getAnnotations())) != null) {
                this.onErrorAction = OnErrorAction.valueOf(onErrorAnnotation.getElement("action").toUpperCase());
            }
        }
        catch (DuplicateAnnotationException e) {
            throw new DuplicateAnnotationException(e.getMessageWithOutContext() + " for the same Stream " + streamDefinition.getId(), (Throwable)e, e.getQueryContextStartIndex(), e.getQueryContextEndIndex(), siddhiAppContext.getName(), siddhiAppContext.getSiddhiAppString());
        }
        this.isTraceEnabled = log.isTraceEnabled();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendEvent(ComplexEvent complexEvent) {
        ComplexEvent complexEventList;
        if (this.isTraceEnabled) {
            log.trace((Object)("Event is received by streamJunction " + this));
        }
        if (this.disruptor != null) {
            for (complexEventList = complexEvent; complexEventList != null; complexEventList = complexEventList.getNext()) {
                if (this.throughputTracker != null && this.siddhiAppContext.isStatsEnabled()) {
                    this.throughputTracker.eventIn();
                }
                long sequenceNo = this.ringBuffer.next();
                try {
                    EventExchangeHolder eventExchangeHolder = (EventExchangeHolder)this.ringBuffer.get(sequenceNo);
                    eventExchangeHolder.getEvent().copyFrom(complexEventList);
                    eventExchangeHolder.getAndSetIsProcessed(false);
                    continue;
                }
                finally {
                    this.ringBuffer.publish(sequenceNo);
                }
            }
        } else {
            if (this.throughputTracker != null && this.siddhiAppContext.isStatsEnabled()) {
                int messageCount = 0;
                while (complexEventList != null) {
                    ++messageCount;
                    complexEventList = complexEventList.getNext();
                }
                this.throughputTracker.eventsIn(messageCount);
            }
            for (Receiver receiver : this.receivers) {
                receiver.receive(complexEvent);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendEvent(Event event) {
        if (this.throughputTracker != null && this.siddhiAppContext.isStatsEnabled()) {
            this.throughputTracker.eventIn();
        }
        if (this.isTraceEnabled) {
            log.trace((Object)(event + " event is received by streamJunction " + this));
        }
        if (this.disruptor != null) {
            long sequenceNo = this.ringBuffer.next();
            try {
                EventExchangeHolder eventExchangeHolder = (EventExchangeHolder)this.ringBuffer.get(sequenceNo);
                eventExchangeHolder.getEvent().copyFrom(event);
                eventExchangeHolder.getAndSetIsProcessed(false);
            }
            finally {
                this.ringBuffer.publish(sequenceNo);
            }
        } else {
            for (Receiver receiver : this.receivers) {
                receiver.receive(event);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendEvent(Event[] events) {
        if (this.throughputTracker != null && this.siddhiAppContext.isStatsEnabled()) {
            this.throughputTracker.eventsIn(events.length);
        }
        if (this.isTraceEnabled) {
            log.trace((Object)("Event is received by streamJunction " + this));
        }
        if (this.disruptor != null) {
            for (Event event : events) {
                long sequenceNo = this.ringBuffer.next();
                try {
                    EventExchangeHolder eventExchangeHolder = (EventExchangeHolder)this.ringBuffer.get(sequenceNo);
                    eventExchangeHolder.getEvent().copyFrom(event);
                    eventExchangeHolder.getAndSetIsProcessed(false);
                }
                finally {
                    this.ringBuffer.publish(sequenceNo);
                }
            }
        } else {
            for (Receiver receiver : this.receivers) {
                receiver.receive(events);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendEvent(List<Event> events) {
        if (this.isTraceEnabled) {
            log.trace((Object)("Event is received by streamJunction " + this));
        }
        if (this.disruptor != null) {
            for (Event event : events) {
                long sequenceNo = this.ringBuffer.next();
                try {
                    EventExchangeHolder eventExchangeHolder = (EventExchangeHolder)this.ringBuffer.get(sequenceNo);
                    eventExchangeHolder.getEvent().copyFrom(event);
                    eventExchangeHolder.getAndSetIsProcessed(false);
                }
                finally {
                    this.ringBuffer.publish(sequenceNo);
                }
            }
        } else {
            for (Receiver receiver : this.receivers) {
                receiver.receive(events.toArray(new Event[events.size()]));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendData(long timeStamp, Object[] data) {
        if (this.throughputTracker != null && this.siddhiAppContext.isStatsEnabled()) {
            this.throughputTracker.eventIn();
        }
        if (this.disruptor != null) {
            long sequenceNo = this.ringBuffer.next();
            try {
                EventExchangeHolder eventExchangeHolder = (EventExchangeHolder)this.ringBuffer.get(sequenceNo);
                eventExchangeHolder.getAndSetIsProcessed(false);
                eventExchangeHolder.getEvent().setTimestamp(timeStamp);
                eventExchangeHolder.getEvent().setIsExpired(false);
                System.arraycopy(data, 0, eventExchangeHolder.getEvent().getData(), 0, data.length);
            }
            finally {
                this.ringBuffer.publish(sequenceNo);
            }
        } else {
            for (Receiver receiver : this.receivers) {
                receiver.receive(timeStamp, data);
            }
        }
    }

    public void startProcessing() {
        if (!this.receivers.isEmpty() && this.async) {
            for (Constructor<?> constructor : Disruptor.class.getConstructors()) {
                if (constructor.getParameterTypes().length != 5) continue;
                ProducerType producerType = ProducerType.MULTI;
                this.disruptor = new Disruptor((EventFactory)new EventExchangeHolderFactory(this.streamDefinition.getAttributeList().size()), this.bufferSize, (Executor)this.executorService, producerType, (WaitStrategy)new BlockingWaitStrategy());
                this.disruptor.handleExceptionsWith(this.siddhiAppContext.getDisruptorExceptionHandler());
                break;
            }
            if (this.disruptor == null) {
                this.disruptor = new Disruptor((EventFactory)new EventExchangeHolderFactory(this.streamDefinition.getAttributeList().size()), this.bufferSize, (Executor)this.executorService);
                this.disruptor.handleExceptionsWith(this.siddhiAppContext.getDisruptorExceptionHandler());
            }
            if (this.workers > 0) {
                for (int i = 0; i < this.workers; ++i) {
                    this.disruptor.handleEventsWith(new EventHandler[]{new StreamHandler(this.receivers, this.batchSize, this.streamDefinition.getId(), this.siddhiAppContext.getName(), this.faultStreamJunction, this.onErrorAction, this.exceptionListener)});
                }
            } else {
                this.disruptor.handleEventsWith(new EventHandler[]{new StreamHandler(this.receivers, this.batchSize, this.streamDefinition.getId(), this.siddhiAppContext.getName(), this.faultStreamJunction, this.onErrorAction, this.exceptionListener)});
            }
            this.ringBuffer = this.disruptor.start();
        } else {
            for (Receiver receiver : this.receivers) {
                if (!(receiver instanceof StreamCallback)) continue;
                ((StreamCallback)receiver).startProcessing();
            }
        }
    }

    public void stopProcessing() {
        if (this.disruptor != null) {
            this.disruptor.shutdown();
        } else {
            for (Receiver receiver : this.receivers) {
                if (!(receiver instanceof StreamCallback)) continue;
                ((StreamCallback)receiver).stopProcessing();
            }
        }
    }

    public Publisher constructPublisher() {
        Publisher publisher = new Publisher();
        publisher.setStreamJunction(this);
        this.publishers.add(publisher);
        return publisher;
    }

    public void subscribe(Receiver receiver) {
        if (!this.receivers.contains(receiver)) {
            this.receivers.add(receiver);
        }
    }

    public String getStreamId() {
        return this.streamDefinition.getId();
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }

    @Override
    public long getBufferedEvents() {
        if (this.disruptor != null) {
            return this.disruptor.getBufferSize() - this.disruptor.getRingBuffer().remainingCapacity();
        }
        return 0L;
    }

    @Override
    public boolean containsBufferedEvents() {
        return !this.receivers.isEmpty() && this.async;
    }

    public class Publisher
    implements InputProcessor {
        private StreamJunction streamJunction;

        public void setStreamJunction(StreamJunction streamJunction) {
            this.streamJunction = streamJunction;
        }

        public void send(ComplexEvent complexEvent) {
            try {
                this.streamJunction.sendEvent(complexEvent);
            }
            catch (Exception e) {
                this.handleError(complexEvent, e);
            }
        }

        @Override
        public void send(Event event, int streamIndex) {
            try {
                this.streamJunction.sendEvent(event);
            }
            catch (Exception e) {
                this.handleError(event, e);
            }
        }

        @Override
        public void send(Event[] events, int streamIndex) {
            try {
                this.streamJunction.sendEvent(events);
            }
            catch (Exception e) {
                this.handleError(events, e);
            }
        }

        @Override
        public void send(List<Event> events, int streamIndex) {
            try {
                this.streamJunction.sendEvent(events);
            }
            catch (Exception e) {
                this.handleError(events, e);
            }
        }

        @Override
        public void send(long timeStamp, Object[] data, int streamIndex) {
            try {
                this.streamJunction.sendData(timeStamp, data);
            }
            catch (Exception e) {
                this.handleError(timeStamp, data, e);
            }
        }

        public String getStreamId() {
            return this.streamJunction.getStreamId();
        }

        private void handleError(Object event, Exception e) {
            if (StreamJunction.this.exceptionListener != null) {
                StreamJunction.this.exceptionListener.exceptionThrown(e);
            }
            switch (StreamJunction.this.onErrorAction) {
                case LOG: {
                    log.error((Object)("Error in '" + StreamJunction.this.siddhiAppContext.getName() + "' after consuming events from Stream '" + StreamJunction.this.streamDefinition.getId() + "', " + e.getMessage() + ". Hence, dropping event '" + event.toString() + "'"), (Throwable)e);
                    break;
                }
                case STREAM: {
                    if (StreamJunction.this.faultStreamJunction != null) {
                        StreamEvent streamEvent = null;
                        if (event instanceof ComplexEvent) {
                            streamEvent = StreamJunction.this.faultStreamEventConverter.convert((ComplexEvent)event, e);
                            StreamJunction.this.faultStreamJunction.sendEvent(streamEvent);
                            break;
                        }
                        if (event instanceof Event) {
                            streamEvent = StreamJunction.this.faultStreamEventConverter.convert((Event)event, e);
                            StreamJunction.this.faultStreamJunction.sendEvent(streamEvent);
                            break;
                        }
                        if (event instanceof Event[]) {
                            streamEvent = StreamJunction.this.faultStreamEventConverter.convert((Event[])event, e);
                            StreamJunction.this.faultStreamJunction.sendEvent(streamEvent);
                            break;
                        }
                        if (!(event instanceof List)) break;
                        streamEvent = StreamJunction.this.faultStreamEventConverter.convert((List)event, e);
                        StreamJunction.this.faultStreamJunction.sendEvent(streamEvent);
                        break;
                    }
                    log.error((Object)("Error in SiddhiApp '" + StreamJunction.this.siddhiAppContext.getName() + "' after consuming events from Stream '" + StreamJunction.this.streamDefinition.getId() + "', " + e.getMessage() + ". Siddhi Fault Stream for '" + StreamJunction.this.streamDefinition.getId() + "' is not defined. Hence, dropping event '" + event.toString() + "'"), (Throwable)e);
                    break;
                }
            }
        }

        private void handleError(long timeStamp, Object[] data, Exception e) {
            if (StreamJunction.this.exceptionListener != null) {
                StreamJunction.this.exceptionListener.exceptionThrown(e);
            }
            switch (StreamJunction.this.onErrorAction) {
                case LOG: {
                    log.error((Object)("Error in '" + StreamJunction.this.siddhiAppContext.getName() + "' after consuming events from Stream '" + StreamJunction.this.streamDefinition.getId() + "' , " + e.getMessage() + ". Hence, dropping event '" + Arrays.toString(data) + "'"), (Throwable)e);
                    break;
                }
                case STREAM: {
                    if (StreamJunction.this.faultStreamJunction != null) {
                        StreamEvent streamEvent = StreamJunction.this.faultStreamEventConverter.convert(timeStamp, data, e);
                        StreamJunction.this.faultStreamJunction.sendEvent(streamEvent);
                        break;
                    }
                    log.error((Object)("Error in SiddhiApp '" + StreamJunction.this.siddhiAppContext.getName() + "' after consuming events from Stream '" + StreamJunction.this.streamDefinition.getId() + "', " + e.getMessage() + ". Siddhi Fault Stream for '" + StreamJunction.this.streamDefinition.getId() + "' is not defined. Hence, dropping data '" + Arrays.toString(data) + "'"), (Throwable)e);
                    break;
                }
            }
        }
    }

    public static interface Receiver {
        public String getStreamId();

        public void receive(ComplexEvent var1);

        public void receive(Event var1);

        public void receive(List<Event> var1);

        public void receive(long var1, Object[] var3);

        public void receive(Event[] var1);
    }

    public static enum OnErrorAction {
        LOG,
        STREAM;

    }
}

