package org.wso2.choreo.connect.enforcer.throttle.databridge.agent.endpoint;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.databridge.commons.exception.SessionTimeoutException;
import org.wso2.carbon.databridge.commons.exception.TransportException;
import org.wso2.carbon.databridge.commons.exception.UndefinedEventTypeException;
import org.wso2.carbon.databridge.commons.utils.DataBridgeThreadFactory;
import org.wso2.choreo.connect.enforcer.throttle.databridge.agent.conf.DataEndpointConfiguration;
import org.wso2.choreo.connect.enforcer.throttle.databridge.agent.exception.DataEndpointAuthenticationException;
import org.wso2.choreo.connect.enforcer.throttle.databridge.agent.exception.DataEndpointException;

/* loaded from: input_file:org/wso2/choreo/connect/enforcer/throttle/databridge/agent/endpoint/DataEndpoint.class */
public abstract class DataEndpoint {
    private static final Logger log = LogManager.getLogger(DataEndpoint.class);
    private DataEndpointConnectionWorker connectionWorker;
    private GenericKeyedObjectPool transportPool;
    private EventPublisherThreadPoolExecutor threadPoolExecutor;
    private DataEndpointFailureCallback dataEndpointFailureCallback;
    private ExecutorService connectionService;
    private int maxPoolSize;
    private Semaphore immediateDispatchSemaphore;
    private int batchSize = 100;
    private State state = State.INITIALIZING;
    private List<Event> events = new ArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/choreo/connect/enforcer/throttle/databridge/agent/endpoint/DataEndpoint$EventPublisher.class */
    public class EventPublisher implements Runnable {
        List<Event> events;
        private Semaphore semaphore;

        public EventPublisher(List<Event> list) {
            this.events = list;
        }

        /* JADX WARN: Code restructure failed: missing block: B:65:0x006f, code lost:
        
            if (r0.equalsIgnoreCase(r7.this$0.getDataEndpointConfiguration().getSessionId()) != false) goto L15;
         */
        /* JADX WARN: Removed duplicated region for block: B:52:0x00bb  */
        /* JADX WARN: Removed duplicated region for block: B:55:0x00cd  */
        /* JADX WARN: Removed duplicated region for block: B:57:? A[RETURN, SYNTHETIC] */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 721
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.wso2.choreo.connect.enforcer.throttle.databridge.agent.endpoint.DataEndpoint.EventPublisher.run():void");
        }

        public void setPoolSemaphore(Semaphore semaphore) {
            this.semaphore = semaphore;
        }

        private void publish() throws DataEndpointException, SessionTimeoutException, UndefinedEventTypeException {
            Object client = DataEndpoint.this.getClient();
            try {
                DataEndpoint.this.send(client, this.events);
                semaphoreRelease();
            } finally {
                DataEndpoint.this.returnClient(client);
            }
        }

        private void semaphoreRelease() {
            if (this.semaphore != null) {
                this.semaphore.release();
            }
        }
    }

    /* loaded from: input_file:org/wso2/choreo/connect/enforcer/throttle/databridge/agent/endpoint/DataEndpoint$State.class */
    public enum State {
        ACTIVE,
        UNAVAILABLE,
        BUSY,
        INITIALIZING
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void collectAndSend(Event event) {
        this.events.add(event);
        if (this.events.size() >= this.batchSize) {
            this.threadPoolExecutor.submitJobAndSetState(new EventPublisher(this.events), this);
            this.events = new ArrayList();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flushEvents() {
        if (this.events.size() != 0) {
            this.threadPoolExecutor.submitJobAndSetState(new EventPublisher(this.events), this);
            this.events = new ArrayList();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void syncSend(Event event) {
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(event);
        EventPublisher eventPublisher = new EventPublisher(arrayList);
        setStateBusy();
        acquireImmediateDispatchSemaphore();
        try {
            eventPublisher.run();
            releaseImmediateDispatchSemaphore();
        } catch (Throwable th) {
            releaseImmediateDispatchSemaphore();
            throw th;
        }
    }

    private void acquireImmediateDispatchSemaphore() {
        boolean z = false;
        do {
            try {
                this.immediateDispatchSemaphore.acquire();
                z = true;
            } catch (InterruptedException e) {
            }
        } while (!z);
    }

    private void releaseImmediateDispatchSemaphore() {
        this.immediateDispatchSemaphore.release();
    }

    private void setStateBusy() {
        if (this.immediateDispatchSemaphore.availablePermits() <= 1) {
            setState(State.BUSY);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setState(State state) {
        if (this.state.equals(state)) {
            return;
        }
        this.state = state;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connect() throws TransportException, DataEndpointAuthenticationException, DataEndpointException {
        if (this.connectionWorker == null) {
            throw new DataEndpointException("Data Endpoint is not initialized");
        }
        this.connectionService.submit(this.connectionWorker);
    }

    synchronized void syncConnect(String str) throws DataEndpointException {
        if (str == null || str.equalsIgnoreCase(getDataEndpointConfiguration().getSessionId())) {
            if (this.connectionWorker == null) {
                throw new DataEndpointException("Data Endpoint is not initialized");
            }
            this.connectionWorker.run();
        }
    }

    public void initialize(DataEndpointConfiguration dataEndpointConfiguration) throws DataEndpointException, DataEndpointAuthenticationException, TransportException {
        this.transportPool = dataEndpointConfiguration.getTransportPool();
        this.batchSize = dataEndpointConfiguration.getBatchSize();
        this.connectionWorker = new DataEndpointConnectionWorker();
        this.connectionWorker.initialize(this, dataEndpointConfiguration);
        this.threadPoolExecutor = new EventPublisherThreadPoolExecutor(dataEndpointConfiguration.getCorePoolSize(), dataEndpointConfiguration.getMaxPoolSize(), dataEndpointConfiguration.getKeepAliveTimeInPool(), dataEndpointConfiguration.getReceiverURL());
        this.connectionService = Executors.newSingleThreadExecutor(new DataBridgeThreadFactory("ConnectionService-" + dataEndpointConfiguration.getReceiverURL()));
        this.maxPoolSize = dataEndpointConfiguration.getMaxPoolSize();
        this.immediateDispatchSemaphore = new Semaphore(this.maxPoolSize);
        connect();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract String login(Object obj, String str, String str2) throws DataEndpointAuthenticationException;

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract void logout(Object obj, String str) throws DataEndpointAuthenticationException;

    public State getState() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void activate() {
        setState(State.ACTIVE);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deactivate() {
        setState(State.UNAVAILABLE);
    }

    protected abstract void send(Object obj, List<Event> list) throws DataEndpointException, SessionTimeoutException, UndefinedEventTypeException;

    /* JADX INFO: Access modifiers changed from: protected */
    public DataEndpointConfiguration getDataEndpointConfiguration() {
        if (this.connectionWorker == null) {
            return null;
        }
        return this.connectionWorker.getDataEndpointConfiguration();
    }

    private Object getClient() throws DataEndpointException {
        try {
            return this.transportPool.borrowObject(getDataEndpointConfiguration().getPublisherKey());
        } catch (Exception e) {
            throw new DataEndpointException("Cannot borrow client for " + getDataEndpointConfiguration().getPublisherKey(), e);
        }
    }

    private void returnClient(Object obj) {
        try {
            this.transportPool.returnObject(getDataEndpointConfiguration().getPublisherKey(), obj);
        } catch (Exception e) {
            log.warn("Error occurred while returning object to connection pool", e);
            discardClient(obj);
        }
    }

    private void discardClient(Object obj) {
        if (obj != null) {
            try {
                this.transportPool.invalidateObject(getDataEndpointConfiguration().getPublisherKey(), obj);
            } catch (Exception e) {
                log.error("Error while invalidating the client ", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerDataEndpointFailureCallback(DataEndpointFailureCallback dataEndpointFailureCallback) {
        this.dataEndpointFailureCallback = dataEndpointFailureCallback;
    }

    private void handleFailedEvents(List<Event> list) {
        deactivate();
        this.dataEndpointFailureCallback.tryResendEvents(list, this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isConnected() {
        return !this.state.equals(State.UNAVAILABLE);
    }

    public String toString() {
        return getDataEndpointConfiguration() == null ? "null" : "( Receiver URL : " + getDataEndpointConfiguration().getReceiverURL() + ", Authentication URL : " + getDataEndpointConfiguration().getAuthURL() + ")";
    }

    public void shutdown() {
        log.info("Shutdown triggered for data publisher endpoint URL - " + getDataEndpointConfiguration().getReceiverURL());
        while (this.threadPoolExecutor.getActiveCount() != 0) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
        }
        this.connectionWorker.disconnect(getDataEndpointConfiguration());
        this.connectionService.shutdownNow();
        this.threadPoolExecutor.shutdownNow();
        try {
            this.connectionService.awaitTermination(10L, TimeUnit.SECONDS);
            this.threadPoolExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
        }
        log.info("Completed shutdown for data publisher endpoint URL - " + getDataEndpointConfiguration().getReceiverURL());
    }
}
