package acromusashi.stream.spout;

import acromusashi.stream.config.ConfigFileWatcher;
import acromusashi.stream.config.StormConfigGenerator;
import acromusashi.stream.constants.FieldName;
import acromusashi.stream.entity.StreamMessage;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:acromusashi/stream/spout/AmBaseSpout.class */
public abstract class AmBaseSpout extends AmConfigurationSpout {
    private static final long serialVersionUID = -3966364804089682434L;
    private static final Logger logger = LoggerFactory.getLogger(AmBaseSpout.class);
    protected static final long DEFAULT_INTERVAL = 30;
    protected String taskId;
    protected boolean recordHistory = true;
    protected boolean reloadConfig = false;
    protected long reloadConfigIntervalSec = DEFAULT_INTERVAL;
    protected Map<String, Object> specificConfig;
    protected transient ConfigFileWatcher watcher;

    @Override // acromusashi.stream.spout.AmConfigurationSpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        super.open(map, topologyContext, spoutOutputCollector);
        this.taskId = topologyContext.getThisComponentId() + "_" + topologyContext.getThisTaskId();
        if (this.reloadConfig && map.containsKey(StormConfigGenerator.INIT_CONFIG_KEY)) {
            String obj = map.get(StormConfigGenerator.INIT_CONFIG_KEY).toString();
            logger.info(MessageFormat.format("Config reload watch start. : WatchPath={0}, Interval(Sec)={1}", obj, Long.valueOf(this.reloadConfigIntervalSec)));
            this.watcher = new ConfigFileWatcher(obj, this.reloadConfigIntervalSec);
            this.watcher.init();
        }
        onOpen(map, topologyContext);
    }

    public abstract void onOpen(Map map, TopologyContext topologyContext);

    public void nextTuple() {
        if (this.reloadConfig && this.watcher != null) {
            Map<String, Object> map = null;
            try {
                map = this.watcher.readIfUpdated();
            } catch (IOException e) {
                logger.warn("Config file reload failed. Skip reload config.", e);
            }
            if (map != null) {
                onUpdate(map);
            }
        }
        onNextTuple();
    }

    public abstract void onNextTuple();

    public void onUpdate(Map<String, Object> map) {
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        ArrayList newArrayList = Lists.newArrayList(new String[]{FieldName.MESSAGE_KEY, FieldName.MESSAGE_VALUE});
        outputFieldsDeclarer.declare(new Fields(newArrayList));
        Iterator<String> it = getOutputStreams().iterator();
        while (it.hasNext()) {
            outputFieldsDeclarer.declareStream(it.next(), new Fields(newArrayList));
        }
    }

    protected List<String> getOutputStreams() {
        return Lists.newArrayList();
    }

    protected Object extractSpecificConfig(String str) {
        if (this.specificConfig == null) {
            return null;
        }
        return this.specificConfig.get(str);
    }

    protected Map<String, Object> getSpecificConfig() {
        if (this.specificConfig == null) {
            return null;
        }
        return Collections.unmodifiableMap(this.specificConfig);
    }

    protected void emit(StreamMessage streamMessage, Object obj, Object obj2) {
        if (this.recordHistory) {
            streamMessage.getHeader().addHistory(obj.toString());
        }
        getCollector().emit(new Values(new Object[]{"", streamMessage}), obj2);
    }

    protected void emitWithGrouping(StreamMessage streamMessage, Object obj, Object obj2, String str) {
        if (this.recordHistory) {
            streamMessage.getHeader().addHistory(obj.toString());
        }
        getCollector().emit(new Values(new Object[]{str, streamMessage}), obj2);
    }

    protected void emitWithStream(StreamMessage streamMessage, Object obj, Object obj2, String str) {
        if (this.recordHistory) {
            streamMessage.getHeader().addHistory(obj.toString());
        }
        getCollector().emit(str, new Values(new Object[]{"", streamMessage}), obj2);
    }

    protected void emitWithGroupingStream(StreamMessage streamMessage, Object obj, Object obj2, String str, String str2) {
        if (this.recordHistory) {
            streamMessage.getHeader().addHistory(obj.toString());
        }
        getCollector().emit(str2, new Values(new Object[]{str, streamMessage}), obj2);
    }

    protected void emitWithNoKeyId(StreamMessage streamMessage) {
        getCollector().emit(new Values(new Object[]{"", streamMessage}));
    }

    protected void emitWithNoKeyIdAndGrouping(StreamMessage streamMessage, String str) {
        getCollector().emit(new Values(new Object[]{str, streamMessage}));
    }

    protected void emitWithNoKeyIdAndStream(StreamMessage streamMessage, String str) {
        getCollector().emit(str, new Values(new Object[]{"", streamMessage}));
    }

    protected void emitWithNoKeyIdAndGroupingStream(StreamMessage streamMessage, String str, String str2) {
        getCollector().emit(str2, new Values(new Object[]{str, streamMessage}));
    }

    protected void emitWithOnlyKey(StreamMessage streamMessage, Object obj) {
        if (this.recordHistory) {
            streamMessage.getHeader().addHistory(obj.toString());
        }
        getCollector().emit(new Values(new Object[]{"", streamMessage}));
    }

    protected void emitWithOnlyKeyAndGrouping(StreamMessage streamMessage, Object obj, String str) {
        if (this.recordHistory) {
            streamMessage.getHeader().addHistory(obj.toString());
        }
        getCollector().emit(new Values(new Object[]{str, streamMessage}));
    }

    protected void emitWithOnlyKeyAndStream(StreamMessage streamMessage, Object obj, String str) {
        if (this.recordHistory) {
            streamMessage.getHeader().addHistory(obj.toString());
        }
        getCollector().emit(str, new Values(new Object[]{"", streamMessage}));
    }

    protected void emitWithOnlyKeyAndGroupingStream(StreamMessage streamMessage, Object obj, String str, String str2) {
        if (this.recordHistory) {
            streamMessage.getHeader().addHistory(obj.toString());
        }
        getCollector().emit(str2, new Values(new Object[]{str, streamMessage}));
    }

    public void setRecordHistory(boolean z) {
        this.recordHistory = z;
    }

    public void setReloadConfig(boolean z) {
        this.reloadConfig = z;
    }

    public void setReloadConfigIntervalSec(long j) {
        this.reloadConfigIntervalSec = j;
    }

    public void setSpecificConfig(Map<String, Object> map) {
        this.specificConfig = map;
    }
}
