package org.wso2.carbon.databridge.agent.internal.endpoint;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.databridge.agent.DataEndpointAgent;
import org.wso2.carbon.databridge.agent.exception.DataEndpointException;
import org.wso2.carbon.databridge.agent.exception.EventQueueFullException;
import org.wso2.carbon.databridge.agent.util.DataEndpointConstants;
import org.wso2.carbon.databridge.agent.util.DataPublisherUtil;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.exception.UndefinedEventTypeException;

/* loaded from: input_file:org/wso2/carbon/databridge/agent/internal/endpoint/DataEndpointGroup.class */
public class DataEndpointGroup implements DataEndpointFailureCallback {
    private static final Log log = LogFactory.getLog(DataEndpointGroup.class);
    private HAType haType;
    private EventQueue eventQueue;
    private int reconnectionInterval;
    private AtomicInteger currentDataPublisherIndex = new AtomicInteger();
    private AtomicInteger maximumDataPublisherIndex = new AtomicInteger();
    private ScheduledExecutorService reconnectionService = Executors.newScheduledThreadPool(1);
    private final Integer START_INDEX = 0;
    private ArrayList<DataEndpoint> dataEndpoints = new ArrayList<>();
    private ArrayList<DataEndpoint> failedEventsDataEndpoints = new ArrayList<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/databridge/agent/internal/endpoint/DataEndpointGroup$EventQueue.class */
    public class EventQueue {
        private RingBuffer<Event> ringBuffer;
        private Disruptor<Event> eventQueue;
        public final EventFactory<Event> EVENT_FACTORY = new EventFactory<Event>() { // from class: org.wso2.carbon.databridge.agent.internal.endpoint.DataEndpointGroup.EventQueue.1
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public Event m7newInstance() {
                return new Event();
            }
        };

        public EventQueue(int i) {
            this.eventQueue = new Disruptor<>(this.EVENT_FACTORY, i, Executors.newCachedThreadPool());
            this.eventQueue.handleEventsWith(new EventHandler[]{new EventQueueWorker()});
            this.ringBuffer = this.eventQueue.start();
        }

        public void tryPut(Event event) throws EventQueueFullException {
            try {
                long tryNext = this.ringBuffer.tryNext(1);
                updateEvent((Event) this.ringBuffer.get(tryNext), event);
                this.ringBuffer.publish(tryNext);
            } catch (InsufficientCapacityException e) {
                throw new EventQueueFullException("Cannot send events because the event queue is full", e);
            }
        }

        public void tryPut(Event event, long j) throws EventQueueFullException {
            long currentTimeMillis = System.currentTimeMillis() + j;
            do {
                try {
                    long tryNext = this.ringBuffer.tryNext(1);
                    updateEvent((Event) this.ringBuffer.get(tryNext), event);
                    this.ringBuffer.publish(tryNext);
                    return;
                } catch (InsufficientCapacityException e) {
                }
            } while (currentTimeMillis <= System.currentTimeMillis());
            throw new EventQueueFullException("Cannot send events because the event queue is full", e);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void put(Event event) {
            long next = this.ringBuffer.next();
            updateEvent((Event) this.ringBuffer.get(next), event);
            this.ringBuffer.publish(next);
        }

        private void updateEvent(Event event, Event event2) {
            event.setArbitraryDataMap(event2.getArbitraryDataMap());
            event.setCorrelationData(event2.getCorrelationData());
            event.setMetaData(event2.getMetaData());
            event.setPayloadData(event2.getPayloadData());
            event.setStreamId(event2.getStreamId());
            event.setTimeStamp(event2.getTimeStamp());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void shutdown() {
            this.eventQueue.shutdown();
        }
    }

    /* loaded from: input_file:org/wso2/carbon/databridge/agent/internal/endpoint/DataEndpointGroup$EventQueueWorker.class */
    class EventQueueWorker implements EventHandler<Event> {
        EventQueueWorker() {
        }

        public void onEvent(Event event, long j, boolean z) throws Exception {
            DataEndpoint dataEndpoint = DataEndpointGroup.this.getDataEndpoint();
            DataEndpointGroup.this.processFailedEventsIfExists();
            dataEndpoint.collectAndSend(event);
            if (z) {
                DataEndpointGroup.this.flushAllDataEndpoints();
            }
        }
    }

    /* loaded from: input_file:org/wso2/carbon/databridge/agent/internal/endpoint/DataEndpointGroup$HAType.class */
    public enum HAType {
        FAILOVER,
        LOADBALANCE
    }

    /* loaded from: input_file:org/wso2/carbon/databridge/agent/internal/endpoint/DataEndpointGroup$ReconnectionTask.class */
    private class ReconnectionTask implements Runnable {
        private ReconnectionTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            for (int intValue = DataEndpointGroup.this.START_INDEX.intValue(); intValue < DataEndpointGroup.this.maximumDataPublisherIndex.get(); intValue++) {
                DataEndpoint dataEndpoint = (DataEndpoint) DataEndpointGroup.this.dataEndpoints.get(intValue);
                if (dataEndpoint.isConnected()) {
                    String[] protocolHostPort = DataPublisherUtil.getProtocolHostPort(dataEndpoint.getDataEndpointConfiguration().getReceiverURL());
                    if (!isServerExists(protocolHostPort[1], Integer.parseInt(protocolHostPort[2]))) {
                        dataEndpoint.deactivate();
                    }
                } else {
                    try {
                        dataEndpoint.connect();
                    } catch (Exception e) {
                        dataEndpoint.deactivate();
                    }
                }
                if (dataEndpoint.isConnected()) {
                    z = true;
                }
            }
            if (z) {
                return;
            }
            DataEndpointGroup.log.info("No receiver is reachable at reconnection, will try to reconnect every " + DataEndpointGroup.this.reconnectionInterval + " sec");
        }

        private boolean isServerExists(String str, int i) {
            try {
                new Socket(str, i);
                return true;
            } catch (UnknownHostException e) {
                return false;
            } catch (IOException e2) {
                return false;
            } catch (Exception e3) {
                return false;
            }
        }
    }

    public DataEndpointGroup(HAType hAType, DataEndpointAgent dataEndpointAgent) {
        this.haType = hAType;
        this.reconnectionInterval = dataEndpointAgent.getDataEndpointAgentConfiguration().getReconnectionInterval();
        this.eventQueue = new EventQueue(dataEndpointAgent.getDataEndpointAgentConfiguration().getQueueSize());
        this.reconnectionService.scheduleAtFixedRate(new ReconnectionTask(), this.reconnectionInterval, this.reconnectionInterval, TimeUnit.SECONDS);
    }

    public void addDataEndpoint(DataEndpoint dataEndpoint) {
        this.dataEndpoints.add(dataEndpoint);
        dataEndpoint.registerDataEndpointFailureCallback(this);
        this.maximumDataPublisherIndex.incrementAndGet();
    }

    public void tryPublish(Event event) throws EventQueueFullException {
        this.eventQueue.tryPut(event);
    }

    public void tryPublish(Event event, long j) throws EventQueueFullException {
        this.eventQueue.tryPut(event, j);
    }

    public void publish(Event event) {
        this.eventQueue.put(event);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flushAllDataEndpoints() {
        Iterator<DataEndpoint> it = this.dataEndpoints.iterator();
        while (it.hasNext()) {
            DataEndpoint next = it.next();
            if (next.isActive()) {
                next.flushEvents();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void processFailedEventsIfExists() throws UndefinedEventTypeException, DataEndpointException {
        if (this.failedEventsDataEndpoints.isEmpty()) {
            return;
        }
        Iterator<DataEndpoint> it = this.failedEventsDataEndpoints.iterator();
        while (it.hasNext()) {
            DataEndpoint next = it.next();
            if (next.isActive()) {
                next.flushEvents();
            } else {
                Iterator<Event> it2 = next.getAndResetFailedEvents().iterator();
                while (it2.hasNext()) {
                    getDataEndpoint().collectAndSend(it2.next());
                }
            }
            next.resetFailedEvents();
        }
        this.failedEventsDataEndpoints.clear();
    }

    public DataEndpoint getDataEndpoint() {
        int dataPublisherIndex = this.haType.equals(HAType.FAILOVER) ? getDataPublisherIndex() : this.START_INDEX.intValue();
        int i = dataPublisherIndex;
        while (true) {
            DataEndpoint dataEndpoint = this.dataEndpoints.get(i);
            if (dataEndpoint.isActive()) {
                return dataEndpoint;
            }
            i++;
            if (i > this.maximumDataPublisherIndex.get() - 1) {
                i = this.START_INDEX.intValue();
            }
            if (i == dataPublisherIndex) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
            }
        }
    }

    private synchronized int getDataPublisherIndex() {
        int andIncrement = this.currentDataPublisherIndex.getAndIncrement();
        if (andIncrement == this.maximumDataPublisherIndex.get()) {
            this.currentDataPublisherIndex.set(this.START_INDEX.intValue());
        }
        return andIncrement;
    }

    @Override // org.wso2.carbon.databridge.agent.internal.endpoint.DataEndpointFailureCallback
    public ArrayList<Event> tryResendEvents(ArrayList<Event> arrayList) {
        ArrayList<Event> arrayList2 = new ArrayList<>();
        Iterator<Event> it = arrayList.iterator();
        while (it.hasNext()) {
            Event next = it.next();
            try {
                this.eventQueue.tryPut(next);
            } catch (EventQueueFullException e) {
                arrayList2.add(next);
            }
        }
        return arrayList2;
    }

    @Override // org.wso2.carbon.databridge.agent.internal.endpoint.DataEndpointFailureCallback
    public void addFailedDataEndpoint(DataEndpoint dataEndpoint) {
        this.failedEventsDataEndpoints.add(dataEndpoint);
    }

    public String toString() {
        String str = "[ ";
        for (int i = 0; i < this.dataEndpoints.size(); i++) {
            String str2 = str + this.dataEndpoints.get(i).toString();
            if (i == this.dataEndpoints.size() - 1) {
                return str2 + " ]";
            }
            str = this.haType == HAType.FAILOVER ? str2 + DataEndpointConstants.FAILOVER_URL_GROUP_SEPARATOR : str2 + DataEndpointConstants.LB_URL_GROUP_SEPARATOR;
        }
        return str;
    }

    public void shutdown() {
        this.eventQueue.shutdown();
        this.reconnectionService.shutdown();
        Iterator<DataEndpoint> it = this.dataEndpoints.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }
}
