package org.infinispan.distexec.mapreduce;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.atomic.Delta;
import org.infinispan.atomic.DeltaAware;
import org.infinispan.commands.read.MapCombineCommand;
import org.infinispan.commands.read.ReduceCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.commons.util.Util;
import org.infinispan.commons.util.concurrent.ParallelIterableMap;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.Flag;
import org.infinispan.distexec.mapreduce.spi.MapReduceTaskLifecycleService;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.filter.CollectionKeyFilter;
import org.infinispan.filter.CompositeKeyFilter;
import org.infinispan.filter.KeyFilter;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.core.Ids;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.marshall.core.MarshalledValue;
import org.infinispan.persistence.PrimaryOwnerFilter;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.persistence.spi.AdvancedCacheLoader;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.TimeService;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

/* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceManagerImpl.class */
public class MapReduceManagerImpl implements MapReduceManager {
    private static final Log log = LogFactory.getLog(MapReduceManagerImpl.class);
    private ClusteringDependentLogic cdl;
    private EmbeddedCacheManager cacheManager;
    private PersistenceManager persistenceManager;
    private ExecutorService executorService;
    private TimeService timeService;
    private int chunkSize;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$CollectableCollector.class */
    public interface CollectableCollector<K, V> extends Collector<K, V> {
        Map<K, List<V>> collectedValues();

        void emitReduced(K k, V v);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$DataContainerTask.class */
    public abstract class DataContainerTask<K, V> implements ParallelIterableMap.KeyValueAction<K, InternalCacheEntry<K, V>> {
        private DataContainerTask() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v12, types: [java.lang.Object] */
        /* JADX WARN: Type inference failed for: r6v0 */
        protected V getValue(InternalCacheEntry<K, V> internalCacheEntry) {
            if (internalCacheEntry == null || internalCacheEntry.isExpired(MapReduceManagerImpl.this.timeService.wallClockTime())) {
                return null;
            }
            ?? value = internalCacheEntry.getValue();
            boolean z = value instanceof MarshalledValue;
            V v = value;
            if (z) {
                v = ((MarshalledValue) value).get();
            }
            return v;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$DefaultCollector.class */
    public final class DefaultCollector<K, V, KOut, VOut> implements CollectableCollector<KOut, VOut> {
        private Map<KOut, List<VOut>> store = new HashMap(1024, 0.75f);
        private final AtomicInteger emitCount = new AtomicInteger();
        private final int maxCollectorSize;
        private MapCombineCommand<K, V, KOut, VOut> mcc;

        public DefaultCollector(MapCombineCommand<K, V, KOut, VOut> mapCombineCommand, int i) {
            this.maxCollectorSize = i;
            this.mcc = mapCombineCommand;
        }

        @Override // org.infinispan.distexec.mapreduce.Collector
        public void emit(KOut kout, VOut vout) {
            List<VOut> list = this.store.get(kout);
            if (list == null) {
                list = new ArrayList(Ids.CLUSTER_EVENT_CALLABLE);
                this.store.put(kout, list);
            }
            list.add(vout);
            this.emitCount.incrementAndGet();
            if (isOverflown() && this.mcc.hasCombiner()) {
                MapReduceManagerImpl.this.combine(this.mcc, this);
            }
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceManagerImpl.CollectableCollector
        public void emitReduced(KOut kout, VOut vout) {
            List<VOut> list = this.store.get(kout);
            int size = list.size();
            list.clear();
            list.add(vout);
            this.emitCount.addAndGet((-size) + 1);
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceManagerImpl.CollectableCollector
        public Map<KOut, List<VOut>> collectedValues() {
            return this.store;
        }

        public void reset() {
            this.store.clear();
            this.emitCount.set(0);
        }

        public boolean isEmpty() {
            return this.store.isEmpty();
        }

        public void emit(Map<KOut, List<VOut>> map) {
            for (Map.Entry<KOut, List<VOut>> entry : map.entrySet()) {
                KOut key = entry.getKey();
                Iterator<VOut> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    emit(key, it.next());
                }
            }
        }

        public boolean isOverflown() {
            return this.emitCount.get() > this.maxCollectorSize;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$DeltaAwareList.class */
    public static class DeltaAwareList<E> implements Iterable<E>, DeltaAware {
        private final List<E> list;

        public DeltaAwareList(List<E> list) {
            this.list = list;
        }

        @Override // org.infinispan.atomic.DeltaAware
        public Delta delta() {
            return new DeltaList(this.list);
        }

        @Override // org.infinispan.atomic.DeltaAware
        public void commit() {
            this.list.clear();
        }

        @Override // java.lang.Iterable
        public Iterator<E> iterator() {
            return this.list.iterator();
        }
    }

    /* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$DeltaAwareListExternalizer.class */
    public static class DeltaAwareListExternalizer extends AbstractExternalizer<DeltaAwareList> {
        private static final long serialVersionUID = -8956663669844107351L;

        public void writeObject(ObjectOutput objectOutput, DeltaAwareList deltaAwareList) throws IOException {
            objectOutput.writeObject(deltaAwareList.list);
        }

        /* renamed from: readObject, reason: merged with bridge method [inline-methods] */
        public DeltaAwareList m178readObject(ObjectInput objectInput) throws IOException, ClassNotFoundException {
            return new DeltaAwareList((List) objectInput.readObject());
        }

        public Integer getId() {
            return Integer.valueOf(Ids.DELTA_AWARE_MAPREDUCE_LIST_ID);
        }

        public Set<Class<? extends DeltaAwareList>> getTypeClasses() {
            return Util.asSet(new Class[]{DeltaAwareList.class});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$DeltaList.class */
    public static class DeltaList<E> implements Delta {
        private final List<E> deltas;

        public DeltaList(List<E> list) {
            this.deltas = new ArrayList(list);
        }

        @Override // org.infinispan.atomic.Delta
        public DeltaAware merge(DeltaAware deltaAware) {
            DeltaAwareList deltaAwareList;
            if (deltaAware instanceof DeltaAwareList) {
                deltaAwareList = (DeltaAwareList) deltaAware;
                deltaAwareList.list.addAll(this.deltas);
            } else {
                deltaAwareList = new DeltaAwareList(this.deltas);
            }
            return deltaAwareList;
        }
    }

    /* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$DeltaListExternalizer.class */
    public static class DeltaListExternalizer extends AbstractExternalizer<DeltaList> {
        private static final long serialVersionUID = 5859147782602054109L;

        public void writeObject(ObjectOutput objectOutput, DeltaList deltaList) throws IOException {
            objectOutput.writeObject(deltaList.deltas);
        }

        /* renamed from: readObject, reason: merged with bridge method [inline-methods] */
        public DeltaList m179readObject(ObjectInput objectInput) throws IOException, ClassNotFoundException {
            return new DeltaList((List) objectInput.readObject());
        }

        public Integer getId() {
            return Integer.valueOf(Ids.DELTA_MAPREDUCE_LIST_ID);
        }

        public Set<Class<? extends DeltaList>> getTypeClasses() {
            return Util.asSet(new Class[]{DeltaList.class});
        }
    }

    /* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$IntermediateKey.class */
    public static final class IntermediateKey<V> implements Serializable {
        private static final long serialVersionUID = 4434717760740027918L;
        private final String taskId;
        private final V key;

        public IntermediateKey(String str, V v) {
            this.taskId = str;
            this.key = v;
        }

        public String getTaskId() {
            return this.taskId;
        }

        public V getKey() {
            return this.key;
        }

        public int hashCode() {
            return (31 * ((31 * 1) + (this.key == null ? 0 : this.key.hashCode()))) + (this.taskId == null ? 0 : this.taskId.hashCode());
        }

        public boolean equals(Object obj) {
            if (obj == null || !(obj instanceof IntermediateKey)) {
                return false;
            }
            IntermediateKey intermediateKey = (IntermediateKey) obj;
            if (this.key == null) {
                if (intermediateKey.key != null) {
                    return false;
                }
            } else if (!this.key.equals(intermediateKey.key)) {
                return false;
            }
            return this.taskId == null ? intermediateKey.taskId == null : this.taskId.equals(intermediateKey.taskId);
        }

        public String toString() {
            return "IntermediateCompositeKey [taskId=" + this.taskId + ", key=" + this.key + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$IntermediateKeyFilter.class */
    public static final class IntermediateKeyFilter<T> implements KeyFilter<IntermediateKey<T>> {
        private final String taskId;
        private final boolean acceptAll;

        public IntermediateKeyFilter(String str, boolean z) {
            if (str == null || str.isEmpty()) {
                throw new IllegalArgumentException("Invalid task Id " + str);
            }
            this.taskId = str;
            this.acceptAll = z;
        }

        @Override // org.infinispan.filter.KeyFilter
        public boolean accept(IntermediateKey<T> intermediateKey) {
            if (this.acceptAll) {
                return true;
            }
            if (intermediateKey != null) {
                return this.taskId.equals(intermediateKey.getTaskId());
            }
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$MapCombineTask.class */
    public final class MapCombineTask<K, V, KOut, VOut> extends DataContainerTask<K, V> implements AdvancedCacheLoader.CacheLoaderTask<K, V> {
        private final MapCombineCommand<K, V, KOut, VOut> mcc;
        private final Set<KOut> intermediateKeys;
        private final int queueLimit;
        private final BlockingQueue<DefaultCollector<K, V, KOut, VOut>> queue;

        public MapCombineTask(MapCombineCommand<K, V, KOut, VOut> mapCombineCommand, int i) throws Exception {
            super();
            this.queueLimit = Runtime.getRuntime().availableProcessors() * 2;
            this.queue = new ArrayBlockingQueue(this.queueLimit + 1);
            this.mcc = mapCombineCommand;
            this.intermediateKeys = Collections.synchronizedSet(new HashSet());
            for (int i2 = 0; i2 < this.queueLimit; i2++) {
                this.queue.put(new DefaultCollector<>(mapCombineCommand, i));
            }
        }

        public void apply(K k, InternalCacheEntry<K, V> internalCacheEntry) {
            V value = getValue(internalCacheEntry);
            if (value != null) {
                try {
                    executeMapWithCollector(k, value);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }

        @Override // org.infinispan.persistence.spi.AdvancedCacheLoader.CacheLoaderTask
        public void processEntry(MarshalledEntry<K, V> marshalledEntry, AdvancedCacheLoader.TaskContext taskContext) throws InterruptedException {
            executeMapWithCollector(marshalledEntry.getKey(), getValue(marshalledEntry));
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v9, types: [java.lang.Object] */
        /* JADX WARN: Type inference failed for: r4v0 */
        @Override // org.infinispan.distexec.mapreduce.MapReduceManagerImpl.DataContainerTask
        protected V getValue(InternalCacheEntry<K, V> internalCacheEntry) {
            if (internalCacheEntry == null) {
                return null;
            }
            ?? value = internalCacheEntry.getValue();
            boolean z = value instanceof MarshalledValue;
            V v = value;
            if (z) {
                v = ((MarshalledValue) value).get();
            }
            return v;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Set<KOut> getMigratedIntermediateKeys() {
            return this.intermediateKeys;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Map<KOut, List<VOut>> collectedValues() {
            DefaultCollector defaultCollector = new DefaultCollector(this.mcc, Integer.MAX_VALUE);
            Iterator it = this.queue.iterator();
            while (it.hasNext()) {
                DefaultCollector defaultCollector2 = (DefaultCollector) it.next();
                if (!defaultCollector2.isEmpty()) {
                    defaultCollector.emit(defaultCollector2.collectedValues());
                    defaultCollector2.reset();
                }
            }
            MapReduceManagerImpl.this.combine(this.mcc, defaultCollector);
            return defaultCollector.collectedValues();
        }

        private void executeMapWithCollector(K k, V v) throws InterruptedException {
            DefaultCollector<K, V, KOut, VOut> defaultCollector = null;
            try {
                defaultCollector = this.queue.take();
                this.mcc.getMapper().map(k, v, defaultCollector);
                migrate(defaultCollector);
                this.queue.put(defaultCollector);
            } catch (Throwable th) {
                this.queue.put(defaultCollector);
                throw th;
            }
        }

        private void migrate(DefaultCollector<K, V, KOut, VOut> defaultCollector) {
            if (defaultCollector.isOverflown()) {
                this.intermediateKeys.addAll(MapReduceManagerImpl.this.migrateIntermediateKeysAndValues(this.mcc, defaultCollector.collectedValues()));
                defaultCollector.reset();
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private V getValue(MarshalledEntry<K, V> marshalledEntry) {
            V value = marshalledEntry.getValue();
            return value instanceof MarshalledValue ? (V) ((MarshalledValue) value).get() : value;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ void apply(Object obj, Object obj2) {
            apply((MapCombineTask<K, V, KOut, VOut>) obj, (InternalCacheEntry<MapCombineTask<K, V, KOut, VOut>, V>) obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/distexec/mapreduce/MapReduceManagerImpl$SynchronizedCollector.class */
    public final class SynchronizedCollector<KOut, VOut> implements CollectableCollector<KOut, VOut> {
        private CollectableCollector<KOut, VOut> delegate;

        public SynchronizedCollector(CollectableCollector<KOut, VOut> collectableCollector) {
            this.delegate = collectableCollector;
        }

        @Override // org.infinispan.distexec.mapreduce.Collector
        public synchronized void emit(KOut kout, VOut vout) {
            this.delegate.emit(kout, vout);
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceManagerImpl.CollectableCollector
        public synchronized void emitReduced(KOut kout, VOut vout) {
            this.delegate.emitReduced(kout, vout);
        }

        @Override // org.infinispan.distexec.mapreduce.MapReduceManagerImpl.CollectableCollector
        public synchronized Map<KOut, List<VOut>> collectedValues() {
            return this.delegate.collectedValues();
        }
    }

    @Inject
    public void init(EmbeddedCacheManager embeddedCacheManager, PersistenceManager persistenceManager, @ComponentName("org.infinispan.executors.transport") ExecutorService executorService, ClusteringDependentLogic clusteringDependentLogic, TimeService timeService, Configuration configuration) {
        this.cacheManager = embeddedCacheManager;
        this.persistenceManager = persistenceManager;
        this.cdl = clusteringDependentLogic;
        this.executorService = executorService;
        this.timeService = timeService;
        this.chunkSize = configuration.clustering().stateTransfer().chunkSize();
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <KIn, VIn, KOut, VOut> Map<KOut, List<VOut>> mapAndCombineForLocalReduction(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand) throws InterruptedException {
        CollectableCollector<KOut, VOut> map = map(mapCombineCommand);
        combine(mapCombineCommand, map);
        return map.collectedValues();
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <KIn, VIn, KOut, VOut> Set<KOut> mapAndCombineForDistributedReduction(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand) throws InterruptedException {
        try {
            return mapAndCombine(mapCombineCommand);
        } catch (Exception e) {
            throw new CacheException(e);
        }
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <KOut, VOut> Map<KOut, VOut> reduce(ReduceCommand<KOut, VOut> reduceCommand) throws InterruptedException {
        ConcurrentMap makeConcurrentMap = CollectionFactory.makeConcurrentMap(256);
        reduce(reduceCommand, makeConcurrentMap);
        return makeConcurrentMap;
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <KOut, VOut> void reduce(ReduceCommand<KOut, VOut> reduceCommand, String str) throws InterruptedException {
        reduce((ReduceCommand) reduceCommand, (Map) this.cacheManager.m297getCache(str));
    }

    protected <KOut, VOut> void reduce(ReduceCommand<KOut, VOut> reduceCommand, final Map<KOut, VOut> map) throws InterruptedException {
        Set<KOut> keys = reduceCommand.getKeys();
        final String taskId = reduceCommand.getTaskId();
        if (keys == null || keys.isEmpty()) {
            throw new IllegalStateException("Reduce phase of MapReduceTask " + taskId + " on node " + this.cdl.getAddress() + " executed with empty input keys");
        }
        final Reducer<KOut, VOut> reducer = reduceCommand.getReducer();
        boolean isUseIntermediateSharedCache = reduceCommand.isUseIntermediateSharedCache();
        MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
        log.tracef("For m/r task %s invoking %s at %s", taskId, reduceCommand, this.cdl.getAddress());
        long time = log.isTraceEnabled() ? this.timeService.time() : 0L;
        try {
            Cache<?, ?> cache = this.cacheManager.m297getCache(reduceCommand.getCacheName());
            mapReduceTaskLifecycleService.onPreExecute(reducer, cache);
            cache.getAdvancedCache().getDataContainer().executeTask(new IntermediateKeyFilter(taskId, !isUseIntermediateSharedCache), new DataContainerTask<IntermediateKey<KOut>, List<VOut>>() { // from class: org.infinispan.distexec.mapreduce.MapReduceManagerImpl.1
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super();
                }

                /* JADX WARN: Multi-variable type inference failed */
                public void apply(IntermediateKey<KOut> intermediateKey, InternalCacheEntry<IntermediateKey<KOut>, List<VOut>> internalCacheEntry) {
                    KOut key = intermediateKey.getKey();
                    List<VOut> value = getValue(internalCacheEntry);
                    if (value == null) {
                        throw new IllegalStateException("Found invalid value in intermediate cache, for key " + key + " during reduce phase execution on " + MapReduceManagerImpl.this.cacheManager.getAddress() + " for M/R task " + taskId);
                    }
                    Object reduce = reducer.reduce(key, value.iterator());
                    map.put(key, reduce);
                    MapReduceManagerImpl.log.tracef("For m/r task %s reduced %s to %s at %s ", new Object[]{taskId, key, reduce, MapReduceManagerImpl.this.cdl.getAddress()});
                }
            });
            if (log.isTraceEnabled()) {
                log.tracef("Reduce for task %s took %s milliseconds", reduceCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
            }
            mapReduceTaskLifecycleService.onPostExecute(reducer);
        } catch (Throwable th) {
            if (log.isTraceEnabled()) {
                log.tracef("Reduce for task %s took %s milliseconds", reduceCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
            }
            mapReduceTaskLifecycleService.onPostExecute(reducer);
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected <KIn, VIn, KOut, VOut> CollectableCollector<KOut, VOut> map(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand) throws InterruptedException {
        Cache cache = this.cacheManager.m297getCache(mapCombineCommand.getCacheName());
        Set<KIn> keys = mapCombineCommand.getKeys();
        int maxCollectorSize = mapCombineCommand.getMaxCollectorSize();
        final Mapper<KIn, VIn, KOut, VOut> mapper = mapCombineCommand.getMapper();
        boolean z = (keys == null || keys.isEmpty()) ? false : true;
        MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
        final SynchronizedCollector synchronizedCollector = new SynchronizedCollector(new DefaultCollector(mapCombineCommand, maxCollectorSize));
        DataContainer dataContainer = cache.getAdvancedCache().getDataContainer();
        log.tracef("For m/r task %s invoking %s with input keys %s", mapCombineCommand.getTaskId(), mapCombineCommand, keys);
        long time = log.isTraceEnabled() ? this.timeService.time() : 0L;
        try {
            mapReduceTaskLifecycleService.onPreExecute(mapper, cache);
            if (z) {
                for (KIn kin : keys) {
                    Object obj = cache.get(kin);
                    if (obj != null) {
                        mapper.map(kin, obj, synchronizedCollector);
                    }
                }
            } else {
                dataContainer.executeTask(new PrimaryOwnerFilter(this.cdl), new DataContainerTask<KIn, VIn>() { // from class: org.infinispan.distexec.mapreduce.MapReduceManagerImpl.2
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super();
                    }

                    /* JADX WARN: Multi-variable type inference failed */
                    public void apply(KIn kin2, InternalCacheEntry<KIn, VIn> internalCacheEntry) {
                        VIn value = getValue(internalCacheEntry);
                        if (value != null) {
                            mapper.map(kin2, value, synchronizedCollector);
                        }
                    }

                    /* JADX WARN: Multi-variable type inference failed */
                    public /* bridge */ /* synthetic */ void apply(Object obj2, Object obj3) {
                        apply((AnonymousClass2<KIn, VIn>) obj2, (InternalCacheEntry<AnonymousClass2<KIn, VIn>, VIn>) obj3);
                    }
                });
            }
            if (this.persistenceManager != null && !z) {
                this.persistenceManager.processOnAllStores(new CompositeKeyFilter(new PrimaryOwnerFilter(this.cdl), new CollectionKeyFilter(dataContainer.keySet())), new MapReduceCacheLoaderTask(mapper, synchronizedCollector), true, false);
            }
            if (log.isTraceEnabled()) {
                log.tracef("Map phase for task %s took %s milliseconds", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
            }
            mapReduceTaskLifecycleService.onPostExecute(mapper);
            return synchronizedCollector;
        } catch (Throwable th) {
            if (log.isTraceEnabled()) {
                log.tracef("Map phase for task %s took %s milliseconds", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
            }
            mapReduceTaskLifecycleService.onPostExecute(mapper);
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected <KIn, VIn, KOut, VOut> Set<KOut> mapAndCombine(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand) throws Exception {
        Cache cache = this.cacheManager.m297getCache(mapCombineCommand.getCacheName());
        Set<KIn> keys = mapCombineCommand.getKeys();
        int maxCollectorSize = mapCombineCommand.getMaxCollectorSize();
        Mapper<KIn, VIn, KOut, VOut> mapper = mapCombineCommand.getMapper();
        boolean z = (keys == null || keys.isEmpty()) ? false : true;
        MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
        DataContainer dataContainer = cache.getAdvancedCache().getDataContainer();
        log.tracef("For m/r task %s invoking %s with input keys %s", mapCombineCommand.getTaskId(), mapCombineCommand, mapCombineCommand.getKeys());
        long time = log.isTraceEnabled() ? this.timeService.time() : 0L;
        HashSet hashSet = new HashSet();
        try {
            mapReduceTaskLifecycleService.onPreExecute(mapper, cache);
            if (z) {
                DefaultCollector defaultCollector = new DefaultCollector(mapCombineCommand, maxCollectorSize);
                for (KIn kin : keys) {
                    Object obj = cache.get(kin);
                    if (obj != null) {
                        mapper.map(kin, obj, defaultCollector);
                    }
                }
                combine(mapCombineCommand, defaultCollector);
                hashSet.addAll(migrateIntermediateKeysAndValues(mapCombineCommand, defaultCollector.collectedValues()));
            } else {
                MapCombineTask mapCombineTask = new MapCombineTask(mapCombineCommand, maxCollectorSize);
                dataContainer.executeTask(new PrimaryOwnerFilter(this.cdl), mapCombineTask);
                hashSet.addAll(mapCombineTask.getMigratedIntermediateKeys());
                hashSet.addAll(migrateIntermediateKeysAndValues(mapCombineCommand, mapCombineTask.collectedValues()));
            }
            if (this.persistenceManager != null && !z) {
                CompositeKeyFilter compositeKeyFilter = new CompositeKeyFilter(new PrimaryOwnerFilter(this.cdl), new CollectionKeyFilter(dataContainer.keySet()));
                MapCombineTask mapCombineTask2 = new MapCombineTask(mapCombineCommand, maxCollectorSize);
                this.persistenceManager.processOnAllStores(compositeKeyFilter, mapCombineTask2, true, false);
                hashSet.addAll(mapCombineTask2.getMigratedIntermediateKeys());
                hashSet.addAll(migrateIntermediateKeysAndValues(mapCombineCommand, mapCombineTask2.collectedValues()));
            }
            if (log.isTraceEnabled()) {
                log.tracef("Map phase for task %s took %s milliseconds", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
            }
            mapReduceTaskLifecycleService.onPostExecute(mapper);
            return hashSet;
        } catch (Throwable th) {
            if (log.isTraceEnabled()) {
                log.tracef("Map phase for task %s took %s milliseconds", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
            }
            mapReduceTaskLifecycleService.onPostExecute(mapper);
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected <KIn, VIn, KOut, VOut> void combine(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand, CollectableCollector<KOut, VOut> collectableCollector) {
        if (mapCombineCommand.hasCombiner()) {
            Reducer<KOut, VOut> combiner = mapCombineCommand.getCombiner();
            Cache cache = this.cacheManager.m297getCache(mapCombineCommand.getCacheName());
            log.tracef("For m/r task %s invoking combiner %s at %s", mapCombineCommand.getTaskId(), mapCombineCommand, this.cdl.getAddress());
            MapReduceTaskLifecycleService mapReduceTaskLifecycleService = MapReduceTaskLifecycleService.getInstance();
            long time = log.isTraceEnabled() ? this.timeService.time() : 0L;
            try {
                mapReduceTaskLifecycleService.onPreExecute(combiner, (Cache<?, ?>) cache);
                for (Map.Entry entry : collectableCollector.collectedValues().entrySet()) {
                    List list = (List) entry.getValue();
                    if (list.size() > 1) {
                        collectableCollector.emitReduced(entry.getKey(), combiner.reduce(entry.getKey(), list.iterator()));
                    }
                }
                if (log.isTraceEnabled()) {
                    log.tracef("Combine for task %s took %s milliseconds", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
                }
                mapReduceTaskLifecycleService.onPostExecute(combiner);
            } catch (Throwable th) {
                if (log.isTraceEnabled()) {
                    log.tracef("Combine for task %s took %s milliseconds", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)));
                }
                mapReduceTaskLifecycleService.onPostExecute(combiner);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public <KIn, VIn, KOut, VOut> Set<KOut> migrateIntermediateKeysAndValues(MapCombineCommand<KIn, VIn, KOut, VOut> mapCombineCommand, Map<KOut, List<VOut>> map) {
        String taskId = mapCombineCommand.getTaskId();
        String intermediateCacheName = mapCombineCommand.getIntermediateCacheName();
        Cache cache = this.cacheManager.m297getCache(intermediateCacheName);
        if (cache == null) {
            throw new IllegalStateException("Temporary cache for MapReduceTask " + taskId + " named " + intermediateCacheName + " not found on " + this.cdl.getAddress());
        }
        HashSet hashSet = new HashSet();
        Map mapKeysToNodes = mapKeysToNodes(cache.getAdvancedCache().getDistributionManager(), taskId, map.keySet());
        long time = log.isTraceEnabled() ? this.timeService.time() : 0L;
        AdvancedCache withFlags = cache.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES);
        try {
            for (Map.Entry entry : mapKeysToNodes.entrySet()) {
                List list = (List) entry.getValue();
                try {
                    log.tracef("For m/r task %s migrating intermediate keys %s to %s", taskId, list, entry.getKey());
                    for (Object obj : list) {
                        List<VOut> list2 = map.get(obj);
                        int i = this.chunkSize;
                        for (int i2 = 0; i2 < list2.size(); i2 += i) {
                            withFlags.put(new IntermediateKey(taskId, obj), new DeltaList(list2.subList(i2, Math.min(list2.size(), i2 + i))));
                        }
                        hashSet.add(obj);
                    }
                } catch (Exception e) {
                    throw new CacheException("Could not move intermediate keys/values for M/R task " + taskId, e);
                }
            }
            if (log.isTraceEnabled()) {
                log.tracef("Migrating keys for task %s took %s milliseconds (Migrated %s keys)", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)), Integer.valueOf(hashSet.size()));
            }
            return hashSet;
        } catch (Throwable th) {
            if (log.isTraceEnabled()) {
                log.tracef("Migrating keys for task %s took %s milliseconds (Migrated %s keys)", mapCombineCommand.getTaskId(), Long.valueOf(this.timeService.timeDuration(time, TimeUnit.MILLISECONDS)), Integer.valueOf(hashSet.size()));
            }
            throw th;
        }
    }

    @Override // org.infinispan.distexec.mapreduce.MapReduceManager
    public <T> Map<Address, List<T>> mapKeysToNodes(DistributionManager distributionManager, String str, Collection<T> collection) {
        HashMap hashMap = new HashMap();
        for (T t : collection) {
            Address primaryLocation = distributionManager.getPrimaryLocation(new IntermediateKey(str, t));
            List list = (List) hashMap.get(primaryLocation);
            if (list == null) {
                list = new ArrayList();
                hashMap.put(primaryLocation, list);
            }
            list.add(t);
        }
        return hashMap;
    }

    protected <KIn> Set<KIn> filterLocalPrimaryOwner(Set<KIn> set, DistributionManager distributionManager) {
        HashSet hashSet = new HashSet();
        for (KIn kin : set) {
            Address primaryLocation = distributionManager != null ? distributionManager.getPrimaryLocation(kin) : this.cdl.getAddress();
            if (primaryLocation != null && primaryLocation.equals(this.cdl.getAddress())) {
                hashSet.add(kin);
            }
        }
        return hashSet;
    }
}
