/*
 * Decompiled with CFR 0.152.
 */
package com.tangosol.internal.util;

import com.oracle.coherence.common.util.Options;
import com.tangosol.internal.util.processor.CacheProcessors;
import com.tangosol.net.AsyncNamedCache;
import com.tangosol.net.CacheService;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;
import com.tangosol.util.Filter;
import com.tangosol.util.InvocableMap;
import com.tangosol.util.aggregator.AsynchronousAggregator;
import com.tangosol.util.processor.AsynchronousProcessor;
import com.tangosol.util.processor.SingleEntryAsynchronousProcessor;
import com.tangosol.util.processor.StreamingAsynchronousProcessor;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

public class DefaultAsyncNamedCache<K, V>
implements AsyncNamedCache<K, V> {
    protected final NamedCache<K, V> m_cache;
    protected final Options<AsyncNamedCache.Option> m_options;

    public DefaultAsyncNamedCache(NamedCache<K, V> cache) {
        this(cache, null);
    }

    public DefaultAsyncNamedCache(NamedCache<K, V> cache, AsyncNamedCache.Option[] options) {
        this.m_cache = cache;
        this.m_options = Options.from(AsyncNamedCache.Option.class, options);
    }

    @Override
    public NamedCache<K, V> getNamedCache() {
        return this.m_cache;
    }

    @Override
    public <R> CompletableFuture<R> invoke(K key, InvocableMap.EntryProcessor<K, V, R> processor) {
        SingleEntryAsynchronousProcessor<K, V, R> asyncProcessor = this.instantiateSingleEntryAsyncProcessor(processor);
        this.m_cache.invoke(key, asyncProcessor);
        return asyncProcessor.getCompletableFuture();
    }

    @Override
    public <R> CompletableFuture<Map<K, R>> invokeAll(Collection<? extends K> collKeys, InvocableMap.EntryProcessor<K, V, R> processor) {
        AsynchronousProcessor<K, V, R> asyncProcessor = this.instantiateMultiEntryAsyncProcessor(processor);
        this.m_cache.invokeAll(collKeys, asyncProcessor);
        return asyncProcessor.getCompletableFuture();
    }

    @Override
    public <R> CompletableFuture<Map<K, R>> invokeAll(Filter filter, InvocableMap.EntryProcessor<K, V, R> processor) {
        AsynchronousProcessor<K, V, R> asyncProcessor = this.instantiateMultiEntryAsyncProcessor(processor);
        this.m_cache.invokeAll(filter, asyncProcessor);
        return asyncProcessor.getCompletableFuture();
    }

    @Override
    public <R> CompletableFuture<Void> invokeAll(Collection<? extends K> collKeys, InvocableMap.EntryProcessor<K, V, R> processor, Consumer<? super Map.Entry<? extends K, ? extends R>> callback) {
        StreamingAsynchronousProcessor<K, V, R> asyncProcessor = this.instantiateStreamingAsyncProcessor(processor, callback);
        this.m_cache.invokeAll(collKeys, asyncProcessor);
        return asyncProcessor.getCompletableFuture();
    }

    @Override
    public <R> CompletableFuture<Void> invokeAll(Filter filter, InvocableMap.EntryProcessor<K, V, R> processor, Consumer<? super Map.Entry<? extends K, ? extends R>> callback) {
        StreamingAsynchronousProcessor<K, V, R> asyncProcessor = this.instantiateStreamingAsyncProcessor(processor, callback);
        this.m_cache.invokeAll(filter, asyncProcessor);
        return asyncProcessor.getCompletableFuture();
    }

    @Override
    public <R> CompletableFuture<R> aggregate(Collection<? extends K> collKeys, InvocableMap.EntryAggregator<? super K, ? super V, R> aggregator) {
        AsynchronousAggregator<K, V, ?, R> asyncAggregator = this.instantiateAsyncAggregator(aggregator);
        this.m_cache.aggregate(collKeys, asyncAggregator);
        return asyncAggregator.getCompletableFuture();
    }

    @Override
    public <R> CompletableFuture<R> aggregate(Filter filter, InvocableMap.EntryAggregator<? super K, ? super V, R> aggregator) {
        AsynchronousAggregator<K, V, ?, R> asyncAggregator = this.instantiateAsyncAggregator(aggregator);
        this.m_cache.aggregate(filter, asyncAggregator);
        return asyncAggregator.getCompletableFuture();
    }

    @Override
    public CompletableFuture<Void> putAll(Map<? extends K, ? extends V> map) {
        CacheService service = this.m_cache.getCacheService();
        if (service instanceof PartitionedService) {
            HashMap<Member, HashMap<K, V>> mapByOwner = new HashMap<Member, HashMap<K, V>>();
            PartitionedService svcPart = (PartitionedService)((Object)service);
            for (Map.Entry<K, V> entry : map.entrySet()) {
                K oKey = entry.getKey();
                Member member = svcPart.getKeyOwner(oKey);
                HashMap<K, V> mapMember = (HashMap<K, V>)mapByOwner.get(member);
                if (mapMember == null) {
                    mapMember = new HashMap<K, V>();
                    mapByOwner.put(member, mapMember);
                }
                mapMember.put(oKey, entry.getValue());
            }
            CompletableFuture[] aFuture = new CompletableFuture[mapByOwner.size()];
            int i = 0;
            for (Map mapMember : mapByOwner.values()) {
                aFuture[i++] = this.invokeAll(mapMember.keySet(), CacheProcessors.putAll(mapMember));
            }
            return CompletableFuture.allOf(aFuture);
        }
        return AsyncNamedCache.super.putAll(map);
    }

    protected <R> SingleEntryAsynchronousProcessor<K, V, R> instantiateSingleEntryAsyncProcessor(InvocableMap.EntryProcessor<K, V, R> processor) {
        return processor instanceof SingleEntryAsynchronousProcessor ? (SingleEntryAsynchronousProcessor)processor : new SingleEntryAsynchronousProcessor<K, V, R>(processor, this.getOrderId());
    }

    protected <R> AsynchronousProcessor<K, V, R> instantiateMultiEntryAsyncProcessor(InvocableMap.EntryProcessor<K, V, R> processor) {
        return processor instanceof AsynchronousProcessor ? (AsynchronousProcessor)processor : new AsynchronousProcessor<K, V, R>(processor, this.getOrderId());
    }

    protected <R> StreamingAsynchronousProcessor<K, V, R> instantiateStreamingAsyncProcessor(InvocableMap.EntryProcessor<K, V, R> processor, Consumer<? super Map.Entry<? extends K, ? extends R>> callback) {
        return processor instanceof StreamingAsynchronousProcessor ? (StreamingAsynchronousProcessor)processor : new StreamingAsynchronousProcessor<K, V, R>(processor, this.getOrderId(), callback);
    }

    protected <R> AsynchronousAggregator<? super K, ? super V, ?, R> instantiateAsyncAggregator(InvocableMap.EntryAggregator<? super K, ? super V, R> aggregator) {
        if (aggregator instanceof AsynchronousAggregator) {
            return (AsynchronousAggregator)aggregator;
        }
        if (aggregator instanceof InvocableMap.StreamingAggregator) {
            return new AsynchronousAggregator((InvocableMap.StreamingAggregator)aggregator, this.getOrderId());
        }
        throw new IllegalArgumentException("Aggregator must be a StreamingAggregator or AsynchronousAggregator");
    }

    protected int getOrderId() {
        return this.m_options.get(AsyncNamedCache.OrderBy.class).getOrderId();
    }
}

