package org.wso2.carbon.event.processor.core.internal.storm;

import java.util.HashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.event.processor.common.util.AsyncEventPublisher;
import org.wso2.carbon.event.processor.core.ExecutionPlanConfiguration;
import org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorUtil;
import org.wso2.carbon.event.processor.manager.commons.transport.server.ConnectionCallback;
import org.wso2.carbon.event.processor.manager.core.config.DistributedConfiguration;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/SiddhiStormInputEventDispatcher.class */
public class SiddhiStormInputEventDispatcher extends AbstractSiddhiInputEventDispatcher {
    private static final Log log = LogFactory.getLog(SiddhiStormInputEventDispatcher.class);
    private final DistributedConfiguration stormDeploymentConfig;
    private StreamDefinition siddhiStreamDefinition;
    private String logPrefix;
    private ExecutorService executorService;
    private AsyncEventPublisher asyncEventPublisher;
    private final ConnectionCallback connectionCallback;

    public SiddhiStormInputEventDispatcher(org.wso2.carbon.databridge.commons.StreamDefinition streamDefinition, String str, ExecutionPlanConfiguration executionPlanConfiguration, int i, DistributedConfiguration distributedConfiguration, ConnectionCallback connectionCallback) {
        super(streamDefinition.getStreamId(), str, executionPlanConfiguration, i);
        this.executorService = Executors.newSingleThreadExecutor();
        this.stormDeploymentConfig = distributedConfiguration;
        this.connectionCallback = connectionCallback;
        init(streamDefinition, str, executionPlanConfiguration);
    }

    private void init(org.wso2.carbon.databridge.commons.StreamDefinition streamDefinition, String str, ExecutionPlanConfiguration executionPlanConfiguration) {
        this.logPrefix = "[CEP Receiver|ExecPlan:" + executionPlanConfiguration.getName() + ", Tenant:" + this.tenantId + ", Stream:" + str + "] ";
        try {
            this.siddhiStreamDefinition = EventProcessorUtil.convertToSiddhiStreamDefinition(streamDefinition, str);
            HashSet hashSet = new HashSet();
            hashSet.add(this.siddhiStreamDefinition);
            this.asyncEventPublisher = new AsyncEventPublisher(AsyncEventPublisher.DestinationType.STORM_RECEIVER, hashSet, this.stormDeploymentConfig.getManagers(), executionPlanConfiguration.getName(), this.tenantId, this.stormDeploymentConfig, this.connectionCallback);
            this.asyncEventPublisher.initializeConnection(false);
        } catch (Exception e) {
            log.error(this.logPrefix + "Failed to start event listener", e);
        }
    }

    @Override // org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher
    public void sendEvent(Event event) throws InterruptedException {
        this.asyncEventPublisher.sendEvent(event.getData(), event.getTimestamp(), this.siddhiStreamDefinition.getId());
    }

    @Override // org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher
    public void shutdown() {
        this.asyncEventPublisher.shutdown();
        this.executorService.shutdown();
    }
}
