package org.wso2.carbon.event.processor.manager.core.internal;

import com.hazelcast.core.IMap;
import com.hazelcast.core.Member;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger;
import org.wso2.carbon.context.PrivilegedCarbonContext;
import org.wso2.carbon.databridge.commons.Event;
import org.wso2.carbon.event.processor.manager.commons.transport.client.TCPEventPublisher;
import org.wso2.carbon.event.processor.manager.commons.transport.client.TCPEventPublisherConfig;
import org.wso2.carbon.event.processor.manager.commons.transport.server.ConnectionCallback;
import org.wso2.carbon.event.processor.manager.commons.transport.server.StreamCallback;
import org.wso2.carbon.event.processor.manager.commons.transport.server.TCPEventServer;
import org.wso2.carbon.event.processor.manager.commons.transport.server.TCPEventServerConfig;
import org.wso2.carbon.event.processor.manager.commons.utils.HostAndPort;
import org.wso2.carbon.event.processor.manager.core.EventManagementUtil;
import org.wso2.carbon.event.processor.manager.core.EventSync;
import org.wso2.carbon.event.processor.manager.core.internal.ds.EventManagementServiceValueHolder;

/* loaded from: input_file:org/wso2/carbon/event/processor/manager/core/internal/EventHandler.class */
public class EventHandler {
    private static Logger log = Logger.getLogger(EventHandler.class);
    private HostAndPort localMember;
    private TCPEventPublisherConfig localEventPublisherConfiguration;
    private boolean isMemberNode;
    private TCPEventServer tcpEventServer = null;
    private IMap<String, HostAndPort> members = null;
    private ConcurrentHashMap<HostAndPort, TCPEventPublisher> tcpEventPublisherPool = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, EventSync> eventSyncMap = new ConcurrentHashMap<>();
    private boolean allowEventSync = true;
    private boolean allowContinueProcess = false;

    public void init(String str, HostAndPort hostAndPort, TCPEventPublisherConfig tCPEventPublisherConfig, boolean z) {
        this.isMemberNode = z;
        this.members = EventManagementServiceValueHolder.getHazelcastInstance().getMap(str);
        this.localMember = hostAndPort;
        registerLocalMember();
        this.localEventPublisherConfiguration = tCPEventPublisherConfig;
    }

    public void registerLocalMember() {
        if (!this.isMemberNode || this.members == null) {
            return;
        }
        this.members.set(EventManagementServiceValueHolder.getHazelcastInstance().getCluster().getLocalMember().getUuid(), this.localMember);
    }

    public void removeMember(String str) {
        if (this.members != null) {
            this.members.remove(str);
        }
    }

    public void shutdown() {
        if (this.members != null) {
            this.members.remove(EventManagementServiceValueHolder.getHazelcastInstance().getCluster().getLocalMember().getUuid());
        }
        Iterator<TCPEventPublisher> it = this.tcpEventPublisherPool.values().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        if (this.tcpEventServer != null) {
            this.tcpEventServer.shutdown();
        }
    }

    public void syncEvent(String str, Event event) {
        if (this.allowEventSync) {
            for (TCPEventPublisher tCPEventPublisher : this.tcpEventPublisherPool.values()) {
                if (tCPEventPublisher != null) {
                    try {
                        tCPEventPublisher.sendEvent(str, event.getTimeStamp(), ArrayUtils.addAll(ArrayUtils.addAll(event.getMetaData(), event.getCorrelationData()), event.getPayloadData()), event.getArbitraryDataMap(), true);
                    } catch (IOException e) {
                        log.error("Error sending sync events to " + str, e);
                    }
                }
            }
        }
    }

    public synchronized void registerEventSync(EventSync eventSync) {
        if (this.allowContinueProcess) {
            eventSync.setContinueProcess(this.allowContinueProcess);
        }
        this.eventSyncMap.putIfAbsent(EventManagementUtil.getSyncIdFromDatabridgeStream(eventSync.getStreamDefinition()), eventSync);
        Iterator<TCPEventPublisher> it = this.tcpEventPublisherPool.values().iterator();
        while (it.hasNext()) {
            it.next().addStreamDefinition(EventManagementUtil.constructStreamDefinition(EventManagementUtil.getSyncIdFromDatabridgeStream(eventSync.getStreamDefinition()), eventSync.getStreamDefinition()));
        }
        if (this.tcpEventServer != null) {
            this.tcpEventServer.addStreamDefinition(EventManagementUtil.constructStreamDefinition(EventManagementUtil.getSyncIdFromDatabridgeStream(eventSync.getStreamDefinition()), eventSync.getStreamDefinition()));
        }
    }

    public void unregisterEventSync(String str) {
        EventSync remove = this.eventSyncMap.remove(str);
        if (remove != null) {
            Iterator<TCPEventPublisher> it = this.tcpEventPublisherPool.values().iterator();
            while (it.hasNext()) {
                it.next().removeStreamDefinition(EventManagementUtil.constructStreamDefinition(EventManagementUtil.getSyncIdFromDatabridgeStream(remove.getStreamDefinition()), remove.getStreamDefinition()));
            }
            if (this.tcpEventServer != null) {
                this.tcpEventServer.removeStreamDefinition(EventManagementUtil.getSyncIdFromDatabridgeStream(remove.getStreamDefinition()));
            }
        }
    }

    public void startServer(HostAndPort hostAndPort) {
        if (this.tcpEventServer == null) {
            this.tcpEventServer = new TCPEventServer(new TCPEventServerConfig(hostAndPort.getHostName(), hostAndPort.getPort()), new StreamCallback() { // from class: org.wso2.carbon.event.processor.manager.core.internal.EventHandler.1
                public void receive(String str, long j, Object[] objArr, Map<String, String> map) {
                    int indexOf = str.indexOf("/");
                    if (indexOf != -1) {
                        int parseInt = Integer.parseInt(str.substring(0, indexOf));
                        try {
                            PrivilegedCarbonContext.startTenantFlow();
                            PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(parseInt);
                            PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain(true);
                            EventSync eventSync = (EventSync) EventHandler.this.eventSyncMap.get(str);
                            if (EventHandler.log.isDebugEnabled()) {
                                EventHandler.log.debug("Event Received to :" + str);
                            }
                            if (eventSync != null) {
                                eventSync.process(EventManagementUtil.getWso2Event(eventSync.getStreamDefinition(), j, objArr));
                            }
                        } catch (Exception e) {
                            EventHandler.log.error("Unable to process events for tenant :" + parseInt + " on stream:" + str.substring(indexOf), e);
                        } finally {
                            PrivilegedCarbonContext.endTenantFlow();
                        }
                    }
                }
            }, (ConnectionCallback) null);
            for (EventSync eventSync : this.eventSyncMap.values()) {
                this.tcpEventServer.addStreamDefinition(EventManagementUtil.constructStreamDefinition(EventManagementUtil.getSyncIdFromDatabridgeStream(eventSync.getStreamDefinition()), eventSync.getStreamDefinition()));
            }
            try {
                this.tcpEventServer.start();
                log.info("Event Management TCPEventServer for EventReceiver started on port " + hostAndPort.getPort());
            } catch (IOException e) {
                log.error("Unable to start TCPEventServer for EventReceiver started on port " + hostAndPort.getPort());
            }
        }
    }

    public void checkMemberUpdate() {
        cleanupMembers();
        updateEventPublishers();
    }

    private synchronized void updateEventPublishers() {
        if (this.members != null) {
            ArrayList<HostAndPort> arrayList = new ArrayList(this.members.values());
            arrayList.remove(this.localMember);
            ArrayList arrayList2 = new ArrayList(this.tcpEventPublisherPool.keySet());
            for (HostAndPort hostAndPort : arrayList) {
                if (!arrayList2.remove(hostAndPort)) {
                    addEventPublisher(hostAndPort);
                }
            }
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                removeEventPublisher((HostAndPort) it.next());
            }
        }
    }

    public synchronized void addEventPublisher(HostAndPort hostAndPort) {
        try {
            if (!this.tcpEventPublisherPool.containsKey(hostAndPort)) {
                TCPEventPublisher tCPEventPublisher = new TCPEventPublisher(hostAndPort.getHostName() + ":" + hostAndPort.getPort(), this.localEventPublisherConfiguration, false, (ConnectionCallback) null);
                for (EventSync eventSync : this.eventSyncMap.values()) {
                    tCPEventPublisher.addStreamDefinition(EventManagementUtil.constructStreamDefinition(EventManagementUtil.getSyncIdFromDatabridgeStream(eventSync.getStreamDefinition()), eventSync.getStreamDefinition()));
                }
                this.tcpEventPublisherPool.putIfAbsent(hostAndPort, tCPEventPublisher);
                log.info("CEP sync publisher initiated to Member '" + hostAndPort.getHostName() + ":" + hostAndPort.getPort() + "'");
            }
        } catch (IOException e) {
            log.error("Error occurred while trying to start the publisher: " + e.getMessage(), e);
        }
    }

    private synchronized void removeEventPublisher(HostAndPort hostAndPort) {
        TCPEventPublisher remove = this.tcpEventPublisherPool.remove(hostAndPort);
        if (remove != null) {
            remove.shutdown();
            log.info("CEP sync publisher disconnected from Member '" + hostAndPort.getHostName() + ":" + hostAndPort.getPort() + "'");
        }
    }

    private void cleanupMembers() {
        if (this.members != null) {
            HashSet hashSet = new HashSet();
            Iterator it = EventManagementServiceValueHolder.getHazelcastInstance().getCluster().getMembers().iterator();
            while (it.hasNext()) {
                hashSet.add(((Member) it.next()).getUuid());
            }
            for (String str : new ArrayList(this.members.keySet())) {
                if (!hashSet.contains(str)) {
                    this.members.remove(str);
                }
            }
        }
    }

    public void allowEventSync(boolean z) {
        this.allowEventSync = z;
    }

    public synchronized void allowContinueProcess(boolean z) {
        this.allowContinueProcess = z;
        Iterator<EventSync> it = this.eventSyncMap.values().iterator();
        while (it.hasNext()) {
            it.next().setContinueProcess(z);
        }
    }
}
