/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.processor;

import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.datamodel.Session;
import com.hazelcast.jet.function.DistributedBiConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.function.DistributedSupplier;
import com.hazelcast.jet.function.DistributedToLongFunction;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.StringJoiner;
import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;

public class SessionWindowP<T, K, A, R>
extends AbstractProcessor {
    private static final Watermark COMPLETING_WM = new Watermark(Long.MAX_VALUE);
    final Map<K, Windows> keyToWindows = new HashMap<K, Windows>();
    final SortedMap<Long, Set<K>> deadlineToKeys = new TreeMap<Long, Set<K>>();
    private final long sessionTimeout;
    private final DistributedToLongFunction<? super T> getTimestampFn;
    private final DistributedFunction<? super T, K> getKeyFn;
    private final DistributedSupplier<A> newAccumulatorFn;
    private final BiConsumer<? super A, ? super T> accumulateFn;
    private final DistributedFunction<? super A, R> finishAccumulationFn;
    private final DistributedBiConsumer<? super A, ? super A> combineAccFn;
    private final AbstractProcessor.FlatMapper<Watermark, Session<K, R>> expiredSessionFlatmapper;
    private Traverser snapshotTraverser;

    public SessionWindowP(long sessionTimeout, DistributedToLongFunction<? super T> getTimestampFn, DistributedFunction<? super T, K> getKeyFn, AggregateOperation1<? super T, A, R> aggrOp) {
        this.getTimestampFn = getTimestampFn;
        this.getKeyFn = getKeyFn;
        this.newAccumulatorFn = aggrOp.createFn();
        this.accumulateFn = aggrOp.accumulateFn();
        this.combineAccFn = aggrOp.combineFn();
        this.finishAccumulationFn = aggrOp.finishFn();
        this.sessionTimeout = sessionTimeout;
        this.expiredSessionFlatmapper = this.flatMapper(this::expiredSessionTraverser);
    }

    @Override
    protected boolean tryProcess0(@Nonnull Object item) {
        Object event = item;
        long timestamp = this.getTimestampFn.applyAsLong(event);
        Object key = this.getKeyFn.apply(event);
        this.addEvent(this.keyToWindows.computeIfAbsent(key, k -> new Windows()), key, timestamp, event);
        return true;
    }

    @Override
    protected boolean tryProcessWm0(@Nonnull Watermark wm) {
        return this.expiredSessionFlatmapper.tryProcess(wm);
    }

    @Override
    public boolean complete() {
        return this.expiredSessionFlatmapper.tryProcess(COMPLETING_WM);
    }

    private Traverser<Session<K, R>> expiredSessionTraverser(Watermark wm) {
        List distinctKeys = this.deadlineToKeys.headMap(wm.timestamp()).values().stream().flatMap(Collection::stream).distinct().collect(Collectors.toList());
        this.deadlineToKeys.headMap(wm.timestamp()).clear();
        Stream<List> listStream = distinctKeys.stream().map(key -> this.closeWindows(this.keyToWindows.get(key), key, wm.timestamp()));
        Stream sessions = listStream.flatMap(Collection::stream);
        return Traversers.traverseStream(sessions);
    }

    private void addToDeadlines(K key, long deadline) {
        this.deadlineToKeys.computeIfAbsent(deadline, x -> new HashSet()).add(key);
    }

    private void removeFromDeadlines(K key, long deadline) {
        Set ks = (Set)this.deadlineToKeys.get(deadline);
        ks.remove(key);
        if (ks.isEmpty()) {
            this.deadlineToKeys.remove(deadline);
        }
    }

    @Override
    public boolean saveToSnapshot() {
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.keyToWindows.entrySet()).onFirstNull(() -> {
                this.snapshotTraverser = null;
            });
        }
        return this.emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    @Override
    protected void restoreFromSnapshot(@Nonnull Object key, @Nonnull Object value) {
        this.keyToWindows.put(key, (Windows)value);
    }

    @Override
    public boolean finishSnapshotRestore() {
        assert (this.deadlineToKeys.isEmpty());
        for (Map.Entry<K, Windows> entry : this.keyToWindows.entrySet()) {
            for (long end : entry.getValue().ends) {
                this.addToDeadlines(entry.getKey(), end);
            }
        }
        return true;
    }

    private void addEvent(Windows<A> w, K key, long timestamp, T event) {
        this.accumulateFn.accept(this.resolveAcc(w, key, timestamp), event);
    }

    private List<Session<K, R>> closeWindows(Windows<A> w, K key, long wm) {
        int i;
        ArrayList<Session<K, R>> sessions = new ArrayList<Session<K, R>>();
        for (i = 0; i < ((Windows)w).size && ((Windows)w).ends[i] < wm; ++i) {
            sessions.add(new Session(key, ((Windows)w).starts[i], ((Windows)w).ends[i], this.finishAccumulationFn.apply(((Windows)w).accs[i])));
        }
        if (i != ((Windows)w).size) {
            ((Windows)w).removeHead(i);
        } else {
            this.keyToWindows.remove(key);
        }
        return sessions;
    }

    private A resolveAcc(Windows<A> w, K key, long timestamp) {
        int i;
        long eventEnd = timestamp + this.sessionTimeout;
        for (i = 0; i < ((Windows)w).size && ((Windows)w).starts[i] <= eventEnd; ++i) {
            if (((Windows)w).ends[i] < timestamp) continue;
            if (((Windows)w).starts[i] <= timestamp && ((Windows)w).ends[i] >= eventEnd) {
                return (A)((Windows)w).accs[i];
            }
            if (i + 1 == ((Windows)w).size || ((Windows)w).starts[i + 1] > eventEnd) {
                ((Windows)w).starts[i] = Math.min(((Windows)w).starts[i], timestamp);
                if (((Windows)w).ends[i] < eventEnd) {
                    this.removeFromDeadlines(key, ((Windows)w).ends[i]);
                    ((Windows)w).ends[i] = eventEnd;
                    this.addToDeadlines(key, ((Windows)w).ends[i]);
                }
                return (A)((Windows)w).accs[i];
            }
            this.removeFromDeadlines(key, ((Windows)w).ends[i]);
            ((Windows)w).ends[i] = ((Windows)w).ends[i + 1];
            this.combineAccFn.accept(((Windows)w).accs[i], ((Windows)w).accs[i + 1]);
            ((Windows)w).removeWindow(i + 1);
            return (A)((Windows)w).accs[i];
        }
        this.addToDeadlines(key, eventEnd);
        return this.insertWindow(w, i, timestamp, eventEnd);
    }

    private A insertWindow(Windows<A> w, int idx, long windowStart, long windowEnd) {
        ((Windows)w).expandIfNeeded();
        ((Windows)w).copy(idx, idx + 1, ((Windows)w).size - idx);
        ((Windows)w).size++;
        ((Windows)w).starts[idx] = windowStart;
        ((Windows)w).ends[idx] = windowEnd;
        ((Windows)w).accs[idx] = this.newAccumulatorFn.get();
        return (A)((Windows)w).accs[idx];
    }

    public static class Windows<A>
    implements IdentifiedDataSerializable {
        private int size;
        private long[] starts = new long[2];
        private long[] ends = new long[2];
        private A[] accs = new Object[2];

        private void removeWindow(int idx) {
            --this.size;
            this.copy(idx + 1, idx, this.size - idx);
        }

        private void removeHead(int count) {
            this.copy(count, 0, this.size - count);
            this.size -= count;
        }

        private void copy(int from, int to, int length) {
            System.arraycopy(this.starts, from, this.starts, to, length);
            System.arraycopy(this.ends, from, this.ends, to, length);
            System.arraycopy(this.accs, from, this.accs, to, length);
        }

        private void expandIfNeeded() {
            if (this.size == this.starts.length) {
                this.starts = Arrays.copyOf(this.starts, 2 * this.starts.length);
                this.ends = Arrays.copyOf(this.ends, 2 * this.ends.length);
                this.accs = Arrays.copyOf(this.accs, 2 * this.accs.length);
            }
        }

        @Override
        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        @Override
        public int getId() {
            return 12;
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeInt(this.size);
            for (int i = 0; i < this.size; ++i) {
                out.writeLong(this.starts[i]);
                out.writeLong(this.ends[i]);
                out.writeObject(this.accs[i]);
            }
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
            this.size = in.readInt();
            if (this.size > this.starts.length) {
                int newSize = 1 << 32 - Integer.numberOfLeadingZeros(this.size - 1);
                this.starts = new long[newSize];
                this.ends = new long[newSize];
                this.accs = new Object[newSize];
            }
            for (int i = 0; i < this.size; ++i) {
                this.starts[i] = in.readLong();
                this.ends[i] = in.readLong();
                this.accs[i] = in.readObject();
            }
        }

        public String toString() {
            StringJoiner sj = new StringJoiner(", ", this.getClass().getSimpleName() + "{", "}");
            for (int i = 0; i < this.size; ++i) {
                sj.add("[s=" + this.format(this.starts[i]) + ", e=" + this.format(this.ends[i]) + ", a=" + this.accs[i] + ']');
            }
            return sj.toString();
        }

        private String format(long time) {
            return Instant.ofEpochMilli(time).atZone(ZoneId.systemDefault()).toLocalTime().toString();
        }
    }
}

