package org.wso2.carbon.event.publisher.core.internal;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.Logger;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Attribute;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.event.output.adapter.core.OutputEventAdapterService;
import org.wso2.carbon.event.output.adapter.core.exception.OutputEventAdapterException;
import org.wso2.carbon.event.processor.manager.core.EventManagementUtil;
import org.wso2.carbon.event.processor.manager.core.EventSync;
import org.wso2.carbon.event.processor.manager.core.Manager;
import org.wso2.carbon.event.processor.manager.core.config.HAConfiguration;
import org.wso2.carbon.event.processor.manager.core.config.ManagementModeInfo;
import org.wso2.carbon.event.processor.manager.core.config.Mode;
import org.wso2.carbon.event.publisher.core.config.EventPublisherConfiguration;
import org.wso2.carbon.event.publisher.core.config.EventPublisherConstants;
import org.wso2.carbon.event.publisher.core.exception.EventPublisherConfigurationException;
import org.wso2.carbon.event.publisher.core.exception.EventPublisherStreamValidationException;
import org.wso2.carbon.event.publisher.core.internal.ds.EventPublisherServiceValueHolder;
import org.wso2.carbon.event.publisher.core.internal.util.EventPublisherUtil;
import org.wso2.carbon.event.stream.core.WSO2EventConsumer;
import org.wso2.carbon.event.stream.core.exception.EventStreamConfigurationException;
import org.wso2.carbon.metrics.manager.Counter;
import org.wso2.carbon.metrics.manager.Level;
import org.wso2.carbon.metrics.manager.MetricManager;

/* loaded from: input_file:org/wso2/carbon/event/publisher/core/internal/EventPublisher.class */
public class EventPublisher implements WSO2EventConsumer, EventSync {
    private static final Log log = LogFactory.getLog(EventPublisher.class);
    private final boolean traceEnabled;
    private final boolean statisticsEnabled;
    private Counter eventCounter;
    private EventPublisherConfiguration eventPublisherConfiguration;
    private OutputMapper outputMapper;
    private String streamId;
    private String beforeTracerPrefix;
    private String afterTracerPrefix;
    private boolean dynamicMessagePropertyEnabled;
    private boolean customMappingEnabled;
    private boolean isPolled;
    private Mode mode;
    private String syncId;
    private boolean sendToOther;
    private StreamDefinition streamDefinition;
    private BlockingEventQueue eventQueue;
    private List<String> dynamicMessagePropertyList = new ArrayList();
    private Logger trace = Logger.getLogger(EventPublisherConstants.EVENT_TRACE_LOGGER);
    private Map<String, Integer> propertyPositionMap = new TreeMap();
    private boolean isContinue = false;
    private int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();

    /* loaded from: input_file:org/wso2/carbon/event/publisher/core/internal/EventPublisher$EventWrapper.class */
    public class EventWrapper {
        private Event event;
        private long timestampInMillis;
        private int size;

        public EventWrapper(Event event, long j) {
            this.event = event;
            this.timestampInMillis = j;
        }

        public Event getEvent() {
            return this.event;
        }

        public int getSize() {
            return this.size;
        }

        public void setSize(int i) {
            this.size = i;
        }

        public long getTimestampInMillis() {
            return this.timestampInMillis;
        }
    }

    public EventPublisher(EventPublisherConfiguration eventPublisherConfiguration) throws EventPublisherConfigurationException {
        this.eventPublisherConfiguration = null;
        this.outputMapper = null;
        this.streamId = null;
        this.dynamicMessagePropertyEnabled = false;
        this.customMappingEnabled = false;
        this.isPolled = false;
        this.mode = Mode.SingleNode;
        this.sendToOther = false;
        this.eventPublisherConfiguration = eventPublisherConfiguration;
        this.customMappingEnabled = eventPublisherConfiguration.getOutputMapping().isCustomMappingEnabled();
        String str = "WSO2_CEP.EventPublishers[+]." + eventPublisherConfiguration.getEventPublisherName() + EventPublisherConstants.METRIC_DELIMITER + EventPublisherConstants.METRICS_PUBLISHED_EVENTS;
        String fromStreamName = eventPublisherConfiguration.getFromStreamName();
        String fromStreamVersion = eventPublisherConfiguration.getFromStreamVersion();
        try {
            StreamDefinition streamDefinition = EventPublisherServiceValueHolder.getEventStreamService().getStreamDefinition(fromStreamName, fromStreamVersion);
            if (streamDefinition == null) {
                throw new EventPublisherConfigurationException("No event stream exists for the corresponding stream name and version : " + fromStreamName + "-" + fromStreamVersion);
            }
            this.streamId = streamDefinition.getStreamId();
            createPropertyPositionMap(streamDefinition);
            this.outputMapper = EventPublisherServiceValueHolder.getMappingFactoryMap().get(eventPublisherConfiguration.getOutputMapping().getMappingType()).constructOutputMapper(eventPublisherConfiguration, this.propertyPositionMap, this.tenantId, streamDefinition);
            for (Map.Entry<String, String> entry : eventPublisherConfiguration.getToAdapterDynamicProperties().entrySet()) {
                getDynamicOutputMessageProperties(entry.getValue() != null ? entry.getValue().toString() : "");
            }
            if (this.dynamicMessagePropertyList.size() > 0) {
                this.dynamicMessagePropertyEnabled = true;
            }
            try {
                EventPublisherServiceValueHolder.getEventStreamService().subscribe(this);
                this.traceEnabled = eventPublisherConfiguration.isTracingEnabled();
                this.statisticsEnabled = eventPublisherConfiguration.isStatisticsEnabled() && EventPublisherServiceValueHolder.isGlobalStatisticsEnabled();
                if (this.statisticsEnabled) {
                    this.eventCounter = MetricManager.counter(str, new Level[]{Level.INFO, Level.INFO});
                }
                if (this.traceEnabled) {
                    this.beforeTracerPrefix = "TenantId : " + this.tenantId + ", " + EventPublisherConstants.EVENT_PUBLISHER + " : " + eventPublisherConfiguration.getEventPublisherName() + ", " + EventPublisherConstants.EVENT_STREAM + " : " + EventPublisherUtil.getImportedStreamIdFrom(eventPublisherConfiguration) + ", before processing " + System.getProperty("line.separator");
                    this.afterTracerPrefix = "TenantId : " + this.tenantId + ", " + EventPublisherConstants.EVENT_PUBLISHER + " : " + eventPublisherConfiguration.getEventPublisherName() + ", after processing " + System.getProperty("line.separator");
                }
                OutputEventAdapterService outputEventAdapterService = EventPublisherServiceValueHolder.getOutputEventAdapterService();
                try {
                    outputEventAdapterService.create(eventPublisherConfiguration.getToAdapterConfiguration());
                    try {
                        this.isPolled = outputEventAdapterService.isPolled(eventPublisherConfiguration.getToAdapterConfiguration().getName());
                        ManagementModeInfo managementModeInfo = EventPublisherServiceValueHolder.getEventManagementService().getManagementModeInfo();
                        this.mode = managementModeInfo.getMode();
                        if (this.mode == Mode.Distributed || this.mode == Mode.HA) {
                            this.syncId = EventManagementUtil.constructEventSyncId(this.tenantId, eventPublisherConfiguration.getToAdapterConfiguration().getName(), Manager.ManagerType.Publisher);
                            this.streamDefinition = EventManagementUtil.constructDatabridgeStreamDefinition(this.syncId, streamDefinition);
                            if (this.mode == Mode.Distributed && managementModeInfo.getDistributedConfiguration().isWorkerNode()) {
                                this.sendToOther = true;
                            } else if (this.mode == Mode.HA && managementModeInfo.getHaConfiguration().isWorkerNode()) {
                                this.sendToOther = true;
                                HAConfiguration haConfiguration = managementModeInfo.getHaConfiguration();
                                this.eventQueue = new BlockingEventQueue(haConfiguration.getEventSyncPublisherMaxQueueSizeInMb(), haConfiguration.getEventSyncPublisherQueueSize());
                            }
                            EventPublisherServiceValueHolder.getEventManagementService().registerEventSync(this, Manager.ManagerType.Publisher);
                        }
                    } catch (OutputEventAdapterException e) {
                        throw new EventPublisherConfigurationException("Error in creating Event Publisher :" + eventPublisherConfiguration.getEventPublisherName() + ", " + e.getMessage(), e);
                    }
                } catch (OutputEventAdapterException e2) {
                    throw new EventPublisherConfigurationException("Error in creating the output Adapter for Event Publisher :" + eventPublisherConfiguration.getEventPublisherName() + ", " + e2.getMessage(), e2);
                }
            } catch (EventStreamConfigurationException e3) {
                throw new EventPublisherStreamValidationException("Stream " + this.streamId + " does not exist", this.streamId);
            }
        } catch (EventStreamConfigurationException e4) {
            throw new EventPublisherConfigurationException("Cannot retrieve the stream definition from stream store : " + e4.getMessage());
        }
    }

    public EventPublisherConfiguration getEventPublisherConfiguration() {
        return this.eventPublisherConfiguration;
    }

    public void sendEvent(Event event) {
        if (this.isPolled) {
            if (this.sendToOther) {
                EventPublisherServiceValueHolder.getEventManagementService().syncEvent(this.syncId, Manager.ManagerType.Publisher, event);
            }
            process(event);
            return;
        }
        if (!EventPublisherServiceValueHolder.getCarbonEventPublisherManagementService().isDrop()) {
            if (this.mode == Mode.HA) {
                long clusterTimeInMillis = EventPublisherServiceValueHolder.getEventManagementService().getClusterTimeInMillis();
                if (!this.eventQueue.isEmpty()) {
                    long latestEventSentTime = EventPublisherServiceValueHolder.getEventManagementService().getLatestEventSentTime(this.eventPublisherConfiguration.getEventPublisherName(), this.tenantId);
                    while (!this.eventQueue.isEmpty()) {
                        EventWrapper poll = this.eventQueue.poll();
                        if (poll.getTimestampInMillis() > latestEventSentTime) {
                            process(poll.getEvent());
                        }
                    }
                }
                EventPublisherServiceValueHolder.getEventManagementService().updateLatestEventSentTime(this.eventPublisherConfiguration.getEventPublisherName(), this.tenantId, clusterTimeInMillis);
            }
            process(event);
            return;
        }
        if (this.mode == Mode.HA) {
            EventWrapper eventWrapper = new EventWrapper(event, EventPublisherServiceValueHolder.getEventManagementService().getClusterTimeInMillis());
            while (!this.eventQueue.offer(eventWrapper)) {
                EventWrapper poll2 = this.eventQueue.poll();
                if (log.isDebugEnabled()) {
                    log.debug("Dropping event arrived at " + poll2.getTimestampInMillis() + " due to insufficient capacity at Event Publisher Queue, dropped event: " + poll2.getEvent());
                }
            }
            long latestEventSentTime2 = EventPublisherServiceValueHolder.getEventManagementService().getLatestEventSentTime(this.eventPublisherConfiguration.getEventPublisherName(), this.tenantId);
            while (!this.eventQueue.isEmpty() && this.eventQueue.peek().getTimestampInMillis() <= latestEventSentTime2) {
                this.eventQueue.remove();
            }
        }
    }

    private void createPropertyPositionMap(StreamDefinition streamDefinition) {
        List metaData = streamDefinition.getMetaData();
        List correlationData = streamDefinition.getCorrelationData();
        List payloadData = streamDefinition.getPayloadData();
        int i = 0;
        if (metaData != null) {
            Iterator it = metaData.iterator();
            while (it.hasNext()) {
                this.propertyPositionMap.put(EventPublisherConstants.PROPERTY_META_PREFIX + ((Attribute) it.next()).getName(), Integer.valueOf(i));
                i++;
            }
        }
        if (correlationData != null) {
            Iterator it2 = correlationData.iterator();
            while (it2.hasNext()) {
                this.propertyPositionMap.put(EventPublisherConstants.PROPERTY_CORRELATION_PREFIX + ((Attribute) it2.next()).getName(), Integer.valueOf(i));
                i++;
            }
        }
        if (payloadData != null) {
            Iterator it3 = payloadData.iterator();
            while (it3.hasNext()) {
                this.propertyPositionMap.put(((Attribute) it3.next()).getName(), Integer.valueOf(i));
                i++;
            }
        }
    }

    public String getStreamId() {
        return this.streamId;
    }

    public void onEvent(Event event) {
        sendEvent(event);
    }

    public void onAddDefinition(StreamDefinition streamDefinition) {
    }

    public void onRemoveDefinition(StreamDefinition streamDefinition) {
    }

    private List<String> getDynamicOutputMessageProperties(String str) {
        String str2 = str;
        while (true) {
            String str3 = str2;
            if (!str3.contains(EventPublisherConstants.TEMPLATE_EVENT_ATTRIBUTE_PREFIX) || str3.indexOf(EventPublisherConstants.TEMPLATE_EVENT_ATTRIBUTE_POSTFIX) <= 0) {
                break;
            }
            this.dynamicMessagePropertyList.add(str3.substring(str3.indexOf(EventPublisherConstants.TEMPLATE_EVENT_ATTRIBUTE_PREFIX) + 2, str3.indexOf(EventPublisherConstants.TEMPLATE_EVENT_ATTRIBUTE_POSTFIX)));
            str2 = str3.substring(str3.indexOf(EventPublisherConstants.TEMPLATE_EVENT_ATTRIBUTE_POSTFIX) + 2);
        }
        return this.dynamicMessagePropertyList;
    }

    private void changeDynamicEventAdapterMessageProperties(Object[] objArr, Map<String, String> map) {
        for (String str : this.dynamicMessagePropertyList) {
            if (objArr.length != 0 && str != null) {
                changePropertyValue(this.propertyPositionMap.get(str).intValue(), str, objArr, map);
            }
        }
    }

    private void changePropertyValue(int i, String str, Object[] objArr, Map<String, String> map) {
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String str2 = EventPublisherConstants.TEMPLATE_EVENT_ATTRIBUTE_PREFIX + str + EventPublisherConstants.TEMPLATE_EVENT_ATTRIBUTE_POSTFIX;
            String str3 = "\\{\\{" + str + "\\}\\}";
            String value = entry.getValue();
            if (value != null && value.contains(str2)) {
                if (objArr[i] != null) {
                    entry.setValue(value.replaceAll(str3, objArr[i].toString()));
                } else {
                    entry.setValue(value.replaceAll(str3, ""));
                }
            }
        }
    }

    public void destroy() {
        if (this.mode == Mode.Distributed || this.mode == Mode.HA) {
            EventPublisherServiceValueHolder.getEventManagementService().unregisterEventSync(this.syncId, Manager.ManagerType.Publisher);
        }
        EventPublisherServiceValueHolder.getOutputEventAdapterService().destroy(this.eventPublisherConfiguration.getEventPublisherName());
    }

    public void process(Event event) {
        HashMap hashMap = new HashMap(this.eventPublisherConfiguration.getToAdapterDynamicProperties());
        if (this.traceEnabled) {
            this.trace.info(this.beforeTracerPrefix + event);
        }
        if (this.statisticsEnabled) {
            this.eventCounter.inc();
        }
        try {
            Object convertToMappedInputEvent = this.customMappingEnabled ? this.outputMapper.convertToMappedInputEvent(EventPublisherUtil.convertToSiddhiEvent(event)) : this.outputMapper.convertToTypedInputEvent(EventPublisherUtil.convertToSiddhiEvent(event));
            if (this.traceEnabled) {
                this.trace.info(this.afterTracerPrefix + convertToMappedInputEvent);
            }
            if (this.dynamicMessagePropertyEnabled) {
                changeDynamicEventAdapterMessageProperties(EventPublisherUtil.convertToSiddhiEvent(event).getData(), hashMap);
            }
            EventPublisherServiceValueHolder.getOutputEventAdapterService().publish(this.eventPublisherConfiguration.getEventPublisherName(), hashMap, convertToMappedInputEvent);
        } catch (EventPublisherConfigurationException e) {
            log.error("Cannot send " + event + " from " + this.eventPublisherConfiguration.getEventPublisherName(), e);
        }
    }

    public StreamDefinition getStreamDefinition() {
        return this.streamDefinition;
    }

    public boolean isContinueProcess() {
        return this.isContinue;
    }

    public void setContinueProcess(boolean z) {
        this.isContinue = z;
    }

    public void prepareDestroy() {
        if (EventPublisherServiceValueHolder.getEventManagementService().getManagementModeInfo().getMode() == Mode.HA && EventPublisherServiceValueHolder.getEventManagementService().getManagementModeInfo().getHaConfiguration().isWorkerNode()) {
            EventPublisherServiceValueHolder.getEventManagementService().updateLatestEventSentTime(this.eventPublisherConfiguration.getEventPublisherName(), this.tenantId, EventPublisherServiceValueHolder.getEventManagementService().getClusterTimeInMillis());
        }
    }
}
