/*
 * Decompiled with CFR 0.152.
 */
package storm.trident.spout;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IPartitionedTridentSpout;
import storm.trident.spout.ISpoutPartition;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;
import storm.trident.topology.state.RotatingTransactionalState;
import storm.trident.topology.state.TransactionalState;

public class PartitionedTridentSpoutExecutor
implements ITridentSpout<Integer> {
    IPartitionedTridentSpout _spout;

    public PartitionedTridentSpoutExecutor(IPartitionedTridentSpout spout) {
        this._spout = spout;
    }

    public IPartitionedTridentSpout getPartitionedSpout() {
        return this._spout;
    }

    @Override
    public ITridentSpout.BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
        return new Coordinator(conf, context);
    }

    @Override
    public ITridentSpout.Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
        return new Emitter(txStateId, conf, context);
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return this._spout.getComponentConfiguration();
    }

    @Override
    public Fields getOutputFields() {
        return this._spout.getOutputFields();
    }

    class Emitter
    implements ITridentSpout.Emitter<Object> {
        private IPartitionedTridentSpout.Emitter _emitter;
        private TransactionalState _state;
        private Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
        private int _index;
        private int _numTasks;
        Object _savedCoordinatorMeta = null;

        public Emitter(String txStateId, Map conf, TopologyContext context) {
            this._emitter = PartitionedTridentSpoutExecutor.this._spout.getEmitter(conf, context);
            this._state = TransactionalState.newUserState(conf, txStateId);
            this._index = context.getThisTaskIndex();
            this._numTasks = context.getComponentTasks(context.getThisComponentId()).size();
        }

        @Override
        public void emitBatch(final TransactionAttempt tx, Object coordinatorMeta, final TridentCollector collector) {
            if (this._savedCoordinatorMeta == null || !this._savedCoordinatorMeta.equals(coordinatorMeta)) {
                List partitions = this._emitter.getOrderedPartitions(coordinatorMeta);
                this._partitionStates.clear();
                ArrayList<ISpoutPartition> myPartitions = new ArrayList<ISpoutPartition>();
                for (int i = this._index; i < partitions.size(); i += this._numTasks) {
                    ISpoutPartition p = (ISpoutPartition)partitions.get(i);
                    String id = p.getId();
                    myPartitions.add(p);
                    this._partitionStates.put(id, new EmitterPartitionState(new RotatingTransactionalState(this._state, id), p));
                }
                this._emitter.refreshPartitions(myPartitions);
                this._savedCoordinatorMeta = coordinatorMeta;
            }
            for (EmitterPartitionState s : this._partitionStates.values()) {
                RotatingTransactionalState state = s.rotatingState;
                final ISpoutPartition partition = s.partition;
                Object meta = state.getStateOrCreate(tx.getTransactionId(), new RotatingTransactionalState.StateInitializer(){

                    @Override
                    public Object init(long txid, Object lastState) {
                        return Emitter.this._emitter.emitPartitionBatchNew(tx, collector, partition, lastState);
                    }
                });
                if (meta == null) continue;
                this._emitter.emitPartitionBatch(tx, collector, partition, meta);
            }
        }

        @Override
        public void success(TransactionAttempt tx) {
            for (EmitterPartitionState state : this._partitionStates.values()) {
                state.rotatingState.cleanupBefore(tx.getTransactionId());
            }
        }

        @Override
        public void close() {
            this._state.close();
            this._emitter.close();
        }
    }

    static class EmitterPartitionState {
        public RotatingTransactionalState rotatingState;
        public ISpoutPartition partition;

        public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
            this.rotatingState = s;
            this.partition = p;
        }
    }

    class Coordinator
    implements ITridentSpout.BatchCoordinator<Object> {
        private IPartitionedTridentSpout.Coordinator _coordinator;

        public Coordinator(Map conf, TopologyContext context) {
            this._coordinator = PartitionedTridentSpoutExecutor.this._spout.getCoordinator(conf, context);
        }

        @Override
        public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
            if (currMetadata != null) {
                return currMetadata;
            }
            return this._coordinator.getPartitionsForBatch();
        }

        @Override
        public void close() {
            this._coordinator.close();
        }

        @Override
        public void success(long txid) {
        }

        @Override
        public boolean isReady(long txid) {
            return this._coordinator.isReady(txid);
        }
    }
}

