package org.wso2.carbon.bam.activity.mediation.data.publisher.publish;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.transport.THttpClient;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.wso2.carbon.bam.activity.mediation.data.publisher.conf.ActivityConfigData;
import org.wso2.carbon.bam.activity.mediation.data.publisher.pool.TFramedTransportPool;
import org.wso2.carbon.bam.activity.mediation.data.publisher.pool.TFramedTransportPoolFactory;
import org.wso2.carbon.bam.activity.mediation.data.publisher.util.ActivityPublisherUtils;
import org.wso2.carbon.bam.activity.mediation.data.publisher.util.TenantActivityConfigData;
import org.wso2.carbon.bam.data.publisher.util.PublisherConfiguration;
import org.wso2.carbon.bam.data.publisher.util.stats.AtomicIntSingleton;
import org.wso2.carbon.bam.service.Event;
import org.wso2.carbon.bam.service.ReceiverService;
import org.wso2.carbon.bam.service.SessionTimeOutException;

/* loaded from: input_file:org/wso2/carbon/bam/activity/mediation/data/publisher/publish/DataPublisher.class */
public class DataPublisher implements ActivityProcessor {
    private static Log log = LogFactory.getLog(DataPublisher.class);

    @Override // org.wso2.carbon.bam.activity.mediation.data.publisher.publish.ActivityProcessor
    public void destroy() {
    }

    @Override // org.wso2.carbon.bam.activity.mediation.data.publisher.publish.ActivityProcessor
    public void process(ArrayList<Event> arrayList, int i) {
        ActivityConfigData activityConfigData = TenantActivityConfigData.getTenantSpecificEventingConfigData().get(Integer.valueOf(i));
        if (activityConfigData.isSocketTransportEnable()) {
            publishUsingTSocketTransport(arrayList, activityConfigData);
        } else {
            publishUsingHttp(arrayList, activityConfigData);
        }
    }

    private void publishUsingTSocketTransport(ArrayList<Event> arrayList, ActivityConfigData activityConfigData) {
        int i = 0;
        TTransport tTransport = null;
        String sessionId = ThriftUtil.getSessionId(activityConfigData);
        PublisherConfiguration publisherConfiguration = ActivityPublisherUtils.getPublisherConfiguration();
        GenericKeyedObjectPool clientPool = TFramedTransportPool.getClientPool(new TFramedTransportPoolFactory(), publisherConfiguration.getMaxPoolSize(), publisherConfiguration.getMaxIdleConnections(), true, publisherConfiguration.getEvictionTimePeriod(), publisherConfiguration.getMinIdleTimeInPool());
        String str = null;
        try {
            try {
                try {
                    try {
                        try {
                            str = new URL(activityConfigData.getUrl()).getHost() + ":" + activityConfigData.getPort();
                            tTransport = (TTransport) clientPool.borrowObject(str);
                            ReceiverService.Client client = new ReceiverService.Client(new TCompactProtocol(tTransport));
                            Iterator<Event> it = arrayList.iterator();
                            while (it.hasNext()) {
                                client.publish(it.next(), sessionId);
                                if (log.isDebugEnabled()) {
                                    AtomicIntSingleton.getAtomicInteger().incrementAndGet();
                                }
                                i++;
                            }
                            try {
                                clientPool.returnObject(str, tTransport);
                            } catch (Exception e) {
                                log.warn("Error occurred while returning object to connection pool");
                            }
                        } catch (Throwable th) {
                            try {
                                clientPool.returnObject(str, tTransport);
                            } catch (Exception e2) {
                                log.warn("Error occurred while returning object to connection pool");
                            }
                            throw th;
                        }
                    } catch (MalformedURLException e3) {
                        log.error("BAM url is not correct", e3);
                        try {
                            clientPool.returnObject(str, tTransport);
                        } catch (Exception e4) {
                            log.warn("Error occurred while returning object to connection pool");
                        }
                    }
                } catch (TException e5) {
                    log.error("Unable to publish event to BAM", e5);
                    try {
                        clientPool.returnObject(str, tTransport);
                    } catch (Exception e6) {
                        log.warn("Error occurred while returning object to connection pool");
                    }
                }
            } catch (TTransportException e7) {
                log.warn("TransportException, retrying to publish again..", e7);
                clientPool.clear(str);
                publishRetryUsingTSocket(arrayList, i, activityConfigData, clientPool);
                try {
                    clientPool.returnObject(str, tTransport);
                } catch (Exception e8) {
                    log.warn("Error occurred while returning object to connection pool");
                }
            }
        } catch (SessionTimeOutException e9) {
            log.warn("Session Timeout, retrying .........");
            publishRetryUsingTSocket(arrayList, i, activityConfigData, clientPool);
            try {
                clientPool.returnObject(str, tTransport);
            } catch (Exception e10) {
                log.warn("Error occurred while returning object to connection pool");
            }
        } catch (Exception e11) {
            log.error("Unable to publish event to BAM", e11);
            try {
                clientPool.returnObject(str, tTransport);
            } catch (Exception e12) {
                log.warn("Error occurred while returning object to connection pool");
            }
        }
    }

    private void publishRetryUsingTSocket(ArrayList<Event> arrayList, int i, ActivityConfigData activityConfigData, GenericKeyedObjectPool genericKeyedObjectPool) {
        ArrayList arrayList2 = new ArrayList();
        TTransport tTransport = null;
        for (int i2 = i; i2 < arrayList.size(); i2++) {
            arrayList2.add(arrayList.get(i2));
        }
        ThriftUtil.setSessionId(null);
        for (int i3 = 0; i3 < 30; i3++) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            String str = null;
            try {
                try {
                    try {
                        String sessionId = ThriftUtil.getSessionId(activityConfigData);
                        str = new URL(activityConfigData.getUrl()).getHost() + ":" + activityConfigData.getPort();
                        tTransport = (TTransport) genericKeyedObjectPool.borrowObject(str);
                        ReceiverService.Client client = new ReceiverService.Client(new TCompactProtocol(tTransport));
                        Iterator<Event> it = arrayList.iterator();
                        while (it.hasNext()) {
                            client.publish(it.next(), sessionId);
                            i++;
                        }
                        try {
                            genericKeyedObjectPool.returnObject(str, tTransport);
                            return;
                        } catch (Exception e2) {
                            log.warn("Error occurred while returning object to connection pool");
                            return;
                        }
                    } catch (TTransportException e3) {
                        try {
                            log.error("Unable to publish event to BAM", e3);
                            try {
                                genericKeyedObjectPool.returnObject(str, tTransport);
                            } catch (Exception e4) {
                                log.warn("Error occurred while returning object to connection pool");
                            }
                        } catch (Throwable th) {
                            try {
                                genericKeyedObjectPool.returnObject(str, tTransport);
                            } catch (Exception e5) {
                                log.warn("Error occurred while returning object to connection pool");
                            }
                            throw th;
                        }
                    }
                } catch (TException e6) {
                    log.error("Unable to publish event to BAM", e6);
                    try {
                        genericKeyedObjectPool.returnObject(str, tTransport);
                    } catch (Exception e7) {
                        log.warn("Error occurred while returning object to connection pool");
                    }
                } catch (Exception e8) {
                    log.error("Unable to publish event to BAM", e8);
                    try {
                        genericKeyedObjectPool.returnObject(str, tTransport);
                    } catch (Exception e9) {
                        log.warn("Error occurred while returning object to connection pool");
                    }
                }
            } catch (MalformedURLException e10) {
                log.error("BAM url is not correct", e10);
                try {
                    genericKeyedObjectPool.returnObject(str, tTransport);
                } catch (Exception e11) {
                    log.warn("Error occurred while returning object to connection pool");
                }
            } catch (SessionTimeOutException e12) {
                log.warn("Session Timeout, retrying .........");
                try {
                    genericKeyedObjectPool.returnObject(str, tTransport);
                } catch (Exception e13) {
                    log.warn("Error occurred while returning object to connection pool");
                }
            }
        }
    }

    private void publishUsingHttp(ArrayList<Event> arrayList, ActivityConfigData activityConfigData) {
        THttpClient tHttpClient = null;
        int i = 0;
        try {
            try {
                try {
                    try {
                        String sessionId = ThriftUtil.getSessionId(activityConfigData);
                        tHttpClient = new THttpClient(activityConfigData.getUrl() + "thriftReceiver");
                        ReceiverService.Client client = new ReceiverService.Client(new TCompactProtocol(tHttpClient));
                        tHttpClient.open();
                        Iterator<Event> it = arrayList.iterator();
                        while (it.hasNext()) {
                            client.publish(it.next(), sessionId);
                            if (log.isDebugEnabled()) {
                                AtomicIntSingleton.getAtomicInteger().incrementAndGet();
                            }
                            i++;
                        }
                        tHttpClient.close();
                    } catch (TException e) {
                        log.error("Unable to publish event to BAM", e);
                        tHttpClient.close();
                    }
                } catch (TTransportException e2) {
                    log.error("Unable to publish event to BAM", e2);
                    tHttpClient.close();
                }
            } catch (SessionTimeOutException e3) {
                log.warn("Session Timeout, retrying .........");
                publishRetryUsingHttp(arrayList, i, activityConfigData);
                tHttpClient.close();
            }
        } catch (Throwable th) {
            tHttpClient.close();
            throw th;
        }
    }

    private void publishRetryUsingHttp(ArrayList<Event> arrayList, int i, ActivityConfigData activityConfigData) {
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = i; i2 < arrayList.size(); i2++) {
            arrayList2.add(arrayList.get(i2));
        }
        ThriftUtil.setSessionId(null);
        for (int i3 = 0; i3 < 30; i3++) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            TTransport tTransport = null;
            TCompactProtocol tCompactProtocol = null;
            String sessionId = ThriftUtil.getSessionId(activityConfigData);
            try {
                tTransport = new THttpClient(activityConfigData.getUrl() + "thriftReceiver");
                tCompactProtocol = new TCompactProtocol(tTransport);
            } catch (TTransportException e2) {
                e2.printStackTrace();
            }
            ReceiverService.Client client = new ReceiverService.Client(tCompactProtocol);
            try {
                try {
                    try {
                        tTransport.open();
                        Iterator<Event> it = arrayList.iterator();
                        while (it.hasNext()) {
                            client.publish(it.next(), sessionId);
                        }
                        tTransport.close();
                        return;
                    } catch (SessionTimeOutException e3) {
                        log.warn("Session Timeout, retrying .........");
                        tTransport.close();
                    }
                } catch (TTransportException e4) {
                    try {
                        log.error("Unable to publish event to BAM", e4);
                        tTransport.close();
                    } catch (Throwable th) {
                        tTransport.close();
                        throw th;
                    }
                }
            } catch (TException e5) {
                log.error("Unable to publish event to BAM", e5);
                tTransport.close();
            }
        }
    }
}
