package org.wso2.siddhi.core.stream.output;

import java.util.concurrent.ThreadPoolExecutor;
import org.apache.log4j.Logger;
import org.wso2.siddhi.core.config.SiddhiContext;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.event.StreamEvent;
import org.wso2.siddhi.core.query.SchedulerElement;
import org.wso2.siddhi.core.stream.StreamReceiver;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerQueue;

/* loaded from: input_file:org/wso2/siddhi/core/stream/output/StreamCallback.class */
public abstract class StreamCallback implements Runnable, StreamReceiver, SchedulerElement {
    private SchedulerQueue<StreamEvent> inputQueue;
    private ThreadPoolExecutor threadPoolExecutor;
    private String streamId;
    private SiddhiContext siddhiContext;
    static final Logger log = Logger.getLogger(StreamCallback.class);

    public void setSiddhiContext(SiddhiContext siddhiContext) {
        this.siddhiContext = siddhiContext;
        this.threadPoolExecutor = siddhiContext.getThreadPoolExecutor();
        this.inputQueue = new SchedulerQueue<>(this);
    }

    @Override // org.wso2.siddhi.core.stream.StreamReceiver
    public void receive(StreamEvent streamEvent) {
        if (!this.siddhiContext.isAsyncProcessing()) {
            send(streamEvent);
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Adding to streamId callback " + streamEvent);
        }
        this.inputQueue.put(streamEvent);
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            StreamEvent poll = this.inputQueue.poll();
            if (poll == null) {
                return;
            }
            if (this.siddhiContext.getEventBatchSize() > 0 && 0 > this.siddhiContext.getEventBatchSize()) {
                this.threadPoolExecutor.execute(this);
                return;
            }
            send(poll);
        }
    }

    private void send(StreamEvent streamEvent) {
        receive(streamEvent.toArray());
    }

    public abstract void receive(Event[] eventArr);

    public void setStreamId(String str) {
        this.streamId = str;
    }

    @Override // org.wso2.siddhi.core.stream.StreamReceiver
    public String getStreamId() {
        return this.streamId;
    }

    @Override // org.wso2.siddhi.core.query.SchedulerElement
    public void schedule() {
        this.threadPoolExecutor.execute(this);
    }

    @Override // org.wso2.siddhi.core.query.SchedulerElement
    public void scheduleNow() {
        this.threadPoolExecutor.execute(this);
    }
}
