package org.wso2.carbon.event.stream.core.internal;

import java.io.File;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.core.multitenancy.utils.TenantAxisUtils;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.databridge.commons.utils.DataBridgeCommonsUtils;
import org.wso2.carbon.event.stream.core.EventProducer;
import org.wso2.carbon.event.stream.core.EventStreamConfiguration;
import org.wso2.carbon.event.stream.core.EventStreamListener;
import org.wso2.carbon.event.stream.core.EventStreamService;
import org.wso2.carbon.event.stream.core.SiddhiEventConsumer;
import org.wso2.carbon.event.stream.core.WSO2EventConsumer;
import org.wso2.carbon.event.stream.core.WSO2EventListConsumer;
import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationException;
import org.wso2.carbon.event.stream.core.exception.StreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.event.stream.core.internal.ds.EventStreamServiceValueHolder;
import org.wso2.carbon.event.stream.core.internal.util.EventStreamConstants;
import org.wso2.carbon.event.stream.core.internal.util.SampleEventGenerator;
import org.wso2.carbon.event.stream.core.internal.util.helper.EventStreamConfigurationFileSystemInvoker;

/* loaded from: input_file:org/wso2/carbon/event/stream/core/internal/CarbonEventStreamService.class */
public class CarbonEventStreamService implements EventStreamService {
    private static final Log log = LogFactory.getLog(CarbonEventStreamService.class);
    private final List<StreamDefinition> pendingStreams = new ArrayList();
    private Map<Integer, ConcurrentHashMap<String, EventStreamConfiguration>> tenantSpecificEventStreamConfigs = new ConcurrentHashMap();

    public void removeEventStreamConfigurationFromMap(String str) {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        ConcurrentHashMap<String, EventStreamConfiguration> concurrentHashMap = this.tenantSpecificEventStreamConfigs.get(Integer.valueOf(tenantId));
        String str2 = null;
        if (concurrentHashMap != null) {
            Iterator<EventStreamConfiguration> it = concurrentHashMap.values().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                EventStreamConfiguration next = it.next();
                if (next.getFileName().equals(str)) {
                    str2 = next.getStreamDefinition().getStreamId();
                    break;
                }
            }
        }
        if (str2 != null) {
            concurrentHashMap.remove(str2);
            Iterator<EventStreamListener> it2 = EventStreamServiceValueHolder.getEventStreamListenerList().iterator();
            while (it2.hasNext()) {
                it2.next().removedEventStream(tenantId, DataBridgeCommonsUtils.getStreamNameFromStreamId(str2), DataBridgeCommonsUtils.getStreamVersionFromStreamId(str2));
            }
        }
    }

    public void addPendingStreams() {
        synchronized (this.pendingStreams) {
            for (StreamDefinition streamDefinition : this.pendingStreams) {
                try {
                    addEventStreamDefinition(streamDefinition);
                } catch (EventStreamConfigurationException e) {
                    log.error("Error occurred when adding stream " + streamDefinition.getName(), e);
                }
            }
            this.pendingStreams.clear();
        }
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public StreamDefinition getStreamDefinition(String str, String str2) throws EventStreamConfigurationException {
        ConcurrentHashMap<String, EventStreamConfiguration> concurrentHashMap = this.tenantSpecificEventStreamConfigs.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (concurrentHashMap == null || !concurrentHashMap.containsKey(str + ":" + str2)) {
            return null;
        }
        return concurrentHashMap.get(str + ":" + str2).getStreamDefinition();
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public StreamDefinition getStreamDefinition(String str) throws EventStreamConfigurationException {
        ConcurrentHashMap<String, EventStreamConfiguration> concurrentHashMap = this.tenantSpecificEventStreamConfigs.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (concurrentHashMap == null || !concurrentHashMap.containsKey(str)) {
            return null;
        }
        return concurrentHashMap.get(str).getStreamDefinition();
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public EventStreamConfiguration getEventStreamConfiguration(String str) {
        ConcurrentHashMap<String, EventStreamConfiguration> concurrentHashMap = this.tenantSpecificEventStreamConfigs.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (concurrentHashMap == null || !concurrentHashMap.containsKey(str)) {
            return null;
        }
        return concurrentHashMap.get(str);
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public List<StreamDefinition> getAllStreamDefinitions() throws EventStreamConfigurationException {
        ConcurrentHashMap<String, EventStreamConfiguration> concurrentHashMap = this.tenantSpecificEventStreamConfigs.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        ArrayList arrayList = new ArrayList();
        if (concurrentHashMap == null) {
            return arrayList;
        }
        Iterator<EventStreamConfiguration> it = concurrentHashMap.values().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getStreamDefinition());
        }
        return arrayList;
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public List<EventStreamConfiguration> getAllEventStreamConfigurations() throws EventStreamConfigurationException {
        ConcurrentHashMap<String, EventStreamConfiguration> concurrentHashMap = this.tenantSpecificEventStreamConfigs.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        return concurrentHashMap == null ? new ArrayList() : new ArrayList(concurrentHashMap.values());
    }

    public void addEventStreamConfig(EventStreamConfiguration eventStreamConfiguration) throws EventStreamConfigurationException {
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        ConcurrentHashMap<String, EventStreamConfiguration> concurrentHashMap = this.tenantSpecificEventStreamConfigs.get(Integer.valueOf(tenantId));
        if (concurrentHashMap == null) {
            concurrentHashMap = new ConcurrentHashMap<>();
            this.tenantSpecificEventStreamConfigs.put(Integer.valueOf(tenantId), concurrentHashMap);
        }
        concurrentHashMap.put(eventStreamConfiguration.getStreamDefinition().getStreamId(), eventStreamConfiguration);
        Iterator<EventStreamListener> it = EventStreamServiceValueHolder.getEventStreamListenerList().iterator();
        while (it.hasNext()) {
            it.next().addedEventStream(tenantId, eventStreamConfiguration.getStreamDefinition().getName(), eventStreamConfiguration.getStreamDefinition().getVersion());
        }
    }

    public void validateEventStreamDefinition(StreamDefinition streamDefinition) throws EventStreamConfigurationException {
        if (streamDefinition.getMetaData() != null && streamDefinition.getMetaData().size() != 0) {
            for (int i = 0; i < streamDefinition.getMetaData().size(); i++) {
                String name = ((Attribute) streamDefinition.getMetaData().get(i)).getName();
                for (int i2 = i + 1; i2 < streamDefinition.getMetaData().size(); i2++) {
                    if (name.equals(((Attribute) streamDefinition.getMetaData().get(i2)).getName())) {
                        throw new EventStreamConfigurationException("Cannot have same name '" + name + "' for multiple meta data attributes, give different names");
                    }
                }
            }
        }
        if (streamDefinition.getCorrelationData() != null && streamDefinition.getCorrelationData().size() != 0) {
            for (int i3 = 0; i3 < streamDefinition.getCorrelationData().size(); i3++) {
                String name2 = ((Attribute) streamDefinition.getCorrelationData().get(i3)).getName();
                for (int i4 = i3 + 1; i4 < streamDefinition.getCorrelationData().size(); i4++) {
                    if (name2.equals(((Attribute) streamDefinition.getCorrelationData().get(i4)).getName())) {
                        throw new EventStreamConfigurationException("Cannot have same name '" + name2 + "' for multiple correlation data attributes, give different names");
                    }
                }
            }
        }
        if (streamDefinition.getPayloadData() == null || streamDefinition.getPayloadData().size() == 0) {
            return;
        }
        for (int i5 = 0; i5 < streamDefinition.getPayloadData().size(); i5++) {
            String name3 = ((Attribute) streamDefinition.getPayloadData().get(i5)).getName();
            for (int i6 = i5 + 1; i6 < streamDefinition.getPayloadData().size(); i6++) {
                if (name3.equals(((Attribute) streamDefinition.getPayloadData().get(i6)).getName())) {
                    throw new EventStreamConfigurationException("Cannot have same name '" + name3 + "' for multiple payload data attributes, give different names");
                }
            }
        }
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public void addEventStreamDefinition(StreamDefinition streamDefinition) throws EventStreamConfigurationException {
        if (EventStreamServiceValueHolder.getConfigurationContextService() == null) {
            synchronized (this.pendingStreams) {
                if (EventStreamServiceValueHolder.getConfigurationContextService() != null) {
                    addEventStreamDefinition(streamDefinition);
                } else {
                    this.pendingStreams.add(streamDefinition);
                }
            }
            return;
        }
        AxisConfiguration axisConfiguration = getAxisConfiguration();
        String str = new File(axisConfiguration.getRepository().getPath()).getAbsolutePath() + File.separator + EventStreamConstants.EVENT_STREAMS;
        File file = new File(str);
        if (!file.exists() && !file.mkdir()) {
            throw new EventStreamConfigurationException("Cannot create directory to add tenant specific Event Stream : " + streamDefinition.getStreamId());
        }
        String str2 = str + File.separator + streamDefinition.getName() + EventStreamConstants.STREAM_DEFINITION_FILE_DELIMITER + streamDefinition.getVersion() + EventStreamConstants.STREAM_DEFINITION_FILE_EXTENSION;
        StreamDefinition streamDefinition2 = getStreamDefinition(streamDefinition.getStreamId());
        if (streamDefinition2 != null) {
            if (!streamDefinition2.equals(streamDefinition)) {
                throw new StreamDefinitionAlreadyDefinedException("Different stream definition with same stream id " + streamDefinition.getStreamId() + " already exist " + streamDefinition2.toString() + ", cannot add stream definition " + streamDefinition.toString());
            }
        } else {
            validateEventStreamDefinition(streamDefinition);
            EventStreamConfigurationFileSystemInvoker.save(streamDefinition, str2, axisConfiguration);
        }
    }

    private AxisConfiguration getAxisConfiguration() {
        return PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId() == -1234 ? EventStreamServiceValueHolder.getConfigurationContextService().getServerConfigContext().getAxisConfiguration() : TenantAxisUtils.getTenantAxisConfiguration(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(), EventStreamServiceValueHolder.getConfigurationContextService().getServerConfigContext());
    }

    public void removeEventStreamDefinition(String str) throws EventStreamConfigurationException {
        String str2 = null;
        String str3 = null;
        if (str != null && str.contains(":")) {
            str2 = str.split(":")[0];
            str3 = str.split(":")[1];
        }
        removeEventStreamDefinition(str2, str3);
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public void removeEventStreamDefinition(String str, String str2) throws EventStreamConfigurationException {
        EventStreamConfigurationFileSystemInvoker.delete(str + EventStreamConstants.STREAM_DEFINITION_FILE_DELIMITER + str2 + EventStreamConstants.STREAM_DEFINITION_FILE_EXTENSION, getAxisConfiguration());
        log.info("Stream definition - " + str + ":" + str2 + " removed successfully");
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public List<String> getStreamIds() throws EventStreamConfigurationException {
        List<StreamDefinition> allStreamDefinitions = getAllStreamDefinitions();
        ArrayList arrayList = new ArrayList(allStreamDefinitions.size());
        Iterator<StreamDefinition> it = allStreamDefinitions.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getStreamId());
        }
        return arrayList;
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public String generateSampleEvent(String str, String str2) throws EventStreamConfigurationException {
        StreamDefinition streamDefinition = getStreamDefinition(str);
        if (str2.equals(EventStreamConstants.XML_EVENT)) {
            return SampleEventGenerator.generateXMLEvent(streamDefinition);
        }
        if (str2.equals("json")) {
            return SampleEventGenerator.generateJSONEvent(streamDefinition);
        }
        if (str2.equals(EventStreamConstants.TEXT_EVENT)) {
            return SampleEventGenerator.generateTextEvent(streamDefinition);
        }
        return null;
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public void subscribe(SiddhiEventConsumer siddhiEventConsumer) throws EventStreamConfigurationException {
        EventStreamServiceValueHolder.getEventStreamRuntime().subscribe(siddhiEventConsumer);
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public void subscribe(EventProducer eventProducer) throws EventStreamConfigurationException {
        EventStreamServiceValueHolder.getEventStreamRuntime().subscribe(eventProducer);
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public void subscribe(WSO2EventConsumer wSO2EventConsumer) throws EventStreamConfigurationException {
        EventStreamServiceValueHolder.getEventStreamRuntime().subscribe(wSO2EventConsumer);
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public void subscribe(WSO2EventListConsumer wSO2EventListConsumer) throws EventStreamConfigurationException {
        EventStreamServiceValueHolder.getEventStreamRuntime().subscribe(wSO2EventListConsumer);
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public void unsubscribe(SiddhiEventConsumer siddhiEventConsumer) {
        EventStreamServiceValueHolder.getEventStreamRuntime().unsubscribe(siddhiEventConsumer);
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public void unsubscribe(EventProducer eventProducer) {
        EventStreamServiceValueHolder.getEventStreamRuntime().unsubscribe(eventProducer);
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public void unsubscribe(WSO2EventConsumer wSO2EventConsumer) {
        EventStreamServiceValueHolder.getEventStreamRuntime().unsubscribe(wSO2EventConsumer);
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public void unsubscribe(WSO2EventListConsumer wSO2EventListConsumer) {
        EventStreamServiceValueHolder.getEventStreamRuntime().unsubscribe(wSO2EventListConsumer);
    }

    @Override // org.wso2.carbon.event.stream.core.EventStreamService
    public void publish(Event event) {
        EventStreamServiceValueHolder.getEventStreamRuntime().publish(event.getStreamId(), event);
    }

    public boolean isEventStreamFileExists(String str) {
        ConcurrentHashMap<String, EventStreamConfiguration> concurrentHashMap = this.tenantSpecificEventStreamConfigs.get(Integer.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
        if (concurrentHashMap == null) {
            return false;
        }
        Iterator<EventStreamConfiguration> it = concurrentHashMap.values().iterator();
        while (it.hasNext()) {
            if (it.next().getFileName().equals(str)) {
                return true;
            }
        }
        return false;
    }

    public boolean isMatchForStreamDefinition(StreamDefinition streamDefinition, StreamDefinition streamDefinition2) {
        if (streamDefinition == null || streamDefinition2 == null) {
            throw new IllegalArgumentException("Stream definitions passed in cannot be null!");
        }
        List metaData = streamDefinition2.getMetaData();
        List metaData2 = streamDefinition.getMetaData();
        if (metaData != null && metaData2 != null) {
            if (metaData2.size() != metaData.size()) {
                return false;
            }
            for (int i = 0; i < metaData.size(); i++) {
                Attribute attribute = (Attribute) metaData.get(i);
                if (metaData2.get(i) == null || !((Attribute) metaData2.get(i)).equals(attribute)) {
                    return false;
                }
            }
        }
        List correlationData = streamDefinition2.getCorrelationData();
        List correlationData2 = streamDefinition.getCorrelationData();
        if (correlationData != null && correlationData2 != null) {
            if (correlationData2.size() != correlationData.size()) {
                return false;
            }
            for (int i2 = 0; i2 < correlationData.size(); i2++) {
                Attribute attribute2 = (Attribute) correlationData.get(i2);
                if (correlationData2.get(i2) == null || !((Attribute) correlationData2.get(i2)).equals(attribute2)) {
                    return false;
                }
            }
        }
        List payloadData = streamDefinition2.getPayloadData();
        List payloadData2 = streamDefinition.getPayloadData();
        if (payloadData == null || payloadData2 == null) {
            return true;
        }
        if (payloadData2.size() != payloadData.size()) {
            return false;
        }
        for (int i3 = 0; i3 < payloadData.size(); i3++) {
            Attribute attribute3 = (Attribute) payloadData.get(i3);
            if (payloadData2.get(i3) == null || !((Attribute) payloadData2.get(i3)).equals(attribute3)) {
                return false;
            }
        }
        return true;
    }
}
