package org.zbus.mq;

import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import org.zbus.broker.Broker;
import org.zbus.kit.log.Logger;
import org.zbus.mq.Protocol;
import org.zbus.net.http.Message;
import org.zbus.net.http.MessageClient;

/* loaded from: input_file:org/zbus/mq/Consumer.class */
public class Consumer extends MqAdmin implements Closeable {
    private static final Logger log = Logger.getLogger((Class<?>) Consumer.class);
    private MessageClient client;
    private String topic;
    private int consumeTimeout;
    private volatile Thread consumerThread;
    private volatile Message.MessageHandler consumerHandler;
    private final Runnable consumerTask;

    public Consumer(Broker broker, String str, Protocol.MqMode... mqModeArr) {
        super(broker, str, mqModeArr);
        this.topic = null;
        this.consumeTimeout = 10000;
        this.consumerThread = null;
        this.consumerTask = new Runnable() { // from class: org.zbus.mq.Consumer.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        try {
                            Message recv = Consumer.this.recv(Consumer.this.consumeTimeout);
                            if (recv != null) {
                                if (Consumer.this.consumerHandler == null) {
                                    Consumer.log.warn("Missing consumer MessageHandler, call onMessage first");
                                } else {
                                    try {
                                        Consumer.this.consumerHandler.handle(recv, Consumer.this.client.getSession());
                                    } catch (IOException e) {
                                        Consumer.log.error(e.getMessage(), e);
                                    }
                                }
                            }
                        } catch (InterruptedException e2) {
                            return;
                        }
                    } catch (IOException e3) {
                        Consumer.log.error(e3.getMessage(), e3);
                    }
                }
            }
        };
    }

    public Consumer(MqConfig mqConfig) {
        super(mqConfig);
        this.topic = null;
        this.consumeTimeout = 10000;
        this.consumerThread = null;
        this.consumerTask = new Runnable() { // from class: org.zbus.mq.Consumer.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        try {
                            Message recv = Consumer.this.recv(Consumer.this.consumeTimeout);
                            if (recv != null) {
                                if (Consumer.this.consumerHandler == null) {
                                    Consumer.log.warn("Missing consumer MessageHandler, call onMessage first");
                                } else {
                                    try {
                                        Consumer.this.consumerHandler.handle(recv, Consumer.this.client.getSession());
                                    } catch (IOException e) {
                                        Consumer.log.error(e.getMessage(), e);
                                    }
                                }
                            }
                        } catch (InterruptedException e2) {
                            return;
                        }
                    } catch (IOException e3) {
                        Consumer.log.error(e3.getMessage(), e3);
                    }
                }
            }
        };
        this.topic = mqConfig.getTopic();
    }

    private void ensureClient() throws IOException {
        if (this.client == null) {
            synchronized (this) {
                if (this.client == null) {
                    this.client = this.broker.getClient(brokerHint());
                }
            }
        }
    }

    public Message recv(int i) throws IOException, InterruptedException {
        ensureClient();
        Message message = new Message();
        message.setCmd(Protocol.Consume);
        message.setMq(this.mq);
        if (Protocol.MqMode.isEnabled(this.mode, Protocol.MqMode.PubSub) && this.topic != null) {
            message.setTopic(this.topic);
        }
        Message message2 = null;
        try {
            message2 = this.client.invokeSync((MessageClient) message, i);
        } catch (ClosedByInterruptException e) {
            throw new InterruptedException(e.getMessage());
        } catch (IOException e2) {
            log.error(e2.getMessage(), e2);
            try {
                this.broker.closeClient(this.client);
                this.client = this.broker.getClient(brokerHint());
            } catch (IOException e3) {
                log.error(e3.getMessage(), e3);
            }
        }
        if (message2 != null && message2.isStatus404()) {
            if (createMQ()) {
                return recv(i);
            }
            throw new IllegalStateException("register error");
        }
        if (message2 != null) {
            message2.setId(message2.getRawId());
            message2.removeHead(Message.RAWID);
        }
        return message2;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        stop();
        if (this.client != null) {
            this.broker.closeClient(this.client);
        }
    }

    @Override // org.zbus.mq.MqAdmin
    protected Message invokeCreateMQ(Message message) throws IOException, InterruptedException {
        ensureClient();
        return this.client.invokeSync(message);
    }

    public void routeMessage(Message message) throws IOException {
        ensureClient();
        message.setCmd(Protocol.Route);
        message.setAck(false);
        this.client.send((MessageClient) message);
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String str) {
        if (!Protocol.MqMode.isEnabled(this.mode, Protocol.MqMode.PubSub)) {
            throw new IllegalStateException("topic require PubSub mode");
        }
        this.topic = str;
    }

    public void onMessage(Message.MessageHandler messageHandler) throws IOException {
        this.consumerHandler = messageHandler;
    }

    public void stop() {
        if (this.consumerThread != null) {
            this.consumerThread.interrupt();
            this.consumerThread = null;
        }
        try {
            if (this.client != null) {
                this.broker.closeClient(this.client);
            }
            this.client = null;
        } catch (IOException e) {
            log.error(e.getMessage(), e);
        }
    }

    public void start() {
        if (this.consumerThread == null) {
            this.consumerThread = new Thread(this.consumerTask);
        }
        if (this.consumerThread.isAlive()) {
            return;
        }
        this.consumerThread.start();
    }

    public void setConsumeTimeout(int i) {
        this.consumeTimeout = i;
    }
}
