package org.apache.samza.operators.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.samza.config.Config;
import org.apache.samza.operators.WindowState;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.triggers.FiringType;
import org.apache.samza.operators.triggers.RepeatingTriggerImpl;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.triggers.TriggerImpl;
import org.apache.samza.operators.triggers.TriggerImpls;
import org.apache.samza.operators.util.InternalInMemoryStore;
import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.WindowKey;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.operators.windows.internal.WindowType;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/operators/impl/WindowOperatorImpl.class */
public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK, WV>> {
    private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorImpl.class);
    private final WindowOperatorSpec<M, WK, WV> windowOpSpec;
    private final Clock clock;
    private final WindowInternal<M, WK, WV> window;
    private TriggerScheduler<WK> triggerScheduler;
    private final KeyValueStore<WindowKey<WK>, WindowState<WV>> store = new InternalInMemoryStore();
    private final Map<TriggerKey<WK>, WindowOperatorImpl<M, WK, WV>.TriggerImplHandler> triggers = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/samza/operators/impl/WindowOperatorImpl$TriggerImplHandler.class */
    public class TriggerImplHandler {
        private final TriggerImpl<M, WK> impl;
        private boolean isCancelled = false;

        public TriggerImplHandler(TriggerKey<WK> triggerKey, TriggerImpl<M, WK> triggerImpl) {
            this.impl = triggerImpl;
        }

        public Optional<WindowPane<WK, WV>> onMessage(TriggerKey<WK> triggerKey, M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
            if (!this.isCancelled) {
                WindowOperatorImpl.LOG.trace("Forwarding callbacks for {}", m);
                this.impl.onMessage(m, WindowOperatorImpl.this.triggerScheduler);
                if (this.impl.shouldFire()) {
                    if (this.impl instanceof RepeatingTriggerImpl) {
                        ((RepeatingTriggerImpl) this.impl).clear();
                    }
                    return WindowOperatorImpl.this.onTriggerFired(triggerKey, messageCollector, taskCoordinator);
                }
            }
            return Optional.empty();
        }

        public Optional<WindowPane<WK, WV>> onTimer(TriggerKey<WK> triggerKey, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
            if (!this.impl.shouldFire() || this.isCancelled) {
                return Optional.empty();
            }
            WindowOperatorImpl.LOG.trace("Triggering timer triggers");
            if (this.impl instanceof RepeatingTriggerImpl) {
                ((RepeatingTriggerImpl) this.impl).clear();
            }
            return WindowOperatorImpl.this.onTriggerFired(triggerKey, messageCollector, taskCoordinator);
        }

        public void cancel() {
            this.impl.cancel();
            this.isCancelled = true;
        }

        public boolean isRepeating() {
            return this.impl instanceof RepeatingTriggerImpl;
        }
    }

    public WindowOperatorImpl(WindowOperatorSpec<M, WK, WV> windowOperatorSpec, Clock clock) {
        this.windowOpSpec = windowOperatorSpec;
        this.clock = clock;
        this.window = windowOperatorSpec.getWindow();
        this.triggerScheduler = new TriggerScheduler<>(clock);
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleInit(Config config, TaskContext taskContext) {
        WindowInternal<M, WK, WV> window = this.windowOpSpec.getWindow();
        if (window.getFoldLeftFunction() != null) {
            window.getFoldLeftFunction().init(config, taskContext);
        }
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    public Collection<WindowPane<WK, WV>> handleMessage(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        LOG.trace("Processing message envelope: {}", m);
        ArrayList arrayList = new ArrayList();
        WindowKey<WK> storeKey = getStoreKey(m);
        WindowState<WV> windowState = (WindowState) this.store.get(storeKey);
        LOG.trace("Store key ({}) has existing state ({})", storeKey, windowState);
        WindowState<WV> applyFoldFunction = applyFoldFunction(windowState, m);
        LOG.trace("New window value: {}, earliest timestamp: {}", applyFoldFunction.getWindowValue(), Long.valueOf(applyFoldFunction.getEarliestTimestamp()));
        this.store.put(storeKey, applyFoldFunction);
        if (this.window.getEarlyTrigger() != null) {
            TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.EARLY, storeKey);
            Optional<WindowPane<WK, WV>> onMessage = getOrCreateTriggerImplHandler(triggerKey, this.window.getEarlyTrigger()).onMessage(triggerKey, m, messageCollector, taskCoordinator);
            arrayList.getClass();
            onMessage.ifPresent((v1) -> {
                r1.add(v1);
            });
        }
        if (this.window.getDefaultTrigger() != null) {
            TriggerKey<WK> triggerKey2 = new TriggerKey<>(FiringType.DEFAULT, storeKey);
            Optional<WindowPane<WK, WV>> onMessage2 = getOrCreateTriggerImplHandler(triggerKey2, this.window.getDefaultTrigger()).onMessage(triggerKey2, m, messageCollector, taskCoordinator);
            arrayList.getClass();
            onMessage2.ifPresent((v1) -> {
                r1.add(v1);
            });
        }
        return arrayList;
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    public Collection<WindowPane<WK, WV>> handleTimer(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        ArrayList arrayList = new ArrayList();
        for (TriggerKey<WK> triggerKey : this.triggerScheduler.runPendingCallbacks()) {
            WindowOperatorImpl<M, WK, WV>.TriggerImplHandler triggerImplHandler = this.triggers.get(triggerKey);
            if (triggerImplHandler != null) {
                Optional<WindowPane<WK, WV>> onTimer = triggerImplHandler.onTimer(triggerKey, messageCollector, taskCoordinator);
                arrayList.getClass();
                onTimer.ifPresent((v1) -> {
                    r1.add(v1);
                });
            }
        }
        return arrayList;
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected OperatorSpec<WindowPane<WK, WV>> getOperatorSpec() {
        return this.windowOpSpec;
    }

    private WindowKey<WK> getStoreKey(M m) {
        Function keyExtractor = this.window.getKeyExtractor();
        Object obj = null;
        if (keyExtractor != null) {
            obj = keyExtractor.apply(m);
        }
        String str = null;
        if (this.window.getWindowType() == WindowType.TUMBLING) {
            long millis = this.window.getDefaultTrigger().getDuration().toMillis();
            long currentTimeMillis = this.clock.currentTimeMillis();
            str = Long.valueOf(currentTimeMillis - (currentTimeMillis % millis)).toString();
        }
        return new WindowKey<>(obj, str);
    }

    private WindowState<WV> applyFoldFunction(WindowState<WV> windowState, M m) {
        Object windowValue;
        long earliestTimestamp;
        if (windowState == null) {
            LOG.trace("No existing state found for key. Invoking initializer.");
            windowValue = this.window.getInitializer().get();
            earliestTimestamp = this.clock.currentTimeMillis();
        } else {
            windowValue = windowState.getWindowValue();
            earliestTimestamp = windowState.getEarliestTimestamp();
        }
        return new WindowState<>(this.window.getFoldLeftFunction().apply(m, windowValue), earliestTimestamp);
    }

    private WindowOperatorImpl<M, WK, WV>.TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey<WK> triggerKey, Trigger<M> trigger) {
        WindowOperatorImpl<M, WK, WV>.TriggerImplHandler triggerImplHandler = this.triggers.get(triggerKey);
        if (triggerImplHandler != null) {
            LOG.trace("Returning existing trigger wrapper for {}", triggerKey);
            return triggerImplHandler;
        }
        LOG.trace("Creating a new trigger wrapper for {}", triggerKey);
        WindowOperatorImpl<M, WK, WV>.TriggerImplHandler triggerImplHandler2 = new TriggerImplHandler(triggerKey, TriggerImpls.createTriggerImpl(trigger, this.clock, triggerKey));
        this.triggers.put(triggerKey, triggerImplHandler2);
        return triggerImplHandler2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Optional<WindowPane<WK, WV>> onTriggerFired(TriggerKey<WK> triggerKey, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        LOG.trace("Trigger key {} fired.", triggerKey);
        WindowOperatorImpl<M, WK, WV>.TriggerImplHandler triggerImplHandler = this.triggers.get(triggerKey);
        WindowKey<WK> key = triggerKey.getKey();
        WindowState<WV> windowState = (WindowState) this.store.get(key);
        if (windowState == null) {
            LOG.trace("No state found for triggerKey: {}", triggerKey);
            return Optional.empty();
        }
        WindowPane<WK, WV> computePaneOutput = computePaneOutput(triggerKey, windowState);
        if (this.window.getAccumulationMode() == AccumulationMode.DISCARDING) {
            LOG.trace("Clearing state for trigger key: {}", triggerKey);
            this.store.put(key, (Object) null);
        }
        if (triggerKey.getType() == FiringType.DEFAULT) {
            LOG.trace("Default trigger fired. Canceling triggers for {}", triggerKey);
            cancelTrigger(triggerKey, true);
            cancelTrigger(new TriggerKey<>(FiringType.EARLY, triggerKey.getKey()), true);
            this.store.delete(triggerKey.getKey());
        }
        if (triggerKey.getType() == FiringType.EARLY && !triggerImplHandler.isRepeating()) {
            cancelTrigger(triggerKey, false);
        }
        return Optional.of(computePaneOutput);
    }

    private WindowPane<WK, WV> computePaneOutput(TriggerKey<WK> triggerKey, WindowState<WV> windowState) {
        WindowKey<WK> key = triggerKey.getKey();
        Object windowValue = windowState.getWindowValue();
        if (this.window.getWindowType() == WindowType.SESSION) {
            key = new WindowKey<>(key.getKey(), Long.toString(windowState.getEarliestTimestamp()));
        }
        if (windowValue instanceof Collection) {
            windowValue = new ArrayList((Collection) windowValue);
        }
        WindowPane<WK, WV> windowPane = new WindowPane<>(key, windowValue, this.window.getAccumulationMode(), triggerKey.getType());
        LOG.trace("Emitting pane output for trigger key {}", triggerKey);
        return windowPane;
    }

    private void cancelTrigger(TriggerKey<WK> triggerKey, boolean z) {
        WindowOperatorImpl<M, WK, WV>.TriggerImplHandler triggerImplHandler = this.triggers.get(triggerKey);
        if (triggerImplHandler != null) {
            triggerImplHandler.cancel();
        }
        if (!z || triggerKey == null) {
            return;
        }
        this.triggers.remove(triggerKey);
    }
}
