package net.oschina.j2cache.cluster;

import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import net.oschina.j2cache.CacheProviderHolder;
import net.oschina.j2cache.Command;
import net.oschina.j2cache.ehcache.EhCacheProvider;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/oschina/j2cache/cluster/RocketMQClusterPolicy.class */
public class RocketMQClusterPolicy implements ClusterPolicy, MessageListenerConcurrently {
    private static final Logger log = LoggerFactory.getLogger(RocketMQClusterPolicy.class);
    private CacheProviderHolder holder;
    private String hosts;
    private String topic;
    private DefaultMQProducer producer;
    private DefaultMQPushConsumer consumer;

    public RocketMQClusterPolicy(Properties properties) {
        this.hosts = properties.getProperty("hosts");
        String property = properties.getProperty(EhCacheProvider.KEY_EHCACHE_NAME, "j2cache");
        this.topic = properties.getProperty("topic", "j2cache");
        this.producer = new DefaultMQProducer(property);
        this.producer.setNamesrvAddr(this.hosts);
        this.consumer = new DefaultMQPushConsumer(property);
        this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        this.consumer.setNamesrvAddr(this.hosts);
        this.consumer.setMessageModel(MessageModel.BROADCASTING);
    }

    @Override // net.oschina.j2cache.cluster.ClusterPolicy
    public void evict(String str, String... strArr) {
        this.holder.getLevel1Cache(str).evict(strArr);
    }

    @Override // net.oschina.j2cache.cluster.ClusterPolicy
    public void clear(String str) {
        this.holder.getLevel1Cache(str).clear();
    }

    @Override // net.oschina.j2cache.cluster.ClusterPolicy
    public void connect(Properties properties, CacheProviderHolder cacheProviderHolder) {
        this.holder = cacheProviderHolder;
        try {
            this.producer.start();
            publish(Command.join());
            this.consumer.subscribe(this.topic, "*");
            this.consumer.registerMessageListener(this);
            this.consumer.start();
        } catch (MQClientException e) {
            log.error("Failed to start producer", e);
        }
    }

    @Override // net.oschina.j2cache.cluster.ClusterPolicy
    public void publish(Command command) {
        try {
            this.producer.send(new Message(this.topic, "", "", command.json().getBytes()));
        } catch (Exception e) {
            log.error(String.format("Failed to publish %s to RocketMQ", command.json()), e);
        }
    }

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        Iterator<MessageExt> it = list.iterator();
        while (it.hasNext()) {
            handleCommand(Command.parse(new String(it.next().getBody())));
        }
        return null;
    }

    @Override // net.oschina.j2cache.cluster.ClusterPolicy
    public void disconnect() {
        try {
            publish(Command.quit());
        } finally {
            this.producer.shutdown();
            this.consumer.shutdown();
        }
    }
}
