/*
 * Decompiled with CFR 0.152.
 */
package io.siddhi.extension.io.kafka.multidc.source;

import io.siddhi.core.stream.input.source.SourceEventListener;
import io.siddhi.extension.io.kafka.multidc.source.SourceSynchronizer;
import io.siddhi.query.api.definition.StreamDefinition;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
import org.apache.log4j.Logger;

class Interceptor
implements SourceEventListener {
    private static final Logger LOG = Logger.getLogger(Interceptor.class);
    private String sourceId;
    private SourceSynchronizer synchronizer;
    private boolean isBinaryMessage;

    public Interceptor(String sourceId, SourceSynchronizer synchronizer, boolean isBinaryMessage) {
        this.sourceId = sourceId;
        this.synchronizer = synchronizer;
        this.isBinaryMessage = isBinaryMessage;
    }

    public void onEvent(Object event, String[] transportProperties) {
        this.onEventReceive(event, transportProperties, null);
    }

    public void onEvent(Object eventObject, Object[] transportProperties, String[] transportSyncProperties) {
        this.onEventReceive(eventObject, transportProperties, transportSyncProperties);
    }

    public void onEvent(Object event, String[] transportProperties, String[] transportSyncProperties) {
        this.onEventReceive(event, transportProperties, transportSyncProperties);
    }

    public StreamDefinition getStreamDefinition() {
        return null;
    }

    public void onEvent(Object eventObject, Object[] transportProperties) {
        this.onEventReceive(eventObject, transportProperties, null);
    }

    private void onEventReceive(Object event, Object[] transportProperties, Object[] transportSyncProperties) {
        if (!this.isBinaryMessage) {
            String eventString = (String)event;
            int headerStartingIndex = eventString.indexOf("~");
            if (headerStartingIndex > 0) {
                String eventBody = eventString.substring(headerStartingIndex + 1);
                String header = eventString.substring(0, headerStartingIndex);
                String[] headerElements = header.split(":");
                Integer seqNo = Integer.parseInt(headerElements[1]);
                this.synchronizer.onEvent(this.sourceId, seqNo.intValue(), eventBody, transportProperties);
            } else {
                LOG.warn((Object)("Sequence number is not contained in the message. Dropping the message :" + eventString));
            }
        } else {
            byte[] byteEvents = (byte[])event;
            int stringSize = ByteBuffer.wrap(byteEvents).getInt();
            String header = new String(byteEvents, 4, stringSize - 1, Charset.defaultCharset());
            if (!header.isEmpty()) {
                String[] headerElements = header.split(":");
                Integer seqNo = Integer.parseInt(headerElements[1]);
                byte[] eventBody = Arrays.copyOfRange(byteEvents, stringSize + 4, byteEvents.length);
                this.synchronizer.onEvent(this.sourceId, seqNo.intValue(), eventBody, transportProperties);
            } else {
                LOG.warn((Object)"Sequence number is not contained in the message. Dropping the message");
            }
        }
    }
}

