package org.apache.samza.operators.impl;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.samza.config.Config;
import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.impl.store.TimeSeriesStore;
import org.apache.samza.operators.impl.store.TimeSeriesStoreImpl;
import org.apache.samza.operators.impl.store.TimestampedValue;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.triggers.FiringType;
import org.apache.samza.operators.triggers.RepeatingTriggerImpl;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.triggers.TriggerImpl;
import org.apache.samza.operators.triggers.TriggerImpls;
import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.WindowKey;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.operators.windows.internal.WindowType;
import org.apache.samza.storage.kv.ClosableIterator;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/operators/impl/WindowOperatorImpl.class */
public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Object>> {
    private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorImpl.class);
    private final WindowOperatorSpec<M, K, Object> windowOpSpec;
    private final Clock clock;
    private final WindowInternal<M, K, Object> window;
    private final FoldLeftFunction<M, Object> foldLeftFn;
    private final Supplier<Object> initializer;
    private final Function<M, K> keyFn;
    private final TriggerScheduler<K> triggerScheduler;
    private final Map<TriggerKey<K>, WindowOperatorImpl<M, K>.TriggerImplHandler> triggers = new HashMap();
    private TimeSeriesStore<K, Object> timeSeriesStore;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/operators/impl/WindowOperatorImpl$TriggerImplHandler.class */
    public class TriggerImplHandler {
        private final TriggerImpl<M, K> impl;
        private boolean isCancelled = false;

        public TriggerImplHandler(TriggerKey<K> triggerKey, TriggerImpl<M, K> triggerImpl) {
            this.impl = triggerImpl;
        }

        public Optional<WindowPane<K, Object>> onMessage(TriggerKey<K> triggerKey, M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
            if (!this.isCancelled) {
                WindowOperatorImpl.LOG.trace("Forwarding callbacks for {}", m);
                this.impl.onMessage(m, WindowOperatorImpl.this.triggerScheduler);
                if (this.impl.shouldFire()) {
                    if (this.impl instanceof RepeatingTriggerImpl) {
                        ((RepeatingTriggerImpl) this.impl).clear();
                    }
                    return WindowOperatorImpl.this.onTriggerFired(triggerKey, messageCollector, taskCoordinator);
                }
            }
            return Optional.empty();
        }

        public Optional<WindowPane<K, Object>> onTimer(TriggerKey<K> triggerKey, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
            if (!this.impl.shouldFire() || this.isCancelled) {
                return Optional.empty();
            }
            WindowOperatorImpl.LOG.trace("Triggering timer triggers");
            if (this.impl instanceof RepeatingTriggerImpl) {
                ((RepeatingTriggerImpl) this.impl).clear();
            }
            return WindowOperatorImpl.this.onTriggerFired(triggerKey, messageCollector, taskCoordinator);
        }

        public void cancel() {
            this.impl.cancel();
            this.isCancelled = true;
        }

        public boolean isRepeating() {
            return this.impl instanceof RepeatingTriggerImpl;
        }
    }

    public WindowOperatorImpl(WindowOperatorSpec<M, K, Object> windowOperatorSpec, Clock clock) {
        this.windowOpSpec = windowOperatorSpec;
        this.clock = clock;
        this.window = windowOperatorSpec.getWindow();
        this.foldLeftFn = this.window.getFoldLeftFunction();
        this.initializer = this.window.getInitializer();
        this.keyFn = this.window.getKeyExtractor();
        this.triggerScheduler = new TriggerScheduler<>(clock);
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleInit(Config config, TaskContext taskContext) {
        this.windowOpSpec.getWindow();
        KeyValueStore keyValueStore = (KeyValueStore) taskContext.getStore(this.windowOpSpec.getOpId());
        if (this.foldLeftFn == null) {
            this.timeSeriesStore = new TimeSeriesStoreImpl(keyValueStore, true);
        } else {
            this.foldLeftFn.init(config, taskContext);
            this.timeSeriesStore = new TimeSeriesStoreImpl(keyValueStore, false);
        }
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    public Collection<WindowPane<K, Object>> handleMessage(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        LOG.trace("Processing message envelope: {}", m);
        ArrayList arrayList = new ArrayList();
        K apply = this.keyFn != null ? this.keyFn.apply(m) : null;
        long windowTimestamp = getWindowTimestamp(m);
        if (this.foldLeftFn == null) {
            this.timeSeriesStore.put(apply, m, windowTimestamp);
        } else {
            List<Object> values = getValues(apply, windowTimestamp);
            Preconditions.checkState(values.size() <= 1, String.format("WindowState for aggregating windows must not contain more than one entry per window. Current size: %s", Integer.valueOf(values.size())));
            if (values.size() == 0) {
                LOG.trace("No existing state found for key {} Invoking initializer.", apply);
            }
            this.timeSeriesStore.put(apply, this.foldLeftFn.apply(m, values.size() == 0 ? this.initializer.get() : values.get(0)), windowTimestamp);
        }
        if (this.window.getEarlyTrigger() != null) {
            TriggerKey<K> triggerKey = new TriggerKey<>(FiringType.EARLY, apply, windowTimestamp);
            Optional<WindowPane<K, Object>> onMessage = getOrCreateTriggerImplHandler(triggerKey, this.window.getEarlyTrigger()).onMessage(triggerKey, m, messageCollector, taskCoordinator);
            arrayList.getClass();
            onMessage.ifPresent((v1) -> {
                r1.add(v1);
            });
        }
        if (this.window.getDefaultTrigger() != null) {
            TriggerKey<K> triggerKey2 = new TriggerKey<>(FiringType.DEFAULT, apply, windowTimestamp);
            Optional<WindowPane<K, Object>> onMessage2 = getOrCreateTriggerImplHandler(triggerKey2, this.window.getDefaultTrigger()).onMessage(triggerKey2, m, messageCollector, taskCoordinator);
            arrayList.getClass();
            onMessage2.ifPresent((v1) -> {
                r1.add(v1);
            });
        }
        return arrayList;
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    public Collection<WindowPane<K, Object>> handleTimer(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        LOG.trace("Processing timer.");
        ArrayList arrayList = new ArrayList();
        for (TriggerKey<K> triggerKey : this.triggerScheduler.runPendingCallbacks()) {
            WindowOperatorImpl<M, K>.TriggerImplHandler triggerImplHandler = this.triggers.get(triggerKey);
            if (triggerImplHandler != null) {
                Optional<WindowPane<K, Object>> onTimer = triggerImplHandler.onTimer(triggerKey, messageCollector, taskCoordinator);
                arrayList.getClass();
                onTimer.ifPresent((v1) -> {
                    r1.add(v1);
                });
            }
        }
        LOG.trace("Triggered panes: " + arrayList.size());
        return arrayList;
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected OperatorSpec<M, WindowPane<K, Object>> getOperatorSpec() {
        return this.windowOpSpec;
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected Collection<WindowPane<K, Object>> handleEndOfStream(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        ArrayList arrayList = new ArrayList();
        Iterator it = new HashSet(this.triggers.keySet()).iterator();
        while (it.hasNext()) {
            Optional<WindowPane<K, Object>> onTriggerFired = onTriggerFired((TriggerKey) it.next(), messageCollector, taskCoordinator);
            arrayList.getClass();
            onTriggerFired.ifPresent((v1) -> {
                r1.add(v1);
            });
        }
        return arrayList;
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleClose() {
        if (this.foldLeftFn != null) {
            this.foldLeftFn.close();
        }
        if (this.timeSeriesStore != null) {
            this.timeSeriesStore.close();
        }
    }

    private WindowOperatorImpl<M, K>.TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey<K> triggerKey, Trigger<M> trigger) {
        WindowOperatorImpl<M, K>.TriggerImplHandler triggerImplHandler = this.triggers.get(triggerKey);
        if (triggerImplHandler != null) {
            LOG.trace("Returning existing trigger wrapper for {}", triggerKey);
            return triggerImplHandler;
        }
        LOG.trace("Creating a new trigger wrapper for {}", triggerKey);
        WindowOperatorImpl<M, K>.TriggerImplHandler triggerImplHandler2 = new TriggerImplHandler(triggerKey, TriggerImpls.createTriggerImpl(trigger, this.clock, triggerKey));
        this.triggers.put(triggerKey, triggerImplHandler2);
        return triggerImplHandler2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Optional<WindowPane<K, Object>> onTriggerFired(TriggerKey<K> triggerKey, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        LOG.trace("Trigger key {} fired.", triggerKey);
        WindowOperatorImpl<M, K>.TriggerImplHandler triggerImplHandler = this.triggers.get(triggerKey);
        long timestamp = triggerKey.getTimestamp();
        K key = triggerKey.getKey();
        List<Object> values = getValues(key, timestamp);
        if (values == null || values.size() == 0) {
            LOG.trace("No state found for triggerKey: {}", triggerKey);
            return Optional.empty();
        }
        WindowPane<K, Object> computePaneOutput = computePaneOutput(triggerKey, this.window.getFoldLeftFunction() == null ? values : values.get(0));
        if (this.window.getAccumulationMode() == AccumulationMode.DISCARDING) {
            LOG.trace("Clearing state for trigger key: {}", triggerKey);
            this.timeSeriesStore.remove(key, timestamp);
        }
        if (triggerKey.getType() == FiringType.DEFAULT) {
            LOG.trace("Default trigger fired. Canceling triggers for {}", triggerKey);
            cancelTrigger(triggerKey, true);
            cancelTrigger(new TriggerKey<>(FiringType.EARLY, triggerKey.getKey(), triggerKey.getTimestamp()), true);
            this.timeSeriesStore.remove(key, timestamp);
        }
        if (triggerKey.getType() == FiringType.EARLY && !triggerImplHandler.isRepeating()) {
            cancelTrigger(triggerKey, false);
        }
        return Optional.of(computePaneOutput);
    }

    private WindowPane<K, Object> computePaneOutput(TriggerKey<K> triggerKey, Object obj) {
        WindowPane<K, Object> windowPane = new WindowPane<>(new WindowKey(triggerKey.getKey(), Long.toString(triggerKey.getTimestamp())), obj, this.window.getAccumulationMode(), triggerKey.getType());
        LOG.trace("Emitting pane output for trigger key {}", triggerKey);
        return windowPane;
    }

    private void cancelTrigger(TriggerKey<K> triggerKey, boolean z) {
        WindowOperatorImpl<M, K>.TriggerImplHandler triggerImplHandler = this.triggers.get(triggerKey);
        if (triggerImplHandler != null) {
            triggerImplHandler.cancel();
        }
        if (!z || triggerKey == null) {
            return;
        }
        this.triggers.remove(triggerKey);
    }

    private long getWindowTimestamp(M m) {
        if (this.window.getWindowType() != WindowType.TUMBLING) {
            List list = toList(this.timeSeriesStore.get(this.keyFn.apply(m), 0L, Long.MAX_VALUE, 1));
            return list.isEmpty() ? this.clock.currentTimeMillis() : ((TimestampedValue) list.get(0)).getTimestamp();
        }
        long millis = this.window.getDefaultTrigger().getDuration().toMillis();
        long currentTimeMillis = this.clock.currentTimeMillis();
        return currentTimeMillis - (currentTimeMillis % millis);
    }

    private List<Object> getValues(K k, long j) {
        List<Object> list = (List) toList(this.timeSeriesStore.get(k, j)).stream().map(timestampedValue -> {
            return timestampedValue.getValue();
        }).collect(Collectors.toList());
        LOG.trace("Returning {} for key {} and timestamp {}", new Object[]{list, k, Long.valueOf(j)});
        return list;
    }

    static <V> List<V> toList(ClosableIterator<V> closableIterator) {
        ArrayList arrayList = new ArrayList();
        while (closableIterator.hasNext()) {
            try {
                arrayList.add(closableIterator.next());
            } finally {
                if (closableIterator != null) {
                    closableIterator.close();
                }
            }
        }
        return Collections.unmodifiableList(arrayList);
    }
}
