/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.amqp_1_0.framing;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
import org.apache.qpid.amqp_1_0.codec.ProtocolHeaderHandler;
import org.apache.qpid.amqp_1_0.codec.ValueHandler;
import org.apache.qpid.amqp_1_0.codec.ValueWriter;
import org.apache.qpid.amqp_1_0.framing.AMQFrame;
import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
import org.apache.qpid.amqp_1_0.framing.SocketExceptionHandler;
import org.apache.qpid.amqp_1_0.framing.TransportFrame;
import org.apache.qpid.amqp_1_0.transport.BytesProcessor;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
import org.apache.qpid.amqp_1_0.type.AmqpErrorException;
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.UnsignedShort;
import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.amqp_1_0.type.transport.Open;

public class ConnectionHandler {
    private final ConnectionEndpoint _connection;
    private ProtocolHandler _delegate;
    private static final Logger FRAME_LOGGER = Logger.getLogger("FRM");
    private static final Logger RAW_LOGGER = Logger.getLogger("RAW");

    public ConnectionHandler(ConnectionEndpoint connection) {
        this._connection = connection;
        this._delegate = new ProtocolHeaderHandler(connection);
    }

    public boolean parse(ByteBuffer in) {
        while (in.hasRemaining() && !this.isDone()) {
            this._delegate = this._delegate.parse(in);
        }
        return this.isDone();
    }

    public boolean isDone() {
        return this._delegate.isDone();
    }

    public static void main(String[] args) throws AmqpErrorException {
        byte[] buffer = new byte[76];
        ByteBuffer buf = ByteBuffer.wrap(buffer);
        AMQPDescribedTypeRegistry registry = AMQPDescribedTypeRegistry.newInstance().registerTransportLayer().registerMessagingLayer().registerTransactionLayer();
        Open open = new Open();
        open.setContainerId("venture");
        open.setChannelMax(UnsignedShort.valueOf((short)10));
        open.setHostname("foo");
        open.setOfferedCapabilities(new Symbol[]{Symbol.valueOf("one"), Symbol.valueOf("two"), Symbol.valueOf("three")});
        ValueWriter<Open> writer = registry.getValueWriter(open);
        System.out.println("------ Encode (time in ms for 1 million opens)");
        Long myLong = 32L;
        ValueWriter<Long> writer2 = registry.getValueWriter(myLong);
        Double myDouble = 3.14159265359;
        ValueWriter<Double> writer3 = registry.getValueWriter(myDouble);
        for (int n = 0; n < 1; ++n) {
            long startTime = System.currentTimeMillis();
            for (int i = 1; i != 0; --i) {
                buf.position(0);
                writer.setValue(open);
                writer.writeToBuffer(buf);
                writer2.setValue(myLong);
                writer.writeToBuffer(buf);
                writer3.setValue(myDouble);
                writer3.writeToBuffer(buf);
            }
            long midTime = System.currentTimeMillis();
            System.out.println(midTime - startTime);
        }
        ValueHandler handler = new ValueHandler(registry);
        System.out.println("------ Decode (time in ms for 1 million opens)");
        for (int n = 0; n < 100; ++n) {
            long startTime = System.currentTimeMillis();
            for (int i = 1000000; i != 0; --i) {
                buf.flip();
                handler.parse(buf);
                handler.parse(buf);
                handler.parse(buf);
            }
            long midTime = System.currentTimeMillis();
            System.out.println(midTime - startTime);
        }
    }

    public static class OutputHandler
    implements Runnable {
        private final OutputStream _outputStream;
        private FrameSource _frameSource;
        private static final int BUF_SIZE = 65536;
        private ValueWriter.Registry _registry;

        public OutputHandler(OutputStream outputStream, FrameSource source, ValueWriter.Registry registry) {
            this._outputStream = outputStream;
            this._frameSource = source;
            this._registry = registry;
        }

        @Override
        public void run() {
            boolean i = false;
            try {
                byte[] buffer = new byte[65536];
                ByteBuffer buf = ByteBuffer.wrap(buffer);
                buf.put((byte)65);
                buf.put((byte)77);
                buf.put((byte)81);
                buf.put((byte)80);
                buf.put((byte)0);
                buf.put((byte)1);
                buf.put((byte)0);
                buf.put((byte)0);
                FrameSource frameSource = this._frameSource;
                FrameWriter writer = new FrameWriter(this._registry);
                while (!frameSource.closed()) {
                    AMQFrame frame;
                    if (!writer.isComplete()) {
                        writer.writeToBuffer(buf);
                    }
                    while (buf.hasRemaining() && (frame = frameSource.getNextFrame(buf.position() == 0)) != null) {
                        writer.setValue(frame);
                        int size = writer.writeToBuffer(buf);
                    }
                    if (buf.limit() == 0) continue;
                    this._outputStream.write(buffer, 0, buf.position());
                    buf.clear();
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static class BytesOutputHandler
    implements Runnable,
    BytesProcessor {
        private final OutputStream _outputStream;
        private BytesSource _bytesSource;
        private boolean _closed;
        private ConnectionEndpoint _conn;
        private SocketExceptionHandler _exceptionHandler;

        public BytesOutputHandler(OutputStream outputStream, BytesSource source, ConnectionEndpoint conn, SocketExceptionHandler exceptionHandler) {
            this._outputStream = outputStream;
            this._bytesSource = source;
            this._conn = conn;
            this._exceptionHandler = exceptionHandler;
        }

        @Override
        public void run() {
            BytesSource bytesSource = this._bytesSource;
            while (!this._closed && !bytesSource.closed()) {
                this._bytesSource.getBytes(this, true);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void processBytes(ByteBuffer buf) {
            try {
                Object bin;
                if (RAW_LOGGER.isLoggable(Level.FINE)) {
                    bin = new Binary(buf.array(), buf.arrayOffset() + buf.position(), buf.limit() - buf.position());
                    RAW_LOGGER.fine("SEND[" + this._conn.getRemoteAddress() + "] : " + ((Binary)bin).toString());
                }
                bin = this._outputStream;
                synchronized (bin) {
                    this._outputStream.write(buf.array(), buf.arrayOffset() + buf.position(), buf.limit() - buf.position());
                }
                buf.position(buf.limit());
            }
            catch (IOException e) {
                this._closed = true;
                this._exceptionHandler.processSocketException(e);
            }
        }
    }

    public static class SequentialBytesSource
    implements BytesSource {
        private Queue<BytesSource> _sources = new LinkedList<BytesSource>();

        public SequentialBytesSource(BytesSource ... sources) {
            this._sources.addAll(Arrays.asList(sources));
        }

        public synchronized void addSource(BytesSource source) {
            this._sources.add(source);
        }

        @Override
        public synchronized void getBytes(BytesProcessor processor, boolean wait) {
            BytesSource src = this._sources.peek();
            while (src != null && src.closed()) {
                this._sources.poll();
                src = this._sources.peek();
            }
            if (src != null) {
                src.getBytes(processor, wait);
            }
        }

        @Override
        public boolean closed() {
            return this._sources.isEmpty();
        }
    }

    public static class HeaderBytesSource
    implements BytesSource {
        private final ByteBuffer _buffer;
        private ConnectionEndpoint _conn;

        public HeaderBytesSource(ConnectionEndpoint conn, byte ... headerBytes) {
            this._conn = conn;
            this._buffer = ByteBuffer.wrap(headerBytes);
        }

        @Override
        public void getBytes(BytesProcessor processor, boolean wait) {
            processor.processBytes(this._buffer);
        }

        @Override
        public boolean closed() {
            return !this._buffer.hasRemaining();
        }
    }

    public static class FrameToBytesSourceAdapter
    implements BytesSource {
        private final FrameSource _frameSource;
        private final FrameWriter _writer;
        private static final int BUF_SIZE = 65536;
        private final byte[] _bytes = new byte[65536];
        private final ByteBuffer _buffer = ByteBuffer.wrap(this._bytes);

        public FrameToBytesSourceAdapter(FrameSource frameSource, ValueWriter.Registry registry) {
            this._frameSource = frameSource;
            this._writer = new FrameWriter(registry);
        }

        @Override
        public void getBytes(BytesProcessor processor, boolean wait) {
            if (this._buffer.position() == 0 && !this._frameSource.closed()) {
                AMQFrame frame;
                if (!this._writer.isComplete()) {
                    this._writer.writeToBuffer(this._buffer);
                }
                while (this._buffer.hasRemaining() && (frame = this._frameSource.getNextFrame(wait && this._buffer.position() == 0)) != null) {
                    this._writer.setValue(frame);
                    try {
                        this._writer.writeToBuffer(this._buffer);
                    }
                    catch (RuntimeException e) {
                        e.printStackTrace();
                        throw e;
                    }
                    catch (Error e) {
                        e.printStackTrace();
                        throw e;
                    }
                }
                this._buffer.flip();
            }
            if (this._buffer.limit() != 0) {
                processor.processBytes(this._buffer);
                if (this._buffer.remaining() == 0) {
                    this._buffer.clear();
                }
            }
        }

        @Override
        public boolean closed() {
            return this._buffer.position() == 0 && this._frameSource.closed();
        }
    }

    public static interface BytesSource {
        public void getBytes(BytesProcessor var1, boolean var2);

        public boolean closed();
    }

    public static interface FrameSource<T> {
        public AMQFrame<T> getNextFrame(boolean var1);

        public boolean closed();
    }

    public static class FrameOutput<T>
    implements FrameOutputHandler<T>,
    FrameSource {
        private static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.wrap(new byte[0]);
        private final BlockingQueue<AMQFrame<T>> _queue = new ArrayBlockingQueue<AMQFrame<T>>(100);
        private ConnectionEndpoint _conn;
        private final AMQFrame<T> _endOfFrameMarker = new AMQFrame<T>((Object)null){

            @Override
            public short getChannel() {
                throw new UnsupportedOperationException();
            }

            @Override
            public byte getFrameType() {
                throw new UnsupportedOperationException();
            }
        };
        private boolean _setForClose;
        private boolean _closed;
        private long _nextHeartbeat;

        public FrameOutput(ConnectionEndpoint conn) {
            this._conn = conn;
        }

        @Override
        public boolean canSend() {
            return this._queue.remainingCapacity() != 0;
        }

        @Override
        public void send(AMQFrame<T> frame) {
            this.send(frame, null);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void send(AMQFrame<T> frame, ByteBuffer payload) {
            Object object = this._conn.getLock();
            synchronized (object) {
                try {
                    int size = this._conn.getDescribedTypeRegistry().getValueWriter(frame.getFrameBody()).writeToBuffer(EMPTY_BYTEBUFFER) + 8;
                    if (size > this._conn.getMaxFrameSize()) {
                        throw new OversizeFrameException(frame, size);
                    }
                    while (!this._queue.offer(frame)) {
                        this._conn.getLock().wait(1000L);
                    }
                    this._conn.getLock().notifyAll();
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            Object object = this._conn.getLock();
            synchronized (object) {
                if (!this._queue.offer(this._endOfFrameMarker)) {
                    this._setForClose = true;
                }
                this._conn.getLock().notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public AMQFrame<T> getNextFrame(boolean wait) {
            Object object = this._conn.getLock();
            synchronized (object) {
                long time = System.currentTimeMillis();
                try {
                    AMQFrame frame = null;
                    while (!this.closed() && (frame = (AMQFrame)this._queue.poll()) == null && wait) {
                        this._conn.getLock().wait(this._conn.getIdleTimeout() / 2L);
                        if (this._conn.getIdleTimeout() <= 0L) continue;
                        time = System.currentTimeMillis();
                        if (frame != null || time <= this._nextHeartbeat) continue;
                        frame = new TransportFrame(0, null);
                        break;
                    }
                    if (frame != null) {
                        this._nextHeartbeat = time + this._conn.getIdleTimeout() / 2L;
                    }
                    if (frame == this._endOfFrameMarker) {
                        this._closed = true;
                        frame = null;
                    } else if (this._setForClose && frame != null) {
                        boolean bl = this._setForClose = !this._queue.offer(this._endOfFrameMarker);
                    }
                    if (frame != null && FRAME_LOGGER.isLoggable(Level.FINE)) {
                        FRAME_LOGGER.fine("SEND[" + this._conn.getRemoteAddress() + "|" + frame.getChannel() + "] : " + frame.getFrameBody());
                    }
                    this._conn.getLock().notifyAll();
                    return frame;
                }
                catch (InterruptedException e) {
                    this._conn.setClosedForOutput(true);
                    e.printStackTrace();
                    return null;
                }
            }
        }

        @Override
        public boolean closed() {
            return this._closed;
        }
    }
}

