/*
 * Decompiled with CFR 0.152.
 */
package org.activemq.transport.stomp;

import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import java.io.BufferedReader;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.DatagramPacket;
import java.net.ProtocolException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.jms.JMSException;
import org.activeio.Packet;
import org.activeio.command.WireFormat;
import org.activemq.command.ActiveMQBytesMessage;
import org.activemq.command.ActiveMQDestination;
import org.activemq.command.ActiveMQMessage;
import org.activemq.command.ActiveMQTextMessage;
import org.activemq.command.Command;
import org.activemq.command.ConnectionId;
import org.activemq.command.ConnectionInfo;
import org.activemq.command.ConsumerId;
import org.activemq.command.FlushCommand;
import org.activemq.command.LocalTransactionId;
import org.activemq.command.MessageId;
import org.activemq.command.RemoveInfo;
import org.activemq.command.Response;
import org.activemq.command.SessionId;
import org.activemq.command.SessionInfo;
import org.activemq.command.TransactionId;
import org.activemq.transport.stomp.AckListener;
import org.activemq.transport.stomp.AsyncHelper;
import org.activemq.transport.stomp.CommandParser;
import org.activemq.transport.stomp.FrameBuilder;
import org.activemq.transport.stomp.HeaderParser;
import org.activemq.transport.stomp.ResponseListener;
import org.activemq.transport.stomp.Subscription;
import org.activemq.util.IdGenerator;

public class StompWireFormat
implements WireFormat {
    static final IdGenerator PACKET_IDS;
    static final IdGenerator clientIds;
    private static int transactionIdCounter;
    private int version = 1;
    private CommandParser commandParser = new CommandParser(this);
    private HeaderParser headerParser = new HeaderParser();
    private DataInputStream in;
    private String clientId;
    private BlockingQueue pendingReadCommands = new LinkedBlockingQueue();
    private BlockingQueue pendingWriteFrames = new LinkedBlockingQueue();
    private List receiptListeners = new CopyOnWriteArrayList();
    private SessionId sessionId;
    private Map subscriptions = new ConcurrentHashMap();
    private List ackListeners = new CopyOnWriteArrayList();
    private final Map transactions = new ConcurrentHashMap();
    private ConnectionId connectionId;
    static final /* synthetic */ boolean $assertionsDisabled;

    void addResponseListener(ResponseListener listener) {
        this.receiptListeners.add(listener);
    }

    public Command readCommand(DataInput in) throws IOException, JMSException {
        Command pending = (Command)AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn(){

            public Object cycle() throws InterruptedException {
                return StompWireFormat.this.pendingReadCommands.poll(0L, TimeUnit.MILLISECONDS);
            }
        });
        if (pending != null) {
            return pending;
        }
        try {
            return this.commandParser.parse(in);
        }
        catch (ProtocolException e) {
            this.sendError(e.getMessage());
            return FlushCommand.COMMAND;
        }
    }

    public Command writeCommand(Command packet, DataOutput out) throws IOException, JMSException {
        ActiveMQMessage msg;
        this.flushPendingFrames(out);
        if (packet == null) {
            return null;
        }
        if (packet.getDataStructureType() == 30) {
            if (!$assertionsDisabled && !(packet instanceof Response)) {
                throw new AssertionError();
            }
            Response receipt = (Response)packet;
            for (int i = 0; i < this.receiptListeners.size(); ++i) {
                ResponseListener listener = (ResponseListener)this.receiptListeners.get(i);
                if (!listener.onResponse(receipt, out)) continue;
                this.receiptListeners.remove(listener);
                return null;
            }
        }
        if (packet.getDataStructureType() == 28) {
            if (!$assertionsDisabled && !(packet instanceof ActiveMQTextMessage)) {
                throw new AssertionError();
            }
            msg = (ActiveMQTextMessage)packet;
            Subscription sub = (Subscription)this.subscriptions.get(msg.getJMSDestination());
            sub.receive((ActiveMQTextMessage)msg, out);
        } else if (packet.getDataStructureType() == 24) {
            if (!$assertionsDisabled && !(packet instanceof ActiveMQBytesMessage)) {
                throw new AssertionError();
            }
            msg = (ActiveMQBytesMessage)packet;
            Subscription sub = (Subscription)this.subscriptions.get(msg.getJMSDestination());
            sub.receive((ActiveMQBytesMessage)msg, out);
        }
        return null;
    }

    private void flushPendingFrames(DataOutput out) throws IOException {
        boolean interrupted = false;
        do {
            try {
                byte[] frame = (byte[])this.pendingWriteFrames.poll(0L, TimeUnit.MILLISECONDS);
                if (frame == null) {
                    return;
                }
                out.write(frame);
            }
            catch (InterruptedException e) {
                interrupted = true;
            }
        } while (interrupted);
    }

    private void sendError(final String message) {
        AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper(){

            public void cycle() throws InterruptedException {
                StompWireFormat.this.pendingWriteFrames.put((Object)new FrameBuilder("ERROR").addHeader("message", message).toFrame());
            }
        });
    }

    public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn) {
        this.in = dataIn;
    }

    public void initiateServerSideProtocol() throws IOException {
        BufferedReader in = new BufferedReader(new InputStreamReader(this.in));
        String first_line = in.readLine();
        if (!first_line.startsWith("CONNECT")) {
            throw new IOException("First line does not begin with with CONNECT");
        }
        Properties headers = this.headerParser.parse(in);
        String login = headers.getProperty("login");
        String passcode = headers.getProperty("passcode");
        while (in.read() != 0) {
        }
        final ConnectionInfo info = new ConnectionInfo();
        this.clientId = clientIds.generateId();
        this.commandParser.setClientId(this.clientId);
        info.setClientId(this.clientId);
        info.setResponseRequired(true);
        final short commandId = StompWireFormat.generateCommandId();
        info.setCommandId(commandId);
        info.setUserName(login);
        info.setPassword(passcode);
        AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper(){

            public void cycle() throws InterruptedException {
                StompWireFormat.this.pendingReadCommands.put((Object)info);
            }
        });
        this.addResponseListener(new ResponseListener(){

            public boolean onResponse(Response receipt, DataOutput out) {
                if (receipt.getCorrelationId() != commandId) {
                    return false;
                }
                StompWireFormat.this.sessionId = StompWireFormat.this.generateSessionId();
                final SessionInfo info = new SessionInfo();
                info.setCommandId(StompWireFormat.generateCommandId());
                info.setSessionId(StompWireFormat.this.sessionId);
                info.setResponseRequired(true);
                AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper(){

                    public void cycle() throws InterruptedException {
                        StompWireFormat.this.pendingReadCommands.put((Object)info);
                    }
                });
                StompWireFormat.this.addResponseListener(new ResponseListener(){

                    public boolean onResponse(Response receipt, DataOutput out) throws IOException {
                        if (receipt.getCorrelationId() != commandId) {
                            return false;
                        }
                        StringBuffer buffer = new StringBuffer();
                        buffer.append("CONNECTED").append("\n");
                        buffer.append("session").append(":").append(StompWireFormat.this.clientId).append("\n").append("\n");
                        buffer.append("\u0000");
                        out.writeBytes(buffer.toString());
                        return true;
                    }
                });
                return true;
            }
        });
    }

    public WireFormat copy() {
        return new StompWireFormat();
    }

    public void initiateClientSideProtocol() throws IOException {
        throw new UnsupportedOperationException("Not yet implemented!");
    }

    public boolean canProcessWireFormatVersion(int version) {
        return version == this.getCurrentWireFormatVersion();
    }

    public int getCurrentWireFormatVersion() {
        return 1;
    }

    public boolean isCachingEnabled() {
        return false;
    }

    public void setCachingEnabled(boolean enableCaching) {
    }

    public boolean doesSupportMessageFragmentation() {
        return false;
    }

    public boolean doesSupportMessageCompression() {
        return false;
    }

    public DatagramPacket writeCommand(String channelID, Command packet) throws IOException, JMSException {
        throw new UnsupportedOperationException("Will not be implemented");
    }

    public Command fromBytes(byte[] bytes, int offset, int length) throws IOException {
        throw new UnsupportedOperationException("Will not be implemented");
    }

    public Command fromBytes(byte[] bytes) throws IOException {
        throw new UnsupportedOperationException("Will not be implemented");
    }

    public byte[] toBytes(Command packet) throws IOException, JMSException {
        throw new UnsupportedOperationException("Will not be implemented");
    }

    public Command readCommand(int firstByte, DataInput in) throws IOException {
        throw new UnsupportedOperationException("Will not be implemented");
    }

    public Command readCommand(String channelID, DatagramPacket dpacket) throws IOException {
        throw new UnsupportedOperationException("Will not be implemented");
    }

    void clearTransactionId(String user_tx_id) {
        this.transactions.remove(user_tx_id);
    }

    String getClientId() {
        return this.clientId;
    }

    public SessionId getSessionId() {
        return this.sessionId;
    }

    public void addSubscription(Subscription s) {
        if (this.subscriptions.containsKey(s.getDestination())) {
            Subscription old = (Subscription)this.subscriptions.get(s.getDestination());
            RemoveInfo p = old.close();
            this.enqueueCommand(p);
            this.subscriptions.put(s.getDestination(), s);
        } else {
            this.subscriptions.put(s.getDestination(), s);
        }
    }

    public void enqueueCommand(final Command ack) {
        AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper(){

            public void cycle() throws InterruptedException {
                StompWireFormat.this.pendingReadCommands.put((Object)ack);
            }
        });
    }

    public Subscription getSubscriptionFor(ActiveMQDestination destination) {
        return (Subscription)this.subscriptions.get(destination);
    }

    public void addAckListener(AckListener listener) {
        this.ackListeners.add(listener);
    }

    public List getAckListeners() {
        return this.ackListeners;
    }

    public TransactionId getTransactionId(String key) {
        return (TransactionId)this.transactions.get(key);
    }

    public TransactionId registerTransactionId(String user_tx_id, int tx_id) {
        LocalTransactionId transactionId = new LocalTransactionId(this.getConnectionId(), tx_id);
        this.transactions.put(user_tx_id, transactionId);
        return transactionId;
    }

    public int getVersion() {
        return this.version;
    }

    public void setVersion(int version) {
        this.version = version;
    }

    public ConnectionId getConnectionId() {
        return this.connectionId;
    }

    public void setConnectionId(ConnectionId connectionId) {
        this.connectionId = connectionId;
    }

    public static synchronized int generateTransactionId() {
        return ++transactionIdCounter;
    }

    public ConsumerId createConsumerId() {
        throw new RuntimeException("TODO!!");
    }

    public MessageId generateMessageId() {
        throw new RuntimeException("TODO!!");
    }

    public static short generateCommandId() {
        throw new RuntimeException("TODO!!");
    }

    public SessionId generateSessionId() {
        throw new RuntimeException("TODO!!");
    }

    public Packet marshal(Object arg0) throws IOException {
        throw new RuntimeException("TODO!!");
    }

    public Object unmarshal(Packet arg0) throws IOException {
        throw new RuntimeException("TODO!!");
    }

    public void marshal(Object arg0, DataOutputStream arg1) throws IOException {
        throw new RuntimeException("TODO!!");
    }

    public Object unmarshal(DataInputStream arg0) throws IOException {
        throw new RuntimeException("TODO!!");
    }

    static {
        $assertionsDisabled = !StompWireFormat.class.desiredAssertionStatus();
        PACKET_IDS = new IdGenerator();
        clientIds = new IdGenerator();
    }
}

