/*
 * Decompiled with CFR 0.152.
 */
package com.lambdaworks.redis.cluster;

import com.lambdaworks.redis.AbstractRedisAsyncCommands;
import com.lambdaworks.redis.KeyScanCursor;
import com.lambdaworks.redis.RedisClusterAsyncConnection;
import com.lambdaworks.redis.RedisFuture;
import com.lambdaworks.redis.RedisURI;
import com.lambdaworks.redis.ScanArgs;
import com.lambdaworks.redis.ScanCursor;
import com.lambdaworks.redis.StreamScanCursor;
import com.lambdaworks.redis.api.async.RedisKeyAsyncCommands;
import com.lambdaworks.redis.api.async.RedisScriptingAsyncCommands;
import com.lambdaworks.redis.api.async.RedisServerAsyncCommands;
import com.lambdaworks.redis.cluster.AbstractNodeSelection;
import com.lambdaworks.redis.cluster.ClusterConnectionProvider;
import com.lambdaworks.redis.cluster.ClusterScanSupport;
import com.lambdaworks.redis.cluster.DynamicAsyncNodeSelection;
import com.lambdaworks.redis.cluster.MultiNodeExecution;
import com.lambdaworks.redis.cluster.NodeSelectionInvocationHandler;
import com.lambdaworks.redis.cluster.PipelinedRedisFuture;
import com.lambdaworks.redis.cluster.RedisAdvancedClusterAsyncConnection;
import com.lambdaworks.redis.cluster.SlotHash;
import com.lambdaworks.redis.cluster.StatefulRedisClusterConnectionImpl;
import com.lambdaworks.redis.cluster.StaticAsyncNodeSelection;
import com.lambdaworks.redis.cluster.api.NodeSelectionSupport;
import com.lambdaworks.redis.cluster.api.StatefulRedisClusterConnection;
import com.lambdaworks.redis.cluster.api.async.AsyncNodeSelection;
import com.lambdaworks.redis.cluster.api.async.NodeSelectionAsyncCommands;
import com.lambdaworks.redis.cluster.api.async.RedisAdvancedClusterAsyncCommands;
import com.lambdaworks.redis.cluster.api.async.RedisClusterAsyncCommands;
import com.lambdaworks.redis.cluster.models.partitions.Partitions;
import com.lambdaworks.redis.cluster.models.partitions.RedisClusterNode;
import com.lambdaworks.redis.codec.RedisCodec;
import com.lambdaworks.redis.output.IntegerOutput;
import com.lambdaworks.redis.output.KeyStreamingChannel;
import com.lambdaworks.redis.output.ValueStreamingChannel;
import com.lambdaworks.redis.protocol.AsyncCommand;
import com.lambdaworks.redis.protocol.Command;
import com.lambdaworks.redis.protocol.CommandType;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;

public class RedisAdvancedClusterAsyncCommandsImpl<K, V>
extends AbstractRedisAsyncCommands<K, V>
implements RedisAdvancedClusterAsyncConnection<K, V>,
RedisAdvancedClusterAsyncCommands<K, V> {
    private Random random = new Random();

    public RedisAdvancedClusterAsyncCommandsImpl(StatefulRedisClusterConnectionImpl<K, V> connection, RedisCodec<K, V> codec) {
        super(connection, codec);
    }

    @Override
    public RedisFuture<Long> del(K ... keys) {
        return this.del((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public RedisFuture<Long> del(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.del(keys);
        }
        HashMap<Integer, RedisFuture<Long>> executions = new HashMap<Integer, RedisFuture<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<Long> del = super.del((Iterable)entry.getValue());
            executions.put(entry.getKey(), del);
        }
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    public RedisFuture<Long> unlink(K ... keys) {
        return this.unlink((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public RedisFuture<Long> unlink(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.unlink(keys);
        }
        HashMap<Integer, RedisFuture<Long>> executions = new HashMap<Integer, RedisFuture<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<Long> unlink = super.unlink((Iterable)entry.getValue());
            executions.put(entry.getKey(), unlink);
        }
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    public RedisFuture<Long> exists(K ... keys) {
        return this.exists((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public RedisFuture<Long> exists(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.exists(keys);
        }
        HashMap<Integer, RedisFuture<Long>> executions = new HashMap<Integer, RedisFuture<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<Long> exists = super.exists((Iterable)entry.getValue());
            executions.put(entry.getKey(), exists);
        }
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    public RedisFuture<List<V>> mget(K ... keys) {
        return this.mget((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public RedisFuture<List<V>> mget(Iterable<K> keys) {
        Map partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.mget(keys);
        }
        Map slots = SlotHash.getSlots(partitioned);
        HashMap executions = new HashMap();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture mget = super.mget((Iterable)entry.getValue());
            executions.put(entry.getKey(), mget);
        }
        return new PipelinedRedisFuture<List<V>>(executions, objectPipelinedRedisFuture -> {
            ArrayList<Object> result = new ArrayList<Object>();
            for (Object opKey : keys) {
                int slot = (Integer)slots.get(opKey);
                int position = ((List)partitioned.get(slot)).indexOf(opKey);
                RedisFuture listRedisFuture = (RedisFuture)executions.get(slot);
                result.add(MultiNodeExecution.execute(() -> ((List)listRedisFuture.get()).get(position)));
            }
            return result;
        });
    }

    @Override
    public RedisFuture<Long> mget(ValueStreamingChannel<V> channel, K ... keys) {
        return this.mget(channel, (Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public RedisFuture<Long> mget(ValueStreamingChannel<V> channel, Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.mget(channel, keys);
        }
        HashMap<Integer, RedisFuture<Long>> executions = new HashMap<Integer, RedisFuture<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<Long> del = super.mget(channel, (Iterable)entry.getValue());
            executions.put(entry.getKey(), del);
        }
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    public RedisFuture<String> mset(Map<K, V> map) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, map.keySet());
        if (partitioned.size() < 2) {
            return super.mset(map);
        }
        HashMap<Integer, RedisFuture<String>> executions = new HashMap<Integer, RedisFuture<String>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            HashMap op = new HashMap();
            entry.getValue().forEach(k -> op.put(k, map.get(k)));
            RedisFuture<String> mset = super.mset(op);
            executions.put(entry.getKey(), mset);
        }
        return MultiNodeExecution.firstOfAsync(executions);
    }

    @Override
    public RedisFuture<Boolean> msetnx(Map<K, V> map) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, map.keySet());
        if (partitioned.size() < 2) {
            return super.msetnx(map);
        }
        HashMap<Integer, RedisFuture<Boolean>> executions = new HashMap<Integer, RedisFuture<Boolean>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            HashMap op = new HashMap();
            entry.getValue().forEach(k -> op.put(k, map.get(k)));
            RedisFuture<Boolean> msetnx = super.msetnx(op);
            executions.put(entry.getKey(), msetnx);
        }
        return new PipelinedRedisFuture<Boolean>(executions, objectPipelinedRedisFuture -> {
            for (RedisFuture listRedisFuture : executions.values()) {
                Boolean b = MultiNodeExecution.execute(() -> (Boolean)listRedisFuture.get());
                if (b == null || !b.booleanValue()) continue;
                return true;
            }
            return false;
        });
    }

    @Override
    public RedisFuture<String> clientSetname(K name) {
        HashMap<String, RedisFuture<String>> executions = new HashMap<String, RedisFuture<String>>();
        for (RedisClusterNode redisClusterNode : this.getStatefulConnection().getPartitions()) {
            RedisURI uri;
            RedisClusterAsyncConnection byHost;
            RedisClusterAsyncConnection byNodeId = this.getConnection(redisClusterNode.getNodeId());
            if (byNodeId.isOpen()) {
                executions.put("NodeId: " + redisClusterNode.getNodeId(), byNodeId.clientSetname(name));
            }
            if (!(byHost = this.getConnection((uri = redisClusterNode.getUri()).getHost(), uri.getPort())).isOpen()) continue;
            executions.put("HostAndPort: " + redisClusterNode.getNodeId(), byHost.clientSetname(name));
        }
        return MultiNodeExecution.firstOfAsync(executions);
    }

    @Override
    public RedisFuture<List<K>> clusterGetKeysInSlot(int slot, int count) {
        RedisClusterAsyncCommands<K, V> connectionBySlot = this.findConnectionBySlot(slot);
        if (connectionBySlot != null) {
            return connectionBySlot.clusterGetKeysInSlot(slot, count);
        }
        return super.clusterGetKeysInSlot(slot, count);
    }

    @Override
    public RedisFuture<Long> clusterCountKeysInSlot(int slot) {
        RedisClusterAsyncCommands<K, V> connectionBySlot = this.findConnectionBySlot(slot);
        if (connectionBySlot != null) {
            return connectionBySlot.clusterCountKeysInSlot(slot);
        }
        return super.clusterCountKeysInSlot(slot);
    }

    @Override
    public RedisFuture<Long> dbsize() {
        Map executions = this.executeOnMasters(RedisServerAsyncCommands::dbsize);
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    public RedisFuture<String> flushall() {
        Map executions = this.executeOnMasters(RedisServerAsyncCommands::flushall);
        return MultiNodeExecution.firstOfAsync(executions);
    }

    @Override
    public RedisFuture<String> flushdb() {
        Map executions = this.executeOnMasters(RedisServerAsyncCommands::flushdb);
        return MultiNodeExecution.firstOfAsync(executions);
    }

    @Override
    public RedisFuture<String> scriptFlush() {
        Map executions = this.executeOnNodes(RedisScriptingAsyncCommands::scriptFlush, redisClusterNode -> true);
        return MultiNodeExecution.firstOfAsync(executions);
    }

    @Override
    public RedisFuture<String> scriptKill() {
        Map executions = this.executeOnNodes(RedisScriptingAsyncCommands::scriptFlush, redisClusterNode -> true);
        return MultiNodeExecution.alwaysOkOfAsync(executions);
    }

    @Override
    public RedisFuture<V> randomkey() {
        Partitions partitions = this.getStatefulConnection().getPartitions();
        int index = this.random.nextInt(partitions.size());
        RedisClusterAsyncConnection connection = this.getConnection(partitions.getPartition(index).getNodeId());
        return connection.randomkey();
    }

    @Override
    public RedisFuture<List<K>> keys(K pattern) {
        Map executions = this.executeOnMasters(commands -> commands.keys(pattern));
        return new PipelinedRedisFuture<List<K>>(executions, objectPipelinedRedisFuture -> {
            ArrayList result = new ArrayList();
            for (RedisFuture future : executions.values()) {
                result.addAll(MultiNodeExecution.execute(() -> (List)future.get()));
            }
            return result;
        });
    }

    @Override
    public RedisFuture<Long> keys(KeyStreamingChannel<K> channel, K pattern) {
        Map executions = this.executeOnMasters(commands -> commands.keys(channel, pattern));
        return MultiNodeExecution.aggregateAsync(executions);
    }

    @Override
    public void shutdown(boolean save) {
        this.executeOnNodes(commands -> {
            commands.shutdown(save);
            Command command = new Command(CommandType.SHUTDOWN, new IntegerOutput(this.codec), null);
            AsyncCommand async = new AsyncCommand(command);
            async.complete();
            return async;
        }, redisClusterNode -> true);
    }

    @Override
    public RedisFuture<Long> touch(K ... keys) {
        return this.touch((Iterable<K>)Arrays.asList(keys));
    }

    @Override
    public RedisFuture<Long> touch(Iterable<K> keys) {
        Map<Integer, List<K>> partitioned = SlotHash.partition(this.codec, keys);
        if (partitioned.size() < 2) {
            return super.touch(keys);
        }
        HashMap<Integer, RedisFuture<Long>> executions = new HashMap<Integer, RedisFuture<Long>>();
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<Long> touch = super.touch((Iterable)entry.getValue());
            executions.put(entry.getKey(), touch);
        }
        return MultiNodeExecution.aggregateAsync(executions);
    }

    protected <T> Map<String, RedisFuture<T>> executeOnMasters(Function<RedisClusterAsyncCommands<K, V>, RedisFuture<T>> function) {
        return this.executeOnNodes(function, redisClusterNode -> redisClusterNode.is(RedisClusterNode.NodeFlag.MASTER));
    }

    protected <T> Map<String, RedisFuture<T>> executeOnNodes(Function<RedisClusterAsyncCommands<K, V>, RedisFuture<T>> function, Function<RedisClusterNode, Boolean> filter) {
        HashMap<String, RedisFuture<T>> executions = new HashMap<String, RedisFuture<T>>();
        for (RedisClusterNode redisClusterNode : this.getStatefulConnection().getPartitions()) {
            RedisURI uri;
            RedisClusterAsyncConnection connection;
            if (!filter.apply(redisClusterNode).booleanValue() || !(connection = this.getConnection((uri = redisClusterNode.getUri()).getHost(), uri.getPort())).isOpen()) continue;
            executions.put(redisClusterNode.getNodeId(), function.apply((RedisClusterAsyncCommands<K, V>)connection));
        }
        return executions;
    }

    private RedisClusterAsyncCommands<K, V> findConnectionBySlot(int slot) {
        RedisClusterNode node = this.getStatefulConnection().getPartitions().getPartitionBySlot(slot);
        if (node != null) {
            return this.getConnection(node.getUri().getHost(), node.getUri().getPort());
        }
        return null;
    }

    @Override
    public RedisClusterAsyncCommands<K, V> getConnection(String nodeId) {
        return this.getStatefulConnection().getConnection(nodeId).async();
    }

    @Override
    public RedisClusterAsyncCommands<K, V> getConnection(String host, int port) {
        return this.getStatefulConnection().getConnection(host, port).async();
    }

    @Override
    public StatefulRedisClusterConnection<K, V> getStatefulConnection() {
        return (StatefulRedisClusterConnection)this.connection;
    }

    @Override
    public AsyncNodeSelection<K, V> nodes(Predicate<RedisClusterNode> predicate) {
        return this.nodes(predicate, false);
    }

    @Override
    public AsyncNodeSelection<K, V> readonly(Predicate<RedisClusterNode> predicate) {
        return this.nodes(predicate, ClusterConnectionProvider.Intent.READ, false);
    }

    @Override
    public AsyncNodeSelection<K, V> nodes(Predicate<RedisClusterNode> predicate, boolean dynamic) {
        return this.nodes(predicate, ClusterConnectionProvider.Intent.WRITE, dynamic);
    }

    protected AsyncNodeSelection<K, V> nodes(Predicate<RedisClusterNode> predicate, ClusterConnectionProvider.Intent intent, boolean dynamic) {
        AbstractNodeSelection selection = dynamic ? new DynamicAsyncNodeSelection(this.getStatefulConnection(), predicate, intent) : new StaticAsyncNodeSelection(this.getStatefulConnection(), predicate, intent);
        NodeSelectionInvocationHandler h = new NodeSelectionInvocationHandler(selection);
        return (AsyncNodeSelection)Proxy.newProxyInstance(NodeSelectionSupport.class.getClassLoader(), new Class[]{NodeSelectionAsyncCommands.class, AsyncNodeSelection.class}, (InvocationHandler)h);
    }

    @Override
    public RedisFuture<KeyScanCursor<K>> scan() {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(), ClusterScanSupport.asyncClusterKeyScanCursorMapper());
    }

    @Override
    public RedisFuture<KeyScanCursor<K>> scan(ScanArgs scanArgs) {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(scanArgs), ClusterScanSupport.asyncClusterKeyScanCursorMapper());
    }

    @Override
    public RedisFuture<KeyScanCursor<K>> scan(ScanCursor scanCursor, ScanArgs scanArgs) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan((ScanCursor)cursor, scanArgs), ClusterScanSupport.asyncClusterKeyScanCursorMapper());
    }

    @Override
    public RedisFuture<KeyScanCursor<K>> scan(ScanCursor scanCursor) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan((ScanCursor)cursor), ClusterScanSupport.asyncClusterKeyScanCursorMapper());
    }

    @Override
    public RedisFuture<StreamScanCursor> scan(KeyStreamingChannel<K> channel) {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(channel), ClusterScanSupport.asyncClusterStreamScanCursorMapper());
    }

    @Override
    public RedisFuture<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanArgs scanArgs) {
        return this.clusterScan(ScanCursor.INITIAL, (connection, cursor) -> connection.scan(channel, scanArgs), ClusterScanSupport.asyncClusterStreamScanCursorMapper());
    }

    @Override
    public RedisFuture<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanCursor scanCursor, ScanArgs scanArgs) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan(channel, (ScanCursor)cursor, scanArgs), ClusterScanSupport.asyncClusterStreamScanCursorMapper());
    }

    @Override
    public RedisFuture<StreamScanCursor> scan(KeyStreamingChannel<K> channel, ScanCursor scanCursor) {
        return this.clusterScan(scanCursor, (connection, cursor) -> connection.scan(channel, (ScanCursor)cursor), ClusterScanSupport.asyncClusterStreamScanCursorMapper());
    }

    private <T extends ScanCursor> RedisFuture<T> clusterScan(ScanCursor cursor, BiFunction<RedisKeyAsyncCommands<K, V>, ScanCursor, RedisFuture<T>> scanFunction, ClusterScanSupport.ScanCursorMapper<RedisFuture<T>> resultMapper) {
        return RedisAdvancedClusterAsyncCommandsImpl.clusterScan(this.getStatefulConnection(), cursor, scanFunction, resultMapper);
    }

    static <T extends ScanCursor, K, V> RedisFuture<T> clusterScan(StatefulRedisClusterConnection<K, V> connection, ScanCursor cursor, BiFunction<RedisKeyAsyncCommands<K, V>, ScanCursor, RedisFuture<T>> scanFunction, ClusterScanSupport.ScanCursorMapper<RedisFuture<T>> mapper) {
        List<String> nodeIds = ClusterScanSupport.getNodeIds(connection, cursor);
        String currentNodeId = ClusterScanSupport.getCurrentNodeId(cursor, nodeIds);
        ScanCursor continuationCursor = ClusterScanSupport.getContinuationCursor(cursor);
        RedisFuture<T> scanCursor = scanFunction.apply(connection.getConnection(currentNodeId).async(), continuationCursor);
        return mapper.map(nodeIds, currentNodeId, scanCursor);
    }
}

