package acromusashi.stream.component.infinispan.bolt;

import acromusashi.stream.bolt.AmBaseBolt;
import acromusashi.stream.component.infinispan.CacheHelper;
import acromusashi.stream.component.infinispan.TupleCacheMapper;
import acromusashi.stream.constants.FieldName;
import acromusashi.stream.entity.StreamMessage;
import acromusashi.stream.exception.ConvertFailException;
import backtype.storm.task.TopologyContext;
import java.text.MessageFormat;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:acromusashi/stream/component/infinispan/bolt/InfinispanLookupBolt.class */
public class InfinispanLookupBolt<K, V> extends AmBaseBolt {
    private static final long serialVersionUID = 9028505967740858573L;
    private static final Logger logger = LoggerFactory.getLogger(InfinispanLookupBolt.class);
    protected String cacheServerUrl;
    protected String cacheName;
    protected TupleCacheMapper<K, V> mapper;
    protected transient CacheHelper<K, V> cacheHelper;

    public InfinispanLookupBolt(String str, String str2, TupleCacheMapper<K, V> tupleCacheMapper) {
        this.cacheServerUrl = str;
        this.cacheName = str2;
        this.mapper = tupleCacheMapper;
    }

    @Override // acromusashi.stream.bolt.AmBaseBolt
    public void onPrepare(Map map, TopologyContext topologyContext) {
        this.cacheHelper = new CacheHelper<>(this.cacheServerUrl, this.cacheName);
        this.cacheHelper.initCache();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // acromusashi.stream.bolt.AmBaseBolt
    public void onExecute(StreamMessage streamMessage) {
        onLookupBefore(streamMessage);
        K k = null;
        try {
            k = this.mapper.convertToKey(streamMessage);
        } catch (ConvertFailException e) {
            logger.warn(MessageFormat.format("Tuple convert to key failed. Skip lookup. : InputTuple={0}", streamMessage.toString()), e);
        }
        V v = null;
        if (k != null) {
            try {
                v = this.cacheHelper.getCache().get(k);
            } catch (Exception e2) {
                logger.warn(MessageFormat.format("Cache lookup failed. Continue execute. : InputTuple={0}", streamMessage.toString()), e2);
            }
        }
        onLookupAfter(streamMessage, k, v);
    }

    protected void onLookupBefore(StreamMessage streamMessage) {
    }

    protected void onLookupAfter(StreamMessage streamMessage, K k, V v) {
        if (v != null) {
            StreamMessage streamMessage2 = new StreamMessage();
            streamMessage2.addField(FieldName.MESSAGE_KEY, k);
            streamMessage2.addField(FieldName.MESSAGE_VALUE, v);
            emitWithGrouping(streamMessage2, k, k.toString());
        }
    }
}
