package org.wso2.carbon.stream.processor.core.event.queue;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.log4j.Logger;
import org.wso2.carbon.sp.metrics.core.SPThroughputMetric;
import org.wso2.carbon.stream.processor.core.ha.HACoordinationSourceHandler;
import org.wso2.carbon.stream.processor.core.ha.exception.InvalidByteMessageException;
import org.wso2.carbon.stream.processor.core.ha.tcp.SiddhiEventConverter;
import org.wso2.carbon.stream.processor.core.internal.SiddhiAppData;
import org.wso2.carbon.stream.processor.core.internal.StreamProcessorDataHolder;
import org.wso2.carbon.stream.processor.core.util.BinaryMessageConverterUtil;
import org.wso2.siddhi.core.event.Event;
import org.wso2.siddhi.core.stream.input.source.Source;

/* loaded from: input_file:org/wso2/carbon/stream/processor/core/event/queue/EventListMapManager.class */
public class EventListMapManager {
    private static ConcurrentSkipListMap<Long, QueuedEvent> eventListMap;
    private static long startTime;
    private static long endTime;
    private static final int TPS_EVENT_THRESHOLD = 100000;
    private SPThroughputMetric throughputTracker;
    private static Map<String, Long> perAppLastControlMessageSequenceNumberList = new HashMap();
    private static final Logger log = Logger.getLogger(EventListMapManager.class);
    private static int count = 0;

    public EventListMapManager() {
        this.throughputTracker = null;
        if (this.throughputTracker == null) {
            this.throughputTracker = StreamProcessorDataHolder.getStatisticsConfiguration().getFactory().createThroughputTracker("org.wso2.ha.receiving.throughput", StreamProcessorDataHolder.getStatisticsManager());
        }
    }

    public static void initializeEventListMap() {
        eventListMap = new ConcurrentSkipListMap<>();
    }

    public void parseControlMessage(byte[] bArr) throws UnsupportedEncodingException {
        String str = new String(bArr, "UTF-8");
        if (str.isEmpty()) {
            return;
        }
        trimQueue(str.replace("[", "").replace("]", "").split(","));
    }

    public void parseMessage(byte[] bArr) {
        int i;
        try {
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            int i2 = wrap.getInt();
            if (this.throughputTracker != null && StreamProcessorDataHolder.isStatisticsEnabled()) {
                this.throughputTracker.eventsIn(i2);
            }
            Event[] eventArr = new Event[i2];
            for (int i3 = 0; i3 < i2; i3++) {
                String[] strArr = null;
                long j = wrap.getLong();
                int i4 = wrap.getInt();
                if (i4 == 0) {
                    throw new InvalidByteMessageException("Invalid sourceHandlerLength size = 0");
                }
                String string = BinaryMessageConverterUtil.getString(wrap, i4);
                int i5 = wrap.getInt();
                if (i5 == 0) {
                    throw new InvalidByteMessageException("Invalid appNameLength size = 0");
                }
                String string2 = BinaryMessageConverterUtil.getString(wrap, i5);
                int i6 = wrap.getInt();
                if (i6 != 0 && (i = wrap.getInt()) != 0) {
                    strArr = new String[i];
                    int i7 = 0;
                    int i8 = 0;
                    while (i6 != i7) {
                        int i9 = wrap.getInt();
                        i7 += i9;
                        strArr[i8] = BinaryMessageConverterUtil.getString(wrap, i9);
                        i8++;
                    }
                }
                long j2 = -1;
                if (perAppLastControlMessageSequenceNumberList.size() != 0 && perAppLastControlMessageSequenceNumberList.get(string2) != null) {
                    j2 = perAppLastControlMessageSequenceNumberList.get(string2).longValue();
                }
                synchronized (this) {
                    if (j > j2) {
                        int i10 = wrap.getInt();
                        if (i10 == 0) {
                            throw new InvalidByteMessageException("Invalid attributeLength size = 0");
                        }
                        String string3 = BinaryMessageConverterUtil.getString(wrap, i10);
                        eventArr[i3] = SiddhiEventConverter.getEvent(wrap, string3.substring(1, string3.length() - 1).split(", "));
                        addToEventListMap(j, new QueuedEvent(string2, string, j, eventArr[i3], strArr));
                    }
                    if (log.isDebugEnabled()) {
                        if (startTime == 0) {
                            startTime = new Date().getTime();
                        }
                        count++;
                        if (count % TPS_EVENT_THRESHOLD == 0) {
                            endTime = new Date().getTime();
                            log.debug("# of events batch : " + count + " start timestamp : " + startTime + " end time stamp : " + endTime + " Throughput is (events / sec) : " + (100000000 / (endTime - startTime)) + " Total Event Count : " + count);
                            startTime = new Date().getTime();
                        }
                    }
                }
            }
        } catch (UnsupportedEncodingException e) {
            log.error("Error when converting bytes " + e.getMessage(), e);
        }
    }

    public void trimAndSendToInputHandler() throws InterruptedException {
        Map<String, SiddhiAppData> siddhiAppMap = StreamProcessorDataHolder.getStreamProcessorService().getSiddhiAppMap();
        for (Map.Entry<Long, QueuedEvent> entry : eventListMap.entrySet()) {
            long longValue = entry.getKey().longValue();
            QueuedEvent value = entry.getValue();
            SiddhiAppData siddhiAppData = siddhiAppMap.get(value.getSiddhiAppName());
            if (siddhiAppData != null) {
                Iterator it = siddhiAppData.getSiddhiAppRuntime().getSources().iterator();
                while (it.hasNext()) {
                    boolean z = false;
                    Iterator it2 = ((List) it.next()).iterator();
                    while (true) {
                        if (!it2.hasNext()) {
                            break;
                        }
                        Source source = (Source) it2.next();
                        if (value.getSourceHandlerElementId().equals(source.getMapper().getHandler().getElementId())) {
                            source.getMapper().getHandler().sendEvent(value.getEvent(), value.getTransportSyncProperties());
                            if (null != value.getTransportSyncProperties() && value.getTransportSyncProperties().length != 0 && (source.getMapper().getHandler() instanceof HACoordinationSourceHandler)) {
                                ((HACoordinationSourceHandler) source.getMapper().getHandler()).updateTransportSyncProperties(value.getTransportSyncProperties());
                            }
                            eventListMap.remove(Long.valueOf(longValue));
                            z = true;
                        }
                    }
                    if (z) {
                        break;
                    }
                }
            }
        }
        eventListMap.clear();
        perAppLastControlMessageSequenceNumberList.clear();
    }

    public void trimQueue(String[] strArr) {
        synchronized (this) {
            if (eventListMap.size() != 0) {
                for (String str : strArr) {
                    String[] split = str.split("__");
                    long parseLong = Long.parseLong(split[0].trim());
                    String trim = split[2].trim();
                    if (perAppLastControlMessageSequenceNumberList.get(trim) == null) {
                        perAppLastControlMessageSequenceNumberList.put(trim, Long.valueOf(parseLong));
                    } else if (perAppLastControlMessageSequenceNumberList.get(trim).longValue() < parseLong) {
                        perAppLastControlMessageSequenceNumberList.put(trim, Long.valueOf(parseLong));
                    }
                    for (Map.Entry<Long, QueuedEvent> entry : eventListMap.entrySet()) {
                        long longValue = entry.getKey().longValue();
                        if (trim.equals(entry.getValue().getSiddhiAppName()) && parseLong > longValue) {
                            eventListMap.remove(Long.valueOf(longValue));
                        }
                    }
                }
            }
        }
    }

    public void addToEventListMap(long j, QueuedEvent queuedEvent) {
        eventListMap.put(Long.valueOf(j), queuedEvent);
    }
}
