package wang.moshu.message;

import javax.annotation.PostConstruct;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskRejectedException;

/* loaded from: input_file:wang/moshu/message/AbstarctMessageHandler.class */
public abstract class AbstarctMessageHandler<T> implements Runnable {
    private static Log logger = LogFactory.getLog(AbstarctMessageHandler.class);

    @Autowired
    protected MessageTrunk messageTrunk;

    @Autowired
    private RedisUtil redisUtil;
    private String messageType;
    private Class<T> clazz;
    private boolean monitor;
    private int retryTimes;

    @PostConstruct
    public void startListen() {
        new Thread(this).start();
    }

    public AbstarctMessageHandler(String str, Class<T> cls, int i) {
        this.retryTimes = 3;
        this.messageType = str;
        this.clazz = cls;
        this.retryTimes = 3;
    }

    public AbstarctMessageHandler(String str, Class<T> cls) {
        this.retryTimes = 3;
        this.messageType = str;
        this.clazz = cls;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            listen();
        }
    }

    public void listen() {
        final Object blpop = this.redisUtil.blpop(this.messageType, Integer.MAX_VALUE, Message.class);
        if (blpop == null) {
            this.monitor = false;
            logger.warn("消息分发器获取redis连接失败");
            try {
                Thread.sleep(5000L);
                return;
            } catch (InterruptedException e) {
                logger.warn("消息分发器线程暂停失败");
                return;
            }
        }
        if (!this.monitor) {
            logger.warn("消息分发开始");
            this.monitor = true;
        }
        try {
            this.messageTrunk.getThreadPool().submit(new Runnable() { // from class: wang.moshu.message.AbstarctMessageHandler.1
                /* JADX WARN: Multi-variable type inference failed */
                @Override // java.lang.Runnable
                public void run() {
                    Message message = (Message) blpop;
                    try {
                        AbstarctMessageHandler.this.handle(message.getContent());
                    } catch (Exception e2) {
                        AbstarctMessageHandler.logger.error(e2);
                        if (message.getFailTimes().intValue() >= AbstarctMessageHandler.this.retryTimes) {
                            AbstarctMessageHandler.this.handleFailed(message.getContent());
                            return;
                        }
                        message.setFailTimes(Integer.valueOf(message.getFailTimes().intValue() + 1));
                        AbstarctMessageHandler.this.messageTrunk.put(message);
                        if (AbstarctMessageHandler.logger.isDebugEnabled()) {
                            StringBuilder sb = new StringBuilder();
                            sb.append("msg:[").append(message).append("], 执行失败，准备重试。");
                            AbstarctMessageHandler.logger.debug(sb.toString());
                        }
                    }
                }
            });
        } catch (TaskRejectedException e2) {
            logger.warn("线程池已满，准备回写任务，暂停本线程");
            this.messageTrunk.put((Message) blpop);
            try {
                Thread.sleep(this.messageTrunk.getThreadPoolFullSleepSeconds() * 1000);
            } catch (InterruptedException e3) {
                logger.warn("生产者暂停异常", e2);
            }
        } catch (Exception e4) {
            logger.error("消息总线发生异常", e4);
        }
    }

    public abstract void handle(T t);

    public abstract void handleFailed(T t);
}
