package org.wso2.carbon.databridge.agent.thrift.lb;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.agent.thrift.Agent;
import org.wso2.carbon.databridge.agent.thrift.AgentHolder;
import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.databridge.agent.thrift.internal.utils.AgentServerURL;
import org.wso2.carbon.databridge.agent.thrift.util.PublishData;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/databridge/agent/thrift/lb/ReceiverGroup.class */
public class ReceiverGroup implements ReceiverStateObserver {
    private static Log log = LogFactory.getLog(ReceiverGroup.class);
    private ArrayList<DataPublisherHolder> dataPublisherCache;
    private AtomicInteger currentDataPublisherIndex;
    private int maximumDataPublisherIndex;
    private final Integer START_INDEX;
    private ScheduledExecutorService scheduledExecutorService;
    private final LinkedBlockingQueue<PublishData> receiverGroupUnsentEventQueue;
    private boolean isFailOver;

    /* loaded from: input_file:org/wso2/carbon/databridge/agent/thrift/lb/ReceiverGroup$ReconnectionTask.class */
    private class ReconnectionTask implements Runnable {
        private ReconnectionTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            for (int intValue = ReceiverGroup.this.START_INDEX.intValue(); intValue <= ReceiverGroup.this.maximumDataPublisherIndex; intValue++) {
                DataPublisherHolder dataPublisherHolder = (DataPublisherHolder) ReceiverGroup.this.dataPublisherCache.get(intValue);
                if (dataPublisherHolder.getConnected().get()) {
                    AgentServerURL agentServerURL = null;
                    try {
                        agentServerURL = new AgentServerURL(dataPublisherHolder.getReceiverUrl());
                    } catch (MalformedURLException e) {
                    }
                    if (null != agentServerURL && !isServerExists(agentServerURL.getHost(), agentServerURL.getPort())) {
                        dataPublisherHolder.setConnected(false);
                    }
                } else {
                    dataPublisherHolder.getDataPublisher().reconnect();
                }
                if (dataPublisherHolder.getConnected().get()) {
                    z = true;
                }
            }
            if (z) {
                return;
            }
            ReceiverGroup.log.error("No receiver is reachable at reconnection, can't publish the events");
        }

        private boolean isServerExists(String str, int i) {
            try {
                new Socket(str, i);
                return true;
            } catch (UnknownHostException e) {
                return false;
            } catch (IOException e2) {
                return false;
            } catch (Exception e3) {
                return false;
            }
        }
    }

    public ReceiverGroup(ArrayList<DataPublisherHolder> arrayList) {
        this.dataPublisherCache = new ArrayList<>();
        this.START_INDEX = 0;
        this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
        this.isFailOver = false;
        Iterator<DataPublisherHolder> it = arrayList.iterator();
        while (it.hasNext()) {
            this.dataPublisherCache.add(it.next());
        }
        this.maximumDataPublisherIndex = arrayList.size() - 1;
        this.currentDataPublisherIndex = new AtomicInteger(this.START_INDEX.intValue());
        this.receiverGroupUnsentEventQueue = new LinkedBlockingQueue<>(AgentHolder.getOrCreateAgent().getAgentConfiguration().getLoadBalancingDataPublisherBufferedEventSize());
    }

    public ReceiverGroup(ArrayList<DataPublisherHolder> arrayList, boolean z) {
        this.dataPublisherCache = new ArrayList<>();
        this.START_INDEX = 0;
        this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
        this.isFailOver = false;
        Iterator<DataPublisherHolder> it = arrayList.iterator();
        while (it.hasNext()) {
            this.dataPublisherCache.add(it.next());
        }
        this.maximumDataPublisherIndex = arrayList.size() - 1;
        this.isFailOver = z;
        this.currentDataPublisherIndex = new AtomicInteger(this.START_INDEX.intValue());
        this.receiverGroupUnsentEventQueue = new LinkedBlockingQueue<>(AgentHolder.getOrCreateAgent().getAgentConfiguration().getLoadBalancingDataPublisherBufferedEventSize());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createDataPublishers(Agent agent, ConcurrentHashMap<String, String> concurrentHashMap) {
        Iterator<DataPublisherHolder> it = this.dataPublisherCache.iterator();
        while (it.hasNext()) {
            DataPublisherHolder next = it.next();
            next.setAgent(agent);
            next.generateDataPublisher(concurrentHashMap);
            next.getDataPublisher().registerReceiverObserver(this);
        }
        long reconnectionInterval = agent.getAgentConfiguration().getReconnectionInterval();
        this.scheduledExecutorService.scheduleAtFixedRate(new ReconnectionTask(), reconnectionInterval, reconnectionInterval, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void publish(String str, String str2, long j, Object[] objArr, Object[] objArr2, Object[] objArr3, Map<String, String> map) throws AgentException {
        AsyncDataPublisher dataPublisher = getDataPublisher();
        if (null != dataPublisher) {
            dataPublisher.publish(str, str2, j, objArr, objArr2, objArr3, map);
        } else {
            this.receiverGroupUnsentEventQueue.offer(new PublishData(str, str2, j, objArr, objArr2, objArr3, map));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void publish(String str, String str2, Object[] objArr, Object[] objArr2, Object[] objArr3, Map<String, String> map) throws AgentException {
        AsyncDataPublisher dataPublisher = getDataPublisher();
        if (null != dataPublisher) {
            dataPublisher.publish(str, str2, objArr, objArr2, objArr3, map);
        } else {
            this.receiverGroupUnsentEventQueue.offer(new PublishData(str, str2, System.currentTimeMillis(), objArr, objArr2, objArr3, map));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void publish(Event event) {
        AsyncDataPublisher dataPublisher = getDataPublisher();
        if (null == dataPublisher) {
            this.receiverGroupUnsentEventQueue.offer(new PublishData(null, null, event));
            return;
        }
        try {
            dataPublisher.publish(event);
        } catch (AgentException e) {
            log.error("No receiver is reachable, can't publish the event.");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void publish(String str, String str2, Event event) throws AgentException {
        AsyncDataPublisher dataPublisher = getDataPublisher();
        if (null != dataPublisher) {
            dataPublisher.publish(str, str2, event);
        } else {
            this.receiverGroupUnsentEventQueue.add(new PublishData(str, str2, event));
        }
    }

    private AsyncDataPublisher getDataPublisher() {
        int dataPublisherIndex = !this.isFailOver ? getDataPublisherIndex() : this.START_INDEX.intValue();
        int i = dataPublisherIndex;
        do {
            DataPublisherHolder dataPublisherHolder = this.dataPublisherCache.get(i);
            if (dataPublisherHolder.getConnected().get()) {
                return dataPublisherHolder.getDataPublisher();
            }
            i++;
            if (i > this.maximumDataPublisherIndex) {
                i = this.START_INDEX.intValue();
            }
        } while (i != dataPublisherIndex);
        return null;
    }

    private synchronized int getDataPublisherIndex() {
        int andIncrement = this.currentDataPublisherIndex.getAndIncrement();
        if (andIncrement == this.maximumDataPublisherIndex) {
            this.currentDataPublisherIndex.set(this.START_INDEX.intValue());
        }
        return andIncrement;
    }

    public void addStreamDefinition(String str, String str2, String str3) {
        for (int intValue = this.START_INDEX.intValue(); intValue <= this.maximumDataPublisherIndex; intValue++) {
            this.dataPublisherCache.get(intValue).getDataPublisher().addStreamDefinition(str, str2, str3);
        }
    }

    public void addStreamDefinition(StreamDefinition streamDefinition) {
        for (int intValue = this.START_INDEX.intValue(); intValue <= this.maximumDataPublisherIndex; intValue++) {
            this.dataPublisherCache.get(intValue).getDataPublisher().addStreamDefinition(streamDefinition);
        }
    }

    private AsyncDataPublisher setConnectionStatus(String str, String str2, String str3, boolean z) {
        for (int intValue = this.START_INDEX.intValue(); intValue <= this.maximumDataPublisherIndex; intValue++) {
            DataPublisherHolder dataPublisherHolder = this.dataPublisherCache.get(intValue);
            if (dataPublisherHolder.getReceiverUrl().equalsIgnoreCase(str) && dataPublisherHolder.getUsername().equalsIgnoreCase(str2) && dataPublisherHolder.getPassword().equalsIgnoreCase(str3)) {
                dataPublisherHolder.setConnected(z);
                return dataPublisherHolder.getDataPublisher();
            }
        }
        return null;
    }

    @Override // org.wso2.carbon.databridge.agent.thrift.lb.ReceiverStateObserver
    public void notifyConnectionFailure(String str, String str2, String str3) {
        setConnectionStatus(str, str2, str3, false);
    }

    @Override // org.wso2.carbon.databridge.agent.thrift.lb.ReceiverStateObserver
    public void resendEvents(LinkedBlockingQueue<Event> linkedBlockingQueue) {
        if (null == linkedBlockingQueue) {
            return;
        }
        if (linkedBlockingQueue.size() > 0) {
            log.info("Resending the failed events....");
        }
        while (true) {
            Event poll = linkedBlockingQueue.poll();
            if (null == poll) {
                return;
            } else {
                publish(poll);
            }
        }
    }

    @Override // org.wso2.carbon.databridge.agent.thrift.lb.ReceiverStateObserver
    public void resendPublishedData(LinkedBlockingQueue<PublishData> linkedBlockingQueue) {
        if (null == linkedBlockingQueue) {
            return;
        }
        if (linkedBlockingQueue.size() > 0) {
            log.info("Resending the failed published data...");
        }
        while (true) {
            PublishData poll = linkedBlockingQueue.poll();
            if (null == poll) {
                return;
            }
            try {
                if (poll.getStreamName() == null) {
                    publish(poll.getEvent());
                } else {
                    publish(poll.getStreamName(), poll.getStreamVersion(), poll.getEvent());
                }
            } catch (AgentException e) {
                log.error(e);
            }
        }
    }

    @Override // org.wso2.carbon.databridge.agent.thrift.lb.ReceiverStateObserver
    public void notifyConnectionSuccess(String str, String str2, String str3) {
        setConnectionStatus(str, str2, str3, true);
        if (this.receiverGroupUnsentEventQueue.size() > 0) {
            resendPublishedData(this.receiverGroupUnsentEventQueue);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stop() {
        Iterator<DataPublisherHolder> it = this.dataPublisherCache.iterator();
        while (it.hasNext()) {
            DataPublisherHolder next = it.next();
            if (null != next.getDataPublisher()) {
                next.getDataPublisher().stop();
            }
        }
    }
}
