package org.wso2.carbon.event.processor.common.util;

import com.lmax.disruptor.EventHandler;
import java.io.IOException;
import java.net.SocketException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.log4j.Logger;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.wso2.carbon.event.processor.common.storm.manager.service.StormManagerService;
import org.wso2.carbon.event.processor.common.util.AsynchronousEventBuffer;
import org.wso2.carbon.event.processor.manager.commons.transport.client.TCPEventPublisher;
import org.wso2.carbon.event.processor.manager.commons.utils.HostAndPort;
import org.wso2.carbon.event.processor.manager.commons.utils.Utils;
import org.wso2.carbon.event.processor.manager.core.config.DistributedConfiguration;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/event/processor/common/util/AsyncEventPublisher.class */
public class AsyncEventPublisher implements EventHandler<AsynchronousEventBuffer.DataHolder> {
    private String logPrefix;
    private String destinationTypeString;
    private String publisherTypeString;
    private DestinationType destinationType;
    private Set<StreamDefinition> streams;
    private String executionPlanName;
    private int tenantId;
    private String thisHostIp;
    private List<HostAndPort> managerServiceEndpoints;
    private DistributedConfiguration stormDeploymentConfig;
    private transient Logger log = Logger.getLogger(AsyncEventPublisher.class);
    private TCPEventPublisher tcpEventPublisher = null;
    private AsynchronousEventBuffer eventSendBuffer = new AsynchronousEventBuffer(1024, this);
    private boolean shutdown = false;
    private EndpointConnectionCreator endpointConnectionCreator = new EndpointConnectionCreator();

    /* loaded from: input_file:org/wso2/carbon/event/processor/common/util/AsyncEventPublisher$DestinationType.class */
    public enum DestinationType {
        STORM_RECEIVER,
        CEP_PUBLISHER
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/common/util/AsyncEventPublisher$EndpointConnectionCreator.class */
    public class EndpointConnectionCreator implements Runnable {
        EndpointConnectionCreator() {
        }

        public String getEndpointFromManagerService() {
            String str = null;
            do {
                Iterator it = AsyncEventPublisher.this.managerServiceEndpoints.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    HostAndPort hostAndPort = (HostAndPort) it.next();
                    TTransport tTransport = null;
                    try {
                        try {
                            tTransport = new TSocket(hostAndPort.getHostName(), hostAndPort.getPort());
                            TBinaryProtocol tBinaryProtocol = new TBinaryProtocol(tTransport);
                            tTransport.open();
                            StormManagerService.Client client = new StormManagerService.Client(tBinaryProtocol);
                            str = AsyncEventPublisher.this.destinationType == DestinationType.CEP_PUBLISHER ? client.getCEPPublisher(AsyncEventPublisher.this.tenantId, AsyncEventPublisher.this.executionPlanName, AsyncEventPublisher.this.thisHostIp) : client.getStormReceiver(AsyncEventPublisher.this.tenantId, AsyncEventPublisher.this.executionPlanName, AsyncEventPublisher.this.thisHostIp);
                            AsyncEventPublisher.this.log.info(AsyncEventPublisher.this.logPrefix + "Retrieved " + AsyncEventPublisher.this.destinationTypeString + " at " + str + " from storm manager service at" + hostAndPort.getHostName() + ":" + hostAndPort.getPort());
                            if (tTransport != null) {
                                tTransport.close();
                            }
                        } catch (Exception e) {
                            AsyncEventPublisher.this.log.error(AsyncEventPublisher.this.logPrefix + "Error while trying retrieve " + AsyncEventPublisher.this.destinationType.name() + "endpoint information from storm manager service at " + hostAndPort.getHostName() + ":" + hostAndPort.getPort() + "Trying next Storm manager.", e);
                            if (tTransport != null) {
                                tTransport.close();
                            }
                        }
                    } catch (Throwable th) {
                        if (tTransport != null) {
                            tTransport.close();
                        }
                        throw th;
                    }
                }
                synchronized (AsyncEventPublisher.this) {
                    if (AsyncEventPublisher.this.shutdown) {
                        AsyncEventPublisher.this.log.info(AsyncEventPublisher.this.logPrefix + "Stopping attempting to connect to Storm manager service.Async event publisher is shutdown");
                        return null;
                    }
                    if (str == null) {
                        try {
                            AsyncEventPublisher.this.log.info(AsyncEventPublisher.this.logPrefix + "Failed to retrieve " + AsyncEventPublisher.this.destinationType.name() + " from given set of Storm Managers. Retrying to retrieve endpoint from manager service in " + AsyncEventPublisher.this.stormDeploymentConfig.getManagementReconnectInterval() + "ms to get a " + AsyncEventPublisher.this.destinationTypeString);
                            Thread.sleep(AsyncEventPublisher.this.stormDeploymentConfig.getManagementReconnectInterval());
                        } catch (InterruptedException e2) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            } while (str == null);
            return str;
        }

        public TCPEventPublisher connectToEndpoint(String str, int i) {
            TCPEventPublisher tCPEventPublisher = null;
            int i2 = 0;
            String str2 = str;
            do {
                try {
                    tCPEventPublisher = new TCPEventPublisher(str2, true);
                    StringBuilder sb = new StringBuilder();
                    for (StreamDefinition streamDefinition : AsyncEventPublisher.this.streams) {
                        tCPEventPublisher.addStreamDefinition(streamDefinition);
                        sb.append(streamDefinition.getId() + ",");
                    }
                    AsyncEventPublisher.this.log.info(AsyncEventPublisher.this.logPrefix + "Connected to " + AsyncEventPublisher.this.destinationTypeString + " at " + str2 + " for the Stream(s) " + sb.toString());
                } catch (IOException e) {
                    AsyncEventPublisher.this.log.error(AsyncEventPublisher.this.logPrefix + "Error while trying to connect to " + AsyncEventPublisher.this.destinationTypeString + " at " + str2, e);
                }
                synchronized (AsyncEventPublisher.this) {
                    if (AsyncEventPublisher.this.shutdown) {
                        AsyncEventPublisher.this.log.info(AsyncEventPublisher.this.logPrefix + "Stopping attempting to connect to endpoint " + str2 + ". Async event publisher is shutdown");
                        return null;
                    }
                    if (tCPEventPublisher == null) {
                        i2++;
                        if (i > 0 && i2 > i) {
                            return null;
                        }
                        try {
                            AsyncEventPublisher.this.log.info(AsyncEventPublisher.this.logPrefix + "Retrying(" + i2 + ") to connect to " + AsyncEventPublisher.this.destinationTypeString + " at " + str2 + " in " + AsyncEventPublisher.this.stormDeploymentConfig.getTransportReconnectInterval() + "ms");
                            Thread.sleep(AsyncEventPublisher.this.stormDeploymentConfig.getTransportReconnectInterval());
                            str2 = getEndpointFromManagerService();
                        } catch (InterruptedException e2) {
                            Thread.currentThread().interrupt();
                        }
                    }
                }
            } while (tCPEventPublisher == null);
            return tCPEventPublisher;
        }

        public void establishConnection() {
            AsyncEventPublisher.this.log.info(AsyncEventPublisher.this.logPrefix + "Requesting a " + AsyncEventPublisher.this.destinationTypeString + " for " + AsyncEventPublisher.this.thisHostIp);
            String endpointFromManagerService = getEndpointFromManagerService();
            if (endpointFromManagerService != null) {
                AsyncEventPublisher.this.tcpEventPublisher = connectToEndpoint(endpointFromManagerService, 0);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            establishConnection();
        }
    }

    public AsyncEventPublisher(DestinationType destinationType, Set<StreamDefinition> set, List<HostAndPort> list, String str, int i, DistributedConfiguration distributedConfiguration) {
        this.destinationType = destinationType;
        this.streams = set;
        this.executionPlanName = str;
        this.tenantId = i;
        this.managerServiceEndpoints = list;
        this.stormDeploymentConfig = distributedConfiguration;
        this.destinationTypeString = destinationType == DestinationType.STORM_RECEIVER ? "Storm Receiver" : "CEP Publisher";
        this.publisherTypeString = destinationType == DestinationType.STORM_RECEIVER ? "CEP Receiver" : "Publisher Bolt";
        this.logPrefix = "[" + i + ":" + str + ":" + this.publisherTypeString + "]";
    }

    public void initializeConnection(boolean z) {
        try {
            this.thisHostIp = Utils.findAddress("localhost");
            if (z) {
                this.endpointConnectionCreator.establishConnection();
            } else {
                new Thread(this.endpointConnectionCreator).start();
            }
        } catch (SocketException e) {
            this.log.error(this.logPrefix + "Error while trying to obtain this host IP address", e);
        }
    }

    public void sendEvent(Object[] objArr, String str) {
        this.eventSendBuffer.addEvent(objArr, str);
    }

    public void onEvent(AsynchronousEventBuffer.DataHolder dataHolder, long j, boolean z) {
        while (this.tcpEventPublisher == null) {
            this.log.info(this.logPrefix + "Can't send event.TCP event publisher not initialized. Waiting " + this.stormDeploymentConfig.getTransportReconnectInterval() + "s");
            try {
                synchronized (this) {
                    if (this.shutdown) {
                        this.log.info(this.logPrefix + "Aborting retry to send events. AsyncEventPublisher shut down.");
                        return;
                    }
                }
                Thread.sleep(this.stormDeploymentConfig.getTransportReconnectInterval());
            } catch (InterruptedException e) {
            }
        }
        try {
            this.tcpEventPublisher.sendEvent(dataHolder.getStreamId(), (Object[]) dataHolder.getData(), z);
        } catch (IOException e2) {
            this.log.error(this.logPrefix + "Error while trying to send event to " + this.destinationTypeString + " at " + this.tcpEventPublisher.getHostUrl(), e2);
            reconnect();
            onEvent(dataHolder, j, z);
        }
    }

    private void reconnect() {
        String hostUrl = this.tcpEventPublisher.getHostUrl();
        resetTCPEventPublisher();
        this.tcpEventPublisher = this.endpointConnectionCreator.connectToEndpoint(hostUrl, 3);
        if (this.tcpEventPublisher == null) {
            this.log.error(this.logPrefix + "Failed to connect to existing " + this.destinationTypeString + " at " + hostUrl + ". Reinitializing");
            initializeConnection(true);
        }
    }

    private void resetTCPEventPublisher() {
        this.tcpEventPublisher.terminate();
        this.tcpEventPublisher = null;
    }

    protected void finalize() {
        if (this.tcpEventPublisher != null) {
            this.tcpEventPublisher.terminate();
        }
    }

    public void shutdown() {
        synchronized (this) {
            this.shutdown = true;
        }
        this.eventSendBuffer.terminate();
    }
}
