package org.wso2.siddhi.core.stream.output;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.PhasedBackoffWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.ExecutionPlanContext;
import org.wso2.siddhi.core.event.ComplexEvent;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.StreamJunction;
import org.wso2.siddhi.query.api.definition.AbstractDefinition;

/* loaded from: input_file:org/wso2/siddhi/core/stream/output/StreamCallback.class */
public abstract class StreamCallback implements StreamJunction.Receiver {
    private static final Logger log = Logger.getLogger(StreamCallback.class);
    private String streamId;
    private AbstractDefinition streamDefinition;
    private List<Event> eventBuffer = new ArrayList();
    private ExecutionPlanContext executionPlanContext;
    private AsyncEventHandler asyncEventHandler;
    private Disruptor<EventHolder> disruptor;
    private RingBuffer<EventHolder> ringBuffer;

    /* loaded from: input_file:org/wso2/siddhi/core/stream/output/StreamCallback$AsyncEventHandler.class */
    public class AsyncEventHandler implements EventHandler<EventHolder> {
        private StreamCallback streamCallback;

        public AsyncEventHandler(StreamCallback streamCallback) {
            this.streamCallback = streamCallback;
        }

        public void onEvent(EventHolder eventHolder, long j, boolean z) throws Exception {
            if (this.streamCallback != null) {
                this.streamCallback.receive(eventHolder.events);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/siddhi/core/stream/output/StreamCallback$EventHolder.class */
    public class EventHolder {
        public Event[] events;

        private EventHolder() {
        }
    }

    @Override // org.wso2.siddhi.core.stream.StreamJunction.Receiver
    public String getStreamId() {
        return this.streamId;
    }

    public void setStreamId(String str) {
        this.streamId = str;
    }

    public void setStreamDefinition(AbstractDefinition abstractDefinition) {
        this.streamDefinition = abstractDefinition;
    }

    public void setContext(ExecutionPlanContext executionPlanContext) {
        this.executionPlanContext = executionPlanContext;
    }

    @Override // org.wso2.siddhi.core.stream.StreamJunction.Receiver
    public void receive(ComplexEvent complexEvent) {
        while (complexEvent != null) {
            this.eventBuffer.add(new Event(complexEvent.getOutputData().length).copyFrom(complexEvent));
            complexEvent = complexEvent.getNext();
        }
        if (this.disruptor == null) {
            receiveSync((Event[]) this.eventBuffer.toArray(new Event[this.eventBuffer.size()]));
        } else {
            receiveAsync((Event[]) this.eventBuffer.toArray(new Event[this.eventBuffer.size()]));
        }
        this.eventBuffer.clear();
    }

    @Override // org.wso2.siddhi.core.stream.StreamJunction.Receiver
    public void receive(Event event) {
        if (this.disruptor == null) {
            receiveSync(new Event[]{event});
        } else {
            receiveAsync(new Event[]{event});
        }
    }

    @Override // org.wso2.siddhi.core.stream.StreamJunction.Receiver
    public void receive(Event event, boolean z) {
        this.eventBuffer.add(event);
        if (z) {
            receiveSync((Event[]) this.eventBuffer.toArray(new Event[this.eventBuffer.size()]));
            this.eventBuffer.clear();
        }
    }

    @Override // org.wso2.siddhi.core.stream.StreamJunction.Receiver
    public void receive(long j, Object[] objArr) {
        if (this.disruptor == null) {
            receiveSync(new Event[]{new Event(j, objArr)});
        } else {
            receiveAsync(new Event[]{new Event(j, objArr)});
        }
    }

    public void receiveSync(Event[] eventArr) {
        try {
            receive(eventArr);
        } catch (RuntimeException e) {
            log.error("Error on sending events" + Arrays.deepToString(eventArr), e);
        }
    }

    @Override // org.wso2.siddhi.core.stream.StreamJunction.Receiver
    public abstract void receive(Event[] eventArr);

    private void receiveAsync(Event[] eventArr) {
        long next = this.ringBuffer.next();
        try {
            ((EventHolder) this.ringBuffer.get(next)).events = eventArr;
            this.ringBuffer.publish(next);
        } catch (Throwable th) {
            this.ringBuffer.publish(next);
            throw th;
        }
    }

    public synchronized void startProcessing() {
        Boolean bool = null;
        if ((0 == 0 || !bool.booleanValue()) && 0 != 0) {
            return;
        }
        Constructor<?>[] constructors = Disruptor.class.getConstructors();
        int length = constructors.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            if (constructors[i].getParameterTypes().length == 5) {
                this.disruptor = new Disruptor<>(new EventFactory<EventHolder>() { // from class: org.wso2.siddhi.core.stream.output.StreamCallback.1
                    /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
                    public EventHolder m37newInstance() {
                        return new EventHolder();
                    }
                }, this.executionPlanContext.getSiddhiContext().getEventBufferSize(), this.executionPlanContext.getExecutorService(), ProducerType.SINGLE, PhasedBackoffWaitStrategy.withLiteLock(1L, 4L, TimeUnit.SECONDS));
                break;
            }
            i++;
        }
        if (this.disruptor == null) {
            this.disruptor = new Disruptor<>(new EventFactory<EventHolder>() { // from class: org.wso2.siddhi.core.stream.output.StreamCallback.2
                /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
                public EventHolder m38newInstance() {
                    return new EventHolder();
                }
            }, this.executionPlanContext.getSiddhiContext().getEventBufferSize(), this.executionPlanContext.getExecutorService());
        }
        this.asyncEventHandler = new AsyncEventHandler(this);
        this.disruptor.handleEventsWith(new EventHandler[]{this.asyncEventHandler});
        this.ringBuffer = this.disruptor.start();
    }

    public synchronized void stopProcessing() {
        if (this.disruptor != null) {
            this.asyncEventHandler.streamCallback = null;
            this.disruptor.shutdown();
        }
    }
}
