package org.wso2.choreo.connect.enforcer.throttle.databridge.agent.endpoint;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.utils.DataBridgeThreadFactory;
import org.wso2.choreo.connect.enforcer.throttle.databridge.agent.DataEndpointAgent;
import org.wso2.choreo.connect.enforcer.throttle.databridge.agent.conf.DataEndpointConfiguration;
import org.wso2.choreo.connect.enforcer.throttle.databridge.agent.endpoint.DataEndpoint;
import org.wso2.choreo.connect.enforcer.throttle.databridge.agent.endpoint.WrappedEventFactory;
import org.wso2.choreo.connect.enforcer.throttle.databridge.agent.exception.DataEndpointConfigurationException;
import org.wso2.choreo.connect.enforcer.throttle.databridge.agent.exception.EventQueueFullException;
import org.wso2.choreo.connect.enforcer.throttle.databridge.agent.util.DataPublisherUtil;

/* loaded from: input_file:org/wso2/choreo/connect/enforcer/throttle/databridge/agent/endpoint/DataEndpointGroup.class */
public class DataEndpointGroup implements DataEndpointFailureCallback {
    private static final Logger log = LogManager.getLogger(DataEndpointGroup.class);
    private HAType haType;
    private EventQueue eventQueue;
    private int reconnectionInterval;
    private final String publishingStrategy;
    private final Integer startIndex = 0;
    private AtomicInteger currentDataPublisherIndex = new AtomicInteger(this.startIndex.intValue());
    private AtomicInteger maximumDataPublisherIndex = new AtomicInteger();
    private boolean isShutdown = false;
    private List<DataEndpoint> dataEndpoints = new ArrayList();
    private ScheduledExecutorService reconnectionService = Executors.newScheduledThreadPool(1, new DataBridgeThreadFactory("ReconnectionService"));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/choreo/connect/enforcer/throttle/databridge/agent/endpoint/DataEndpointGroup$EventQueue.class */
    public class EventQueue {
        private RingBuffer<WrappedEventFactory.WrappedEvent> ringBuffer;
        private Disruptor<WrappedEventFactory.WrappedEvent> eventQueueDisruptor;
        private ExecutorService eventQueuePool;

        EventQueue(int i) {
            this.ringBuffer = null;
            this.eventQueueDisruptor = null;
            this.eventQueuePool = null;
            this.eventQueuePool = Executors.newCachedThreadPool(new DataBridgeThreadFactory("EventQueue"));
            this.eventQueueDisruptor = new Disruptor<>(new WrappedEventFactory(), i, this.eventQueuePool, ProducerType.MULTI, new BlockingWaitStrategy());
            this.eventQueueDisruptor.handleEventsWith(new EventQueueWorker());
            this.ringBuffer = this.eventQueueDisruptor.start();
        }

        private void tryPut(Event event) throws EventQueueFullException {
            try {
                long tryNext = this.ringBuffer.tryNext(1);
                this.ringBuffer.get(tryNext).setEvent(event);
                this.ringBuffer.publish(tryNext);
            } catch (InsufficientCapacityException e) {
                throw new EventQueueFullException("Cannot send events because the event queue is full", e);
            }
        }

        private void put(Event event) {
            do {
                try {
                    long tryNext = this.ringBuffer.tryNext(1);
                    this.ringBuffer.get(tryNext).setEvent(event);
                    this.ringBuffer.publish(tryNext);
                    return;
                } catch (InsufficientCapacityException e) {
                    try {
                        Thread.sleep(2L);
                    } catch (InterruptedException e2) {
                    }
                }
            } while (DataEndpointGroup.this.isActiveDataEndpointExists());
        }

        private void shutdown() {
            this.eventQueuePool.shutdown();
            this.eventQueueDisruptor.shutdown();
        }
    }

    /* loaded from: input_file:org/wso2/choreo/connect/enforcer/throttle/databridge/agent/endpoint/DataEndpointGroup$EventQueueWorker.class */
    class EventQueueWorker implements EventHandler<WrappedEventFactory.WrappedEvent> {
        boolean isLastEventDropped = false;

        EventQueueWorker() {
        }

        @Override // com.lmax.disruptor.EventHandler
        public void onEvent(WrappedEventFactory.WrappedEvent wrappedEvent, long j, boolean z) {
            DataEndpoint dataEndpoint = DataEndpointGroup.this.getDataEndpoint(true);
            Event event = wrappedEvent.getEvent();
            if (dataEndpoint != null) {
                this.isLastEventDropped = false;
                dataEndpoint.collectAndSend(event);
                if (z) {
                    DataEndpointGroup.this.flushAllDataEndpoints();
                    return;
                }
                return;
            }
            if (!this.isLastEventDropped) {
                DataEndpointGroup.log.error("Dropping all events as DataPublisher is shutting down.");
            }
            if (DataEndpointGroup.log.isDebugEnabled()) {
                DataEndpointGroup.log.debug("Data publisher is shutting down, dropping event : " + event);
            }
            this.isLastEventDropped = true;
        }
    }

    /* loaded from: input_file:org/wso2/choreo/connect/enforcer/throttle/databridge/agent/endpoint/DataEndpointGroup$HAType.class */
    public enum HAType {
        FAILOVER,
        LOADBALANCE
    }

    /* loaded from: input_file:org/wso2/choreo/connect/enforcer/throttle/databridge/agent/endpoint/DataEndpointGroup$ReconnectionTask.class */
    private class ReconnectionTask implements Runnable {
        String failedDataEndpoints = "";

        private ReconnectionTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            for (int intValue = DataEndpointGroup.this.startIndex.intValue(); intValue < DataEndpointGroup.this.maximumDataPublisherIndex.get(); intValue++) {
                DataEndpoint dataEndpoint = DataEndpointGroup.this.dataEndpoints.get(intValue);
                if (dataEndpoint.isConnected()) {
                    try {
                        String[] protocolHostPort = DataPublisherUtil.getProtocolHostPort(dataEndpoint.getDataEndpointConfiguration().getReceiverURL());
                        if (!isServerExists(dataEndpoint, protocolHostPort[0], protocolHostPort[1], Integer.parseInt(protocolHostPort[2]))) {
                            dataEndpoint.deactivate();
                        }
                    } catch (DataEndpointConfigurationException e) {
                        DataEndpointGroup.log.warn("Data Endpoint with receiver URL:" + dataEndpoint.getDataEndpointConfiguration().getReceiverURL() + " could not be deactivated", e);
                    }
                } else {
                    try {
                        dataEndpoint.connect();
                    } catch (Exception e2) {
                        dataEndpoint.deactivate();
                    }
                }
                if (dataEndpoint.isConnected()) {
                    z = true;
                } else {
                    this.failedDataEndpoints = (dataEndpoint.getDataEndpointConfiguration() != null ? dataEndpoint.getDataEndpointConfiguration().getReceiverURL() : "Null") + ",";
                }
            }
            if (z) {
                return;
            }
            DataEndpointGroup.log.warn("Receiver is not reachable at reconnection for the endpoints: " + this.failedDataEndpoints + ", will try to reconnect every " + DataEndpointGroup.this.reconnectionInterval + " sec");
        }

        private boolean isServerExists(DataEndpoint dataEndpoint, String str, String str2, int i) {
            try {
                if (str.equals(DataEndpointConfiguration.Protocol.TCP.toString())) {
                    new Socket(str2, i).close();
                    return true;
                }
                DataEndpointConfiguration dataEndpointConfiguration = dataEndpoint.getDataEndpointConfiguration();
                OutputStream outputStream = ((Socket) dataEndpointConfiguration.getSecuredTransportPool().borrowObject(dataEndpointConfiguration.getPublisherKey())).getOutputStream();
                outputStream.write(ByteBuffer.allocate(dataEndpointConfiguration.getSessionId().length()).array());
                outputStream.flush();
                return true;
            } catch (UnknownHostException e) {
                return false;
            } catch (IOException e2) {
                return false;
            } catch (Exception e3) {
                return false;
            }
        }
    }

    public DataEndpointGroup(HAType hAType, DataEndpointAgent dataEndpointAgent) {
        this.eventQueue = null;
        this.haType = hAType;
        this.reconnectionInterval = dataEndpointAgent.getAgentConfiguration().getReconnectionInterval();
        this.publishingStrategy = dataEndpointAgent.getAgentConfiguration().getPublishingStrategy();
        if (!this.publishingStrategy.equalsIgnoreCase("sync")) {
            this.eventQueue = new EventQueue(dataEndpointAgent.getAgentConfiguration().getQueueSize());
        }
        this.reconnectionService.scheduleAtFixedRate(new ReconnectionTask(), this.reconnectionInterval, this.reconnectionInterval, TimeUnit.SECONDS);
        this.currentDataPublisherIndex.set(this.startIndex.intValue());
    }

    public void addDataEndpoint(DataEndpoint dataEndpoint) {
        this.dataEndpoints.add(dataEndpoint);
        dataEndpoint.registerDataEndpointFailureCallback(this);
        this.maximumDataPublisherIndex.incrementAndGet();
    }

    public void tryPublish(Event event) throws EventQueueFullException {
        if (this.eventQueue != null) {
            this.eventQueue.tryPut(event);
        } else {
            if (this.isShutdown) {
                return;
            }
            trySyncPublish(event);
        }
    }

    public void publish(Event event) {
        if (this.eventQueue != null) {
            this.eventQueue.put(event);
        } else {
            if (this.isShutdown) {
                return;
            }
            syncPublish(event);
        }
    }

    private void trySyncPublish(Event event) {
        try {
            DataEndpoint dataEndpoint = getDataEndpoint(false);
            if (dataEndpoint != null) {
                dataEndpoint.syncSend(event);
            } else if (log.isDebugEnabled()) {
                log.debug("DataEndpoint not available, dropping event : " + event);
            }
        } catch (Throwable th) {
            log.error("Unexpected error: " + th.getMessage(), th);
        }
    }

    private void syncPublish(Event event) {
        try {
            DataEndpoint dataEndpoint = getDataEndpoint(true);
            if (dataEndpoint != null) {
                dataEndpoint.syncSend(event);
            } else {
                log.error("Dropping event as DataPublisher is shutting down.");
                if (log.isDebugEnabled()) {
                    log.debug("Data publisher is shutting down, dropping event : " + event);
                }
            }
        } catch (Throwable th) {
            log.error("Unexpected error: " + th.getMessage(), th);
        }
    }

    private void flushAllDataEndpoints() {
        for (DataEndpoint dataEndpoint : this.dataEndpoints) {
            if (dataEndpoint.getState().equals(DataEndpoint.State.ACTIVE)) {
                dataEndpoint.flushEvents();
            }
        }
    }

    private DataEndpoint getDataEndpoint(boolean z) {
        return getDataEndpoint(z, null);
    }

    private DataEndpoint getDataEndpoint(boolean z, DataEndpoint dataEndpoint) {
        int dataPublisherIndex = this.haType.equals(HAType.LOADBALANCE) ? getDataPublisherIndex() : this.startIndex.intValue();
        int i = dataPublisherIndex;
        while (true) {
            DataEndpoint dataEndpoint2 = this.dataEndpoints.get(i);
            if (dataEndpoint2.getState().equals(DataEndpoint.State.ACTIVE) && dataEndpoint2 != dataEndpoint) {
                return dataEndpoint2;
            }
            if (this.haType.equals(HAType.FAILOVER) && (dataEndpoint2.getState().equals(DataEndpoint.State.BUSY) || dataEndpoint2.getState().equals(DataEndpoint.State.INITIALIZING))) {
                busyWait(1L);
            } else {
                i++;
                if (i > this.maximumDataPublisherIndex.get() - 1) {
                    i = this.startIndex.intValue();
                }
                if (i != dataPublisherIndex) {
                    continue;
                } else {
                    if (!z) {
                        return null;
                    }
                    if (!this.reconnectionService.isShutdown()) {
                        busyWait(1L);
                    } else {
                        if (!isActiveDataEndpointExists()) {
                            return null;
                        }
                        busyWait(1L);
                    }
                }
            }
        }
    }

    private void busyWait(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private boolean isActiveDataEndpointExists() {
        for (int intValue = this.startIndex.intValue(); intValue < this.maximumDataPublisherIndex.get(); intValue++) {
            DataEndpoint dataEndpoint = this.dataEndpoints.get(intValue);
            if (dataEndpoint.getState() != DataEndpoint.State.UNAVAILABLE) {
                if (!log.isDebugEnabled()) {
                    return true;
                }
                log.debug("Available endpoint : " + dataEndpoint + " existing in state - " + dataEndpoint.getState());
                return true;
            }
        }
        return false;
    }

    private synchronized int getDataPublisherIndex() {
        int andIncrement = this.currentDataPublisherIndex.getAndIncrement();
        if (andIncrement == this.maximumDataPublisherIndex.get() - 1) {
            this.currentDataPublisherIndex.set(this.startIndex.intValue());
        }
        return andIncrement;
    }

    @Override // org.wso2.choreo.connect.enforcer.throttle.databridge.agent.endpoint.DataEndpointFailureCallback
    public void tryResendEvents(List<Event> list, DataEndpoint dataEndpoint) {
        for (Event event : trySendActiveEndpoints(list, dataEndpoint)) {
            try {
                if (this.eventQueue != null) {
                    this.eventQueue.tryPut(event);
                } else {
                    trySyncPublish(event);
                }
            } catch (EventQueueFullException e) {
                log.error("Unable to put the event :" + event, e);
            }
        }
    }

    private List<Event> trySendActiveEndpoints(List<Event> list, DataEndpoint dataEndpoint) {
        ArrayList arrayList = new ArrayList();
        for (Event event : list) {
            DataEndpoint dataEndpoint2 = getDataEndpoint(false, dataEndpoint);
            if (dataEndpoint2 != null) {
                dataEndpoint2.collectAndSend(event);
            } else {
                arrayList.add(event);
            }
        }
        flushAllDataEndpoints();
        return arrayList;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("[ ");
        for (int i = 0; i < this.dataEndpoints.size(); i++) {
            sb.append(this.dataEndpoints.get(i).toString());
            if (i == this.dataEndpoints.size() - 1) {
                sb.append(" ]");
                return sb.toString();
            }
            if (this.haType == HAType.FAILOVER) {
                sb.append("|");
            } else {
                sb.append(",");
            }
        }
        return sb.toString();
    }

    public void shutdown() {
        this.reconnectionService.shutdownNow();
        if (this.eventQueue != null) {
            this.eventQueue.shutdown();
        }
        this.isShutdown = true;
        Iterator<DataEndpoint> it = this.dataEndpoints.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }
}
