package org.wso2.carbon.event.stream.manager.core.internal;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.exception.DifferentStreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.databridge.commons.utils.DataBridgeCommonsUtils;
import org.wso2.carbon.databridge.core.exception.StreamDefinitionStoreException;
import org.wso2.carbon.event.stream.manager.core.EventProducer;
import org.wso2.carbon.event.stream.manager.core.EventStreamListener;
import org.wso2.carbon.event.stream.manager.core.EventStreamService;
import org.wso2.carbon.event.stream.manager.core.RawEventConsumer;
import org.wso2.carbon.event.stream.manager.core.SiddhiEventConsumer;
import org.wso2.carbon.event.stream.manager.core.WSO2EventConsumer;
import org.wso2.carbon.event.stream.manager.core.WSO2EventListConsumer;
import org.wso2.carbon.event.stream.manager.core.exception.EventStreamConfigurationException;
import org.wso2.carbon.event.stream.manager.core.internal.ds.EventStreamServiceValueHolder;
import org.wso2.carbon.event.stream.manager.core.internal.stream.EventJunction;
import org.wso2.carbon.event.stream.manager.core.internal.util.EventStreamConstants;
import org.wso2.carbon.event.stream.manager.core.internal.util.SampleEventGenerator;

/* loaded from: input_file:org/wso2/carbon/event/stream/manager/core/internal/CarbonEventStreamService.class */
public class CarbonEventStreamService implements EventStreamService {
    private static final Log log = LogFactory.getLog(CarbonEventStreamService.class);
    private List<EventStreamListener> eventStreamListenerList = new ArrayList();
    private Map<Integer, Map<String, EventJunction>> tenantSpecificEventJunctions = new HashMap();

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public StreamDefinition getStreamDefinition(String str, String str2, int i) throws EventStreamConfigurationException {
        return getStreamDefinition(DataBridgeCommonsUtils.generateStreamId(str, str2), i);
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public StreamDefinition getStreamDefinition(String str, int i) throws EventStreamConfigurationException {
        try {
            return EventStreamServiceValueHolder.getStreamDefinitionStore().getStreamDefinition(str, i);
        } catch (StreamDefinitionStoreException e) {
            log.error("Error in getting Stream Definition " + str, e);
            throw new EventStreamConfigurationException("Error in getting Stream Definition " + str, e);
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public Collection<StreamDefinition> getAllStreamDefinitions(int i) throws EventStreamConfigurationException {
        return EventStreamServiceValueHolder.getStreamDefinitionStore().getAllStreamDefinitions(i);
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void addEventStreamDefinition(StreamDefinition streamDefinition, int i) throws EventStreamConfigurationException {
        saveStreamDefinitionToStore(streamDefinition, i);
        log.info("Stream definition - " + streamDefinition.getStreamId() + " added to registry successfully");
    }

    public void loadEventStream(String str, int i) throws EventStreamConfigurationException {
        StreamDefinition streamDefinition = getStreamDefinition(DataBridgeCommonsUtils.getStreamNameFromStreamId(str), DataBridgeCommonsUtils.getStreamVersionFromStreamId(str), i);
        if (streamDefinition != null) {
            Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(i));
            if (map == null) {
                map = new ConcurrentHashMap();
                this.tenantSpecificEventJunctions.put(Integer.valueOf(i), map);
            }
            map.put(streamDefinition.getStreamId(), new EventJunction(streamDefinition));
            Iterator<EventStreamListener> it = this.eventStreamListenerList.iterator();
            while (it.hasNext()) {
                it.next().addedEventStream(i, streamDefinition.getName(), streamDefinition.getVersion());
            }
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void removeEventStreamDefinition(String str, String str2, int i) throws EventStreamConfigurationException {
        if (removeStreamDefinitionFromStore(str, str2, i)) {
            log.info("Stream definition - " + str + EventStreamConstants.EVENT_ATTRIBUTE_VALUE_SEPARATOR + str2 + " removed from registry successfully");
        }
    }

    public void unloadEventStream(String str, int i) throws EventStreamConfigurationException {
        if (getStreamDefinition(DataBridgeCommonsUtils.getStreamNameFromStreamId(str), DataBridgeCommonsUtils.getStreamVersionFromStreamId(str), i) == null) {
            Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(i));
            if (map != null) {
                map.remove(str);
            }
            Iterator<EventStreamListener> it = this.eventStreamListenerList.iterator();
            while (it.hasNext()) {
                it.next().removedEventStream(i, DataBridgeCommonsUtils.getStreamNameFromStreamId(str), DataBridgeCommonsUtils.getStreamVersionFromStreamId(str));
            }
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void registerEventStreamListener(EventStreamListener eventStreamListener) {
        if (eventStreamListener != null) {
            this.eventStreamListenerList.add(eventStreamListener);
        }
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public List<String> getStreamIds(int i) throws EventStreamConfigurationException {
        Collection<StreamDefinition> allStreamDefinitions = getAllStreamDefinitions(i);
        ArrayList arrayList = new ArrayList(allStreamDefinitions.size());
        Iterator<StreamDefinition> it = allStreamDefinitions.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getStreamId());
        }
        return arrayList;
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public String generateSampleEvent(String str, String str2, int i) throws EventStreamConfigurationException {
        StreamDefinition streamDefinition = getStreamDefinition(str, i);
        if (str2.equals(EventStreamConstants.XML_EVENT)) {
            return SampleEventGenerator.generateXMLEvent(streamDefinition);
        }
        if (str2.equals(EventStreamConstants.JSON_EVENT)) {
            return SampleEventGenerator.generateJSONEvent(streamDefinition);
        }
        if (str2.equals(EventStreamConstants.TEXT_EVENT)) {
            return SampleEventGenerator.generateTextEvent(streamDefinition);
        }
        return null;
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void subscribe(SiddhiEventConsumer siddhiEventConsumer, int i) throws EventStreamConfigurationException {
        getOrConstructEventJunction(i, siddhiEventConsumer.getStreamId()).addConsumer(siddhiEventConsumer);
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void subscribe(RawEventConsumer rawEventConsumer, int i) throws EventStreamConfigurationException {
        getOrConstructEventJunction(i, rawEventConsumer.getStreamId()).addConsumer(rawEventConsumer);
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void subscribe(EventProducer eventProducer, int i) throws EventStreamConfigurationException {
        getOrConstructEventJunction(i, eventProducer.getStreamId()).addProducer(eventProducer);
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void subscribe(WSO2EventConsumer wSO2EventConsumer, int i) throws EventStreamConfigurationException {
        getOrConstructEventJunction(i, wSO2EventConsumer.getStreamId()).addConsumer(wSO2EventConsumer);
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void subscribe(WSO2EventListConsumer wSO2EventListConsumer, int i) throws EventStreamConfigurationException {
        getOrConstructEventJunction(i, wSO2EventListConsumer.getStreamId()).addConsumer(wSO2EventListConsumer);
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void unsubscribe(SiddhiEventConsumer siddhiEventConsumer, int i) {
        EventJunction eventJunction;
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(i));
        if (map == null || (eventJunction = map.get(siddhiEventConsumer.getStreamId())) == null) {
            return;
        }
        eventJunction.removeConsumer(siddhiEventConsumer);
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void unsubscribe(RawEventConsumer rawEventConsumer, int i) {
        EventJunction eventJunction;
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(i));
        if (map == null || (eventJunction = map.get(rawEventConsumer.getStreamId())) == null) {
            return;
        }
        eventJunction.removeConsumer(rawEventConsumer);
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void unsubscribe(EventProducer eventProducer, int i) {
        EventJunction eventJunction;
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(i));
        if (map == null || (eventJunction = map.get(eventProducer.getStreamId())) == null) {
            return;
        }
        eventJunction.removeProducer(eventProducer);
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void unsubscribe(WSO2EventConsumer wSO2EventConsumer, int i) {
        EventJunction eventJunction;
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(i));
        if (map == null || (eventJunction = map.get(wSO2EventConsumer.getStreamId())) == null) {
            return;
        }
        eventJunction.removeConsumer(wSO2EventConsumer);
    }

    @Override // org.wso2.carbon.event.stream.manager.core.EventStreamService
    public void unsubscribe(WSO2EventListConsumer wSO2EventListConsumer, int i) {
        EventJunction eventJunction;
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(i));
        if (map == null || (eventJunction = map.get(wSO2EventListConsumer.getStreamId())) == null) {
            return;
        }
        eventJunction.removeConsumer(wSO2EventListConsumer);
    }

    private EventJunction getOrConstructEventJunction(int i, String str) throws EventStreamConfigurationException {
        Map<String, EventJunction> map = this.tenantSpecificEventJunctions.get(Integer.valueOf(i));
        if (map == null) {
            map = new ConcurrentHashMap();
            this.tenantSpecificEventJunctions.put(Integer.valueOf(i), map);
        }
        EventJunction eventJunction = map.get(str);
        if (eventJunction == null) {
            try {
                StreamDefinition streamDefinition = EventStreamServiceValueHolder.getStreamDefinitionStore().getStreamDefinition(str, i);
                if (streamDefinition == null) {
                    throw new EventStreamConfigurationException("Stream " + str + " is not configured to tenant " + i);
                }
                eventJunction = new EventJunction(streamDefinition);
                map.put(streamDefinition.getStreamId(), eventJunction);
            } catch (StreamDefinitionStoreException e) {
                throw new EventStreamConfigurationException("Cannot retrieve Stream " + str + " for tenant " + i);
            }
        }
        return eventJunction;
    }

    private void saveStreamDefinitionToStore(StreamDefinition streamDefinition, int i) throws EventStreamConfigurationException {
        try {
            EventStreamServiceValueHolder.getStreamDefinitionStore().saveStreamDefinition(streamDefinition, i);
        } catch (DifferentStreamDefinitionAlreadyDefinedException e) {
            log.error(e.getMessage());
            throw new EventStreamConfigurationException(e.getMessage(), e);
        } catch (StreamDefinitionStoreException e2) {
            log.error("Error in saving Stream Definition " + streamDefinition);
            throw new EventStreamConfigurationException("Error in saving Stream Definition " + streamDefinition, e2);
        }
    }

    private boolean removeStreamDefinitionFromStore(String str, String str2, int i) throws EventStreamConfigurationException {
        return EventStreamServiceValueHolder.getStreamDefinitionStore().deleteStreamDefinition(str, str2, i);
    }
}
