/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl.tx;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.impl.ClusterStreamManager;
import org.infinispan.stream.impl.KeyTrackingTerminalOperation;
import org.infinispan.stream.impl.TerminalOperation;
import org.infinispan.util.AbstractDelegatingMap;

public class TxClusterStreamManager<K>
implements ClusterStreamManager<K> {
    private final ClusterStreamManager<K> manager;
    private final LocalTxInvocationContext ctx;
    private final ConsistentHash hash;

    public TxClusterStreamManager(ClusterStreamManager<K> manager, LocalTxInvocationContext ctx, ConsistentHash hash) {
        this.manager = manager;
        this.ctx = ctx;
        this.hash = hash;
    }

    @Override
    public <R> Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, TerminalOperation<R> operation, ClusterStreamManager.ResultsCallback<R> callback, Predicate<? super R> earlyTerminatePredicate) {
        TxExcludedKeys txExcludedKeys = new TxExcludedKeys(keysToExclude, this.ctx, this.hash);
        return this.manager.remoteStreamOperation(parallelDistribution, parallelStream, segments, keysToInclude, txExcludedKeys, includeLoader, operation, callback, earlyTerminatePredicate);
    }

    @Override
    public <R> Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, TerminalOperation<R> operation, ClusterStreamManager.ResultsCallback<R> callback, Predicate<? super R> earlyTerminatePredicate) {
        TxExcludedKeys txExcludedKeys = new TxExcludedKeys(keysToExclude, this.ctx, this.hash);
        return this.manager.remoteStreamOperationRehashAware(parallelDistribution, parallelStream, segments, keysToInclude, txExcludedKeys, includeLoader, operation, callback, earlyTerminatePredicate);
    }

    @Override
    public <R> Object remoteStreamOperation(boolean parallelDistribution, boolean parallelStream, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, KeyTrackingTerminalOperation<K, R, ?> operation, ClusterStreamManager.ResultsCallback<Collection<R>> callback) {
        TxExcludedKeys txExcludedKeys = new TxExcludedKeys(keysToExclude, this.ctx, this.hash);
        return this.manager.remoteStreamOperation(parallelDistribution, parallelStream, segments, keysToInclude, txExcludedKeys, includeLoader, operation, callback);
    }

    @Override
    public <R2> Object remoteStreamOperationRehashAware(boolean parallelDistribution, boolean parallelStream, Set<Integer> segments, Set<K> keysToInclude, Map<Integer, Set<K>> keysToExclude, boolean includeLoader, KeyTrackingTerminalOperation<K, ?, R2> operation, ClusterStreamManager.ResultsCallback<Map<K, R2>> callback) {
        TxExcludedKeys txExcludedKeys = new TxExcludedKeys(keysToExclude, this.ctx, this.hash);
        return this.manager.remoteStreamOperationRehashAware(parallelDistribution, parallelStream, segments, keysToInclude, txExcludedKeys, includeLoader, operation, callback);
    }

    @Override
    public boolean isComplete(Object id) {
        return this.manager.isComplete(id);
    }

    @Override
    public boolean awaitCompletion(Object id, long time, TimeUnit unit) throws InterruptedException {
        return this.manager.awaitCompletion(id, time, unit);
    }

    @Override
    public void forgetOperation(Object id) {
        this.manager.forgetOperation(id);
    }

    @Override
    public <R1> boolean receiveResponse(Object id, Address origin, boolean complete, Set<Integer> segments, R1 response) {
        return this.manager.receiveResponse(id, origin, complete, segments, response);
    }

    private static class TxExcludedKeys<K>
    extends AbstractDelegatingMap<Integer, Set<K>> {
        private final Map<Integer, Set<K>> map;
        private final Map<Integer, Set<K>> ctxMap;

        private TxExcludedKeys(Map<Integer, Set<K>> map, LocalTxInvocationContext ctx, ConsistentHash hash) {
            this.map = map;
            this.ctxMap = this.contextToMap(ctx, hash);
        }

        Map<Integer, Set<K>> contextToMap(LocalTxInvocationContext ctx, ConsistentHash hash) {
            HashMap contextMap = new HashMap();
            ctx.getLookedUpEntries().forEach((k, v) -> {
                Integer segment = hash.getSegment(k);
                HashSet<Object> innerSet = (HashSet<Object>)contextMap.get(segment);
                if (innerSet == null) {
                    innerSet = new HashSet<Object>();
                    contextMap.put(segment, innerSet);
                }
                innerSet.add(k);
            });
            return contextMap;
        }

        @Override
        protected Map<Integer, Set<K>> delegate() {
            return this.map;
        }

        @Override
        public Set<K> get(Object key) {
            if (!(key instanceof Integer)) {
                return null;
            }
            Set<K> ctxSet = this.ctxMap.get(key);
            Set excludedSet = (Set)super.get(key);
            if (ctxSet != null) {
                if (excludedSet != null) {
                    ctxSet.addAll(excludedSet);
                }
                return ctxSet;
            }
            return excludedSet;
        }

        @Override
        public boolean isEmpty() {
            return this.ctxMap.isEmpty() && super.isEmpty();
        }
    }
}

