package com.hazelcast.jet.impl.processor;

import com.hazelcast.jet.AbstractProcessor;
import com.hazelcast.jet.AggregateOperation;
import com.hazelcast.jet.Session;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.Watermark;
import com.hazelcast.jet.function.DistributedBiConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.function.DistributedToLongFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/processor/SessionWindowP.class */
public class SessionWindowP<T, K, A, R> extends AbstractProcessor {
    private static final Watermark COMPLETING_WM = new Watermark(Long.MAX_VALUE);
    private final long sessionTimeout;
    private final DistributedToLongFunction<? super T> getTimestampF;
    private final DistributedFunction<? super T, K> getKeyF;
    private final DistributedSupplier<A> newAccumulatorF;
    private final BiConsumer<? super A, ? super T> accumulateF;
    private final DistributedFunction<? super A, R> finishAccumulationF;
    private final DistributedBiConsumer<? super A, ? super A> combineAccF;
    final Map<K, SessionWindowP<T, K, A, R>.Windows> keyToWindows = new HashMap();
    final SortedMap<Long, Set<K>> deadlineToKeys = new TreeMap();
    private final AbstractProcessor.FlatMapper<Watermark, Session<K, R>> expiredSessionFlatmapper = flatMapper(this::expiredSessionTraverser);

    /* loaded from: input_file:com/hazelcast/jet/impl/processor/SessionWindowP$Windows.class */
    private class Windows {
        private int size;
        private long[] starts;
        private long[] ends;
        private A[] accs;

        private Windows() {
            this.starts = new long[2];
            this.ends = new long[2];
            this.accs = (A[]) new Object[2];
        }

        /* JADX WARN: Multi-variable type inference failed */
        void addEvent(K k, long j, T t) {
            SessionWindowP.this.accumulateF.accept(resolveAcc(k, j), t);
        }

        List<Session<K, R>> closeWindows(K k, long j) {
            ArrayList arrayList = new ArrayList();
            int i = 0;
            while (i < this.size && this.ends[i] < j) {
                arrayList.add(new Session(k, this.starts[i], this.ends[i], SessionWindowP.this.finishAccumulationF.apply(this.accs[i])));
                i++;
            }
            if (i != this.size) {
                removeHead(i);
            } else {
                SessionWindowP.this.keyToWindows.remove(k);
            }
            return arrayList;
        }

        private A resolveAcc(K k, long j) {
            long j2 = j + SessionWindowP.this.sessionTimeout;
            int i = 0;
            while (i < this.size && this.starts[i] <= j2) {
                if (this.ends[i] >= j) {
                    if (this.starts[i] <= j && this.ends[i] >= j2) {
                        return this.accs[i];
                    }
                    if (i + 1 != this.size && this.starts[i + 1] <= j2) {
                        SessionWindowP.this.removeFromDeadlines(k, this.ends[i]);
                        this.ends[i] = this.ends[i + 1];
                        SessionWindowP.this.combineAccF.accept(this.accs[i], this.accs[i + 1]);
                        removeWindow(i + 1);
                        return this.accs[i];
                    }
                    this.starts[i] = Math.min(this.starts[i], j);
                    if (this.ends[i] < j2) {
                        SessionWindowP.this.removeFromDeadlines(k, this.ends[i]);
                        this.ends[i] = j2;
                        SessionWindowP.this.addToDeadlines(k, this.ends[i]);
                    }
                    return this.accs[i];
                }
                i++;
            }
            SessionWindowP.this.addToDeadlines(k, j2);
            return (A) insertWindow(i, j, j2);
        }

        private A insertWindow(int i, long j, long j2) {
            expandIfNeeded();
            copy(i, i + 1, this.size - i);
            this.size++;
            this.starts[i] = j;
            this.ends[i] = j2;
            ((A[]) this.accs)[i] = SessionWindowP.this.newAccumulatorF.get();
            return this.accs[i];
        }

        private void removeWindow(int i) {
            this.size--;
            copy(i + 1, i, this.size - i);
        }

        private void removeHead(int i) {
            copy(i, 0, this.size - i);
            this.size -= i;
        }

        private void copy(int i, int i2, int i3) {
            System.arraycopy(this.starts, i, this.starts, i2, i3);
            System.arraycopy(this.ends, i, this.ends, i2, i3);
            System.arraycopy(this.accs, i, this.accs, i2, i3);
        }

        private void expandIfNeeded() {
            if (this.size == this.starts.length) {
                this.starts = Arrays.copyOf(this.starts, 2 * this.starts.length);
                this.ends = Arrays.copyOf(this.ends, 2 * this.ends.length);
                this.accs = (A[]) Arrays.copyOf(this.accs, 2 * this.accs.length);
            }
        }
    }

    public SessionWindowP(long j, DistributedToLongFunction<? super T> distributedToLongFunction, DistributedFunction<? super T, K> distributedFunction, AggregateOperation<? super T, A, R> aggregateOperation) {
        this.getTimestampF = distributedToLongFunction;
        this.getKeyF = distributedFunction;
        this.newAccumulatorF = aggregateOperation.createAccumulatorF();
        this.accumulateF = aggregateOperation.accumulateItemF();
        this.combineAccF = aggregateOperation.combineAccumulatorsF();
        this.finishAccumulationF = aggregateOperation.finishAccumulationF();
        this.sessionTimeout = j;
    }

    @Override // com.hazelcast.jet.AbstractProcessor
    protected boolean tryProcess0(@Nonnull Object obj) {
        long applyAsLong = this.getTimestampF.applyAsLong(obj);
        K apply = this.getKeyF.apply(obj);
        this.keyToWindows.computeIfAbsent(apply, obj2 -> {
            return new Windows();
        }).addEvent(apply, applyAsLong, obj);
        return true;
    }

    @Override // com.hazelcast.jet.AbstractProcessor
    protected boolean tryProcessWm0(@Nonnull Watermark watermark) {
        return this.expiredSessionFlatmapper.tryProcess(watermark);
    }

    @Override // com.hazelcast.jet.Processor
    public boolean complete() {
        return this.expiredSessionFlatmapper.tryProcess(COMPLETING_WM);
    }

    private Traverser<Session<K, R>> expiredSessionTraverser(Watermark watermark) {
        List list = (List) this.deadlineToKeys.headMap(Long.valueOf(watermark.timestamp())).values().stream().flatMap((v0) -> {
            return v0.stream();
        }).distinct().collect(Collectors.toList());
        this.deadlineToKeys.headMap(Long.valueOf(watermark.timestamp())).clear();
        return Traversers.traverseStream(list.stream().map(obj -> {
            return this.keyToWindows.get(obj).closeWindows(obj, watermark.timestamp());
        }).flatMap((v0) -> {
            return v0.stream();
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addToDeadlines(K k, long j) {
        this.deadlineToKeys.computeIfAbsent(Long.valueOf(j), l -> {
            return new HashSet();
        }).add(k);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeFromDeadlines(K k, long j) {
        Set<K> set = this.deadlineToKeys.get(Long.valueOf(j));
        set.remove(k);
        if (set.isEmpty()) {
            this.deadlineToKeys.remove(Long.valueOf(j));
        }
    }
}
