package org.wso2.carbon.event.processor.manager.commons.transport.client;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.event.processor.manager.commons.transport.common.EventServerUtils;
import org.wso2.carbon.event.processor.manager.commons.transport.common.StreamRuntimeInfo;
import org.wso2.carbon.event.processor.manager.commons.transport.server.ConnectionCallback;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/event/processor/manager/commons/transport/client/TCPEventPublisher.class */
public class TCPEventPublisher {
    public static final int PING_HEADER_VALUE = -99;
    private static Log log = LogFactory.getLog(TCPEventPublisher.class);
    private final String hostUrl;
    private Disruptor<ByteArrayHolder> disruptor;
    private RingBuffer<ByteArrayHolder> ringBuffer;
    private Map<String, StreamRuntimeInfo> streamRuntimeInfoMap;
    private OutputStream outputStream;
    private Socket clientSocket;
    private TCPEventPublisherConfig publisherConfig;
    public String defaultCharset;
    private Timer connectionStatusCheckTimer;
    private ExecutorService socketIoExecutorService;
    private long socketIoTimeout;
    private boolean isSynchronous;
    private ConnectionCallback connectionCallback;
    private ConnectionFailureHandler failureHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.wso2.carbon.event.processor.manager.commons.transport.client.TCPEventPublisher$6, reason: invalid class name */
    /* loaded from: input_file:org/wso2/carbon/event/processor/manager/commons/transport/client/TCPEventPublisher$6.class */
    public static /* synthetic */ class AnonymousClass6 {
        static final /* synthetic */ int[] $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type = new int[Attribute.Type.values().length];

        static {
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.INT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.LONG.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.BOOL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.FLOAT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.DOUBLE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.STRING.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/manager/commons/transport/client/TCPEventPublisher$ByteArrayHolder.class */
    public class ByteArrayHolder {
        byte[] bytes;

        ByteArrayHolder() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/wso2/carbon/event/processor/manager/commons/transport/client/TCPEventPublisher$ConnectionStatusCheckTask.class */
    public class ConnectionStatusCheckTask extends TimerTask {
        ConnectionStatusCheckTask() {
        }

        private byte[] createPing() throws IOException {
            ByteBuffer allocate = ByteBuffer.allocate(4);
            allocate.putInt(-99);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            byteArrayOutputStream.write(allocate.array());
            return byteArrayOutputStream.toByteArray();
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            try {
                TCPEventPublisher.this.publishEvent(createPing(), true);
            } catch (IOException e) {
                TCPEventPublisher.log.warn("Ping failed to " + TCPEventPublisher.this.getHostUrl() + " with error: " + e.getMessage());
                TCPEventPublisher.this.connectionStatusCheckTimer.cancel();
                if (TCPEventPublisher.this.failureHandler != null) {
                    TCPEventPublisher.this.failureHandler.onConnectionFail(e);
                }
            }
        }
    }

    public TCPEventPublisher(String str, TCPEventPublisherConfig tCPEventPublisherConfig, boolean z, ConnectionCallback connectionCallback) throws IOException {
        this.socketIoTimeout = 2147483647L;
        this.failureHandler = null;
        this.hostUrl = str;
        this.publisherConfig = tCPEventPublisherConfig;
        this.defaultCharset = tCPEventPublisherConfig.getCharset();
        this.streamRuntimeInfoMap = new ConcurrentHashMap();
        this.isSynchronous = z;
        this.connectionCallback = connectionCallback;
        this.socketIoExecutorService = Executors.newCachedThreadPool();
        this.socketIoTimeout = tCPEventPublisherConfig.getConnectionStatusCheckInterval() / 4;
        if (z) {
            connect(str);
            return;
        }
        try {
            connect(str);
        } catch (IOException e) {
            log.error("Error connection to " + str, e);
        }
        initializeDisruptor(tCPEventPublisherConfig);
    }

    private synchronized void connect(String str) throws IOException {
        String[] split = str.split(":");
        this.clientSocket = new Socket(split[0], Integer.parseInt(split[1]));
        this.clientSocket.setKeepAlive(true);
        this.clientSocket.setTcpNoDelay(true);
        this.clientSocket.setSendBufferSize(this.publisherConfig.getTcpSendBufferSize());
        this.outputStream = new BufferedOutputStream(this.clientSocket.getOutputStream());
        log.info("Connecting to " + str);
        if (this.connectionCallback != null) {
            this.connectionCallback.onCepReceiverConnect();
        }
        this.connectionStatusCheckTimer = new Timer();
        this.connectionStatusCheckTimer.schedule(new ConnectionStatusCheckTask(), this.publisherConfig.getConnectionStatusCheckInterval(), this.publisherConfig.getConnectionStatusCheckInterval());
    }

    public TCPEventPublisher(String str, boolean z, ConnectionCallback connectionCallback) throws IOException {
        this(str, new TCPEventPublisherConfig(), z, connectionCallback);
    }

    public void addStreamDefinition(StreamDefinition streamDefinition) {
        this.streamRuntimeInfoMap.put(streamDefinition.getId(), EventServerUtils.createStreamRuntimeInfo(streamDefinition));
    }

    public void removeStreamDefinition(StreamDefinition streamDefinition) {
        this.streamRuntimeInfoMap.remove(streamDefinition.getId());
    }

    public void registerConnectionFailureHandler(ConnectionFailureHandler connectionFailureHandler) {
        this.failureHandler = connectionFailureHandler;
    }

    public void sendEvent(String str, long j, Object[] objArr, boolean z) throws IOException {
        sendEvent(str, j, objArr, null, z);
    }

    public void sendEvent(String str, long j, Object[] objArr, Map<String, String> map, boolean z) throws IOException {
        StreamRuntimeInfo streamRuntimeInfo = this.streamRuntimeInfoMap.get(str);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        boolean z2 = map != null;
        int length = streamRuntimeInfo.getStreamId().getBytes(this.defaultCharset).length;
        int i = 0;
        if (z2) {
            for (Map.Entry<String, String> entry : map.entrySet()) {
                i = i + 4 + entry.getKey().getBytes(this.defaultCharset).length + 4 + entry.getValue().getBytes(this.defaultCharset).length;
            }
        }
        ByteBuffer allocate = ByteBuffer.allocate(streamRuntimeInfo.getFixedMessageSize() + length + 16);
        allocate.putInt(length);
        allocate.put(streamRuntimeInfo.getStreamId().getBytes(this.defaultCharset));
        allocate.putLong(j);
        allocate.putInt(i);
        int[] iArr = new int[streamRuntimeInfo.getNoOfStringAttributes()];
        int i2 = 0;
        int i3 = 0;
        Attribute.Type[] attributeTypes = streamRuntimeInfo.getAttributeTypes();
        int length2 = attributeTypes.length;
        for (int i4 = 0; i4 < length2; i4++) {
            switch (AnonymousClass6.$SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[attributeTypes[i4].ordinal()]) {
                case 1:
                    allocate.putInt(((Integer) objArr[i4]).intValue());
                    break;
                case 2:
                    allocate.putLong(((Long) objArr[i4]).longValue());
                    break;
                case 3:
                    allocate.put((byte) (((Boolean) objArr[i4]).booleanValue() ? 1 : 0));
                    break;
                case 4:
                    allocate.putFloat(((Float) objArr[i4]).floatValue());
                    break;
                case 5:
                    allocate.putDouble(((Double) objArr[i4]).doubleValue());
                    break;
                case 6:
                    if (objArr[i4] == null) {
                        allocate.putInt(-1);
                    } else {
                        int length3 = ((String) objArr[i4]).getBytes(this.defaultCharset).length;
                        allocate.putInt(length3);
                        i3 += length3;
                    }
                    iArr[i2] = i4;
                    i2++;
                    break;
            }
        }
        byteArrayOutputStream.write(allocate.array());
        ByteBuffer allocate2 = ByteBuffer.allocate(i3);
        for (int i5 : iArr) {
            if (objArr[i5] != null) {
                allocate2.put(((String) objArr[i5]).getBytes(this.defaultCharset));
            }
        }
        byteArrayOutputStream.write(allocate2.array());
        if (i > 0) {
            ByteBuffer allocate3 = ByteBuffer.allocate(i);
            for (Map.Entry<String, String> entry2 : map.entrySet()) {
                allocate3.putInt(entry2.getKey().length());
                allocate3.put(entry2.getKey().getBytes(this.defaultCharset));
                allocate3.putInt(entry2.getValue().length());
                allocate3.put(entry2.getValue().getBytes(this.defaultCharset));
            }
            byteArrayOutputStream.write(allocate3.array());
        }
        if (this.isSynchronous) {
            publishEvent(byteArrayOutputStream.toByteArray(), z);
        } else {
            publishToDisruptor(byteArrayOutputStream.toByteArray());
        }
    }

    private void publishToDisruptor(byte[] bArr) {
        long next = this.ringBuffer.next();
        try {
            ((ByteArrayHolder) this.ringBuffer.get(next)).bytes = bArr;
            this.ringBuffer.publish(next);
        } catch (Throwable th) {
            this.ringBuffer.publish(next);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void publishEvent(byte[] bArr, boolean z) throws IOException {
        doPublishEvent(bArr, z);
    }

    private void doPublishEvent(final byte[] bArr, final boolean z) throws IOException {
        try {
            this.socketIoExecutorService.submit(new Callable<Boolean>() { // from class: org.wso2.carbon.event.processor.manager.commons.transport.client.TCPEventPublisher.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Boolean call() throws IOException {
                    TCPEventPublisher.this.outputStream.write(bArr);
                    if (z) {
                        TCPEventPublisher.this.outputStream.flush();
                    }
                    return true;
                }
            }).get(this.socketIoTimeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException("Could not write the data to " + this.hostUrl);
        } catch (TimeoutException e2) {
            throw new IOException("Timed out writing the data to " + this.hostUrl + " in milliseconds " + this.socketIoTimeout);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void publishEventAsync(byte[] bArr, boolean z) throws IOException {
        if (this.outputStream == null) {
            try {
                log.info("Reconnecting to " + this.hostUrl);
                disconnect();
                connect(this.hostUrl);
                doPublishEvent(bArr, z);
                return;
            } catch (IOException e) {
                log.error("Error on reconnection to " + this.hostUrl, e);
                return;
            }
        }
        try {
            doPublishEvent(bArr, z);
        } catch (IOException e2) {
            try {
                log.error("Error on sending to " + this.hostUrl, e2);
                log.info("Reconnecting to " + this.hostUrl);
                disconnect();
                connect(this.hostUrl);
                doPublishEvent(bArr, z);
            } catch (IOException e3) {
                log.error("Error on reconnection to " + this.hostUrl, e3);
            }
        }
    }

    private void initializeDisruptor(TCPEventPublisherConfig tCPEventPublisherConfig) {
        this.disruptor = new Disruptor<>(new EventFactory<ByteArrayHolder>() { // from class: org.wso2.carbon.event.processor.manager.commons.transport.client.TCPEventPublisher.2
            /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
            public ByteArrayHolder m1newInstance() {
                return new ByteArrayHolder();
            }
        }, tCPEventPublisherConfig.getBufferSize(), Executors.newCachedThreadPool(), ProducerType.MULTI, new TimeoutBlockingWaitStrategy(1L, TimeUnit.SECONDS));
        this.ringBuffer = this.disruptor.getRingBuffer();
        this.disruptor.handleEventsWith(new EventHandler[]{new EventHandler<ByteArrayHolder>() { // from class: org.wso2.carbon.event.processor.manager.commons.transport.client.TCPEventPublisher.3
            public void onEvent(ByteArrayHolder byteArrayHolder, long j, boolean z) throws IOException {
                TCPEventPublisher.this.publishEventAsync(byteArrayHolder.bytes, z);
            }
        }});
        this.disruptor.start();
    }

    public void shutdown() {
        try {
            this.outputStream.flush();
        } catch (IOException e) {
            log.warn("Error while flushing output stream to " + this.hostUrl + " : " + e.getMessage(), e);
        } finally {
            terminate();
        }
    }

    public void terminate() {
        this.connectionStatusCheckTimer.cancel();
        if (!this.isSynchronous) {
            this.disruptor.shutdown();
        }
        disconnect();
        try {
            this.socketIoExecutorService.awaitTermination(this.socketIoTimeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.debug("Error while terminating the executor service", e);
        }
    }

    private void disconnect() {
        if (this.connectionStatusCheckTimer != null) {
            this.connectionStatusCheckTimer.cancel();
        }
        try {
            this.socketIoExecutorService.submit(new Runnable() { // from class: org.wso2.carbon.event.processor.manager.commons.transport.client.TCPEventPublisher.4
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (TCPEventPublisher.this.outputStream != null) {
                            TCPEventPublisher.this.outputStream.close();
                            TCPEventPublisher.this.outputStream = null;
                        }
                    } catch (IOException e) {
                        TCPEventPublisher.log.debug("Error while disconnecting to " + TCPEventPublisher.this.hostUrl + " : " + e.getMessage(), e);
                    }
                }
            }).get(this.socketIoTimeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            log.error("Could not close the output stream properly ", e);
        }
        try {
            this.socketIoExecutorService.submit(new Runnable() { // from class: org.wso2.carbon.event.processor.manager.commons.transport.client.TCPEventPublisher.5
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (TCPEventPublisher.this.clientSocket != null) {
                            TCPEventPublisher.this.clientSocket.close();
                            TCPEventPublisher.this.clientSocket = null;
                        }
                    } catch (IOException e2) {
                        TCPEventPublisher.log.debug("Error while closing socket to " + TCPEventPublisher.this.hostUrl + " : " + e2.getMessage(), e2);
                    }
                }
            }).get(this.socketIoTimeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e2) {
            log.error("Could not close the socket properly ", e2);
        }
        if (this.connectionCallback != null) {
            this.connectionCallback.onCepReceiverDisconnect();
        }
    }

    public String getHostUrl() {
        return this.hostUrl;
    }
}
