package org.wso2.carbon.stream.processor.core.ha;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.log4j.Logger;
import org.wso2.carbon.stream.processor.core.event.queue.QueuedEvent;
import org.wso2.carbon.stream.processor.core.ha.transport.EventSyncConnection;
import org.wso2.carbon.stream.processor.core.ha.transport.EventSyncConnectionPoolManager;
import org.wso2.carbon.stream.processor.core.ha.util.CoordinationConstants;
import org.wso2.carbon.stream.processor.core.ha.util.HAConstants;
import org.wso2.carbon.stream.processor.core.util.BinaryEventConverter;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.exception.ConnectionUnavailableException;
import org.wso2.siddhi.core.stream.input.InputHandler;
import org.wso2.siddhi.core.stream.input.source.SourceHandler;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/ha/HACoordinationSourceHandler.class */
public class HACoordinationSourceHandler extends SourceHandler {
    private boolean isActiveNode;
    private String sourceHandlerElementId;
    private String siddhiAppName;
    private GenericKeyedObjectPool eventSyncConnectionPoolFactory;
    private volatile boolean passiveNodeAdded;
    private static final Logger log = Logger.getLogger(HACoordinationSourceHandler.class);
    private long lastProcessedEventTimestamp = 0;
    private AtomicLong sequenceIDGenerator = EventSyncConnectionPoolManager.getSequenceID();

    public void init(String str, String str2, StreamDefinition streamDefinition) {
        this.sourceHandlerElementId = str2;
        this.siddhiAppName = str;
    }

    public void sendEvent(Event event, InputHandler inputHandler) throws InterruptedException {
        if (this.isActiveNode) {
            this.lastProcessedEventTimestamp = event.getTimestamp();
            if (this.passiveNodeAdded) {
                sendEventsToPassiveNode(event);
            }
            inputHandler.send(event);
        }
    }

    public void sendEvent(Event[] eventArr, InputHandler inputHandler) throws InterruptedException {
        if (this.isActiveNode) {
            this.lastProcessedEventTimestamp = eventArr[eventArr.length - 1].getTimestamp();
            if (this.passiveNodeAdded) {
                sendEventsToPassiveNode(eventArr);
            }
            inputHandler.send(eventArr);
        }
    }

    public void setPassiveNodeAdded(boolean z) {
        this.passiveNodeAdded = z;
    }

    public void setAsActive() {
        this.isActiveNode = true;
    }

    public void setAsPassive() {
        this.isActiveNode = false;
    }

    public Map<String, Object> currentState() {
        HashMap hashMap = new HashMap();
        hashMap.put(CoordinationConstants.ACTIVE_PROCESSED_LAST_TIMESTAMP, Long.valueOf(this.lastProcessedEventTimestamp));
        if (log.isDebugEnabled()) {
            log.debug("Active Node: Saving state of Source Handler with Id " + getElementId() + " with timestamp " + this.lastProcessedEventTimestamp);
        }
        return hashMap;
    }

    public void restoreState(Map<String, Object> map) {
    }

    public String getElementId() {
        return this.sourceHandlerElementId;
    }

    private void sendEventsToPassiveNode(Event event) {
        EventSyncConnection tCPNettyClient = getTCPNettyClient();
        if (tCPNettyClient != null) {
            try {
                ByteBuffer convertToBinaryMessage = BinaryEventConverter.convertToBinaryMessage(new QueuedEvent[]{new QueuedEvent(this.siddhiAppName, this.sourceHandlerElementId, this.sequenceIDGenerator.incrementAndGet(), event)});
                if (convertToBinaryMessage != null) {
                    try {
                        tCPNettyClient.send(HAConstants.CHANNEL_ID_MESSAGE, convertToBinaryMessage.array());
                    } catch (ConnectionUnavailableException e) {
                        log.error("Error in sending events to the passive node. " + e.getMessage());
                    }
                }
                try {
                    this.eventSyncConnectionPoolFactory.returnObject(HAConstants.ACTIVE_NODE_CONNECTION_POOL_ID, tCPNettyClient);
                } catch (Exception e2) {
                    log.error("Error in returning the tcpClient connection object to the pool. ", e2);
                }
            } catch (IOException e3) {
                log.error("Error in converting events to binary message.Hence not sending message to the passive node");
            }
        }
    }

    private void sendEventsToPassiveNode(Event[] eventArr) {
        EventSyncConnection tCPNettyClient = getTCPNettyClient();
        ByteBuffer byteBuffer = null;
        if (tCPNettyClient != null) {
            QueuedEvent[] queuedEventArr = new QueuedEvent[eventArr.length];
            int i = 0;
            for (Event event : eventArr) {
                queuedEventArr[i] = new QueuedEvent(this.siddhiAppName, this.sourceHandlerElementId, this.sequenceIDGenerator.incrementAndGet(), event);
                i++;
            }
            try {
                byteBuffer = BinaryEventConverter.convertToBinaryMessage(queuedEventArr);
            } catch (IOException e) {
                log.error("Error in converting events to binary message.Hence not sending message to the passive node");
            }
            if (byteBuffer != null) {
                try {
                    tCPNettyClient.send(HAConstants.CHANNEL_ID_MESSAGE, byteBuffer.array());
                } catch (ConnectionUnavailableException e2) {
                    log.error("Error in sending events to the passive node. " + e2.getMessage());
                }
            }
            try {
                this.eventSyncConnectionPoolFactory.returnObject(HAConstants.ACTIVE_NODE_CONNECTION_POOL_ID, tCPNettyClient);
            } catch (Exception e3) {
                log.error("Error in returning the tcpClient connection object to the pool. ", e3);
            }
        }
    }

    private EventSyncConnection getTCPNettyClient() {
        this.eventSyncConnectionPoolFactory = EventSyncConnectionPoolManager.getConnectionPool();
        EventSyncConnection eventSyncConnection = null;
        try {
            eventSyncConnection = (EventSyncConnection) this.eventSyncConnectionPoolFactory.borrowObject(HAConstants.ACTIVE_NODE_CONNECTION_POOL_ID);
        } catch (Exception e) {
            log.warn("Error in obtaining a tcp connection to the passive node. Hence not sending events to the passive node. " + e.getMessage());
        }
        return eventSyncConnection;
    }
}
