/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.transport.stomp;

import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.DatagramPacket;
import java.net.ProtocolException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.jms.JMSException;
import org.activemq.io.WireFormat;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.ActiveMQTextMessage;
import org.activemq.message.ConnectionInfo;
import org.activemq.message.ConsumerInfo;
import org.activemq.message.Packet;
import org.activemq.message.Receipt;
import org.activemq.message.SessionInfo;
import org.activemq.transport.stomp.AckListener;
import org.activemq.transport.stomp.AsyncHelper;
import org.activemq.transport.stomp.CommandParser;
import org.activemq.transport.stomp.FlushPacket;
import org.activemq.transport.stomp.FrameBuilder;
import org.activemq.transport.stomp.HeaderParser;
import org.activemq.transport.stomp.ReceiptListener;
import org.activemq.transport.stomp.Stomp;
import org.activemq.transport.stomp.Subscription;
import org.activemq.util.IdGenerator;

public class StompWireFormat
implements WireFormat {
    static final IdGenerator PACKET_IDS;
    static final IdGenerator clientIds;
    private CommandParser commandParser = new CommandParser(this);
    private HeaderParser headerParser = new HeaderParser();
    private DataInputStream in;
    private String clientId;
    private Channel pendingReadPackets = new LinkedQueue();
    private Channel pendingWriteFrames = new LinkedQueue();
    private List receiptListeners = new CopyOnWriteArrayList();
    private short sessionId;
    private Map subscriptions = new ConcurrentHashMap();
    private List ackListeners = new CopyOnWriteArrayList();
    private final Map transactions = new ConcurrentHashMap();
    static final /* synthetic */ boolean $assertionsDisabled;

    void addReceiptListener(ReceiptListener listener) {
        this.receiptListeners.add(listener);
    }

    public Packet readPacket(DataInput in) throws IOException {
        Packet pending = (Packet)AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn(){

            public Object cycle() throws InterruptedException {
                return StompWireFormat.this.pendingReadPackets.poll(0L);
            }
        });
        if (pending != null) {
            return pending;
        }
        try {
            return this.commandParser.parse(in);
        }
        catch (ProtocolException e) {
            this.sendError(e.getMessage());
            return FlushPacket.PACKET;
        }
    }

    public Packet writePacket(Packet packet, DataOutput out) throws IOException, JMSException {
        this.flushPendingFrames(out);
        if (packet == null) {
            return null;
        }
        if (packet.getPacketType() == 16) {
            if (!$assertionsDisabled && !(packet instanceof Receipt)) {
                throw new AssertionError();
            }
            Receipt receipt = (Receipt)packet;
            for (int i = 0; i < this.receiptListeners.size(); ++i) {
                ReceiptListener listener = (ReceiptListener)this.receiptListeners.get(i);
                if (!listener.onReceipt(receipt, out)) continue;
                this.receiptListeners.remove(listener);
                return null;
            }
        }
        if (packet.getPacketType() == 7) {
            if (!$assertionsDisabled && !(packet instanceof ActiveMQTextMessage)) {
                throw new AssertionError();
            }
            ActiveMQTextMessage msg = (ActiveMQTextMessage)packet;
            Subscription sub = (Subscription)this.subscriptions.get(msg.getJMSDestination());
            sub.receive(msg, out);
        }
        return null;
    }

    private void flushPendingFrames(DataOutput out) throws IOException {
        boolean interrupted = false;
        do {
            try {
                String frame = (String)this.pendingWriteFrames.poll(0L);
                if (frame == null) {
                    return;
                }
                out.writeBytes(frame);
            }
            catch (InterruptedException e) {
                interrupted = true;
            }
        } while (interrupted);
    }

    private void sendError(final String message) {
        AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper(){

            public void cycle() throws InterruptedException {
                StompWireFormat.this.pendingWriteFrames.put((Object)new FrameBuilder("ERROR").addHeader("message", message).toFrame());
            }
        });
    }

    public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn) {
        this.in = dataIn;
    }

    public void initiateServerSideProtocol() throws IOException {
        BufferedReader in = new BufferedReader(new InputStreamReader(this.in));
        String first_line = in.readLine();
        if (!first_line.startsWith("CONNECT")) {
            throw new IOException("First line does not begin with with CONNECT");
        }
        Properties headers = this.headerParser.parse(in);
        String login = headers.getProperty("login");
        String passcode = headers.getProperty("passcode");
        while (in.read() != 0) {
        }
        final ConnectionInfo info = new ConnectionInfo();
        final Short packet_id = new Short(PACKET_IDS.getNextShortSequence());
        this.clientId = clientIds.generateId();
        this.commandParser.setClientId(this.clientId);
        info.setClientId(this.clientId);
        info.setReceiptRequired(true);
        info.setClientVersion(Integer.toString(this.getCurrentWireFormatVersion()));
        info.setClosed(false);
        info.setHostName("ttmp.fake.host.name");
        info.setId(packet_id);
        info.setUserName(login);
        info.setPassword(passcode);
        info.setStartTime(System.currentTimeMillis());
        info.setStarted(true);
        AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper(){

            public void cycle() throws InterruptedException {
                StompWireFormat.this.pendingReadPackets.put((Object)info);
            }
        });
        this.addReceiptListener(new ReceiptListener(){

            public boolean onReceipt(Receipt receipt, DataOutput out) {
                if (receipt.getCorrelationId() != packet_id.shortValue()) {
                    return false;
                }
                Short session_packet_id = new Short(PACKET_IDS.getNextShortSequence());
                StompWireFormat.this.sessionId = clientIds.getNextShortSequence();
                SessionInfo info = new SessionInfo();
                info.setStartTime(System.currentTimeMillis());
                info.setId(session_packet_id);
                info.setClientId(StompWireFormat.this.clientId);
                info.setSessionId(StompWireFormat.this.sessionId);
                info.setStarted(true);
                info.setSessionMode(1);
                info.setReceiptRequired(true);
                AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper(this, info){
                    private final /* synthetic */ SessionInfo val$info;
                    private final /* synthetic */ 4 this$1;
                    {
                        this.this$1 = this$1;
                        this.val$info = val$info;
                    }

                    public void cycle() throws InterruptedException {
                        StompWireFormat.access$000(4.access$400(this.this$1)).put((Object)this.val$info);
                    }
                });
                StompWireFormat.this.addReceiptListener(new ReceiptListener(this, session_packet_id){
                    private final /* synthetic */ Short val$session_packet_id;
                    private final /* synthetic */ 4 this$1;
                    {
                        this.this$1 = this$1;
                        this.val$session_packet_id = val$session_packet_id;
                    }

                    public boolean onReceipt(Receipt receipt, DataOutput out) throws IOException {
                        if (receipt.getCorrelationId() != this.val$session_packet_id.shortValue()) {
                            return false;
                        }
                        StringBuffer buffer = new StringBuffer();
                        buffer.append("CONNECTED").append(Stomp.NEWLINE);
                        buffer.append("session").append(":").append(StompWireFormat.access$300(4.access$400(this.this$1))).append(Stomp.NEWLINE).append(Stomp.NEWLINE);
                        buffer.append("\u0000");
                        out.writeBytes(buffer.toString());
                        return true;
                    }
                });
                return true;
            }

            static /* synthetic */ StompWireFormat access$400(4 x0) {
                return x0.StompWireFormat.this;
            }
        });
    }

    public WireFormat copy() {
        return new StompWireFormat();
    }

    public void initiateClientSideProtocol() throws IOException {
        throw new UnsupportedOperationException("Not yet implemented!");
    }

    public boolean canProcessWireFormatVersion(int version) {
        return version == this.getCurrentWireFormatVersion();
    }

    public int getCurrentWireFormatVersion() {
        return 1;
    }

    public boolean isCachingEnabled() {
        return false;
    }

    public void setCachingEnabled(boolean enableCaching) {
    }

    public boolean doesSupportMessageFragmentation() {
        return false;
    }

    public boolean doesSupportMessageCompression() {
        return false;
    }

    public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException {
        throw new UnsupportedOperationException("Will not be implemented");
    }

    public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException {
        throw new UnsupportedOperationException("Will not be implemented");
    }

    public Packet fromBytes(byte[] bytes) throws IOException {
        throw new UnsupportedOperationException("Will not be implemented");
    }

    public byte[] toBytes(Packet packet) throws IOException, JMSException {
        throw new UnsupportedOperationException("Will not be implemented");
    }

    public Packet readPacket(int firstByte, DataInput in) throws IOException {
        throw new UnsupportedOperationException("Will not be implemented");
    }

    public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException {
        throw new UnsupportedOperationException("Will not be implemented");
    }

    void clearTransactionId(String user_tx_id) {
        this.transactions.remove(user_tx_id);
    }

    String getClientId() {
        return this.clientId;
    }

    public short getSessionId() {
        return this.sessionId;
    }

    public void addSubscription(Subscription s) {
        if (this.subscriptions.containsKey(s.getDestination())) {
            Subscription old = (Subscription)this.subscriptions.get(s.getDestination());
            ConsumerInfo p = old.close();
            this.enqueuePacket(p);
            this.subscriptions.put(s.getDestination(), s);
        } else {
            this.subscriptions.put(s.getDestination(), s);
        }
    }

    public void enqueuePacket(final Packet ack) {
        AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper(){

            public void cycle() throws InterruptedException {
                StompWireFormat.this.pendingReadPackets.put((Object)ack);
            }
        });
    }

    public Subscription getSubscriptionFor(ActiveMQDestination destination) {
        return (Subscription)this.subscriptions.get(destination);
    }

    public void addAckListener(AckListener listener) {
        this.ackListeners.add(listener);
    }

    public List getAckListeners() {
        return this.ackListeners;
    }

    public String getTransactionId(String key) {
        return (String)this.transactions.get(key);
    }

    public void registerTransactionId(String user_tx_id, String tx_id) {
        this.transactions.put(user_tx_id, tx_id);
    }

    public boolean isInDefaultTransaction() {
        return this.transactions.containsKey("  ~~!  DEFAULT TRANSACTION !~~  ");
    }

    static {
        $assertionsDisabled = !StompWireFormat.class.desiredAssertionStatus();
        PACKET_IDS = new IdGenerator();
        clientIds = new IdGenerator();
    }
}

