package storm.trident.spout;

import backtype.storm.Config;
import backtype.storm.drpc.PrepareRequest;
import backtype.storm.spout.ISpoutOutputCollector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.ICollectorCallback;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import com.alibaba.jstorm.esotericsoftware.kryo.Serializer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import storm.trident.topology.TridentBoltExecutor;
import storm.trident.tuple.ConsList;
import storm.trident.util.TridentUtils;

/* loaded from: input_file:storm/trident/spout/RichSpoutBatchTriggerer.class */
public class RichSpoutBatchTriggerer implements IRichSpout {
    String _stream;
    IRichSpout _delegate;
    List<Integer> _outputTasks;
    Random _rand;
    String _coordStream;
    Map<Long, Long> _msgIdToBatchId = new HashMap();
    Map<Long, FinishCondition> _finishConditions = new HashMap();

    /* loaded from: input_file:storm/trident/spout/RichSpoutBatchTriggerer$FinishCondition.class */
    static class FinishCondition {
        Set<Long> vals = new HashSet();
        Object msgId;

        FinishCondition() {
        }
    }

    /* loaded from: input_file:storm/trident/spout/RichSpoutBatchTriggerer$StreamOverrideCollector.class */
    class StreamOverrideCollector implements ISpoutOutputCollector {
        SpoutOutputCollector _collector;

        /* loaded from: input_file:storm/trident/spout/RichSpoutBatchTriggerer$StreamOverrideCollector$CollectorCb.class */
        class CollectorCb implements ICollectorCallback {
            List<Integer> tasks;

            public CollectorCb(List<Integer> list) {
                this.tasks = list;
            }

            @Override // backtype.storm.task.ICollectorCallback
            public void execute(String str, List<Integer> list, List list2) {
                this.tasks.addAll(list);
            }
        }

        public StreamOverrideCollector(SpoutOutputCollector spoutOutputCollector) {
            this._collector = spoutOutputCollector;
        }

        @Override // backtype.storm.spout.ISpoutOutputCollector
        public List<Integer> emit(String str, List<Object> list, Object obj) {
            long nextLong = RichSpoutBatchTriggerer.this._rand.nextLong();
            RichSpoutBatchId richSpoutBatchId = new RichSpoutBatchId(nextLong);
            FinishCondition finishCondition = new FinishCondition();
            finishCondition.msgId = obj;
            ArrayList arrayList = new ArrayList();
            this._collector.emit(RichSpoutBatchTriggerer.this._stream, (List<Object>) new ConsList(richSpoutBatchId, list), (ICollectorCallback) new CollectorCb(arrayList));
            this._collector.flush();
            HashSet hashSet = new HashSet(arrayList);
            for (Integer num : RichSpoutBatchTriggerer.this._outputTasks) {
                int i = 0;
                if (hashSet.contains(num)) {
                    i = 1;
                }
                long nextLong2 = RichSpoutBatchTriggerer.this._rand.nextLong();
                this._collector.emitDirect(num.intValue(), RichSpoutBatchTriggerer.this._coordStream, new Values(richSpoutBatchId, Integer.valueOf(i)), Long.valueOf(nextLong2));
                finishCondition.vals.add(Long.valueOf(nextLong2));
                RichSpoutBatchTriggerer.this._msgIdToBatchId.put(Long.valueOf(nextLong2), Long.valueOf(nextLong));
            }
            RichSpoutBatchTriggerer.this._finishConditions.put(Long.valueOf(nextLong), finishCondition);
            return arrayList;
        }

        @Override // backtype.storm.spout.ISpoutOutputCollector
        public void emitDirect(int i, String str, List<Object> list, Object obj) {
            throw new RuntimeException("Trident does not support direct emits from spouts");
        }

        @Override // backtype.storm.spout.ISpoutOutputCollector
        public void reportError(Throwable th) {
            this._collector.reportError(th);
        }

        public long getPendingCount() {
            return 0L;
        }
    }

    public RichSpoutBatchTriggerer(IRichSpout iRichSpout, String str, String str2) {
        this._delegate = iRichSpout;
        this._stream = str;
        this._coordStream = TridentBoltExecutor.COORD_STREAM(str2);
    }

    @Override // backtype.storm.spout.ISpout
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this._delegate.open(map, topologyContext, new SpoutOutputCollector(new StreamOverrideCollector(spoutOutputCollector)));
        this._outputTasks = new ArrayList();
        Iterator it = ((Map) Utils.get(topologyContext.getThisTargets(), this._coordStream, new HashMap())).keySet().iterator();
        while (it.hasNext()) {
            this._outputTasks.addAll(topologyContext.getComponentTasks((String) it.next()));
        }
        this._rand = new Random(Utils.secureRandomLong());
    }

    @Override // backtype.storm.spout.ISpout
    public void close() {
        this._delegate.close();
    }

    @Override // backtype.storm.spout.ISpout
    public void activate() {
        this._delegate.activate();
    }

    @Override // backtype.storm.spout.ISpout
    public void deactivate() {
        this._delegate.deactivate();
    }

    @Override // backtype.storm.spout.ISpout
    public void nextTuple() {
        this._delegate.nextTuple();
    }

    @Override // backtype.storm.spout.ISpout
    public void ack(Object obj) {
        Long remove = this._msgIdToBatchId.remove((Long) obj);
        FinishCondition finishCondition = this._finishConditions.get(remove);
        if (finishCondition != null) {
            finishCondition.vals.remove((Long) obj);
            if (finishCondition.vals.isEmpty()) {
                this._finishConditions.remove(remove);
                this._delegate.ack(finishCondition.msgId);
            }
        }
    }

    @Override // backtype.storm.spout.ISpout
    public void fail(Object obj) {
        FinishCondition remove = this._finishConditions.remove(this._msgIdToBatchId.remove((Long) obj));
        if (remove != null) {
            this._delegate.fail(remove.msgId);
        }
    }

    @Override // backtype.storm.topology.IComponent
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream(this._stream, TridentUtils.fieldsConcat(new Fields("$id$"), TridentUtils.getSingleOutputStreamFields(this._delegate)));
        outputFieldsDeclarer.declareStream(this._coordStream, true, new Fields(PrepareRequest.ID_STREAM, "count"));
    }

    @Override // backtype.storm.topology.IComponent
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> componentConfiguration = this._delegate.getComponentConfiguration();
        HashMap hashMap = componentConfiguration == null ? new HashMap() : new HashMap(componentConfiguration);
        Config.registerSerialization(hashMap, RichSpoutBatchId.class, (Class<? extends Serializer>) RichSpoutBatchIdSerializer.class);
        return hashMap;
    }
}
