package org.apache.qpidity;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.qpidity.transport.ConnectionDelegate;
import org.apache.qpidity.transport.Data;
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.Header;
import org.apache.qpidity.transport.MessageAcquire;
import org.apache.qpidity.transport.MessageFlow;
import org.apache.qpidity.transport.MessageFlush;
import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.transport.MessageSubscribe;
import org.apache.qpidity.transport.MessageTransfer;
import org.apache.qpidity.transport.QueueBind;
import org.apache.qpidity.transport.QueueDeclare;
import org.apache.qpidity.transport.QueueQuery;
import org.apache.qpidity.transport.QueueQueryResult;
import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.transport.Session;
import org.apache.qpidity.transport.SessionDelegate;
import org.apache.qpidity.transport.Struct;
import org.apache.qpidity.transport.network.mina.MinaHandler;
import org.apache.qpidity.transport.util.Functions;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/qpid-common-1.0-incubating-M3-615355.jar:org/apache/qpidity/ToyBroker.class */
public class ToyBroker extends SessionDelegate {
    private ToyExchange exchange;
    private MessageTransfer xfr = null;
    private DeliveryProperties props = null;
    private Header header = null;
    private List<Data> body = null;
    private Map<String, Consumer> consumers = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/qpid-common-1.0-incubating-M3-615355.jar:org/apache/qpidity/ToyBroker$Consumer.class */
    public class Consumer {
        long _credit;
        String _queueName;

        private Consumer() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/qpid-common-1.0-incubating-M3-615355.jar:org/apache/qpidity/ToyBroker$Message.class */
    public class Message {
        private final Header header;
        private final List<Data> body;

        public Message(Header header, List<Data> list) {
            this.header = header;
            this.body = list;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            if (this.header != null) {
                boolean z = true;
                for (Struct struct : this.header.getStructs()) {
                    if (z) {
                        z = false;
                    } else {
                        sb.append(" ");
                    }
                    sb.append(struct);
                }
            }
            Iterator<Data> it = this.body.iterator();
            while (it.hasNext()) {
                for (ByteBuffer byteBuffer : it.next().getFragments()) {
                    sb.append(" | ");
                    sb.append(Functions.str(byteBuffer));
                }
            }
            return sb.toString();
        }
    }

    public ToyBroker(ToyExchange toyExchange) {
        this.exchange = toyExchange;
    }

    @Override // org.apache.qpidity.transport.MethodDelegate
    public void messageAcquire(Session session, MessageAcquire messageAcquire) {
        System.out.println("\n==================> messageAcquire ");
        session.messageAcquired(messageAcquire.getTransfers());
    }

    @Override // org.apache.qpidity.transport.MethodDelegate
    public void queueDeclare(Session session, QueueDeclare queueDeclare) {
        this.exchange.createQueue(queueDeclare.getQueue());
        System.out.println("\n==================> declared queue: " + queueDeclare.getQueue() + "\n");
    }

    @Override // org.apache.qpidity.transport.MethodDelegate
    public void queueBind(Session session, QueueBind queueBind) {
        this.exchange.bindQueue(queueBind.getExchange(), queueBind.getRoutingKey(), queueBind.getQueue());
        System.out.println("\n==================> bound queue: " + queueBind.getQueue() + " with routing key " + queueBind.getRoutingKey() + "\n");
    }

    @Override // org.apache.qpidity.transport.MethodDelegate
    public void queueQuery(Session session, QueueQuery queueQuery) {
        session.executionResult(queueQuery.getId(), new QueueQueryResult().queue(queueQuery.getQueue()));
    }

    @Override // org.apache.qpidity.transport.MethodDelegate
    public void messageSubscribe(Session session, MessageSubscribe messageSubscribe) {
        Consumer consumer = new Consumer();
        consumer._queueName = messageSubscribe.getQueue();
        this.consumers.put(messageSubscribe.getDestination(), consumer);
        System.out.println("\n==================> message subscribe : " + messageSubscribe.getDestination() + " queue: " + messageSubscribe.getQueue() + "\n");
    }

    @Override // org.apache.qpidity.transport.MethodDelegate
    public void messageFlow(Session session, MessageFlow messageFlow) {
        this.consumers.get(messageFlow.getDestination())._credit = messageFlow.getValue();
        System.out.println("\n==================> message flow : " + messageFlow.getDestination() + " credit: " + messageFlow.getValue() + "\n");
    }

    @Override // org.apache.qpidity.transport.MethodDelegate
    public void messageFlush(Session session, MessageFlush messageFlush) {
        System.out.println("\n==================> message flush for consumer : " + messageFlush.getDestination() + "\n");
        checkAndSendMessagesToConsumer(session, messageFlush.getDestination());
    }

    @Override // org.apache.qpidity.transport.MethodDelegate
    public void messageTransfer(Session session, MessageTransfer messageTransfer) {
        this.xfr = messageTransfer;
        this.body = new ArrayList();
        System.out.println("received transfer " + messageTransfer.getDestination());
    }

    @Override // org.apache.qpidity.transport.SessionDelegate, org.apache.qpidity.transport.ProtocolDelegate
    public void header(Session session, Header header) {
        if (this.xfr == null || this.body == null) {
            session.connectionClose(503, "no method segment", 0, 0);
            session.close();
            return;
        }
        this.props = (DeliveryProperties) header.get(DeliveryProperties.class);
        if (this.props != null) {
            System.out.println("received headers routing_key " + this.props.getRoutingKey());
        }
        MessageProperties messageProperties = (MessageProperties) header.get(MessageProperties.class);
        System.out.println("MP: " + messageProperties);
        if (messageProperties != null) {
            System.out.println(messageProperties.getApplicationHeaders());
        }
        this.header = header;
    }

    @Override // org.apache.qpidity.transport.SessionDelegate, org.apache.qpidity.transport.ProtocolDelegate
    public void data(Session session, Data data) {
        if (this.xfr == null || this.body == null) {
            session.connectionClose(503, "no method segment", 0, 0);
            session.close();
            return;
        }
        this.body.add(data);
        if (data.isLast()) {
            String destination = this.xfr.getDestination();
            Message message = new Message(this.header, this.body);
            if (this.exchange.route(destination, this.props.getRoutingKey(), message)) {
                System.out.println("queued " + message);
                dispatchMessages(session);
            } else {
                reject(session);
            }
            session.processed(this.xfr);
            this.xfr = null;
            this.body = null;
        }
    }

    private void reject(Session session) {
        if (this.props == null || !this.props.getDiscardUnroutable()) {
            RangeSet rangeSet = new RangeSet();
            rangeSet.add(this.xfr.getId());
            session.messageReject(rangeSet, 0, "no such destination");
        }
    }

    private void transferMessageToPeer(Session session, String str, Message message) {
        System.out.println("\n==================> Transfering message to: " + str + "\n");
        session.messageTransfer(str, (short) 0, (short) 0);
        session.header(message.header);
        Iterator it = message.body.iterator();
        while (it.hasNext()) {
            Iterator<ByteBuffer> it2 = ((Data) it.next()).getFragments().iterator();
            while (it2.hasNext()) {
                session.data(it2.next());
            }
        }
        session.endData();
    }

    private void dispatchMessages(Session session) {
        Iterator<String> it = this.consumers.keySet().iterator();
        while (it.hasNext()) {
            checkAndSendMessagesToConsumer(session, it.next());
        }
    }

    private void checkAndSendMessagesToConsumer(Session session, String str) {
        Consumer consumer = this.consumers.get(str);
        LinkedBlockingQueue<Message> queue = this.exchange.getQueue(consumer._queueName);
        Message poll = queue.poll();
        while (true) {
            Message message = poll;
            if (message == null || consumer._credit <= 0) {
                return;
            }
            transferMessageToPeer(session, str, message);
            consumer._credit--;
            poll = queue.poll();
        }
    }

    public static final void main(String[] strArr) throws IOException {
        final ToyExchange toyExchange = new ToyExchange();
        ConnectionDelegate connectionDelegate = new ConnectionDelegate() { // from class: org.apache.qpidity.ToyBroker.1
            @Override // org.apache.qpidity.transport.ConnectionDelegate
            public SessionDelegate getSessionDelegate() {
                return new ToyBroker(ToyExchange.this);
            }
        };
        connectionDelegate.setUsername("guest");
        connectionDelegate.setPassword("guest");
        MinaHandler.accept("0.0.0.0", org.apache.qpid.jms.BrokerDetails.DEFAULT_PORT, connectionDelegate);
    }
}
