package storm.trident.windowing;

import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.windowing.EvictionPolicy;
import backtype.storm.windowing.TriggerPolicy;
import backtype.storm.windowing.WindowLifecycleListener;
import backtype.storm.windowing.WindowManager;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import shade.storm.com.google.common.collect.Lists;
import storm.trident.operation.Aggregator;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;
import storm.trident.windowing.WindowsStore;
import storm.trident.windowing.config.WindowConfig;
import storm.trident.windowing.strategy.WindowStrategy;

/* loaded from: input_file:storm/trident/windowing/AbstractTridentWindowManager.class */
public abstract class AbstractTridentWindowManager<T> implements ITridentWindowManager {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractTridentWindowManager.class);
    protected final Aggregator aggregator;
    protected final BatchOutputCollector delegateCollector;
    protected final String windowTaskId;
    protected final WindowsStore windowStore;
    private final String windowTriggerCountId;
    private final TriggerPolicy<T> triggerPolicy;
    protected final Queue<TriggerResult> pendingTriggers = new ConcurrentLinkedQueue();
    protected final AtomicInteger triggerId = new AtomicInteger();
    protected final WindowManager<T> windowManager = new WindowManager<>(new TridentWindowLifeCycleListener());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:storm/trident/windowing/AbstractTridentWindowManager$AccumulatedTuplesCollector.class */
    public static class AccumulatedTuplesCollector implements TridentCollector {
        final List<List<Object>> values = new ArrayList();
        private final BatchOutputCollector delegateCollector;

        public AccumulatedTuplesCollector(BatchOutputCollector batchOutputCollector) {
            this.delegateCollector = batchOutputCollector;
        }

        @Override // storm.trident.operation.TridentCollector
        public void emit(List<Object> list) {
            this.values.add(list);
        }

        @Override // storm.trident.operation.TridentCollector
        public void reportError(Throwable th) {
            this.delegateCollector.reportError(th);
        }
    }

    /* loaded from: input_file:storm/trident/windowing/AbstractTridentWindowManager$TridentWindowLifeCycleListener.class */
    class TridentWindowLifeCycleListener implements WindowLifecycleListener<T> {
        TridentWindowLifeCycleListener() {
        }

        @Override // backtype.storm.windowing.WindowLifecycleListener
        public void onExpiry(List<T> list) {
            AbstractTridentWindowManager.LOG.debug("onExpiry is invoked");
            AbstractTridentWindowManager.this.onTuplesExpired(list);
        }

        @Override // backtype.storm.windowing.WindowLifecycleListener
        public void onActivation(List<T> list, List<T> list2, List<T> list3) {
            AbstractTridentWindowManager.LOG.debug("onActivation is invoked with events size: [{}]", Integer.valueOf(list.size()));
            AbstractTridentWindowManager.this.execAggregatorAndStoreResult(AbstractTridentWindowManager.this.triggerId.incrementAndGet(), list);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:storm/trident/windowing/AbstractTridentWindowManager$TriggerResult.class */
    public static class TriggerResult {
        final int id;
        final List<List<Object>> result;

        public TriggerResult(int i, List<List<Object>> list) {
            this.id = i;
            this.result = list;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return (obj instanceof TriggerResult) && this.id == ((TriggerResult) obj).id;
        }

        public int hashCode() {
            return this.id;
        }

        public String toString() {
            return "TriggerResult{id=" + this.id + ", result=" + this.result + '}';
        }
    }

    public AbstractTridentWindowManager(WindowConfig windowConfig, String str, WindowsStore windowsStore, Aggregator aggregator, BatchOutputCollector batchOutputCollector) {
        this.windowTaskId = str;
        this.windowStore = windowsStore;
        this.aggregator = aggregator;
        this.delegateCollector = batchOutputCollector;
        this.windowTriggerCountId = WindowTridentProcessor.TRIGGER_COUNT_PREFIX + str;
        WindowStrategy<T> windowStrategy = windowConfig.getWindowStrategy();
        EvictionPolicy<T> evictionPolicy = windowStrategy.getEvictionPolicy();
        this.windowManager.setEvictionPolicy(evictionPolicy);
        this.triggerPolicy = windowStrategy.getTriggerPolicy(this.windowManager, evictionPolicy);
        this.windowManager.setTriggerPolicy(this.triggerPolicy);
    }

    @Override // storm.trident.windowing.ITridentWindowManager
    public void prepare() {
        preInitialize();
        initialize();
        postInitialize();
    }

    private void preInitialize() {
        LOG.debug("Getting current trigger count for this component/task");
        Object obj = this.windowStore.get(this.windowTriggerCountId);
        Integer num = 0;
        if (obj == null) {
            LOG.info("No current trigger count in windows store.");
        } else {
            num = Integer.valueOf(((Integer) obj).intValue() + 1);
        }
        this.windowStore.put(this.windowTriggerCountId, num);
        this.triggerId.set(num.intValue());
    }

    private void postInitialize() {
        this.triggerPolicy.start();
    }

    protected abstract void initialize();

    protected abstract void onTuplesExpired(List<T> list);

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public void execAggregatorAndStoreResult(int i, List<T> list) {
        List<TridentTuple> tridentTuples = getTridentTuples(list);
        AccumulatedTuplesCollector accumulatedTuplesCollector = new AccumulatedTuplesCollector(this.delegateCollector);
        Object init = this.aggregator.init(Integer.valueOf(i), accumulatedTuplesCollector);
        Iterator<TridentTuple> it = tridentTuples.iterator();
        while (it.hasNext()) {
            this.aggregator.aggregate(init, it.next(), accumulatedTuplesCollector);
        }
        this.aggregator.complete(init, accumulatedTuplesCollector);
        List<List<Object>> list2 = accumulatedTuplesCollector.values;
        this.windowStore.putAll(Lists.newArrayList(new WindowsStore.Entry(this.windowTriggerCountId, Integer.valueOf(i + 1)), new WindowsStore.Entry(WindowTridentProcessor.generateWindowTriggerKey(this.windowTaskId, i), list2)));
        this.pendingTriggers.add(new TriggerResult(i, list2));
    }

    protected abstract List<TridentTuple> getTridentTuples(List<T> list);

    @Override // storm.trident.windowing.ITridentWindowManager
    public Queue<TriggerResult> getPendingTriggers() {
        return this.pendingTriggers;
    }

    @Override // storm.trident.windowing.ITridentWindowManager
    public void shutdown() {
        try {
            LOG.info("window manager [{}] is being shutdown", this.windowManager);
            this.windowManager.shutdown();
            LOG.info("window store [{}] is being shutdown", this.windowStore);
            this.windowStore.shutdown();
        } catch (Throwable th) {
            LOG.info("window store [{}] is being shutdown", this.windowStore);
            this.windowStore.shutdown();
            throw th;
        }
    }
}
