package org.zbus.mq.server;

import java.io.IOException;
import java.util.AbstractQueue;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.zbus.kit.log.Logger;
import org.zbus.mq.Protocol;
import org.zbus.net.core.Session;
import org.zbus.net.http.Message;

/* loaded from: input_file:org/zbus/mq/server/MQ.class */
public class MQ extends AbstractMQ {
    private static final Logger log = Logger.getLogger((Class<?>) MQ.class);
    protected final Map<String, Session> pullSessions;
    protected final BlockingQueue<PullSession> pullQ;

    public MQ(String str, AbstractQueue<Message> abstractQueue) {
        super(str, abstractQueue);
        this.pullSessions = new ConcurrentHashMap();
        this.pullQ = new LinkedBlockingQueue();
    }

    @Override // org.zbus.mq.server.AbstractMQ
    public void consume(Message message, Session session) throws IOException {
        if (!this.pullSessions.containsKey(session.id())) {
            this.pullSessions.put(session.id(), session);
        }
        for (PullSession pullSession : this.pullQ) {
            if (pullSession.getSession() == session) {
                pullSession.setPullMessage(message);
                dispatch();
                return;
            }
        }
        this.pullQ.offer(new PullSession(session, message));
        dispatch();
    }

    @Override // org.zbus.mq.server.AbstractMQ
    void dispatch() throws IOException {
        Message poll;
        while (this.pullQ.peek() != null && this.msgQ.size() > 0) {
            PullSession poll2 = this.pullQ.poll();
            if (poll2 != null && poll2.getSession().isActive() && (poll = this.msgQ.poll()) != null) {
                this.lastUpdateTime = System.currentTimeMillis();
                try {
                    Message pullMessage = poll2.getPullMessage();
                    Message copyWithoutBody = Message.copyWithoutBody(poll);
                    copyWithoutBody.setRawId(poll.getId());
                    copyWithoutBody.setId(pullMessage.getId());
                    if (copyWithoutBody.getResponseStatus() == null) {
                        copyWithoutBody.setResponseStatus(200);
                    }
                    poll2.getSession().write(copyWithoutBody);
                } catch (IOException e) {
                    log.error(e.getMessage(), e);
                    this.msgQ.offer(poll);
                }
            }
        }
    }

    @Override // org.zbus.mq.server.AbstractMQ
    public void cleanSession(Session session) {
        this.pullSessions.remove(session.id());
        Iterator it = this.pullQ.iterator();
        while (it.hasNext()) {
            if (session == ((PullSession) it.next()).session) {
                it.remove();
                return;
            }
        }
    }

    @Override // org.zbus.mq.server.AbstractMQ
    public void cleanSession() {
        Iterator it = this.pullQ.iterator();
        while (it.hasNext()) {
            PullSession pullSession = (PullSession) it.next();
            if (!pullSession.session.isActive()) {
                this.pullSessions.remove(pullSession.session.id());
                it.remove();
            }
        }
    }

    @Override // org.zbus.mq.server.AbstractMQ
    public Protocol.MqInfo getMqInfo() {
        Protocol.MqInfo mqInfo = new Protocol.MqInfo();
        mqInfo.name = this.name;
        mqInfo.lastUpdateTime = this.lastUpdateTime;
        mqInfo.creator = this.creator;
        mqInfo.mode = Protocol.MqMode.MQ.intValue();
        mqInfo.unconsumedMsgCount = this.msgQ.size();
        mqInfo.consumerCount = this.pullSessions.size();
        mqInfo.consumerInfoList = new ArrayList();
        Iterator it = this.pullQ.iterator();
        while (it.hasNext()) {
            mqInfo.consumerInfoList.add(((PullSession) it.next()).getConsumerInfo());
        }
        return mqInfo;
    }

    public String toString() {
        return "MQ [name=" + this.name + ", creator=" + this.creator + "]";
    }
}
