package org.wso2.carbon.event.stream.manager.core.internal.stream;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.event.stream.manager.core.EventProducer;
import org.wso2.carbon.event.stream.manager.core.EventProducerCallback;
import org.wso2.carbon.event.stream.manager.core.RawEventConsumer;
import org.wso2.carbon.event.stream.manager.core.SiddhiEventConsumer;
import org.wso2.carbon.event.stream.manager.core.WSO2EventConsumer;
import org.wso2.carbon.event.stream.manager.core.WSO2EventListConsumer;
import org.wso2.carbon.event.stream.manager.core.internal.util.EventConverter;

/* loaded from: input_file:org/wso2/carbon/event/stream/manager/core/internal/stream/EventJunction.class */
public class EventJunction implements EventProducerCallback {
    private static final Log log = LogFactory.getLog(EventJunction.class);
    private StreamDefinition streamDefinition;
    private int attributesCount;
    private boolean metaFlag = false;
    private boolean correlationFlag = false;
    private boolean payloadFlag = false;
    private CopyOnWriteArrayList<EventProducer> producers = new CopyOnWriteArrayList<>();
    private CopyOnWriteArrayList<RawEventConsumer> rawEventConsumers = new CopyOnWriteArrayList<>();
    private CopyOnWriteArrayList<SiddhiEventConsumer> siddhiEventConsumers = new CopyOnWriteArrayList<>();
    private CopyOnWriteArrayList<WSO2EventConsumer> wso2EventConsumers = new CopyOnWriteArrayList<>();
    private CopyOnWriteArrayList<WSO2EventListConsumer> wso2EventListConsumers = new CopyOnWriteArrayList<>();

    public EventJunction(StreamDefinition streamDefinition) {
        this.streamDefinition = streamDefinition;
        populateEventTemplate(streamDefinition);
    }

    public void addConsumer(SiddhiEventConsumer siddhiEventConsumer) {
        if (this.siddhiEventConsumers.contains(siddhiEventConsumer)) {
            log.error("Consumer already exist in the junction: " + this.streamDefinition.getStreamId());
        } else {
            log.info("Consumer added to the junction. Stream:" + getStreamDefinition().getStreamId());
            this.siddhiEventConsumers.add(siddhiEventConsumer);
        }
    }

    public boolean removeConsumer(SiddhiEventConsumer siddhiEventConsumer) {
        return this.siddhiEventConsumers.remove(siddhiEventConsumer);
    }

    public void addConsumer(RawEventConsumer rawEventConsumer) {
        if (this.rawEventConsumers.contains(rawEventConsumer)) {
            log.error("Consumer already exist in the junction: " + this.streamDefinition.getStreamId());
        } else {
            log.info("Consumer added to the junction. Stream:" + getStreamDefinition().getStreamId());
            this.rawEventConsumers.add(rawEventConsumer);
        }
    }

    public boolean removeConsumer(RawEventConsumer rawEventConsumer) {
        return this.rawEventConsumers.remove(rawEventConsumer);
    }

    public void addConsumer(WSO2EventConsumer wSO2EventConsumer) {
        if (this.wso2EventConsumers.contains(wSO2EventConsumer)) {
            log.error("WSO2EventConsumer already exist in the junction: " + this.streamDefinition.getStreamId());
            return;
        }
        log.info("WSO2EventConsumer added to the junction. Stream:" + getStreamDefinition().getStreamId());
        wSO2EventConsumer.onAddDefinition(this.streamDefinition);
        this.wso2EventConsumers.add(wSO2EventConsumer);
    }

    public void addConsumer(WSO2EventListConsumer wSO2EventListConsumer) {
        if (this.wso2EventListConsumers.contains(wSO2EventListConsumer)) {
            log.error("WSO2EventConsumer already exist in the junction: " + this.streamDefinition.getStreamId());
            return;
        }
        log.info("WSO2EventConsumer added to the junction. Stream:" + getStreamDefinition().getStreamId());
        wSO2EventListConsumer.onAddDefinition(this.streamDefinition);
        this.wso2EventListConsumers.add(wSO2EventListConsumer);
    }

    public boolean removeConsumer(WSO2EventConsumer wSO2EventConsumer) {
        boolean remove = this.wso2EventConsumers.remove(wSO2EventConsumer);
        wSO2EventConsumer.onRemoveDefinition(this.streamDefinition);
        return remove;
    }

    public boolean removeConsumer(WSO2EventListConsumer wSO2EventListConsumer) {
        boolean remove = this.wso2EventListConsumers.remove(wSO2EventListConsumer);
        wSO2EventListConsumer.onRemoveDefinition(this.streamDefinition);
        return remove;
    }

    public void addProducer(EventProducer eventProducer) {
        if (this.producers.contains(eventProducer)) {
            log.error("Producer already exist in the junction: " + this.streamDefinition.getStreamId());
            return;
        }
        log.info("Producer added to the junction. Stream:" + getStreamDefinition().getStreamId());
        eventProducer.setCallBack(this);
        this.producers.add(eventProducer);
    }

    public boolean removeProducer(EventProducer eventProducer) {
        boolean remove = this.producers.remove(eventProducer);
        if (remove) {
            eventProducer.setCallBack(null);
        }
        return remove;
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventProducerCallback
    public void sendEventData(Object[] objArr) {
        if (!this.siddhiEventConsumers.isEmpty()) {
            Iterator<SiddhiEventConsumer> it = this.siddhiEventConsumers.iterator();
            while (it.hasNext()) {
                try {
                    it.next().consumeEventData(objArr);
                } catch (Exception e) {
                    log.error("Error while dispatching events: " + e.getMessage(), e);
                }
            }
        }
        if (!this.rawEventConsumers.isEmpty()) {
            Iterator<RawEventConsumer> it2 = this.rawEventConsumers.iterator();
            while (it2.hasNext()) {
                try {
                    it2.next().consumeEventData(objArr);
                } catch (Exception e2) {
                    log.error("Error while dispatching events: " + e2.getMessage(), e2);
                }
            }
        }
        if (this.wso2EventConsumers.isEmpty() && this.wso2EventListConsumers.isEmpty()) {
            return;
        }
        Event convertToWso2Event = EventConverter.convertToWso2Event(objArr, this.streamDefinition);
        if (!this.wso2EventConsumers.isEmpty()) {
            Iterator<WSO2EventConsumer> it3 = this.wso2EventConsumers.iterator();
            while (it3.hasNext()) {
                try {
                    it3.next().onEvent(convertToWso2Event);
                } catch (Exception e3) {
                    log.error("Error while dispatching events: " + e3.getMessage(), e3);
                }
            }
        }
        if (this.wso2EventListConsumers.isEmpty()) {
            return;
        }
        Iterator<WSO2EventListConsumer> it4 = this.wso2EventListConsumers.iterator();
        while (it4.hasNext()) {
            try {
                it4.next().onEvent(convertToWso2Event);
            } catch (Exception e4) {
                log.error("Error while dispatching events: " + e4.getMessage(), e4);
            }
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventProducerCallback
    public void sendEvent(Event event) {
        if (!this.siddhiEventConsumers.isEmpty() || !this.rawEventConsumers.isEmpty()) {
            Object[] convertToEventData = EventConverter.convertToEventData(event, this.metaFlag, this.correlationFlag, this.payloadFlag, this.attributesCount);
            if (!this.siddhiEventConsumers.isEmpty()) {
                Iterator<SiddhiEventConsumer> it = this.siddhiEventConsumers.iterator();
                while (it.hasNext()) {
                    try {
                        it.next().consumeEventData(convertToEventData);
                    } catch (Exception e) {
                        log.error("Error while dispatching events: " + e.getMessage(), e);
                    }
                }
            }
            if (!this.rawEventConsumers.isEmpty()) {
                Iterator<RawEventConsumer> it2 = this.rawEventConsumers.iterator();
                while (it2.hasNext()) {
                    try {
                        it2.next().consumeEventData(convertToEventData);
                    } catch (Exception e2) {
                        log.error("Error while dispatching events: " + e2.getMessage(), e2);
                    }
                }
            }
        }
        if (!this.wso2EventConsumers.isEmpty()) {
            Iterator<WSO2EventConsumer> it3 = this.wso2EventConsumers.iterator();
            while (it3.hasNext()) {
                try {
                    it3.next().onEvent(event);
                } catch (Exception e3) {
                    log.error("Error while dispatching events: " + e3.getMessage(), e3);
                }
            }
        }
        if (this.wso2EventListConsumers.isEmpty()) {
            return;
        }
        Iterator<WSO2EventListConsumer> it4 = this.wso2EventListConsumers.iterator();
        while (it4.hasNext()) {
            try {
                it4.next().onEvent(event);
            } catch (Exception e4) {
                log.error("Error while dispatching events: " + e4.getMessage(), e4);
            }
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventProducerCallback
    public void sendEvents(List<Event> list) {
        for (Event event : list) {
            if (!this.siddhiEventConsumers.isEmpty() || !this.rawEventConsumers.isEmpty()) {
                Object[] convertToEventData = EventConverter.convertToEventData(event, this.metaFlag, this.correlationFlag, this.payloadFlag, this.attributesCount);
                if (!this.siddhiEventConsumers.isEmpty()) {
                    Iterator<SiddhiEventConsumer> it = this.siddhiEventConsumers.iterator();
                    while (it.hasNext()) {
                        try {
                            it.next().consumeEventData(convertToEventData);
                        } catch (Exception e) {
                            log.error("Error while dispatching events: " + e.getMessage(), e);
                        }
                    }
                }
                if (!this.rawEventConsumers.isEmpty()) {
                    Iterator<RawEventConsumer> it2 = this.rawEventConsumers.iterator();
                    while (it2.hasNext()) {
                        try {
                            it2.next().consumeEventData(convertToEventData);
                        } catch (Exception e2) {
                            log.error("Error while dispatching events: " + e2.getMessage(), e2);
                        }
                    }
                }
            }
            if (!this.wso2EventConsumers.isEmpty()) {
                Iterator<WSO2EventConsumer> it3 = this.wso2EventConsumers.iterator();
                while (it3.hasNext()) {
                    try {
                        it3.next().onEvent(event);
                    } catch (Exception e3) {
                        log.error("Error while dispatching events: " + e3.getMessage(), e3);
                    }
                }
            }
        }
        if (this.wso2EventListConsumers.isEmpty()) {
            return;
        }
        Iterator<WSO2EventListConsumer> it4 = this.wso2EventListConsumers.iterator();
        while (it4.hasNext()) {
            try {
                it4.next().onEventList(list);
            } catch (Exception e4) {
                log.error("Error while dispatching events: " + e4.getMessage(), e4);
            }
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventProducerCallback
    public void sendEvents(org.wso2.siddhi.core.event.Event[] eventArr) {
        if (!this.siddhiEventConsumers.isEmpty()) {
            Iterator<SiddhiEventConsumer> it = this.siddhiEventConsumers.iterator();
            while (it.hasNext()) {
                try {
                    it.next().consumeEvents(eventArr);
                } catch (Exception e) {
                    log.error("Error while dispatching events: " + e.getMessage(), e);
                }
            }
        }
        ArrayList arrayList = new ArrayList();
        for (org.wso2.siddhi.core.event.Event event : eventArr) {
            if (!this.rawEventConsumers.isEmpty()) {
                Iterator<RawEventConsumer> it2 = this.rawEventConsumers.iterator();
                while (it2.hasNext()) {
                    try {
                        it2.next().consumeEventData(event.getData());
                    } catch (Exception e2) {
                        log.error("Error while dispatching events: " + e2.getMessage(), e2);
                    }
                }
            }
            if (!this.wso2EventConsumers.isEmpty() || !this.wso2EventListConsumers.isEmpty()) {
                Event convertToWso2Event = EventConverter.convertToWso2Event(event.getData(), this.streamDefinition);
                Iterator<WSO2EventConsumer> it3 = this.wso2EventConsumers.iterator();
                while (it3.hasNext()) {
                    try {
                        it3.next().onEvent(convertToWso2Event);
                    } catch (Exception e3) {
                        log.error("Error while dispatching events: " + e3.getMessage(), e3);
                    }
                }
                if (!this.wso2EventListConsumers.isEmpty()) {
                    arrayList.add(convertToWso2Event);
                }
            }
        }
        if (this.wso2EventListConsumers.isEmpty()) {
            return;
        }
        Iterator<WSO2EventListConsumer> it4 = this.wso2EventListConsumers.iterator();
        while (it4.hasNext()) {
            try {
                it4.next().onEventList(arrayList);
            } catch (Exception e4) {
                log.error("Error while dispatching events: " + e4.getMessage(), e4);
            }
        }
    }

    private void populateEventTemplate(StreamDefinition streamDefinition) {
        int i = 0;
        if (streamDefinition.getMetaData() != null) {
            i = 0 + streamDefinition.getMetaData().size();
            this.metaFlag = true;
        }
        if (streamDefinition.getCorrelationData() != null) {
            i += streamDefinition.getCorrelationData().size();
            this.correlationFlag = true;
        }
        if (streamDefinition.getPayloadData() != null) {
            i += streamDefinition.getPayloadData().size();
            this.payloadFlag = true;
        }
        this.attributesCount = i;
    }
}
