package alluxio.worker.nio;

import alluxio.Configuration;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.worker.DataServer;
import alluxio.worker.DataServerMessage;
import alluxio.worker.block.BlockWorker;
import alluxio.worker.block.io.BlockReader;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/worker/nio/NIODataServer.class */
public final class NIODataServer implements Runnable, DataServer {
    private static final Logger LOG = LoggerFactory.getLogger("alluxio.logger.type");
    private final InetSocketAddress mAddress;
    private ServerSocketChannel mServerChannel;
    private Selector mSelector;
    private final BlockWorker mBlockWorker;
    private final Thread mListenerThread;
    private final Map<SocketChannel, DataServerMessage> mSendingData = Collections.synchronizedMap(new HashMap());
    private final Map<SocketChannel, DataServerMessage> mReceivingData = Collections.synchronizedMap(new HashMap());
    private volatile boolean mShutdown = false;
    private volatile boolean mShutdownComplete = false;

    public NIODataServer(InetSocketAddress inetSocketAddress, BlockWorker blockWorker, Configuration configuration) {
        LOG.info("Starting DataServer @ {}", inetSocketAddress);
        NetworkAddressUtils.assertValidPort((InetSocketAddress) Preconditions.checkNotNull(inetSocketAddress));
        this.mAddress = inetSocketAddress;
        this.mBlockWorker = (BlockWorker) Preconditions.checkNotNull(blockWorker);
        try {
            this.mSelector = initSelector();
            this.mListenerThread = new Thread(this);
            this.mListenerThread.start();
        } catch (IOException e) {
            LOG.error(e.getMessage() + this.mAddress, e);
            throw Throwables.propagate(e);
        }
    }

    private void accept(SelectionKey selectionKey) throws IOException {
        SocketChannel accept = ((ServerSocketChannel) selectionKey.channel()).accept();
        accept.configureBlocking(false);
        accept.register(this.mSelector, 1);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.mShutdown = true;
        this.mServerChannel.close();
        this.mSelector.close();
    }

    @Override // alluxio.worker.DataServer
    public String getBindHost() {
        return this.mServerChannel.socket().getInetAddress().getHostAddress();
    }

    @Override // alluxio.worker.DataServer
    public int getPort() {
        return this.mServerChannel.socket().getLocalPort();
    }

    @Override // alluxio.worker.DataServer
    public boolean isClosed() {
        return this.mShutdownComplete;
    }

    private Selector initSelector() throws IOException {
        AbstractSelector openSelector = SelectorProvider.provider().openSelector();
        try {
            this.mServerChannel = ServerSocketChannel.open();
            this.mServerChannel.configureBlocking(false);
            this.mServerChannel.socket().bind(this.mAddress);
            this.mServerChannel.register(openSelector, 16);
            return openSelector;
        } catch (IOException e) {
            try {
                openSelector.close();
            } catch (IOException e2) {
                LOG.warn("Unable to close socket selector. Exception: {}", e2.getMessage());
            }
            throw e;
        } catch (RuntimeException e3) {
            try {
                openSelector.close();
            } catch (IOException e4) {
                LOG.warn("Unable to close socket selector. Exception: {}", e4.getMessage());
            }
            throw e3;
        }
    }

    private void read(SelectionKey selectionKey) throws Exception {
        DataServerMessage createBlockRequestMessage;
        ByteBuffer byteBuffer;
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        if (this.mReceivingData.containsKey(socketChannel)) {
            createBlockRequestMessage = this.mReceivingData.get(socketChannel);
        } else {
            createBlockRequestMessage = DataServerMessage.createBlockRequestMessage();
            this.mReceivingData.put(socketChannel, createBlockRequestMessage);
        }
        try {
            if (createBlockRequestMessage.recv(socketChannel) == -1) {
                selectionKey.channel().close();
                selectionKey.cancel();
                this.mReceivingData.remove(socketChannel);
                this.mSendingData.remove(socketChannel);
                return;
            }
            if (createBlockRequestMessage.isMessageReady()) {
                if (createBlockRequestMessage.getBlockId() <= 0) {
                    LOG.error("Invalid block id {}", Long.valueOf(createBlockRequestMessage.getBlockId()));
                    return;
                }
                selectionKey.interestOps(4);
                long blockId = createBlockRequestMessage.getBlockId();
                LOG.info("Get request for blockId: {}", Long.valueOf(blockId));
                long lockId = createBlockRequestMessage.getLockId();
                long sessionId = createBlockRequestMessage.getSessionId();
                BlockReader readBlockRemote = this.mBlockWorker.readBlockRemote(sessionId, blockId, lockId);
                int i = 0;
                try {
                    try {
                        byteBuffer = readBlockRemote.read(createBlockRequestMessage.getOffset(), createBlockRequestMessage.getLength());
                        this.mBlockWorker.accessBlock(sessionId, blockId);
                        i = byteBuffer.limit();
                        readBlockRemote.close();
                    } catch (Exception e) {
                        LOG.error(e.getMessage(), e);
                        byteBuffer = null;
                        readBlockRemote.close();
                    }
                    DataServerMessage createBlockResponseMessage = DataServerMessage.createBlockResponseMessage(true, blockId, createBlockRequestMessage.getOffset(), i, byteBuffer);
                    createBlockResponseMessage.setLockId(lockId);
                    this.mSendingData.put(socketChannel, createBlockResponseMessage);
                } catch (Throwable th) {
                    readBlockRemote.close();
                    throw th;
                }
            }
        } catch (IOException e2) {
            selectionKey.cancel();
            socketChannel.close();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.mShutdown) {
            try {
                this.mSelector.select();
                if (this.mShutdown) {
                    break;
                }
                Iterator<SelectionKey> it = this.mSelector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey next = it.next();
                    it.remove();
                    if (next.isValid()) {
                        if (next.isAcceptable()) {
                            accept(next);
                        } else if (next.isReadable()) {
                            read(next);
                        } else if (next.isWritable()) {
                            write(next);
                        }
                    }
                }
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
                if (!this.mShutdown) {
                    try {
                        close();
                    } catch (Exception e2) {
                        LOG.error("Exception when closing data server. message: {}", e2.getMessage());
                    }
                    this.mShutdownComplete = true;
                    throw new RuntimeException(e);
                }
            }
        }
        this.mShutdownComplete = true;
    }

    private void write(SelectionKey selectionKey) {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        DataServerMessage dataServerMessage = this.mSendingData.get(socketChannel);
        boolean z = false;
        try {
            dataServerMessage.send(socketChannel);
        } catch (IOException e) {
            z = true;
            LOG.error(e.getMessage());
        }
        if (dataServerMessage.finishSending() || z) {
            try {
                selectionKey.channel().close();
            } catch (IOException e2) {
                LOG.error(e2.getMessage());
            }
            selectionKey.cancel();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
            dataServerMessage.close();
        }
    }
}
