/*
 * Decompiled with CFR 0.152.
 */
package org.ballerinalang.stdlib.socket.tcp;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.NotYetConnectedException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.ballerinalang.jvm.BallerinaValues;
import org.ballerinalang.jvm.runtime.BLangThreadFactory;
import org.ballerinalang.jvm.types.BArrayType;
import org.ballerinalang.jvm.types.BPackage;
import org.ballerinalang.jvm.types.BTupleType;
import org.ballerinalang.jvm.types.BType;
import org.ballerinalang.jvm.types.BTypes;
import org.ballerinalang.jvm.values.ArrayValue;
import org.ballerinalang.jvm.values.ErrorValue;
import org.ballerinalang.jvm.values.MapValue;
import org.ballerinalang.stdlib.socket.SocketConstants;
import org.ballerinalang.stdlib.socket.exceptions.SelectorInitializeException;
import org.ballerinalang.stdlib.socket.tcp.ChannelRegisterCallback;
import org.ballerinalang.stdlib.socket.tcp.ReadPendingCallback;
import org.ballerinalang.stdlib.socket.tcp.ReadPendingSocketMap;
import org.ballerinalang.stdlib.socket.tcp.ReadReadySocketMap;
import org.ballerinalang.stdlib.socket.tcp.SelectorDispatcher;
import org.ballerinalang.stdlib.socket.tcp.SocketReader;
import org.ballerinalang.stdlib.socket.tcp.SocketService;
import org.ballerinalang.stdlib.socket.tcp.SocketUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SelectorManager {
    private static final Logger log = LoggerFactory.getLogger(SelectorManager.class);
    private Selector selector;
    private ThreadFactory threadFactory = new BLangThreadFactory("socket-selector");
    private ExecutorService executor = null;
    private boolean running = false;
    private boolean executing = true;
    private ConcurrentLinkedQueue<ChannelRegisterCallback> registerPendingSockets = new ConcurrentLinkedQueue();
    private ConcurrentLinkedQueue<Integer> readReadySockets = new ConcurrentLinkedQueue();
    private final Object startStopLock = new Object();
    private static final BTupleType receiveFromResultTuple = new BTupleType(Arrays.asList(new BArrayType(BTypes.typeByte), BTypes.typeInt, BallerinaValues.createRecordValue((BPackage)SocketConstants.SOCKET_PACKAGE_ID, (String)"Address").getType()));
    private static final BTupleType tcpReadResultTuple = new BTupleType(Arrays.asList(new BArrayType(BTypes.typeByte), BTypes.typeInt));

    private SelectorManager() throws IOException {
        this.selector = Selector.open();
    }

    public static SelectorManager getInstance() throws SelectorInitializeException {
        return SelectorManagerHolder.manager;
    }

    public void registerChannel(ChannelRegisterCallback callback) {
        this.registerPendingSockets.add(callback);
        this.selector.wakeup();
    }

    public void unRegisterChannel(SelectableChannel channel) {
        SelectionKey selectionKey = channel.keyFor(this.selector);
        if (selectionKey != null) {
            selectionKey.cancel();
        }
    }

    void invokePendingReadReadyResources(int socketHashCode) {
        this.readReadySockets.add(socketHashCode);
        this.selector.wakeup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        Object object = this.startStopLock;
        synchronized (object) {
            if (this.running) {
                return;
            }
            if (this.executor == null || this.executor.isTerminated()) {
                this.executor = Executors.newSingleThreadExecutor(this.threadFactory);
            }
            this.running = true;
            this.executing = true;
            this.executor.execute(this::execute);
        }
    }

    private void execute() {
        while (this.executing) {
            try {
                this.registerChannels();
                this.invokeReadReadyResources();
                if (this.selector.select() == 0) continue;
                Iterator<SelectionKey> keyIterator = this.selector.selectedKeys().iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    keyIterator.remove();
                    this.performAction(key);
                }
            }
            catch (Throwable e) {
                log.error("An error occurred in selector loop: " + e.getMessage(), e);
            }
        }
    }

    private void registerChannels() {
        ChannelRegisterCallback channelRegisterCallback;
        while ((channelRegisterCallback = this.registerPendingSockets.poll()) != null) {
            SocketService socketService = channelRegisterCallback.getSocketService();
            try {
                socketService.getSocketChannel().register(this.selector, channelRegisterCallback.getInitialInterest(), socketService);
            }
            catch (ClosedChannelException e) {
                channelRegisterCallback.notifyFailure("socket already closed");
                continue;
            }
            boolean serviceAttached = socketService.getService() != null && channelRegisterCallback.getInitialInterest() == 1;
            channelRegisterCallback.notifyRegister(serviceAttached);
        }
    }

    private void invokeReadReadyResources() {
        Iterator<Integer> iterator = this.readReadySockets.iterator();
        while (iterator.hasNext()) {
            Integer socketHashCode = iterator.next();
            SocketReader socketReader = ReadReadySocketMap.getInstance().get(socketHashCode);
            if (socketReader == null) continue;
            iterator.remove();
            SocketService socketService = socketReader.getSocketService();
            this.invokeReadReadyResource(socketService);
        }
    }

    private void performAction(SelectionKey key) {
        if (!key.isValid()) {
            key.cancel();
        } else if (key.isAcceptable()) {
            this.onAccept(key);
        } else if (key.isReadable()) {
            this.onReadReady(key);
        }
    }

    private void onAccept(SelectionKey key) {
        SocketService socketService = (SocketService)key.attachment();
        ServerSocketChannel server = (ServerSocketChannel)socketService.getSocketChannel();
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            SocketService clientSocketService = new SocketService(client, socketService.getScheduler(), socketService.getService(), socketService.getReadTimeout());
            client.register(this.selector, 1, clientSocketService);
            SelectorDispatcher.invokeOnConnect(clientSocketService);
        }
        catch (ClosedByInterruptException e) {
            SelectorDispatcher.invokeOnError(new SocketService(socketService.getScheduler(), socketService.getService()), "client accept interrupt by another process");
        }
        catch (AsynchronousCloseException e) {
            SelectorDispatcher.invokeOnError(new SocketService(socketService.getScheduler(), socketService.getService()), "client closed by another process");
        }
        catch (ClosedChannelException e) {
            SelectorDispatcher.invokeOnError(new SocketService(socketService.getScheduler(), socketService.getService()), "client is already closed");
        }
        catch (IOException e) {
            log.error("An error occurred while accepting new client", (Throwable)e);
            SelectorDispatcher.invokeOnError(new SocketService(socketService.getScheduler(), socketService.getService()), "unable to accept a new client. " + e.getMessage());
        }
    }

    private void onReadReady(SelectionKey key) {
        SocketService socketService = (SocketService)key.attachment();
        key.interestOps(0);
        ReadReadySocketMap.getInstance().add(new SocketReader(socketService, key));
        this.invokeRead(key.channel().hashCode(), socketService.getService() != null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void invokeRead(int socketHashId, boolean clientServiceAttached) {
        ReadPendingSocketMap readPendingSocketMap = ReadPendingSocketMap.getInstance();
        if (readPendingSocketMap.isPending(socketHashId)) {
            ReadPendingCallback readPendingCallback = readPendingSocketMap.get(socketHashId);
            synchronized (readPendingCallback) {
                ReadReadySocketMap readReadySocketMap = ReadReadySocketMap.getInstance();
                if (readReadySocketMap.isReadReady(socketHashId)) {
                    SocketReader socketReader = readReadySocketMap.remove(socketHashId);
                    ReadPendingCallback callback = readPendingSocketMap.remove(socketHashId);
                    SelectableChannel channel = socketReader.getSocketService().getSocketChannel();
                    if (channel instanceof SocketChannel) {
                        this.readTcpSocket(socketReader, callback);
                    } else if (channel instanceof DatagramChannel) {
                        this.readUdpSocket(socketReader, callback);
                    }
                }
            }
        } else if (clientServiceAttached) {
            SocketReader socketReader = ReadReadySocketMap.getInstance().get(socketHashId);
            this.invokeReadReadyResource(socketReader.getSocketService());
        }
    }

    private void readUdpSocket(SocketReader socketReader, ReadPendingCallback callback) {
        DatagramChannel channel = (DatagramChannel)socketReader.getSocketService().getSocketChannel();
        try {
            ByteBuffer buffer = this.createBuffer(callback, channel);
            InetSocketAddress remoteAddress = (InetSocketAddress)channel.receive(buffer);
            callback.resetTimeout();
            int bufferPosition = buffer.position();
            callback.updateCurrentLength(bufferPosition);
            socketReader.getSelectionKey().interestOps(1);
            this.selector.wakeup();
            if (callback.getExpectedLength() != -100) {
                if (callback.getBuffer() == null) {
                    callback.setBuffer(ByteBuffer.allocate(buffer.capacity()));
                }
                buffer.flip();
                callback.getBuffer().put(buffer);
            }
            if (callback.getExpectedLength() != -100 && callback.getExpectedLength() != callback.getCurrentLength()) {
                ReadPendingSocketMap.getInstance().add(channel.hashCode(), callback);
                this.invokeRead(channel.hashCode(), false);
                return;
            }
            byte[] bytes = SocketUtils.getByteArrayFromByteBuffer(callback.getBuffer() == null ? buffer : callback.getBuffer());
            callback.getCallback().setReturnValues((Object)this.createUdpSocketReturnValue(callback, bytes, remoteAddress));
            callback.getCallback().notifySuccess();
            callback.cancelTimeout();
        }
        catch (CancelledKeyException | ClosedChannelException e) {
            this.processError(callback, null, "connection closed");
        }
        catch (IOException e) {
            log.error("Error while data receive.", (Throwable)e);
            this.processError(callback, null, e.getMessage());
        }
        catch (Throwable e) {
            log.error(e.getMessage(), e);
            this.processError(callback, SocketConstants.ErrorCode.ReadTimedOutError, "error while on receiveFrom operation");
        }
    }

    private void readTcpSocket(SocketReader socketReader, ReadPendingCallback callback) {
        SocketChannel socketChannel = (SocketChannel)socketReader.getSocketService().getSocketChannel();
        try {
            ByteBuffer buffer = this.createBuffer(callback, socketChannel);
            int read = socketChannel.read(buffer);
            callback.resetTimeout();
            if (read < 0) {
                SelectorManager.getInstance().unRegisterChannel(socketChannel);
            } else {
                callback.updateCurrentLength(read);
                socketReader.getSelectionKey().interestOps(1);
                this.selector.wakeup();
                if (callback.getBuffer() == null) {
                    callback.setBuffer(ByteBuffer.allocate(buffer.capacity()));
                }
                buffer.flip();
                callback.getBuffer().put(buffer);
                if (callback.getExpectedLength() != -100 && callback.getExpectedLength() != callback.getCurrentLength()) {
                    ReadPendingSocketMap.getInstance().add(socketChannel.hashCode(), callback);
                    this.invokeRead(socketChannel.hashCode(), socketReader.getSocketService().getService() != null);
                    return;
                }
            }
            byte[] bytes = SocketUtils.getByteArrayFromByteBuffer(callback.getBuffer() == null ? buffer : callback.getBuffer());
            callback.getCallback().setReturnValues((Object)this.createTcpSocketReturnValue(callback, bytes));
            callback.getCallback().notifySuccess();
            callback.cancelTimeout();
        }
        catch (NotYetConnectedException e) {
            this.processError(callback, null, "connection not yet connected");
        }
        catch (CancelledKeyException | ClosedChannelException e) {
            this.processError(callback, null, "connection closed");
        }
        catch (IOException e) {
            log.error("Error while read.", (Throwable)e);
            this.processError(callback, null, e.getMessage());
        }
        catch (Throwable e) {
            log.error(e.getMessage(), e);
            this.processError(callback, null, "error while on read operation");
        }
    }

    private void processError(ReadPendingCallback callback, SocketConstants.ErrorCode code, String msg) {
        ErrorValue socketError = code == null ? SocketUtils.createSocketError(msg) : SocketUtils.createSocketError(code, msg);
        callback.getCallback().setReturnValues((Object)socketError);
        callback.getCallback().notifySuccess();
    }

    private ArrayValue createTcpSocketReturnValue(ReadPendingCallback callback, byte[] bytes) {
        ArrayValue contentTuple = new ArrayValue((BType)tcpReadResultTuple);
        contentTuple.add(0L, (Object)new ArrayValue(bytes));
        contentTuple.add(1L, (Object)callback.getCurrentLength());
        return contentTuple;
    }

    private ArrayValue createUdpSocketReturnValue(ReadPendingCallback callback, byte[] bytes, InetSocketAddress remoteAddress) {
        MapValue address = BallerinaValues.createRecordValue((BPackage)SocketConstants.SOCKET_PACKAGE_ID, (String)"Address");
        address.put((Object)"port", (Object)remoteAddress.getPort());
        address.put((Object)"host", (Object)remoteAddress.getHostName());
        ArrayValue contentTuple = new ArrayValue((BType)receiveFromResultTuple);
        contentTuple.add(0L, (Object)new ArrayValue(bytes));
        contentTuple.add(1L, (Object)callback.getCurrentLength());
        contentTuple.add(2L, (Object)address);
        return contentTuple;
    }

    private ByteBuffer createBuffer(ReadPendingCallback callback, int osBufferSize) {
        ByteBuffer buffer;
        if (callback.getExpectedLength() == -100) {
            buffer = ByteBuffer.allocate(osBufferSize);
        } else {
            int newBufferSize = callback.getExpectedLength() - callback.getCurrentLength();
            buffer = ByteBuffer.allocate(newBufferSize);
        }
        return buffer;
    }

    private ByteBuffer createBuffer(ReadPendingCallback callback, SocketChannel socketChannel) throws SocketException {
        return this.createBuffer(callback, socketChannel.socket().getReceiveBufferSize());
    }

    private ByteBuffer createBuffer(ReadPendingCallback callback, DatagramChannel socketChannel) throws SocketException {
        return this.createBuffer(callback, socketChannel.socket().getReceiveBufferSize());
    }

    private void invokeReadReadyResource(SocketService socketService) {
        if (socketService.getResourceLock().tryAcquire()) {
            SelectorDispatcher.invokeReadReady(socketService);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        Object object = this.startStopLock;
        synchronized (object) {
            try {
                log.debug("Stopping the selector loop.");
                this.executing = false;
                this.running = false;
                this.selector.wakeup();
                SocketUtils.shutdownExecutor(this.executor);
            }
            catch (Throwable e) {
                log.error("Error occurred while stopping the selector loop: " + e.getMessage(), e);
            }
        }
    }

    private static class SelectorManagerHolder {
        private static SelectorManager manager;

        private SelectorManagerHolder() {
        }

        static {
            try {
                manager = new SelectorManager();
            }
            catch (IOException e) {
                throw new SelectorInitializeException("Unable to initialize the selector", e);
            }
        }
    }
}

