/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.jet.impl.connector;

import com.hazelcast.cache.ICache;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.ExecutionCallback;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.IList;
import com.hazelcast.core.IMap;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.RestartableException;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.function.DistributedBiConsumer;
import com.hazelcast.jet.function.DistributedBiFunction;
import com.hazelcast.jet.function.DistributedBinaryOperator;
import com.hazelcast.jet.function.DistributedConsumer;
import com.hazelcast.jet.function.DistributedFunction;
import com.hazelcast.jet.impl.SerializationConstants;
import com.hazelcast.jet.impl.connector.WriteBufferedP;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.map.EntryBackupProcessor;
import com.hazelcast.map.EntryProcessor;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import java.io.IOException;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

public final class HazelcastWriters {
    private HazelcastWriters() {
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier mergeMapSupplier(@Nonnull String name, @Nullable ClientConfig clientConfig, @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn, @Nonnull DistributedFunction<? super T, ? extends V> toValueFn, @Nonnull DistributedBinaryOperator<V> mergeFn) {
        Util.checkSerializable(toKeyFn, "toKeyFn");
        Util.checkSerializable(toValueFn, "toValueFn");
        Util.checkSerializable(mergeFn, "mergeFn");
        return HazelcastWriters.updateMapSupplier(name, clientConfig, toKeyFn, (? super V oldValue, ? super T item) -> {
            Object newValue = toValueFn.apply(item);
            if (oldValue == null) {
                return newValue;
            }
            return mergeFn.apply((Object)oldValue, (Object)newValue);
        });
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier updateMapSupplier(@Nonnull String mapName, @Nullable ClientConfig clientConfig, @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn, @Nonnull DistributedBiFunction<? super V, ? super T, ? extends V> updateFn) {
        Util.checkSerializable(toKeyFn, "toKeyFn");
        Util.checkSerializable(updateFn, "updateFn");
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.preferLocalParallelismOne(new HazelcastWriterSupplier<UpdateMapContext, Object>(Util.asXmlString(clientConfig), procContext -> new UpdateMapContext(mapName, toKeyFn, updateFn, isLocal), UpdateMapContext::add, instance -> context -> context.flush((HazelcastInstance)instance), DistributedConsumer.noop()));
    }

    @Nonnull
    public static <T, K, V> ProcessorMetaSupplier updateMapSupplier(@Nonnull String name, @Nullable ClientConfig clientConfig, @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn, @Nonnull DistributedFunction<? super T, ? extends EntryProcessor<K, V>> toEntryProcessorFn) {
        Util.checkSerializable(toKeyFn, "toKeyFn");
        Util.checkSerializable(toEntryProcessorFn, "toEntryProcessorFn");
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.preferLocalParallelismOne(new EntryProcessorWriterSupplier(name, Util.asXmlString(clientConfig), toKeyFn, toEntryProcessorFn, isLocal));
    }

    @Nonnull
    public static ProcessorMetaSupplier writeMapSupplier(@Nonnull String name, @Nullable ClientConfig clientConfig) {
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.preferLocalParallelismOne(new HazelcastWriterSupplier<ArrayMap, Map.Entry>(Util.asXmlString(clientConfig), index -> new ArrayMap(), ArrayMap::add, instance -> {
            IMap map = instance.getMap(name);
            return buffer -> {
                try {
                    map.putAll(buffer);
                }
                catch (HazelcastInstanceNotActiveException e) {
                    throw HazelcastWriters.handleInstanceNotActive(e, isLocal);
                }
                buffer.clear();
            };
        }, DistributedConsumer.noop()));
    }

    @Nonnull
    public static ProcessorMetaSupplier writeCacheSupplier(@Nonnull String name, @Nullable ClientConfig clientConfig) {
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.preferLocalParallelismOne(new HazelcastWriterSupplier<ArrayMap, Map.Entry>(Util.asXmlString(clientConfig), index -> new ArrayMap(), ArrayMap::add, CacheFlush.flushToCache(name, isLocal), DistributedConsumer.noop()));
    }

    @Nonnull
    public static ProcessorMetaSupplier writeListSupplier(@Nonnull String name, @Nullable ClientConfig clientConfig) {
        boolean isLocal = clientConfig == null;
        return ProcessorMetaSupplier.preferLocalParallelismOne(new HazelcastWriterSupplier<ArrayList, Object>(Util.asXmlString(clientConfig), index -> new ArrayList(), ArrayList::add, instance -> {
            IList list = instance.getList(name);
            return buffer -> {
                try {
                    list.addAll(buffer);
                }
                catch (HazelcastInstanceNotActiveException e) {
                    throw HazelcastWriters.handleInstanceNotActive(e, isLocal);
                }
                buffer.clear();
            };
        }, DistributedConsumer.noop()));
    }

    private static RuntimeException handleInstanceNotActive(HazelcastInstanceNotActiveException e, boolean isLocal) {
        return isLocal ? new RestartableException(e) : e;
    }

    public static class ApplyFnEntryProcessor<K, V, T>
    implements EntryProcessor<K, V>,
    EntryBackupProcessor<K, V>,
    IdentifiedDataSerializable {
        private Map<K, T> keysToUpdate;
        private DistributedBiFunction<? super V, ? super T, ? extends V> updateFn;

        public ApplyFnEntryProcessor() {
        }

        ApplyFnEntryProcessor(Map<K, T> keysToUpdate, DistributedBiFunction<? super V, ? super T, ? extends V> updateFn) {
            this.keysToUpdate = keysToUpdate;
            this.updateFn = updateFn;
        }

        @Override
        public Object process(Map.Entry<K, V> entry) {
            V oldValue = entry.getValue();
            T item = this.keysToUpdate.get(entry.getKey());
            if (item == null && !this.keysToUpdate.containsKey(entry.getKey())) {
                throw new JetException("The new item not found in the map - is equals/hashCode correctly implemented for the key? Key type: " + entry.getKey().getClass().getName());
            }
            V newValue = this.updateFn.apply(oldValue, item);
            entry.setValue(newValue);
            return null;
        }

        @Override
        public EntryBackupProcessor<K, V> getBackupProcessor() {
            return this;
        }

        @Override
        public void writeData(ObjectDataOutput out) throws IOException {
            out.writeObject(this.keysToUpdate);
            out.writeObject(this.updateFn);
        }

        @Override
        public void readData(ObjectDataInput in) throws IOException {
            this.keysToUpdate = (Map)in.readObject();
            this.updateFn = (DistributedBiFunction)in.readObject();
        }

        @Override
        public void processBackup(Map.Entry<K, V> entry) {
            this.process(entry);
        }

        @Override
        public int getFactoryId() {
            return SerializationConstants.FACTORY_ID;
        }

        @Override
        public int getId() {
            return 3;
        }
    }

    private static class HazelcastWriterSupplier<B, T>
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        private final String clientXml;
        private final DistributedFunction<HazelcastInstance, DistributedConsumer<B>> instanceToFlushBufferFn;
        private final DistributedFunction<Processor.Context, B> newBufferFn;
        private final DistributedBiConsumer<B, T> addToBufferFn;
        private final DistributedConsumer<B> disposeBufferFn;
        private transient DistributedConsumer<B> flushBufferFn;
        private transient HazelcastInstance client;

        HazelcastWriterSupplier(String clientXml, DistributedFunction<Processor.Context, B> newBufferFn, DistributedBiConsumer<B, T> addToBufferFn, DistributedFunction<HazelcastInstance, DistributedConsumer<B>> instanceToFlushBufferFn, DistributedConsumer<B> disposeBufferFn) {
            this.clientXml = clientXml;
            this.instanceToFlushBufferFn = instanceToFlushBufferFn;
            this.newBufferFn = newBufferFn;
            this.addToBufferFn = addToBufferFn;
            this.disposeBufferFn = disposeBufferFn;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context context) {
            HazelcastInstance instance = this.isRemote() ? (this.client = HazelcastClient.newHazelcastClient(Util.asClientConfig(this.clientXml))) : context.jetInstance().getHazelcastInstance();
            this.flushBufferFn = this.instanceToFlushBufferFn.apply(instance);
        }

        @Override
        public void close(Throwable error) {
            if (this.client != null) {
                this.client.shutdown();
            }
        }

        private boolean isRemote() {
            return this.clientXml != null;
        }

        @Nonnull
        public List<Processor> get(int count) {
            return Stream.generate(() -> new WriteBufferedP<B, T>(this.newBufferFn, this.addToBufferFn, this.flushBufferFn, this.disposeBufferFn)).limit(count).collect(Collectors.toList());
        }
    }

    private static final class EntryProcessorWriterSupplier<T, K, V>
    implements ProcessorSupplier {
        static final long serialVersionUID = 1L;
        private final String name;
        private final String clientXml;
        private final DistributedFunction<? super T, ? extends K> toKeyFn;
        private final DistributedFunction<? super T, ? extends EntryProcessor<K, V>> toEntryProcessorFn;
        private final boolean isLocal;
        private transient HazelcastInstance client;
        private transient HazelcastInstance instance;

        private EntryProcessorWriterSupplier(@Nonnull String name, @Nullable String clientXml, @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn, @Nonnull DistributedFunction<? super T, ? extends EntryProcessor<K, V>> toEntryProcessorFn, boolean isLocal) {
            this.name = name;
            this.clientXml = clientXml;
            this.toKeyFn = toKeyFn;
            this.toEntryProcessorFn = toEntryProcessorFn;
            this.isLocal = isLocal;
        }

        @Override
        public void init(@Nonnull ProcessorSupplier.Context context) {
            this.instance = this.clientXml != null ? (this.client = HazelcastClient.newHazelcastClient(Util.asClientConfig(this.clientXml))) : context.jetInstance().getHazelcastInstance();
        }

        @Override
        public void close(Throwable error) {
            if (this.client != null) {
                this.client.shutdown();
            }
        }

        @Nonnull
        public List<Processor> get(int count) {
            return Stream.generate(() -> new EntryProcessorWriter(this.instance, this.name, this.toKeyFn, this.toEntryProcessorFn, this.isLocal)).limit(count).collect(Collectors.toList());
        }
    }

    private static final class EntryProcessorWriter<T, K, V>
    extends AbstractProcessor {
        private static final int MAX_PARALLEL_ASYNC_OPS = 1000;
        private final AtomicInteger numConcurrentOps = new AtomicInteger();
        private final boolean isLocal;
        private final IMap<? super K, ? extends V> map;
        private final DistributedFunction<? super T, ? extends K> toKeyFn;
        private final DistributedFunction<? super T, ? extends EntryProcessor<K, V>> toEntryProcessorFn;
        private final AtomicReference<Throwable> lastError = new AtomicReference();
        private final ExecutionCallback callback = Util.callbackOf(response -> this.numConcurrentOps.decrementAndGet(), exception -> {
            this.numConcurrentOps.decrementAndGet();
            if (exception != null) {
                this.lastError.compareAndSet((Throwable)null, (Throwable)exception);
            }
        });

        private EntryProcessorWriter(@Nonnull HazelcastInstance instance, @Nonnull String name, @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn, @Nonnull DistributedFunction<? super T, ? extends EntryProcessor<K, V>> toEntryProcessorFn, boolean isLocal) {
            this.map = instance.getMap(name);
            this.toKeyFn = toKeyFn;
            this.toEntryProcessorFn = toEntryProcessorFn;
            this.isLocal = isLocal;
        }

        @Override
        public boolean isCooperative() {
            return false;
        }

        @Override
        public boolean tryProcess() {
            this.checkError();
            return true;
        }

        @Override
        protected boolean tryProcess(int ordinal, @Nonnull Object object) {
            this.checkError();
            if (!Util.tryIncrement(this.numConcurrentOps, 1, 1000)) {
                return false;
            }
            try {
                Object item = object;
                EntryProcessor<K, V> entryProcessor = this.toEntryProcessorFn.apply(item);
                K key = this.toKeyFn.apply(item);
                this.map.submitToKey(key, entryProcessor, this.callback);
                return true;
            }
            catch (HazelcastInstanceNotActiveException e) {
                throw HazelcastWriters.handleInstanceNotActive(e, this.isLocal);
            }
        }

        @Override
        public boolean complete() {
            return this.ensureAllWritten();
        }

        @Override
        public boolean saveToSnapshot() {
            return this.ensureAllWritten();
        }

        private boolean ensureAllWritten() {
            boolean allWritten = this.numConcurrentOps.get() == 0;
            this.checkError();
            return allWritten;
        }

        private void checkError() {
            Throwable t = this.lastError.get();
            if (t != null) {
                throw ExceptionUtil.sneakyThrow(t);
            }
        }
    }

    private static final class ArrayMap
    extends AbstractMap<Object, Object> {
        private final List<Map.Entry<Object, Object>> entries;
        private final ArraySet set = new ArraySet();

        ArrayMap() {
            this.entries = new ArrayList<Map.Entry<Object, Object>>();
        }

        @Override
        @Nonnull
        public Set<Map.Entry<Object, Object>> entrySet() {
            return this.set;
        }

        public void add(Map.Entry entry) {
            this.entries.add(entry);
        }

        @Override
        public String toString() {
            return this.entries.toString();
        }

        private class ArraySet
        extends AbstractSet<Map.Entry<Object, Object>> {
            private ArraySet() {
            }

            @Override
            @Nonnull
            public Iterator<Map.Entry<Object, Object>> iterator() {
                return ArrayMap.this.entries.iterator();
            }

            @Override
            public int size() {
                return ArrayMap.this.entries.size();
            }
        }
    }

    private static class CacheFlush {
        private CacheFlush() {
        }

        static DistributedFunction<HazelcastInstance, DistributedConsumer<ArrayMap>> flushToCache(String name, boolean isLocal) {
            return instance -> {
                ICache cache = instance.getCacheManager().getCache(name);
                return buffer -> {
                    try {
                        cache.putAll((Map)buffer);
                    }
                    catch (HazelcastInstanceNotActiveException e) {
                        throw HazelcastWriters.handleInstanceNotActive(e, isLocal);
                    }
                    buffer.clear();
                };
            };
        }
    }

    private static final class UpdateMapContext<K, V, T> {
        private final String mapName;
        private final DistributedFunction<? super T, ? extends K> toKeyFn;
        private final boolean isLocal;
        private final ApplyFnEntryProcessor<K, V, T> entryProcessor;
        private IMap map;
        private final List<T> buffer = new ArrayList<T>();
        private final Map<K, T> tmpMap = new HashMap<K, T>();

        UpdateMapContext(String mapName, @Nonnull DistributedFunction<? super T, ? extends K> toKeyFn, @Nonnull DistributedBiFunction<? super V, ? super T, ? extends V> updateFn, boolean isLocal) {
            this.mapName = mapName;
            this.toKeyFn = toKeyFn;
            this.isLocal = isLocal;
            this.entryProcessor = new ApplyFnEntryProcessor<K, V, T>(this.tmpMap, updateFn);
        }

        void add(T item) {
            this.buffer.add(item);
        }

        void flush(HazelcastInstance instance) {
            if (this.map == null) {
                this.map = instance.getMap(this.mapName);
            }
            try {
                if (this.buffer.isEmpty()) {
                    return;
                }
                for (T object : this.buffer) {
                    T item = object;
                    K key = this.toKeyFn.apply(item);
                    if (this.tmpMap.containsKey(key)) {
                        this.map.executeOnKeys(this.tmpMap.keySet(), this.entryProcessor);
                        this.tmpMap.clear();
                    }
                    this.tmpMap.put(key, item);
                }
                this.map.executeOnKeys(this.tmpMap.keySet(), this.entryProcessor);
                this.tmpMap.clear();
            }
            catch (HazelcastInstanceNotActiveException e) {
                throw HazelcastWriters.handleInstanceNotActive(e, this.isLocal);
            }
            this.buffer.clear();
        }
    }
}

