package storm.trident.spout;

import backtype.storm.Config;
import backtype.storm.spout.ISpoutOutputCollector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.RotatingMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import storm.trident.util.TridentUtils;

/* loaded from: input_file:storm/trident/spout/RichSpoutBatchExecutor.class */
public class RichSpoutBatchExecutor implements ITridentSpout {
    public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
    IRichSpout _spout;

    /* loaded from: input_file:storm/trident/spout/RichSpoutBatchExecutor$CaptureCollector.class */
    static class CaptureCollector implements ISpoutOutputCollector {
        TridentCollector _collector;
        public List<Object> ids;
        public int numEmitted;
        public long pendingCount;

        CaptureCollector() {
        }

        public void reset(TridentCollector tridentCollector) {
            this._collector = tridentCollector;
            this.ids = new ArrayList();
        }

        @Override // backtype.storm.spout.ISpoutOutputCollector
        public void reportError(Throwable th) {
            this._collector.reportError(th);
        }

        @Override // backtype.storm.spout.ISpoutOutputCollector
        public List<Integer> emit(String str, List<Object> list, Object obj) {
            if (obj != null) {
                this.ids.add(obj);
            }
            this.numEmitted++;
            this._collector.emit(list);
            return null;
        }

        @Override // backtype.storm.spout.ISpoutOutputCollector
        public void emitDirect(int i, String str, List<Object> list, Object obj) {
            throw new UnsupportedOperationException("Trident does not support direct streams");
        }

        public long getPendingCount() {
            return this.pendingCount;
        }
    }

    /* loaded from: input_file:storm/trident/spout/RichSpoutBatchExecutor$RichSpoutCoordinator.class */
    private static class RichSpoutCoordinator implements ITridentSpout.BatchCoordinator {
        private RichSpoutCoordinator() {
        }

        @Override // storm.trident.spout.ITridentSpout.BatchCoordinator
        public Object initializeTransaction(long j, Object obj, Object obj2) {
            return null;
        }

        @Override // storm.trident.spout.ITridentSpout.BatchCoordinator
        public void success(long j) {
        }

        @Override // storm.trident.spout.ITridentSpout.BatchCoordinator
        public boolean isReady(long j) {
            return true;
        }

        @Override // storm.trident.spout.ITridentSpout.BatchCoordinator
        public void close() {
        }
    }

    /* loaded from: input_file:storm/trident/spout/RichSpoutBatchExecutor$RichSpoutEmitter.class */
    class RichSpoutEmitter implements ITridentSpout.Emitter<Object> {
        int _maxBatchSize;
        CaptureCollector _collector;
        RotatingMap<Long, List<Object>> idsMap;
        Map _conf;
        TopologyContext _context;
        long rotateTime;
        boolean prepared = false;
        long lastRotate = System.currentTimeMillis();

        public RichSpoutEmitter(Map map, TopologyContext topologyContext) {
            this._conf = map;
            this._context = topologyContext;
            Number number = (Number) map.get(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF);
            this._maxBatchSize = (number == null ? 1000 : number).intValue();
            this._collector = new CaptureCollector();
            this.idsMap = new RotatingMap<>(3);
            this.rotateTime = 1000 * ((Number) map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
        }

        @Override // storm.trident.spout.ITridentSpout.Emitter
        public void emitBatch(TransactionAttempt transactionAttempt, Object obj, TridentCollector tridentCollector) {
            long longValue = transactionAttempt.getTransactionId().longValue();
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastRotate > this.rotateTime) {
                Iterator<Long> it = this.idsMap.rotate().keySet().iterator();
                while (it.hasNext()) {
                    fail(it.next().longValue());
                }
                this.lastRotate = currentTimeMillis;
            }
            if (this.idsMap.containsKey(Long.valueOf(longValue))) {
                fail(longValue);
            }
            this._collector.reset(tridentCollector);
            if (!this.prepared) {
                RichSpoutBatchExecutor.this._spout.open(this._conf, this._context, new SpoutOutputCollector(this._collector));
                this.prepared = true;
            }
            for (int i = 0; i < this._maxBatchSize; i++) {
                RichSpoutBatchExecutor.this._spout.nextTuple();
                if (this._collector.numEmitted < i) {
                    break;
                }
            }
            this.idsMap.put(Long.valueOf(longValue), this._collector.ids);
            this._collector.pendingCount = this.idsMap.size();
        }

        @Override // storm.trident.spout.ITridentSpout.Emitter
        public void success(TransactionAttempt transactionAttempt) {
            ack(transactionAttempt.getTransactionId().longValue());
        }

        private void ack(long j) {
            List list = (List) this.idsMap.remove(Long.valueOf(j));
            if (list != null) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    RichSpoutBatchExecutor.this._spout.ack(it.next());
                }
            }
        }

        private void fail(long j) {
            List list = (List) this.idsMap.remove(Long.valueOf(j));
            if (list != null) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    RichSpoutBatchExecutor.this._spout.fail(it.next());
                }
            }
        }

        @Override // storm.trident.spout.ITridentSpout.Emitter
        public void close() {
            RichSpoutBatchExecutor.this._spout.close();
        }
    }

    public RichSpoutBatchExecutor(IRichSpout iRichSpout) {
        this._spout = iRichSpout;
    }

    @Override // storm.trident.spout.ITridentSpout
    public Map<String, Object> getComponentConfiguration() {
        return this._spout.getComponentConfiguration();
    }

    @Override // storm.trident.spout.ITridentSpout
    public Fields getOutputFields() {
        return TridentUtils.getSingleOutputStreamFields(this._spout);
    }

    @Override // storm.trident.spout.ITridentSpout
    public ITridentSpout.BatchCoordinator getCoordinator(String str, Map map, TopologyContext topologyContext) {
        return new RichSpoutCoordinator();
    }

    @Override // storm.trident.spout.ITridentSpout
    public ITridentSpout.Emitter getEmitter(String str, Map map, TopologyContext topologyContext) {
        return new RichSpoutEmitter(map, topologyContext);
    }
}
