package org.wso2.carbon.event.stream.core.internal;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.utils.DataBridgeCommonsUtils;
import org.wso2.carbon.event.stream.core.EventProducer;
import org.wso2.carbon.event.stream.core.EventStreamListener;
import org.wso2.carbon.event.stream.core.RawEventConsumer;
import org.wso2.carbon.event.stream.core.SiddhiEventConsumer;
import org.wso2.carbon.event.stream.core.WSO2EventConsumer;
import org.wso2.carbon.event.stream.core.WSO2EventListConsumer;
import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationException;
import org.wso2.carbon.event.stream.core.internal.ds.EventStreamServiceValueHolder;

/* loaded from: input_file:org/wso2/carbon/event/stream/core/internal/EventStreamRuntime.class */
public class EventStreamRuntime {
    private static final Log log = LogFactory.getLog(EventStreamRuntime.class);
    private Map<Integer, Map<String, EventJunction>> tenantSpecificEventJunctions = new HashMap();

    public void loadEventStream(String str) throws EventStreamConfigurationException {
        StreamDefinition streamDefinition = EventStreamServiceValueHolder.getCarbonEventStreamService().getStreamDefinition(DataBridgeCommonsUtils.getStreamNameFromStreamId(str), DataBridgeCommonsUtils.getStreamVersionFromStreamId(str));
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        if (streamDefinition != null) {
            Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(tenantId));
            if (map == null) {
                map = new ConcurrentHashMap();
                this.tenantSpecificEventJunctions.put(Integer.valueOf(tenantId), map);
            }
            map.put(streamDefinition.getStreamId(), new EventJunction(streamDefinition));
            Iterator<EventStreamListener> it = EventStreamServiceValueHolder.getEventStreamListenerList().iterator();
            while (it.hasNext()) {
                it.next().addedEventStream(tenantId, streamDefinition.getName(), streamDefinition.getVersion());
            }
        }
    }

    public void unloadEventStream(String str) throws EventStreamConfigurationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        if (EventStreamServiceValueHolder.getCarbonEventStreamService().getStreamDefinition(DataBridgeCommonsUtils.getStreamNameFromStreamId(str), DataBridgeCommonsUtils.getStreamVersionFromStreamId(str)) == null) {
            Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(tenantId));
            if (map != null) {
                map.remove(str);
            }
            Iterator<EventStreamListener> it = EventStreamServiceValueHolder.getEventStreamListenerList().iterator();
            while (it.hasNext()) {
                it.next().removedEventStream(tenantId, DataBridgeCommonsUtils.getStreamNameFromStreamId(str), DataBridgeCommonsUtils.getStreamVersionFromStreamId(str));
            }
        }
    }

    private EventJunction getOrConstructEventJunction(String str) throws EventStreamConfigurationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(tenantId));
        if (map == null) {
            map = new ConcurrentHashMap();
            this.tenantSpecificEventJunctions.put(Integer.valueOf(tenantId), map);
        }
        EventJunction eventJunction = map.get(str);
        if (eventJunction == null) {
            try {
                StreamDefinition streamDefinition = EventStreamServiceValueHolder.getCarbonEventStreamService().getStreamDefinition(str);
                if (streamDefinition == null) {
                    throw new EventStreamConfigurationException("Stream " + str + " is not configured to tenant " + tenantId);
                }
                eventJunction = new EventJunction(streamDefinition);
                map.put(streamDefinition.getStreamId(), eventJunction);
            } catch (Exception e) {
                throw new EventStreamConfigurationException("Cannot retrieve Stream " + str + " for tenant " + tenantId);
            }
        }
        return eventJunction;
    }

    public void publish(String str, Event event) {
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (map == null || !map.containsKey(str)) {
            log.debug("Event " + event.toString() + " dropped since no junction found for the streamId " + str);
        } else {
            map.get(str).sendEvent(event);
        }
    }

    public void subscribe(SiddhiEventConsumer siddhiEventConsumer) throws EventStreamConfigurationException {
        getOrConstructEventJunction(siddhiEventConsumer.getStreamId()).addConsumer(siddhiEventConsumer);
    }

    public void subscribe(RawEventConsumer rawEventConsumer) throws EventStreamConfigurationException {
        getOrConstructEventJunction(rawEventConsumer.getStreamId()).addConsumer(rawEventConsumer);
    }

    public void subscribe(EventProducer eventProducer) throws EventStreamConfigurationException {
        getOrConstructEventJunction(eventProducer.getStreamId()).addProducer(eventProducer);
    }

    public void subscribe(WSO2EventConsumer wSO2EventConsumer) throws EventStreamConfigurationException {
        getOrConstructEventJunction(wSO2EventConsumer.getStreamId()).addConsumer(wSO2EventConsumer);
    }

    public void subscribe(WSO2EventListConsumer wSO2EventListConsumer) throws EventStreamConfigurationException {
        getOrConstructEventJunction(wSO2EventListConsumer.getStreamId()).addConsumer(wSO2EventListConsumer);
    }

    public void unsubscribe(SiddhiEventConsumer siddhiEventConsumer) {
        EventJunction eventJunction;
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (map == null || (eventJunction = map.get(siddhiEventConsumer.getStreamId())) == null) {
            return;
        }
        eventJunction.removeConsumer(siddhiEventConsumer);
    }

    public void unsubscribe(RawEventConsumer rawEventConsumer) {
        EventJunction eventJunction;
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (map == null || (eventJunction = map.get(rawEventConsumer.getStreamId())) == null) {
            return;
        }
        eventJunction.removeConsumer(rawEventConsumer);
    }

    public void unsubscribe(EventProducer eventProducer) {
        EventJunction eventJunction;
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (map == null || (eventJunction = map.get(eventProducer.getStreamId())) == null) {
            return;
        }
        eventJunction.removeProducer(eventProducer);
    }

    public void unsubscribe(WSO2EventConsumer wSO2EventConsumer) {
        EventJunction eventJunction;
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (map == null || (eventJunction = map.get(wSO2EventConsumer.getStreamId())) == null) {
            return;
        }
        eventJunction.removeConsumer(wSO2EventConsumer);
    }

    public void unsubscribe(WSO2EventListConsumer wSO2EventListConsumer) {
        EventJunction eventJunction;
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (map == null || (eventJunction = map.get(wSO2EventListConsumer.getStreamId())) == null) {
            return;
        }
        eventJunction.removeConsumer(wSO2EventListConsumer);
    }
}
