/*
 * Decompiled with CFR 0.152.
 */
package tachyon.worker;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.log4j.Logger;
import tachyon.Constants;
import tachyon.util.CommonUtils;
import tachyon.worker.BlocksLocker;
import tachyon.worker.DataServerMessage;
import tachyon.worker.WorkerStorage;

public class DataServer
implements Runnable {
    private static final Logger LOG = Logger.getLogger((String)Constants.LOGGER_TYPE);
    private InetSocketAddress mAddress;
    private ServerSocketChannel mServerChannel;
    private Selector mSelector;
    private Map<SocketChannel, DataServerMessage> mSendingData = Collections.synchronizedMap(new HashMap());
    private Map<SocketChannel, DataServerMessage> mReceivingData = Collections.synchronizedMap(new HashMap());
    private final BlocksLocker mBlocksLocker;
    private volatile boolean mShutdown = false;
    private volatile boolean mShutdowned = false;

    public DataServer(InetSocketAddress address, WorkerStorage workerStorage) {
        LOG.info((Object)("Starting DataServer @ " + address));
        this.mAddress = address;
        this.mBlocksLocker = new BlocksLocker(workerStorage, -1);
        try {
            this.mSelector = this.initSelector();
        }
        catch (IOException e) {
            LOG.error((Object)(e.getMessage() + this.mAddress), (Throwable)e);
            CommonUtils.runtimeException(e);
        }
    }

    private Selector initSelector() throws IOException {
        AbstractSelector socketSelector = SelectorProvider.provider().openSelector();
        this.mServerChannel = ServerSocketChannel.open();
        this.mServerChannel.configureBlocking(false);
        this.mServerChannel.socket().bind(this.mAddress);
        this.mServerChannel.register(socketSelector, 16);
        return socketSelector;
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(this.mSelector, 1);
    }

    private void read(SelectionKey key) throws IOException {
        int numRead;
        DataServerMessage tMessage;
        SocketChannel socketChannel = (SocketChannel)key.channel();
        if (this.mReceivingData.containsKey(socketChannel)) {
            tMessage = this.mReceivingData.get(socketChannel);
        } else {
            tMessage = DataServerMessage.createBlockRequestMessage();
            this.mReceivingData.put(socketChannel, tMessage);
        }
        try {
            numRead = tMessage.recv(socketChannel);
        }
        catch (IOException e) {
            key.cancel();
            socketChannel.close();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
            return;
        }
        if (numRead == -1) {
            key.channel().close();
            key.cancel();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
            return;
        }
        if (tMessage.isMessageReady()) {
            if (tMessage.getBlockId() <= 0L) {
                LOG.error((Object)("Invalid block id " + tMessage.getBlockId()));
                return;
            }
            key.interestOps(4);
            LOG.info((Object)("Get request for " + tMessage.getBlockId()));
            int lockId = this.mBlocksLocker.lock(tMessage.getBlockId());
            DataServerMessage tResponseMessage = DataServerMessage.createBlockResponseMessage(true, tMessage.getBlockId(), tMessage.getOffset(), tMessage.getLength());
            tResponseMessage.setLockId(lockId);
            this.mSendingData.put(socketChannel, tResponseMessage);
        }
    }

    private void write(SelectionKey key) {
        SocketChannel socketChannel = (SocketChannel)key.channel();
        DataServerMessage sendMessage = this.mSendingData.get(socketChannel);
        boolean closeChannel = false;
        try {
            sendMessage.send(socketChannel);
        }
        catch (IOException e) {
            closeChannel = true;
            LOG.error((Object)e.getMessage());
        }
        if (sendMessage.finishSending() || closeChannel) {
            try {
                key.channel().close();
            }
            catch (IOException e) {
                LOG.error((Object)e.getMessage());
            }
            key.cancel();
            this.mReceivingData.remove(socketChannel);
            this.mSendingData.remove(socketChannel);
            sendMessage.close();
            this.mBlocksLocker.unlock(Math.abs(sendMessage.getBlockId()), sendMessage.getLockId());
        }
    }

    public void close() throws IOException {
        this.mShutdown = true;
        this.mServerChannel.close();
        this.mSelector.close();
    }

    public boolean isClosed() {
        return this.mShutdowned;
    }

    @Override
    public void run() {
        while (!this.mShutdown) {
            try {
                this.mSelector.select();
                if (this.mShutdown) break;
                Iterator<SelectionKey> selectKeys = this.mSelector.selectedKeys().iterator();
                while (selectKeys.hasNext()) {
                    SelectionKey key = selectKeys.next();
                    selectKeys.remove();
                    if (!key.isValid()) continue;
                    if (key.isAcceptable()) {
                        this.accept(key);
                        continue;
                    }
                    if (key.isReadable()) {
                        this.read(key);
                        continue;
                    }
                    if (!key.isWritable()) continue;
                    this.write(key);
                }
            }
            catch (Exception e) {
                LOG.error((Object)e.getMessage(), (Throwable)e);
                if (this.mShutdown) break;
                throw new RuntimeException(e);
            }
        }
        this.mShutdowned = true;
    }
}

