package io.siddhi.core.util.event.handler;

import com.lmax.disruptor.EventHandler;
import io.siddhi.core.event.Event;
import io.siddhi.core.stream.StreamJunction;
import java.beans.ExceptionListener;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* JADX WARN: Classes with same name are omitted:
  input_file:dependencies/siddhi-core-5.1.26.jar:io/siddhi/core/util/event/handler/StreamHandler.class
 */
/* loaded from: input_file:io/siddhi/core/util/event/handler/StreamHandler.class */
public class StreamHandler implements EventHandler<EventExchangeHolder> {
    private static final Logger log = LogManager.getLogger((Class<?>) StreamHandler.class);
    private final String streamName;
    private final String siddhiAppName;
    private final StreamJunction faultStreamJunction;
    private final StreamJunction.OnErrorAction onErrorAction;
    private final ExceptionListener exceptionListener;
    private List<StreamJunction.Receiver> receivers;
    private int batchSize;
    private List<Event> eventBuffer = new LinkedList();

    public StreamHandler(List<StreamJunction.Receiver> list, int i, String str, String str2, StreamJunction streamJunction, StreamJunction.OnErrorAction onErrorAction, ExceptionListener exceptionListener) {
        this.receivers = list;
        this.batchSize = i;
        this.streamName = str;
        this.siddhiAppName = str2;
        this.faultStreamJunction = streamJunction;
        this.onErrorAction = onErrorAction;
        this.exceptionListener = exceptionListener;
    }

    @Override // com.lmax.disruptor.EventHandler
    public void onEvent(EventExchangeHolder eventExchangeHolder, long j, boolean z) {
        if (eventExchangeHolder.getAndSetIsProcessed(true)) {
            if (!z || this.eventBuffer.size() == 0) {
                return;
            }
            Iterator<StreamJunction.Receiver> it = this.receivers.iterator();
            while (it.hasNext()) {
                try {
                    it.next().receive(this.eventBuffer);
                } catch (Exception e) {
                    onError(this.eventBuffer, e);
                }
            }
            this.eventBuffer.clear();
            return;
        }
        this.eventBuffer.add(eventExchangeHolder.getEvent());
        if (this.eventBuffer.size() == this.batchSize || z) {
            Iterator<StreamJunction.Receiver> it2 = this.receivers.iterator();
            while (it2.hasNext()) {
                try {
                    it2.next().receive(this.eventBuffer);
                } catch (Exception e2) {
                    onError(this.eventBuffer, e2);
                }
            }
            this.eventBuffer.clear();
        }
    }

    private void onError(List<Event> list, Exception exc) {
        if (this.exceptionListener != null) {
            this.exceptionListener.exceptionThrown(exc);
        }
        switch (this.onErrorAction) {
            case LOG:
                Iterator<Event> it = list.iterator();
                while (it.hasNext()) {
                    log.error("Error in SiddhiApp '" + this.siddhiAppName + "' after consuming events from Stream '" + this.streamName + "', " + exc.getMessage() + ". Hence, dropping event '" + it.next().toString() + "'", (Throwable) exc);
                }
                return;
            case STREAM:
                for (Event event : list) {
                    if (this.faultStreamJunction != null) {
                        this.faultStreamJunction.sendEvent(event);
                    } else {
                        log.error("Error in SiddhiApp '" + this.siddhiAppName + "' after consuming events from Stream '" + this.streamName + "', " + exc.getMessage() + ". Siddhi Fault Stream for '" + this.streamName + "' is not defined. Hence dropping the event '" + event.toString() + "'", (Throwable) exc);
                    }
                }
                return;
            default:
                return;
        }
    }
}
