/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.trident.testing;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.ITridentSpout;
import org.apache.storm.trident.testing.IFeeder;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.apache.storm.trident.topology.TridentTopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.RegisteredGlobalState;
import org.apache.storm.utils.Utils;

public class FeederBatchSpout
implements ITridentSpout<Map<Integer, List<List<Object>>>>,
IFeeder {
    String _id;
    String _semaphoreId;
    Fields _outFields;
    boolean _waitToEmit = true;

    public FeederBatchSpout(List<String> fields) {
        this._outFields = new Fields(fields);
        this._id = RegisteredGlobalState.registerState(new CopyOnWriteArrayList());
        this._semaphoreId = RegisteredGlobalState.registerState(new CopyOnWriteArrayList());
    }

    public void setWaitToEmit(boolean trueIfWait) {
        this._waitToEmit = trueIfWait;
    }

    @Override
    public void feed(Object tuples) {
        Semaphore sem = new Semaphore(0);
        ((List)RegisteredGlobalState.getState(this._semaphoreId)).add(sem);
        ((List)RegisteredGlobalState.getState(this._id)).add(tuples);
        try {
            sem.acquire();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return this._outFields;
    }

    @Override
    public ITridentSpout.BatchCoordinator<Map<Integer, List<List<Object>>>> getCoordinator(String txStateId, Map<String, Object> conf, TopologyContext context) {
        int numTasks = context.getComponentTasks(TridentTopologyBuilder.spoutIdFromCoordinatorId(context.getThisComponentId())).size();
        return new FeederCoordinator(numTasks);
    }

    @Override
    public ITridentSpout.Emitter<Map<Integer, List<List<Object>>>> getEmitter(String txStateId, Map<String, Object> conf, TopologyContext context) {
        return new FeederEmitter(context.getThisTaskIndex());
    }

    public class FeederCoordinator
    implements ITridentSpout.BatchCoordinator<Map<Integer, List<List<Object>>>> {
        int _numPartitions;
        int _emittedIndex = 0;
        Map<Long, Integer> txIndices = new HashMap<Long, Integer>();
        int _masterEmitted = 0;

        public FeederCoordinator(int numPartitions) {
            this._numPartitions = numPartitions;
        }

        @Override
        public Map<Integer, List<List<Object>>> initializeTransaction(long txid, Map<Integer, List<List<Object>>> prevMetadata, Map<Integer, List<List<Object>>> currMetadata) {
            if (currMetadata != null) {
                return currMetadata;
            }
            List allBatches = (List)RegisteredGlobalState.getState(FeederBatchSpout.this._id);
            if (allBatches.size() > this._emittedIndex) {
                int i;
                Object batchInfo = allBatches.get(this._emittedIndex);
                this.txIndices.put(txid, this._emittedIndex);
                ++this._emittedIndex;
                if (batchInfo instanceof Map) {
                    return (Map)batchInfo;
                }
                List batchList = (List)batchInfo;
                HashMap<Integer, List<List<Object>>> partitions = new HashMap<Integer, List<List<Object>>>();
                for (i = 0; i < this._numPartitions; ++i) {
                    partitions.put(i, new ArrayList());
                }
                for (i = 0; i < batchList.size(); ++i) {
                    int partition = i % this._numPartitions;
                    ((List)partitions.get(partition)).add((List)batchList.get(i));
                }
                return partitions;
            }
            return new HashMap<Integer, List<List<Object>>>();
        }

        @Override
        public void close() {
        }

        @Override
        public void success(long txid) {
            Integer index = this.txIndices.get(txid);
            if (index != null) {
                Semaphore sem = (Semaphore)((List)RegisteredGlobalState.getState(FeederBatchSpout.this._semaphoreId)).get(index);
                sem.release();
            }
        }

        @Override
        public boolean isReady(long txid) {
            if (!FeederBatchSpout.this._waitToEmit) {
                return true;
            }
            List allBatches = (List)RegisteredGlobalState.getState(FeederBatchSpout.this._id);
            if (allBatches.size() > this._masterEmitted) {
                ++this._masterEmitted;
                return true;
            }
            Utils.sleep(2L);
            return false;
        }
    }

    private static class FeederEmitter
    implements ITridentSpout.Emitter<Map<Integer, List<List<Object>>>> {
        int _index;

        public FeederEmitter(int index) {
            this._index = index;
        }

        @Override
        public void emitBatch(TransactionAttempt tx, Map<Integer, List<List<Object>>> coordinatorMeta, TridentCollector collector) {
            List<List<Object>> tuples = coordinatorMeta.get(this._index);
            if (tuples != null) {
                for (List<Object> t : tuples) {
                    collector.emit(t);
                }
            }
        }

        @Override
        public void success(TransactionAttempt tx) {
        }

        @Override
        public void close() {
        }
    }
}

