package org.wso2.carbon.event.processor.manager.commons.transport.server;

import java.io.BufferedInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.log4j.Logger;
import org.wso2.carbon.event.processor.manager.commons.transport.common.EventServerUtils;
import org.wso2.carbon.event.processor.manager.commons.transport.common.StreamRuntimeInfo;
import org.wso2.siddhi.query.api.definition.Attribute;
import org.wso2.siddhi.query.api.definition.StreamDefinition;

/* loaded from: input_file:org/wso2/carbon/event/processor/manager/commons/transport/server/TCPEventServer.class */
public class TCPEventServer {
    private static Logger log = Logger.getLogger(TCPEventServer.class);
    private TCPEventServerConfig tcpEventServerConfig;
    private StreamCallback streamCallback;
    private ConnectionCallback connectionCallback;
    private Map<String, StreamRuntimeInfo> streamRuntimeInfoMap = new ConcurrentHashMap();
    private ServerWorker serverWorker = new ServerWorker(this, null);
    private ExecutorService executorService = Executors.newCachedThreadPool();

    /* renamed from: org.wso2.carbon.event.processor.manager.commons.transport.server.TCPEventServer$1, reason: invalid class name */
    /* loaded from: input_file:org/wso2/carbon/event/processor/manager/commons/transport/server/TCPEventServer$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type = new int[Attribute.Type.values().length];

        static {
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.INT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.LONG.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.BOOL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.FLOAT.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.DOUBLE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[Attribute.Type.STRING.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* loaded from: input_file:org/wso2/carbon/event/processor/manager/commons/transport/server/TCPEventServer$ServerWorker.class */
    private class ServerWorker implements Runnable {
        private ServerSocket receiverSocket;
        private boolean isRunning;

        /* loaded from: input_file:org/wso2/carbon/event/processor/manager/commons/transport/server/TCPEventServer$ServerWorker$ListenerProcessor.class */
        private class ListenerProcessor implements Runnable {
            private final Socket connectionSocket;

            public ListenerProcessor(Socket socket) {
                this.connectionSocket = socket;
            }

            @Override // java.lang.Runnable
            public void run() {
                try {
                    try {
                        try {
                            try {
                                if (TCPEventServer.this.connectionCallback != null) {
                                    TCPEventServer.this.connectionCallback.onPublisherBoltConnect();
                                }
                                BufferedInputStream bufferedInputStream = new BufferedInputStream(this.connectionSocket.getInputStream());
                                while (true) {
                                    int i = ByteBuffer.wrap(loadData(bufferedInputStream, new byte[4])).getInt();
                                    if (i != -99) {
                                        byte[] loadData = loadData(bufferedInputStream, new byte[i]);
                                        String str = new String(loadData, 0, loadData.length);
                                        StreamRuntimeInfo streamRuntimeInfo = (StreamRuntimeInfo) TCPEventServer.this.streamRuntimeInfoMap.get(str);
                                        while (streamRuntimeInfo == null) {
                                            Thread.sleep(1000L);
                                            TCPEventServer.log.warn("TCP server on port :'" + TCPEventServer.this.tcpEventServerConfig.getPort() + "' waiting for streamId:'" + str + "' to process incoming events");
                                            streamRuntimeInfo = (StreamRuntimeInfo) TCPEventServer.this.streamRuntimeInfoMap.get(str);
                                        }
                                        Object[] objArr = new Object[streamRuntimeInfo.getNoOfAttributes()];
                                        byte[] loadData2 = loadData(bufferedInputStream, new byte[8 + streamRuntimeInfo.getFixedMessageSize()]);
                                        ByteBuffer wrap = ByteBuffer.wrap(loadData2, 0, loadData2.length);
                                        long j = wrap.getLong();
                                        ArrayList arrayList = new ArrayList();
                                        Attribute.Type[] attributeTypes = streamRuntimeInfo.getAttributeTypes();
                                        for (int i2 = 0; i2 < attributeTypes.length; i2++) {
                                            switch (AnonymousClass1.$SwitchMap$org$wso2$siddhi$query$api$definition$Attribute$Type[attributeTypes[i2].ordinal()]) {
                                                case 1:
                                                    objArr[i2] = Integer.valueOf(wrap.getInt());
                                                    break;
                                                case 2:
                                                    objArr[i2] = Long.valueOf(wrap.getLong());
                                                    break;
                                                case 3:
                                                    objArr[i2] = Boolean.valueOf(wrap.get() == 1);
                                                    break;
                                                case 4:
                                                    objArr[i2] = Float.valueOf(wrap.getFloat());
                                                    break;
                                                case 5:
                                                    objArr[i2] = Double.valueOf(wrap.getDouble());
                                                    break;
                                                case 6:
                                                    arrayList.add(Integer.valueOf(wrap.getInt()));
                                                    break;
                                            }
                                        }
                                        int i3 = 0;
                                        for (int i4 = 0; i4 < attributeTypes.length; i4++) {
                                            if (Attribute.Type.STRING == attributeTypes[i4]) {
                                                byte[] loadData3 = loadData(bufferedInputStream, new byte[((Integer) arrayList.get(i3)).intValue()]);
                                                i3++;
                                                objArr[i4] = new String(loadData3, 0, loadData3.length);
                                            }
                                        }
                                        TCPEventServer.this.streamCallback.receive(str, j, objArr);
                                    }
                                }
                            } catch (IOException e) {
                                TCPEventServer.log.error("Error reading data from receiver socket:" + e.getMessage(), e);
                                if (TCPEventServer.this.connectionCallback != null) {
                                    TCPEventServer.this.connectionCallback.onPublisherBoltDisconnect();
                                }
                            }
                        } catch (EOFException e2) {
                            TCPEventServer.log.info("Closing listener socket. " + e2.getMessage());
                            if (TCPEventServer.this.connectionCallback != null) {
                                TCPEventServer.this.connectionCallback.onPublisherBoltDisconnect();
                            }
                        }
                    } catch (Throwable th) {
                        TCPEventServer.log.error("Error :" + th.getMessage(), th);
                        if (TCPEventServer.this.connectionCallback != null) {
                            TCPEventServer.this.connectionCallback.onPublisherBoltDisconnect();
                        }
                    }
                } catch (Throwable th2) {
                    if (TCPEventServer.this.connectionCallback != null) {
                        TCPEventServer.this.connectionCallback.onPublisherBoltDisconnect();
                    }
                    throw th2;
                }
            }

            private int loadData(BufferedInputStream bufferedInputStream) throws IOException {
                int read = bufferedInputStream.read();
                if (read != -1) {
                    return read;
                }
                throw new EOFException("Connection closed from remote end.");
            }

            private byte[] loadData(BufferedInputStream bufferedInputStream, byte[] bArr) throws IOException {
                int i = 0;
                do {
                    int read = bufferedInputStream.read(bArr, i, bArr.length - i);
                    if (read == -1) {
                        throw new EOFException("Connection closed from remote end.");
                    }
                    i += read;
                } while (i != bArr.length);
                return bArr;
            }
        }

        private ServerWorker() {
            this.isRunning = false;
        }

        public boolean isRunning() {
            return this.isRunning;
        }

        public void startServerWorker() throws IOException {
            InetAddress byName = InetAddress.getByName(TCPEventServer.this.tcpEventServerConfig.getHostName());
            TCPEventServer.log.info("EventServer starting event listener on " + byName.getHostAddress() + ":" + TCPEventServer.this.tcpEventServerConfig.getPort());
            this.receiverSocket = new ServerSocket(TCPEventServer.this.tcpEventServerConfig.getPort(), 50, byName);
            this.isRunning = true;
            this.receiverSocket.setReuseAddress(true);
        }

        public void shutdownServerWorker() {
            this.isRunning = false;
            try {
                if (this.receiverSocket != null) {
                    this.receiverSocket.close();
                }
            } catch (IOException e) {
                TCPEventServer.log.error("Error occurred while trying to shutdown socket: " + e.getMessage(), e);
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            while (this.isRunning) {
                try {
                    try {
                        Socket accept = this.receiverSocket.accept();
                        accept.setKeepAlive(true);
                        TCPEventServer.this.executorService.execute(new ListenerProcessor(accept));
                    } catch (Throwable th) {
                        if (this.isRunning) {
                            TCPEventServer.log.error("Error while the server was listening for events: " + th.getMessage(), th);
                        } else {
                            TCPEventServer.log.info("EventServer stopped listening for socket connections, " + th.getMessage());
                            if (TCPEventServer.log.isDebugEnabled()) {
                                TCPEventServer.log.debug("EventServer stopped listening for socket connections", th);
                            }
                        }
                        this.isRunning = false;
                        return;
                    }
                } catch (Throwable th2) {
                    this.isRunning = false;
                    throw th2;
                }
            }
            this.isRunning = false;
        }

        /* synthetic */ ServerWorker(TCPEventServer tCPEventServer, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    public TCPEventServer(TCPEventServerConfig tCPEventServerConfig, StreamCallback streamCallback, ConnectionCallback connectionCallback) {
        this.tcpEventServerConfig = new TCPEventServerConfig("0.0.0.0", 7211);
        this.tcpEventServerConfig = tCPEventServerConfig;
        this.streamCallback = streamCallback;
        this.connectionCallback = connectionCallback;
    }

    public void addStreamDefinition(StreamDefinition streamDefinition) {
        this.streamRuntimeInfoMap.put(streamDefinition.getId(), EventServerUtils.createStreamRuntimeInfo(streamDefinition));
    }

    public void removeStreamDefinition(String str) {
        this.streamRuntimeInfoMap.remove(str);
    }

    public synchronized void start() throws IOException {
        if (this.serverWorker.isRunning()) {
            return;
        }
        this.serverWorker.startServerWorker();
        new Thread(this.serverWorker).start();
    }

    public synchronized void shutdown() {
        this.serverWorker.shutdownServerWorker();
    }
}
