package acromusashi.stream.component.kestrel.spout;

import acromusashi.stream.constants.FieldName;
import acromusashi.stream.util.JsonValueExtractor;
import backtype.storm.spout.MultiScheme;
import backtype.storm.spout.Scheme;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import net.sf.json.JSONException;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:acromusashi/stream/component/kestrel/spout/KestrelJsonSpout.class */
public class KestrelJsonSpout extends KestrelSpout {
    public static final String KESTREL_SERVERS = "kestrel.servers";
    public static final String KESTREL_QUEUE = "kestrel.queue";
    public static final String KESTREL_TIMEOUT = "kestrel.timeout";
    public static final String KESTREL_BATCH_SIZE = "kestrel.batch.size";
    public static final String KESTREL_BLACKLISTTIME = "kestrel.blacklist.time";
    private static final String HEADER_TAG = "header";
    private static final String MESSAGEKEY_TAG = "messageKey";
    private static final long serialVersionUID = -3331796053960250415L;
    private static final Logger logger = LoggerFactory.getLogger(KestrelJsonSpout.class);
    protected transient RestrictWatcher restrictWatcher;
    private String restrictFilePath;

    public KestrelJsonSpout(List<String> list, String str, Scheme scheme) {
        super(list, str, (MultiScheme) new SchemeAsMultiScheme(scheme));
        if (StringUtils.isEmpty(str)) {
            throw new IllegalArgumentException("Must configure queueName");
        }
        if (scheme == null) {
            throw new IllegalArgumentException("Must configure scheme");
        }
    }

    @Override // acromusashi.stream.component.kestrel.spout.KestrelSpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        super.open(map, topologyContext, spoutOutputCollector);
        this.messageTimeoutMs = (int) TimeUnit.SECONDS.toMillis(((Number) map.get(KESTREL_TIMEOUT)).intValue());
        this.restrictWatcher = new RestrictWatcher(this.restrictFilePath);
        setQueueName(getQueueName() + "_" + topologyContext.getThisTaskIndex());
    }

    @Override // acromusashi.stream.component.kestrel.spout.KestrelSpout
    public Fields getOutputFields() {
        return new Fields(Arrays.asList("messageKey", FieldName.MESSAGE_VALUE));
    }

    @Override // acromusashi.stream.component.kestrel.spout.KestrelSpout
    protected boolean isRestricted() {
        return this.restrictWatcher.isRestrict();
    }

    @Override // acromusashi.stream.component.kestrel.spout.KestrelSpout
    protected EmitItem generateEmitItem(List<Object> list, KestrelSourceId kestrelSourceId) {
        String str = (String) list.get(0);
        try {
            return new EmitItem(Arrays.asList(JsonValueExtractor.extractValue(str, HEADER_TAG, "messageKey"), str), kestrelSourceId);
        } catch (JSONException e) {
            logger.debug(MessageFormat.format("Received message is not json. : message={0}", str), e);
            return null;
        }
    }

    public void setRestrictFilePath(String str) {
        this.restrictFilePath = str;
    }
}
