package org.wso2.carbon.event.processor.core.internal.storm.manager;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.wso2.carbon.event.processor.common.storm.manager.service.StormManagerService;
import org.wso2.carbon.event.processor.common.storm.manager.service.exception.EndpointNotFoundException;
import org.wso2.carbon.event.processor.common.storm.manager.service.exception.NotStormCoordinatorException;

/* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/manager/StormManagerServiceImpl.class */
public class StormManagerServiceImpl implements StormManagerService.Iface {
    private static Logger log = Logger.getLogger(StormManagerServiceImpl.class);
    public static final long MILLISECONDS_PER_MINUTE = 60000;
    private ConcurrentHashMap<String, Set<Endpoint>> stormReceivers = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, Set<Endpoint>> cepPublishers = new ConcurrentHashMap<>();
    private boolean isStormCoordinator;
    private String hostPort;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/core/internal/storm/manager/StormManagerServiceImpl$Endpoint.class */
    public class Endpoint {
        private int port;
        private String hostName;
        private int connectionCount = 0;
        private long lastRegisterTimestamp = System.currentTimeMillis();

        Endpoint(int i, String str) {
            this.port = i;
            this.hostName = str;
        }

        public long getLastRegisterTimestamp() {
            return this.lastRegisterTimestamp;
        }

        public void updateLastRegisteredTimestamp() {
            this.lastRegisterTimestamp = System.currentTimeMillis();
        }

        public String getHostName() {
            return this.hostName;
        }

        public int getPort() {
            return this.port;
        }

        public void setConnectionCount(int i) {
            this.connectionCount = i;
        }

        public int getConnectionCount() {
            return this.connectionCount;
        }

        public boolean equals(Object obj) {
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Endpoint endpoint = (Endpoint) obj;
            return this.hostName.equals(endpoint.getHostName()) && this.port == endpoint.getPort();
        }
    }

    public StormManagerServiceImpl(String str) {
        this.hostPort = str;
    }

    public void registerStormReceiver(int i, String str, String str2, int i2) throws NotStormCoordinatorException, TException {
        if (!this.isStormCoordinator) {
            throw new NotStormCoordinatorException(this.hostPort + " not a storm coordinator");
        }
        insertToCollection(this.stormReceivers, constructKey(i, str), new Endpoint(i2, str2));
    }

    public void registerCEPPublisher(int i, String str, String str2, int i2) throws NotStormCoordinatorException, TException {
        if (!this.isStormCoordinator) {
            throw new NotStormCoordinatorException(this.hostPort + " not a storm coordinator");
        }
        insertToCollection(this.cepPublishers, constructKey(i, str), new Endpoint(i2, str2));
    }

    public synchronized String getStormReceiver(int i, String str, String str2) throws NotStormCoordinatorException, EndpointNotFoundException, TException {
        if (!this.isStormCoordinator) {
            throw new NotStormCoordinatorException(this.hostPort + " not a storm coordinator");
        }
        Endpoint endpoint = getEndpoint(this.stormReceivers.get(constructKey(i, str)), str2);
        if (null != endpoint) {
            return endpoint.getHostName() + ":" + endpoint.getPort();
        }
        throw new EndpointNotFoundException("No Storm Receiver for executionPlanName: " + str + " of tenantId:" + i + " for CEP Receiver form:" + str2);
    }

    public synchronized String getCEPPublisher(int i, String str, String str2) throws NotStormCoordinatorException, EndpointNotFoundException, TException {
        if (!this.isStormCoordinator) {
            throw new NotStormCoordinatorException(this.hostPort + " not a storm coordinator");
        }
        Endpoint endpoint = getEndpoint(this.cepPublishers.get(constructKey(i, str)), str2);
        if (null != endpoint) {
            return endpoint.getHostName() + ":" + endpoint.getPort();
        }
        throw new EndpointNotFoundException("No CEP Publisher for executionPlanName: " + str + " of tenantId:" + i + " for Storm Publisher form:" + str2);
    }

    public synchronized void deleteExecPlanEndpoints(int i, String str) {
        if (this.cepPublishers.get(constructKey(i, str)) != null) {
            this.cepPublishers.remove(constructKey(i, str));
        }
        if (this.stormReceivers.get(constructKey(i, str)) != null) {
            this.stormReceivers.remove(constructKey(i, str));
        }
        log.info("Removed all end point details related to '" + constructKey(i, str) + "' from Manager service.");
    }

    private synchronized Endpoint getEndpoint(Set<Endpoint> set, String str) {
        Endpoint endpoint = null;
        HashSet hashSet = new HashSet();
        if (set != null && !set.isEmpty()) {
            if (!"".equals(str)) {
                for (Endpoint endpoint2 : set) {
                    if (endpoint2.getHostName().equals(str)) {
                        hashSet.add(endpoint2);
                    }
                }
            }
            endpoint = !hashSet.isEmpty() ? selectEndpoint(hashSet) : selectEndpoint(set);
            if (endpoint != null) {
                endpoint.setConnectionCount(endpoint.getConnectionCount() + 1);
            }
        }
        return endpoint;
    }

    private synchronized Endpoint selectEndpoint(Set<Endpoint> set) {
        Endpoint endpoint = null;
        int i = Integer.MAX_VALUE;
        for (Endpoint endpoint2 : set) {
            if (endpoint2.getConnectionCount() < i) {
                if (endpoint2.getLastRegisterTimestamp() >= System.currentTimeMillis() - MILLISECONDS_PER_MINUTE) {
                    i = endpoint2.getConnectionCount();
                    endpoint = endpoint2;
                } else {
                    log.warn("Ignoring endpoint " + endpoint2.getHostName() + ":" + endpoint2.getPort() + " because it has not sent a heart beat for " + ((int) Math.floor((System.currentTimeMillis() - endpoint2.getLastRegisterTimestamp()) / MILLISECONDS_PER_MINUTE)) + " min(s)");
                }
            }
        }
        return endpoint;
    }

    private static synchronized void insertToCollection(ConcurrentHashMap<String, Set<Endpoint>> concurrentHashMap, String str, Endpoint endpoint) {
        Set<Endpoint> set = concurrentHashMap.get(str);
        boolean z = false;
        if (set != null) {
            Iterator<Endpoint> it = set.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Endpoint next = it.next();
                if (next.equals(endpoint)) {
                    z = true;
                    next.updateLastRegisteredTimestamp();
                    break;
                }
            }
        } else {
            set = new HashSet();
            concurrentHashMap.put(str, set);
        }
        if (z) {
            return;
        }
        set.add(endpoint);
    }

    private static String constructKey(int i, String str) {
        return i + ":" + str;
    }

    public void setStormCoordinator(boolean z) {
        this.isStormCoordinator = z;
    }

    public boolean isStormCoordinator() {
        return this.isStormCoordinator;
    }
}
