/*
 * Decompiled with CFR 0.152.
 */
package org.codehaus.activemq.transport.activeio;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketException;
import javax.jms.JMSException;
import org.activeio.AsynchChannel;
import org.activeio.AsynchChannelListener;
import org.activeio.adapter.PacketByteArrayOutputStream;
import org.activeio.adapter.PacketInputStream;
import org.activeio.net.SocketMetadata;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.io.WireFormat;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.transport.TransportChannelSupport;
import org.codehaus.activemq.transport.TransportStatusEvent;
import org.codehaus.activemq.transport.activeio.PacketAggregator;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class ActiveIOTransportChannel
extends TransportChannelSupport
implements AsynchChannelListener {
    private static final Log log = LogFactory.getLog((Class)ActiveIOTransportChannel.class);
    private final Object writeLock = new Object();
    private final AsynchChannel asynchChannel;
    private final SynchronizedBoolean closed = new SynchronizedBoolean(false);
    private final PacketByteArrayOutputStream outputBuffer = new PacketByteArrayOutputStream();
    private final DataOutputStream dataOut = new DataOutputStream((OutputStream)this.outputBuffer);
    private final PacketAggregator aggregator = new PacketAggregator(){

        protected void packetAssembled(org.activeio.Packet packet) {
            try {
                Packet p = ActiveIOTransportChannel.this.getWireFormat().readPacket(new DataInputStream((InputStream)new PacketInputStream(packet)));
                ActiveIOTransportChannel.this.doConsumePacket(p);
            }
            catch (IOException e) {
                ActiveIOTransportChannel.this.onPacketError(e);
            }
        }
    };

    public ActiveIOTransportChannel(WireFormat wireFormat, AsynchChannel asynchChannel) {
        super(wireFormat);
        this.asynchChannel = asynchChannel;
        asynchChannel.setAsynchChannelListener((AsynchChannelListener)this);
        SocketMetadata socket = (SocketMetadata)asynchChannel.narrow(SocketMetadata.class);
        if (socket != null) {
            try {
                socket.setTcpNoDelay(true);
            }
            catch (SocketException e) {
                // empty catch block
            }
        }
    }

    public void start() throws JMSException {
        try {
            this.asynchChannel.start();
        }
        catch (IOException e) {
            throw JMSExceptionHelper.newJMSException(e.getMessage(), e);
        }
    }

    public void stop() {
        if (this.closed.commit(false, true)) {
            super.stop();
            this.asynchChannel.dispose();
        }
    }

    public void forceDisconnect() {
        log.debug((Object)"Forcing disconnect");
        this.asynchChannel.dispose();
    }

    public void asyncSend(Packet packet) throws JMSException {
        this.doAsyncSend(packet);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Packet doAsyncSend(Packet packet) throws JMSException {
        Packet response = null;
        try {
            Object object = this.writeLock;
            synchronized (object) {
                response = this.getWireFormat().writePacket(packet, this.dataOut);
                this.dataOut.flush();
                this.asynchChannel.write(this.outputBuffer.getPacket());
                this.asynchChannel.flush();
                this.outputBuffer.reset();
            }
        }
        catch (IOException e) {
            if (this.closed.get()) {
                log.trace((Object)("Caught exception while closed: " + e), (Throwable)e);
            }
            throw JMSExceptionHelper.newJMSException("asyncSend failed: " + e, e);
        }
        catch (JMSException e) {
            if (this.closed.get()) {
                log.trace((Object)("Caught exception while closed: " + (Object)((Object)e)), (Throwable)e);
            }
            throw e;
        }
        return response;
    }

    public void onPacket(org.activeio.Packet packet) {
        try {
            this.aggregator.addRawPacket(packet);
        }
        catch (IOException e) {
            this.onPacketError(e);
        }
    }

    public void onPacketError(IOException ex) {
        if (!this.closed.get()) {
            if (!this.pendingStop) {
                this.setPendingStop(true);
                if (ex instanceof EOFException && !this.isServerSide()) {
                    log.warn((Object)"Peer closed connection", (Throwable)ex);
                } else {
                    this.onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
                }
                this.fireStatusEvent(new TransportStatusEvent(this, 2));
            }
            this.stop();
        }
    }

    public AsynchChannel getAsynchChannel() {
        return this.asynchChannel;
    }

    public int getCurrentWireFormatVersion() {
        return this.getWireFormat().getCurrentWireFormatVersion();
    }
}

