package org.wso2.carbon.event.receiver.core.internal;

import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.Logger;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;
import org.wso2.carbon.event.input.adapter.core.InputEventAdapterSubscription;
import org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterException;
import org.wso2.carbon.event.input.adapter.core.exception.InputEventAdapterRuntimeException;
import org.wso2.carbon.event.processor.manager.core.EventManagementUtil;
import org.wso2.carbon.event.processor.manager.core.EventSync;
import org.wso2.carbon.event.processor.manager.core.Manager;
import org.wso2.carbon.event.processor.manager.core.config.DistributedConfiguration;
import org.wso2.carbon.event.processor.manager.core.config.HAConfiguration;
import org.wso2.carbon.event.processor.manager.core.config.Mode;
import org.wso2.carbon.event.receiver.core.InputMapper;
import org.wso2.carbon.event.receiver.core.config.EventReceiverConfiguration;
import org.wso2.carbon.event.receiver.core.config.EventReceiverConstants;
import org.wso2.carbon.event.receiver.core.exception.EventReceiverConfigurationException;
import org.wso2.carbon.event.receiver.core.exception.EventReceiverProcessingException;
import org.wso2.carbon.event.receiver.core.internal.ds.EventReceiverServiceValueHolder;
import org.wso2.carbon.event.receiver.core.internal.management.AbstractInputEventDispatcher;
import org.wso2.carbon.event.receiver.core.internal.management.InputEventDispatcher;
import org.wso2.carbon.event.receiver.core.internal.management.QueueInputEventDispatcher;
import org.wso2.carbon.event.receiver.core.internal.util.EventReceiverUtil;
import org.wso2.carbon.event.receiver.core.internal.util.helper.EventReceiverConfigurationHelper;
import org.wso2.carbon.event.statistics.EventStatisticsMonitor;
import org.wso2.carbon.event.stream.core.EventProducer;
import org.wso2.carbon.event.stream.core.EventProducerCallback;

/* loaded from: input_file:org/wso2/carbon/event/receiver/core/internal/EventReceiver.class */
public class EventReceiver implements EventProducer {
    private static final Log log = LogFactory.getLog(EventReceiver.class);
    private boolean isEventDuplicatedInCluster;
    private boolean traceEnabled;
    private boolean statisticsEnabled;
    private boolean customMappingEnabled;
    private boolean isWorkerNode;
    private boolean sufficientToSend;
    private Logger trace = Logger.getLogger(EventReceiverConstants.EVENT_TRACE_LOGGER);
    private EventReceiverConfiguration eventReceiverConfiguration;
    private StreamDefinition exportedStreamDefinition;
    private InputMapper inputMapper;
    private EventStatisticsMonitor statisticsMonitor;
    private String beforeTracerPrefix;
    private String afterTracerPrefix;
    private AbstractInputEventDispatcher inputEventDispatcher;
    private Mode mode;

    /* loaded from: input_file:org/wso2/carbon/event/receiver/core/internal/EventReceiver$MappedEventSubscription.class */
    private class MappedEventSubscription implements InputEventAdapterSubscription {
        private MappedEventSubscription() {
        }

        public void onEvent(Object obj) {
            EventReceiver.this.processMappedEvent(obj);
        }
    }

    /* loaded from: input_file:org/wso2/carbon/event/receiver/core/internal/EventReceiver$TypedEventSubscription.class */
    private class TypedEventSubscription implements InputEventAdapterSubscription {
        private TypedEventSubscription() {
        }

        public void onEvent(Object obj) {
            EventReceiver.this.processTypedEvent(obj);
        }
    }

    public EventReceiver(EventReceiverConfiguration eventReceiverConfiguration, StreamDefinition streamDefinition, Mode mode) throws EventReceiverConfigurationException {
        this.traceEnabled = false;
        this.statisticsEnabled = false;
        this.customMappingEnabled = false;
        this.isWorkerNode = false;
        this.sufficientToSend = false;
        this.eventReceiverConfiguration = null;
        this.inputMapper = null;
        this.eventReceiverConfiguration = eventReceiverConfiguration;
        int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
        if (this.eventReceiverConfiguration != null) {
            this.traceEnabled = eventReceiverConfiguration.isTraceEnabled();
            this.statisticsEnabled = eventReceiverConfiguration.isStatisticsEnabled();
            this.customMappingEnabled = eventReceiverConfiguration.getInputMapping().isCustomMappingEnabled();
            String mappingType = this.eventReceiverConfiguration.getInputMapping().getMappingType();
            this.inputMapper = EventReceiverServiceValueHolder.getMappingFactoryMap().get(mappingType).constructInputMapper(this.eventReceiverConfiguration, streamDefinition);
            if (this.inputMapper == null) {
                throw new EventReceiverConfigurationException("Could not create input mapper for mapping type " + mappingType + " for event receiver :" + eventReceiverConfiguration.getEventReceiverName());
            }
            if (this.customMappingEnabled) {
                EventReceiverConfigurationHelper.validateExportedStream(eventReceiverConfiguration, streamDefinition, this.inputMapper);
            }
            this.exportedStreamDefinition = streamDefinition;
            if (this.statisticsEnabled) {
                this.statisticsMonitor = EventReceiverServiceValueHolder.getEventStatisticsService().getEventStatisticMonitor(tenantId, EventReceiverConstants.EVENT_RECEIVER, eventReceiverConfiguration.getEventReceiverName(), (String) null);
            }
            if (this.traceEnabled) {
                this.beforeTracerPrefix = "TenantId : " + tenantId + ", " + EventReceiverConstants.EVENT_RECEIVER + " : " + eventReceiverConfiguration.getEventReceiverName() + ", before processing " + System.getProperty("line.separator");
                this.afterTracerPrefix = "TenantId : " + tenantId + ", " + EventReceiverConstants.EVENT_RECEIVER + " : " + eventReceiverConfiguration.getEventReceiverName() + ", " + EventReceiverConstants.EVENT_STREAM + " : " + EventReceiverUtil.getExportedStreamIdFrom(eventReceiverConfiguration) + ", after processing " + System.getProperty("line.separator");
            }
            String name = eventReceiverConfiguration.getFromAdapterConfiguration().getName();
            try {
                EventReceiverServiceValueHolder.getInputEventAdapterService().create(eventReceiverConfiguration.getFromAdapterConfiguration(), this.customMappingEnabled ? new MappedEventSubscription() : new TypedEventSubscription());
                this.isEventDuplicatedInCluster = EventReceiverServiceValueHolder.getInputEventAdapterService().isEventDuplicatedInCluster(eventReceiverConfiguration.getFromAdapterConfiguration().getName());
                DistributedConfiguration distributedConfiguration = EventReceiverServiceValueHolder.getEventManagementService().getManagementModeInfo().getDistributedConfiguration();
                if (distributedConfiguration != null) {
                    this.isWorkerNode = distributedConfiguration.isWorkerNode();
                }
                this.sufficientToSend = mode != Mode.Distributed || (this.isWorkerNode && !this.isEventDuplicatedInCluster);
                this.mode = mode;
                if (mode == Mode.HA) {
                    HAConfiguration haConfiguration = EventReceiverServiceValueHolder.getEventManagementService().getManagementModeInfo().getHaConfiguration();
                    this.inputEventDispatcher = new QueueInputEventDispatcher(tenantId, EventManagementUtil.constructEventSyncId(tenantId, eventReceiverConfiguration.getEventReceiverName(), Manager.ManagerType.Receiver), EventReceiverServiceValueHolder.getCarbonEventReceiverManagementService().getReadLock(), streamDefinition, haConfiguration.getEventSyncReceiverMaxQueueSizeInMb(), haConfiguration.getEventSyncReceiverQueueSize());
                    this.inputEventDispatcher.setSendToOther(!this.isEventDuplicatedInCluster);
                    EventReceiverServiceValueHolder.getEventManagementService().registerEventSync(this.inputEventDispatcher, Manager.ManagerType.Receiver);
                } else {
                    this.inputEventDispatcher = new InputEventDispatcher();
                }
                if (mode == Mode.HA && this.isEventDuplicatedInCluster) {
                    EventReceiverServiceValueHolder.getInputEventAdapterService().start(name);
                }
            } catch (InputEventAdapterException e) {
                throw new EventReceiverConfigurationException("Cannot subscribe to input event adapter :" + name + ", error in configuration. " + e.getMessage(), e);
            } catch (InputEventAdapterRuntimeException e2) {
                throw new EventReceiverProcessingException("Cannot subscribe to input event adapter :" + name + ", error while connecting by adapter. " + e2.getMessage(), e2);
            }
        }
    }

    public StreamDefinition getExportedStreamDefinition() {
        return this.exportedStreamDefinition;
    }

    public EventReceiverConfiguration getEventReceiverConfiguration() {
        return this.eventReceiverConfiguration;
    }

    protected void processMappedEvent(Object obj) {
        if (this.traceEnabled) {
            this.trace.info(this.beforeTracerPrefix + obj.toString());
        }
        if (obj instanceof List) {
            Iterator it = ((List) obj).iterator();
            while (it.hasNext()) {
                try {
                    processMappedEvent(it.next());
                } catch (EventReceiverProcessingException e) {
                    log.error("Dropping event. Error processing event : ", e);
                }
            }
            return;
        }
        try {
            Object convertToMappedInputEvent = this.inputMapper.convertToMappedInputEvent(obj);
            if (convertToMappedInputEvent == null) {
                log.warn("Dropping the empty/null event, Event does not match with mapping");
            } else if (convertToMappedInputEvent instanceof Event[]) {
                for (Event event : (Event[]) convertToMappedInputEvent) {
                    if (event != null) {
                        sendEvent(event);
                    }
                }
            } else {
                sendEvent((Event) convertToMappedInputEvent);
            }
        } catch (EventReceiverProcessingException e2) {
            log.error("Dropping event. Error processing event : ", e2);
        } catch (RuntimeException e3) {
            log.error("Dropping event. Unexpected error while processing event : " + e3.getMessage(), e3);
        }
    }

    protected void processTypedEvent(Object obj) {
        if (this.traceEnabled) {
            this.trace.info(this.beforeTracerPrefix + obj.toString());
        }
        if (obj instanceof List) {
            Iterator it = ((List) obj).iterator();
            while (it.hasNext()) {
                try {
                    processTypedEvent(it.next());
                } catch (EventReceiverProcessingException e) {
                    log.error("Dropping event. Error processing event: " + e.getMessage(), e);
                }
            }
            return;
        }
        try {
            Object convertToTypedInputEvent = this.inputMapper.convertToTypedInputEvent(obj);
            if (convertToTypedInputEvent != null) {
                if (convertToTypedInputEvent instanceof Event[]) {
                    for (Event event : (Event[]) convertToTypedInputEvent) {
                        if (event != null) {
                            sendEvent(event);
                        }
                    }
                } else {
                    sendEvent((Event) convertToTypedInputEvent);
                }
            }
        } catch (EventReceiverProcessingException e2) {
            log.error("Dropping event. Error processing event: " + e2.getMessage(), e2);
        }
    }

    protected void sendEvent(Event event) {
        if (this.traceEnabled) {
            this.trace.info(this.afterTracerPrefix + event);
        }
        if (this.statisticsEnabled) {
            this.statisticsMonitor.incrementRequest();
        }
        if (this.sufficientToSend || EventReceiverServiceValueHolder.getCarbonEventReceiverManagementService().isReceiverCoordinator()) {
            this.inputEventDispatcher.onEvent(event);
        }
    }

    public AbstractInputEventDispatcher getInputEventDispatcher() {
        return this.inputEventDispatcher;
    }

    public String getStreamId() {
        return this.exportedStreamDefinition.getStreamId();
    }

    public void setCallBack(EventProducerCallback eventProducerCallback) {
        this.inputEventDispatcher.setCallBack(eventProducerCallback);
    }

    public void destroy() {
        EventReceiverServiceValueHolder.getInputEventAdapterService().destroy(this.eventReceiverConfiguration.getFromAdapterConfiguration().getName());
        if (this.mode == Mode.HA && (this.inputEventDispatcher instanceof EventSync)) {
            EventReceiverServiceValueHolder.getEventManagementService().unregisterEventSync(this.inputEventDispatcher.getStreamDefinition().getStreamId(), Manager.ManagerType.Receiver);
        }
    }

    public boolean isEventDuplicatedInCluster() {
        return this.isEventDuplicatedInCluster;
    }
}
