package org.wso2.carbon.event.processor.common.transport.client;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
import org.wso2.carbon.event.processor.common.transport.common.EventServerUtils;
import org.wso2.carbon.event.processor.common.transport.common.StreamRuntimeInfo;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/event/processor/common/transport/client/TCPEventPublisher.class */
public class TCPEventPublisher {
    public static final String DEFAULT_CHARSET = "UTF-8";
    private static Logger log = Logger.getLogger(TCPEventPublisher.class);
    private final String hostUrl;
    private Disruptor<BiteArrayHolder> disruptor;
    private RingBuffer<BiteArrayHolder> ringBuffer;
    private Map<String, StreamRuntimeInfo> streamRuntimeInfoMap;
    private OutputStream outputStream;
    private Socket clientSocket;
    private TCPEventPublisherConfig publisherConfig;
    private boolean isSynchronous;

    /* renamed from: org.wso2.carbon.event.processor.common.transport.client.TCPEventPublisher$4, reason: invalid class name */
    /* loaded from: input_file:org/wso2/carbon/event/processor/common/transport/client/TCPEventPublisher$4.class */
    static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type = new int[Attribute.Type.values().length];

        static {
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.INT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.LONG.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.BOOL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.FLOAT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.DOUBLE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.STRING.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/common/transport/client/TCPEventPublisher$BiteArrayHolder.class */
    public class BiteArrayHolder {
        byte[] bytes;

        BiteArrayHolder() {
        }
    }

    public TCPEventPublisher(String str, TCPEventPublisherConfig tCPEventPublisherConfig, boolean z) throws IOException {
        this.hostUrl = str;
        this.publisherConfig = tCPEventPublisherConfig;
        this.streamRuntimeInfoMap = new ConcurrentHashMap();
        this.isSynchronous = z;
        String[] split = str.split(":");
        this.clientSocket = new Socket(split[0], Integer.parseInt(split[1]));
        this.outputStream = new BufferedOutputStream(this.clientSocket.getOutputStream());
        if (!z) {
            initializeDisruptor(tCPEventPublisherConfig);
        }
        log.info("Client configured to send events to " + str);
    }

    public TCPEventPublisher(String str, boolean z) throws IOException {
        this(str, new TCPEventPublisherConfig(), z);
    }

    public String getHostUrl() {
        return this.hostUrl;
    }

    public void addStreamDefinition(StreamDefinition streamDefinition) {
        this.streamRuntimeInfoMap.put(streamDefinition.getStreamId(), EventServerUtils.createStreamRuntimeInfo(streamDefinition));
        log.info("Stream definition added for stream: " + streamDefinition.getStreamId());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v57, types: [int] */
    public void sendEvent(String str, Object[] objArr, boolean z) throws IOException {
        StreamRuntimeInfo streamRuntimeInfo = this.streamRuntimeInfoMap.get(str);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        byte streamIdSize = streamRuntimeInfo.getStreamIdSize();
        ByteBuffer allocate = ByteBuffer.allocate(streamRuntimeInfo.getFixedMessageSize() + streamIdSize + 1);
        allocate.put(streamIdSize);
        allocate.put(streamRuntimeInfo.getStreamId().getBytes(DEFAULT_CHARSET));
        int[] iArr = new int[streamRuntimeInfo.getNoOfStringAttributes()];
        int i = 0;
        short s = 0;
        Attribute.Type[] attributeTypes = streamRuntimeInfo.getAttributeTypes();
        int length = attributeTypes.length;
        for (int i2 = 0; i2 < length; i2++) {
            switch (AnonymousClass4.$SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[attributeTypes[i2].ordinal()]) {
                case 1:
                    allocate.putInt(((Integer) objArr[i2]).intValue());
                    break;
                case 2:
                    allocate.putLong(((Long) objArr[i2]).longValue());
                    break;
                case 3:
                    allocate.put((byte) (((Boolean) objArr[i2]).booleanValue() ? 1 : 0));
                    break;
                case 4:
                    allocate.putFloat(((Float) objArr[i2]).floatValue());
                    break;
                case 5:
                    allocate.putDouble(((Double) objArr[i2]).doubleValue());
                    break;
                case 6:
                    short length2 = (short) ((String) objArr[i2]).length();
                    allocate.putShort(length2);
                    iArr[i] = i2;
                    i++;
                    s += length2;
                    break;
            }
        }
        byteArrayOutputStream.write(allocate.array());
        ByteBuffer allocate2 = ByteBuffer.allocate(s);
        for (int i3 : iArr) {
            allocate2.put(((String) objArr[i3]).getBytes(DEFAULT_CHARSET));
        }
        byteArrayOutputStream.write(allocate2.array());
        if (this.isSynchronous) {
            publishEvent(byteArrayOutputStream.toByteArray(), z);
        } else {
            publishToDisruptor(byteArrayOutputStream.toByteArray());
        }
    }

    private void publishToDisruptor(byte[] bArr) {
        long next = this.ringBuffer.next();
        try {
            ((BiteArrayHolder) this.ringBuffer.get(next)).bytes = bArr;
            this.ringBuffer.publish(next);
        } catch (Throwable th) {
            this.ringBuffer.publish(next);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishEvent(byte[] bArr, boolean z) throws IOException {
        this.outputStream.write(bArr);
        if (z) {
            this.outputStream.flush();
        }
    }

    private void initializeDisruptor(TCPEventPublisherConfig tCPEventPublisherConfig) {
        this.disruptor = new Disruptor<>(new EventFactory<BiteArrayHolder>() { // from class: org.wso2.carbon.event.processor.common.transport.client.TCPEventPublisher.1
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public BiteArrayHolder m54newInstance() {
                return new BiteArrayHolder();
            }
        }, tCPEventPublisherConfig.getBufferSize(), Executors.newSingleThreadExecutor());
        this.ringBuffer = this.disruptor.getRingBuffer();
        this.disruptor.handleExceptionsWith(new ExceptionHandler() { // from class: org.wso2.carbon.event.processor.common.transport.client.TCPEventPublisher.2
            public void handleEventException(Throwable th, long j, Object obj) {
            }

            public void handleOnStartException(Throwable th) {
            }

            public void handleOnShutdownException(Throwable th) {
            }
        });
        this.disruptor.handleEventsWith(new EventHandler[]{new EventHandler<BiteArrayHolder>() { // from class: org.wso2.carbon.event.processor.common.transport.client.TCPEventPublisher.3
            public void onEvent(BiteArrayHolder biteArrayHolder, long j, boolean z) throws IOException {
                TCPEventPublisher.this.publishEvent(biteArrayHolder.bytes, z);
            }
        }});
        this.disruptor.start();
    }

    public void shutdown() {
        try {
            if (!this.isSynchronous) {
                this.disruptor.shutdown();
            }
            this.outputStream.flush();
            this.outputStream.close();
            this.clientSocket.close();
        } catch (IOException e) {
            log.warn("Error while closing stream to " + this.hostUrl + " : " + e.getMessage(), e);
        }
    }

    public void terminate() {
        try {
            if (!this.isSynchronous) {
                this.disruptor.halt();
            }
            this.outputStream.close();
            this.clientSocket.close();
        } catch (IOException e) {
            log.warn("Error while closing stream to " + this.hostUrl + " : " + e.getMessage(), e);
        }
    }
}
