/*
 * Decompiled with CFR 0.152.
 */
package org.apache.catalina.tribes.transport.nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.sql.Timestamp;
import org.apache.catalina.tribes.ChannelMessage;
import org.apache.catalina.tribes.RemoteProcessException;
import org.apache.catalina.tribes.UniqueId;
import org.apache.catalina.tribes.io.BufferPool;
import org.apache.catalina.tribes.io.ChannelData;
import org.apache.catalina.tribes.io.ListenCallback;
import org.apache.catalina.tribes.io.ObjectReader;
import org.apache.catalina.tribes.transport.AbstractRxTask;
import org.apache.catalina.tribes.transport.Constants;
import org.apache.catalina.tribes.transport.nio.NioReceiver;
import org.apache.catalina.tribes.util.Logs;
import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;

public class NioReplicationTask
extends AbstractRxTask {
    private static Log log = LogFactory.getLog(NioReplicationTask.class);
    private ByteBuffer buffer = null;
    private SelectionKey key;
    private int rxBufSize;
    private NioReceiver receiver;

    public NioReplicationTask(ListenCallback callback, NioReceiver receiver) {
        super(callback);
        this.receiver = receiver;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void run() {
        block15: {
            this.buffer = (this.getOptions() & 4) == 4 ? ByteBuffer.allocateDirect(this.getRxBufSize()) : ByteBuffer.allocate(this.getRxBufSize());
            if (this.key == null) {
                return;
            }
            if (log.isTraceEnabled()) {
                log.trace((Object)("Servicing key:" + this.key));
            }
            try {
                ObjectReader reader = (ObjectReader)this.key.attachment();
                if (reader == null) {
                    if (log.isTraceEnabled()) {
                        log.trace((Object)("No object reader, cancelling:" + this.key));
                    }
                    this.cancelKey(this.key);
                    break block15;
                }
                if (log.isTraceEnabled()) {
                    log.trace((Object)("Draining channel:" + this.key));
                }
                this.drainChannel(this.key, reader);
            }
            catch (Exception e) {
                if (!(e instanceof CancelledKeyException)) {
                    if (e instanceof IOException) {
                        if (log.isDebugEnabled()) {
                            log.debug((Object)("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed[" + e.getMessage() + "]."), (Throwable)e);
                        } else {
                            log.warn((Object)("IOException in replication worker, unable to drain channel. Probable cause: Keep alive socket closed[" + e.getMessage() + "]."));
                        }
                    } else if (log.isErrorEnabled()) {
                        log.error((Object)"Exception caught in TcpReplicationThread.drainChannel.", (Throwable)e);
                    }
                }
                this.cancelKey(this.key);
            }
        }
        this.key = null;
        this.getTaskPool().returnWorker(this);
    }

    public synchronized void serviceChannel(SelectionKey key) {
        ObjectReader reader;
        if (log.isTraceEnabled()) {
            log.trace((Object)("About to service key:" + key));
        }
        if ((reader = (ObjectReader)key.attachment()) != null) {
            reader.setLastAccess(System.currentTimeMillis());
        }
        this.key = key;
        key.interestOps(key.interestOps() & 0xFFFFFFFE);
        key.interestOps(key.interestOps() & 0xFFFFFFFB);
    }

    protected void drainChannel(SelectionKey key, ObjectReader reader) throws Exception {
        int count;
        reader.setLastAccess(System.currentTimeMillis());
        reader.access();
        SocketChannel channel = (SocketChannel)key.channel();
        this.buffer.clear();
        while ((count = channel.read(this.buffer)) > 0) {
            this.buffer.flip();
            if (this.buffer.hasArray()) {
                reader.append(this.buffer.array(), 0, count, false);
            } else {
                reader.append(this.buffer, count, false);
            }
            this.buffer.clear();
            if (!reader.hasPackage()) continue;
        }
        int pkgcnt = reader.count();
        if (count < 0 && pkgcnt == 0) {
            this.remoteEof(key);
            return;
        }
        ChannelMessage[] msgs = pkgcnt == 0 ? ChannelData.EMPTY_DATA_ARRAY : reader.execute();
        this.registerForRead(key, reader);
        for (int i = 0; i < msgs.length; ++i) {
            block16: {
                if (ChannelData.sendAckAsync(msgs[i].getOptions())) {
                    this.sendAck(key, channel, Constants.ACK_COMMAND);
                }
                try {
                    if (Logs.MESSAGES.isTraceEnabled()) {
                        try {
                            Logs.MESSAGES.trace((Object)("NioReplicationThread - Received msg:" + new UniqueId(msgs[i].getUniqueId()) + " at " + new Timestamp(System.currentTimeMillis())));
                        }
                        catch (Throwable t) {
                            // empty catch block
                        }
                    }
                    this.getCallback().messageDataReceived(msgs[i]);
                    if (ChannelData.sendAckSync(msgs[i].getOptions())) {
                        this.sendAck(key, channel, Constants.ACK_COMMAND);
                    }
                }
                catch (RemoteProcessException e) {
                    if (log.isDebugEnabled()) {
                        log.error((Object)"Processing of cluster message failed.", (Throwable)e);
                    }
                    if (ChannelData.sendAckSync(msgs[i].getOptions())) {
                        this.sendAck(key, channel, Constants.FAIL_ACK_COMMAND);
                    }
                }
                catch (Exception e) {
                    log.error((Object)"Processing of cluster message failed.", (Throwable)e);
                    if (!ChannelData.sendAckSync(msgs[i].getOptions())) break block16;
                    this.sendAck(key, channel, Constants.FAIL_ACK_COMMAND);
                }
            }
            if (!this.getUseBufferPool()) continue;
            BufferPool.getBufferPool().returnBuffer(msgs[i].getMessage());
            msgs[i].setMessage(null);
        }
        if (count < 0) {
            this.remoteEof(key);
            return;
        }
    }

    private void remoteEof(SelectionKey key) {
        if (log.isDebugEnabled()) {
            log.debug((Object)"Channel closed on the remote end, disconnecting");
        }
        this.cancelKey(key);
    }

    protected void registerForRead(final SelectionKey key, ObjectReader reader) {
        if (log.isTraceEnabled()) {
            log.trace((Object)("Adding key for read event:" + key));
        }
        reader.finish();
        Runnable r = new Runnable(){

            public void run() {
                try {
                    if (key.isValid()) {
                        key.selector().wakeup();
                        int resumeOps = key.interestOps() | 1;
                        key.interestOps(resumeOps);
                        if (log.isTraceEnabled()) {
                            log.trace((Object)("Registering key for read:" + key));
                        }
                    }
                }
                catch (CancelledKeyException ckx) {
                    NioReceiver.cancelledKey(key);
                    if (log.isTraceEnabled()) {
                        log.trace((Object)("CKX Cancelling key:" + key));
                    }
                }
                catch (Exception x) {
                    log.error((Object)("Error registering key for read:" + key), (Throwable)x);
                }
            }
        };
        this.receiver.addEvent(r);
    }

    private void cancelKey(final SelectionKey key) {
        ObjectReader reader;
        if (log.isTraceEnabled()) {
            log.trace((Object)("Adding key for cancel event:" + key));
        }
        if ((reader = (ObjectReader)key.attachment()) != null) {
            reader.setCancelled(true);
            reader.finish();
        }
        Runnable cx = new Runnable(){

            public void run() {
                if (log.isTraceEnabled()) {
                    log.trace((Object)("Cancelling key:" + key));
                }
                NioReceiver.cancelledKey(key);
            }
        };
        this.receiver.addEvent(cx);
    }

    protected void sendAck(SelectionKey key, SocketChannel channel, byte[] command) {
        try {
            ByteBuffer buf = ByteBuffer.wrap(command);
            for (int total = 0; total < command.length; total += channel.write(buf)) {
            }
            if (log.isTraceEnabled()) {
                log.trace((Object)("ACK sent to " + channel.socket().getPort()));
            }
        }
        catch (IOException x) {
            log.warn((Object)("Unable to send ACK back through channel, channel disconnected?: " + x.getMessage()));
        }
    }

    public void setRxBufSize(int rxBufSize) {
        this.rxBufSize = rxBufSize;
    }

    public int getRxBufSize() {
        return this.rxBufSize;
    }
}

