package org.wso2.carbon.event.processor.core.internal.storm;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.wso2.carbon.event.processor.common.storm.config.StormDeploymentConfig;
import org.wso2.carbon.event.processor.common.storm.manager.service.StormManagerService;
import org.wso2.carbon.event.processor.common.transport.client.TCPEventPublisher;
import org.wso2.carbon.event.processor.common.util.Utils;
import org.wso2.carbon.event.processor.core.ExecutionPlanConfiguration;
import org.wso2.carbon.event.processor.core.StreamConfiguration;
import org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher;
import org.wso2.carbon.event.processor.core.internal.util.EventProcessorUtil;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/SiddhiStormInputEventDispatcher.class */
public class SiddhiStormInputEventDispatcher extends AbstractSiddhiInputEventDispatcher {
    private static final Log log = LogFactory.getLog(SiddhiStormInputEventDispatcher.class);
    private final StormDeploymentConfig stormDeploymentConfig;
    private final ExecutionPlanConfiguration executionPlanConfiguration;
    private TCPEventPublisher tcpEventPublisher;
    private StreamDefinition siddhiStreamDefinition;
    private String logPrefix;
    private String thisHostIp;
    private ExecutorService executorService;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/SiddhiStormInputEventDispatcher$Registrar.class */
    public class Registrar implements Runnable {
        Registrar() {
        }

        @Override // java.lang.Runnable
        public void run() {
            SiddhiStormInputEventDispatcher.log.info(SiddhiStormInputEventDispatcher.this.logPrefix + "Getting Storm receiver for " + SiddhiStormInputEventDispatcher.this.thisHostIp);
            while (!configureStormReceiverFromStormMangerService()) {
                SiddhiStormInputEventDispatcher.log.info(SiddhiStormInputEventDispatcher.this.logPrefix + "Retry getting Storm receiver in 30 sec");
                try {
                    Thread.sleep(30000L);
                } catch (InterruptedException e) {
                }
            }
        }

        private boolean configureStormReceiverFromStormMangerService() {
            TTransport tTransport = null;
            try {
                try {
                    tTransport = new TSocket(((StormDeploymentConfig.HostAndPort) SiddhiStormInputEventDispatcher.this.stormDeploymentConfig.getManagers().get(0)).getHostName(), ((StormDeploymentConfig.HostAndPort) SiddhiStormInputEventDispatcher.this.stormDeploymentConfig.getManagers().get(0)).getPort());
                    TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
                    tTransport.open();
                    String stormReceiver = new StormManagerService.Client(tBinaryProtocol).getStormReceiver(SiddhiStormInputEventDispatcher.this.tenantId, SiddhiStormInputEventDispatcher.this.executionPlanConfiguration.getName(), SiddhiStormInputEventDispatcher.this.thisHostIp);
                    SiddhiStormInputEventDispatcher.log.info(SiddhiStormInputEventDispatcher.this.logPrefix + "Successfully got Storm receiver with " + stormReceiver);
                    SiddhiStormInputEventDispatcher.this.tcpEventPublisher = new TCPEventPublisher(stormReceiver);
                    SiddhiStormInputEventDispatcher.this.tcpEventPublisher.addStreamDefinition(SiddhiStormInputEventDispatcher.this.siddhiStreamDefinition);
                    SiddhiStormInputEventDispatcher.log.info(SiddhiStormInputEventDispatcher.this.logPrefix + "connected to Storm event receiver at " + stormReceiver + " for the Stream '" + SiddhiStormInputEventDispatcher.this.siddhiStreamDefinition.getStreamId());
                    if (tTransport != null) {
                        tTransport.close();
                    }
                    return true;
                } catch (Exception e) {
                    SiddhiStormInputEventDispatcher.log.error(SiddhiStormInputEventDispatcher.this.logPrefix + "Error in getting Storm receiver :" + e.getMessage());
                    if (tTransport != null) {
                        tTransport.close();
                    }
                    return false;
                }
            } catch (Throwable th) {
                if (tTransport != null) {
                    tTransport.close();
                }
                throw th;
            }
        }
    }

    public SiddhiStormInputEventDispatcher(org.wso2.carbon.databridge.commons.StreamDefinition streamDefinition, String str, String str2, ExecutionPlanConfiguration executionPlanConfiguration, int i, StormDeploymentConfig stormDeploymentConfig) {
        super(streamDefinition.getStreamId(), str, executionPlanConfiguration, i);
        this.tcpEventPublisher = null;
        this.executorService = Executors.newSingleThreadExecutor();
        this.executionPlanConfiguration = executionPlanConfiguration;
        this.stormDeploymentConfig = stormDeploymentConfig;
        init(streamDefinition, str2, executionPlanConfiguration);
    }

    private void init(org.wso2.carbon.databridge.commons.StreamDefinition streamDefinition, String str, ExecutionPlanConfiguration executionPlanConfiguration) {
        this.logPrefix = "[CEP Receiver|ExecPlan:" + executionPlanConfiguration.getName() + ", Tenant:" + this.tenantId + ", Stream:" + str + "]";
        try {
            this.thisHostIp = Utils.findAddress("localhost");
            this.siddhiStreamDefinition = EventProcessorUtil.convertToSiddhiStreamDefinition(streamDefinition, new StreamConfiguration(str, streamDefinition.getVersion()));
            new Thread(new Registrar()).start();
        } catch (Exception e) {
            log.error(this.logPrefix + "Failed to start event listener", e);
        }
    }

    @Override // org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher
    public void sendEvent(Event event) throws InterruptedException {
        sendEvent(event.getData());
    }

    @Override // org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher
    public void sendEvent(Object[] objArr) throws InterruptedException {
        try {
            if (this.tcpEventPublisher != null) {
                this.tcpEventPublisher.sendEvent(this.siddhiStreamDefinition.getStreamId(), objArr);
            } else {
                log.warn(this.logPrefix + "Dropping the event since the data publisher is not yet initialized");
            }
        } catch (IOException e) {
            log.error(this.logPrefix + "Error while publishing data", e);
        }
    }

    @Override // org.wso2.carbon.event.processor.core.internal.listener.AbstractSiddhiInputEventDispatcher
    public void shutdown() {
        this.executorService.shutdown();
        this.tcpEventPublisher.shutdown();
    }
}
