package com.hazelcast.map.writebehind;

import com.hazelcast.cluster.ClusterService;
import com.hazelcast.config.MapStoreConfig;
import com.hazelcast.core.MapStore;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.MapService;
import com.hazelcast.map.RecordStore;
import com.hazelcast.map.writebehind.store.MapStoreManager;
import com.hazelcast.map.writebehind.store.MapStoreManagers;
import com.hazelcast.map.writebehind.store.StoreListener;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.partition.InternalPartitionService;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.util.Clock;
import com.hazelcast.util.executor.ExecutorType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/hazelcast/map/writebehind/WriteBehindQueueManager.class */
public class WriteBehindQueueManager implements WriteBehindManager {
    private static final String EXECUTOR_NAME_PREFIX = "hz:scheduled:mapstore:";
    private static final int EXECUTOR_DEFAULT_QUEUE_CAPACITY = 10000;
    private static final Comparator<DelayedEntry> DELAYED_ENTRY_COMPARATOR = new Comparator<DelayedEntry>() { // from class: com.hazelcast.map.writebehind.WriteBehindQueueManager.1
        @Override // java.util.Comparator
        public int compare(DelayedEntry delayedEntry, DelayedEntry delayedEntry2) {
            long storeTime = delayedEntry.getStoreTime();
            long storeTime2 = delayedEntry2.getStoreTime();
            if (storeTime < storeTime2) {
                return -1;
            }
            return storeTime == storeTime2 ? 0 : 1;
        }
    };
    private final ScheduledExecutorService scheduledExecutor;
    private final StoreProcessor processor;
    private final MapService mapService;
    private final MapStoreManager<DelayedEntry> mapStoreManager;
    private final List<StoreListener> listeners = new ArrayList(2);
    private final ILogger logger;

    /* loaded from: input_file:com/hazelcast/map/writebehind/WriteBehindQueueManager$StoreProcessor.class */
    private static final class StoreProcessor implements Runnable {
        private final String mapName;
        private final MapService mapService;
        private final MapStoreManager mapStoreManager;
        private final long backupRunIntervalTime;
        private long lastRunTime;
        private final int writeBatchSize;

        private StoreProcessor(String str, MapService mapService, MapStoreManager mapStoreManager, MapStoreConfig mapStoreConfig) {
            this.mapName = str;
            this.mapService = mapService;
            this.mapStoreManager = mapStoreManager;
            this.backupRunIntervalTime = getReplicaWaitTime();
            this.lastRunTime = Clock.currentTimeMillis();
            this.writeBatchSize = mapStoreConfig.getWriteBatchSize();
        }

        private long getReplicaWaitTime() {
            return TimeUnit.SECONDS.toMillis(this.mapService.getNodeEngine().getGroupProperties().MAP_REPLICA_SCHEDULED_TASK_DELAY_SECONDS.getInteger());
        }

        @Override // java.lang.Runnable
        public void run() {
            RecordStore recordStoreOrNull;
            MapService mapService = this.mapService;
            long currentTimeMillis = Clock.currentTimeMillis();
            NodeEngine nodeEngine = mapService.getNodeEngine();
            ClusterService clusterService = nodeEngine.getClusterService();
            InternalPartitionService partitionService = nodeEngine.getPartitionService();
            Address thisAddress = clusterService.getThisAddress();
            int partitionCount = partitionService.getPartitionCount();
            Map<Integer, Integer> emptyMap = Collections.emptyMap();
            List<DelayedEntry> emptyList = Collections.emptyList();
            boolean z = true;
            for (int i = 0; i < partitionCount; i++) {
                Address ownerOrNull = partitionService.getPartition(i).getOwnerOrNull();
                if (ownerOrNull != null && (recordStoreOrNull = getRecordStoreOrNull(this.mapName, i)) != null) {
                    WriteBehindQueue<DelayedEntry> writeBehindQueue = recordStoreOrNull.getWriteBehindQueue();
                    List<DelayedEntry> filterItemsLessThanOrEqualToTime = WriteBehindQueueManager.filterItemsLessThanOrEqualToTime(writeBehindQueue, currentTimeMillis);
                    if (!filterItemsLessThanOrEqualToTime.isEmpty()) {
                        if (ownerOrNull.equals(thisAddress)) {
                            if (z) {
                                emptyMap = new HashMap();
                                emptyList = new ArrayList();
                                z = false;
                            }
                            emptyMap.put(Integer.valueOf(i), Integer.valueOf(filterItemsLessThanOrEqualToTime.size()));
                            emptyList.addAll(filterItemsLessThanOrEqualToTime);
                        } else if (currentTimeMillis < this.lastRunTime + this.backupRunIntervalTime) {
                            doInBackup(writeBehindQueue, filterItemsLessThanOrEqualToTime, i);
                        }
                    }
                }
            }
            if (emptyList.isEmpty()) {
                return;
            }
            Collections.sort(emptyList, WriteBehindQueueManager.DELAYED_ENTRY_COMPARATOR);
            if (this.writeBatchSize > 1) {
                doStoreUsingBatchSize(emptyList, emptyMap);
            } else {
                doStore(emptyList, emptyMap);
            }
            this.lastRunTime = currentTimeMillis;
        }

        private void doStoreUsingBatchSize(List<DelayedEntry> list, Map<Integer, Integer> map) {
            int i = 0;
            while (true) {
                int i2 = i;
                i++;
                List<DelayedEntry> batchChunk = getBatchChunk(list, this.writeBatchSize, i2);
                if (batchChunk == null) {
                    return;
                } else {
                    doStore(batchChunk, map);
                }
            }
        }

        private void doStore(List<DelayedEntry> list, Map<Integer, Integer> map) {
            Map<Integer, List<DelayedEntry>> process = this.mapStoreManager.process(list);
            removeProcessedEntries(this.mapName, map);
            addFailsToQueue(this.mapName, process);
        }

        private List<DelayedEntry> getBatchChunk(List<DelayedEntry> list, int i, int i2) {
            if (list == null || list.isEmpty()) {
                return null;
            }
            int i3 = i2 * i;
            int min = Math.min(i3 + i, list.size());
            if (i3 >= min) {
                return null;
            }
            return list.subList(i3, min);
        }

        private void doInBackup(WriteBehindQueue writeBehindQueue, List<DelayedEntry> list, int i) {
            NodeEngine nodeEngine = this.mapService.getNodeEngine();
            ClusterService clusterService = nodeEngine.getClusterService();
            InternalPartitionService partitionService = nodeEngine.getPartitionService();
            Address thisAddress = clusterService.getThisAddress();
            Address ownerOrNull = partitionService.getPartition(i).getOwnerOrNull();
            if (ownerOrNull == null || ownerOrNull.equals(thisAddress)) {
                return;
            }
            this.mapStoreManager.callBeforeStoreListeners(list);
            removeProcessed(writeBehindQueue, list.size());
            this.mapStoreManager.callAfterStoreListeners(list);
        }

        private void removeProcessedEntries(String str, Map<Integer, Integer> map) {
            Iterator<Map.Entry<Integer, Integer>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                Integer key = it.next().getKey();
                RecordStore recordStoreOrNull = getRecordStoreOrNull(str, key.intValue());
                if (recordStoreOrNull != null) {
                    removeProcessed(recordStoreOrNull.getWriteBehindQueue(), map.get(key).intValue());
                }
            }
        }

        private RecordStore getRecordStoreOrNull(String str, int i) {
            return this.mapService.getPartitionContainer(i).getExistingRecordStore(str);
        }

        private void addFailsToQueue(String str, Map<Integer, List<DelayedEntry>> map) {
            RecordStore recordStoreOrNull;
            if (map.isEmpty()) {
                return;
            }
            Iterator<Map.Entry<Integer, List<DelayedEntry>>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                Integer key = it.next().getKey();
                List<DelayedEntry> list = map.get(key);
                if (list != null && !list.isEmpty() && (recordStoreOrNull = getRecordStoreOrNull(str, key.intValue())) != null) {
                    WriteBehindQueue<DelayedEntry> writeBehindQueue = recordStoreOrNull.getWriteBehindQueue();
                    Collections.sort(list, WriteBehindQueueManager.DELAYED_ENTRY_COMPARATOR);
                    writeBehindQueue.addFront(list);
                }
            }
        }

        private void removeProcessed(WriteBehindQueue<DelayedEntry> writeBehindQueue, int i) {
            if (writeBehindQueue == null || writeBehindQueue.size() == 0 || i < 1) {
                return;
            }
            for (int i2 = 0; i2 < i; i2++) {
                writeBehindQueue.removeFirst();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WriteBehindQueueManager(String str, MapService mapService, MapStore mapStore, MapStoreConfig mapStoreConfig) {
        this.scheduledExecutor = getScheduledExecutorService(str, mapService);
        this.mapService = mapService;
        this.logger = mapService.getNodeEngine().getLogger(WriteBehindQueueManager.class);
        this.mapStoreManager = MapStoreManagers.newMapStoreManager(mapService, mapStore, this.listeners);
        this.processor = new StoreProcessor(str, mapService, this.mapStoreManager, mapStoreConfig);
    }

    @Override // com.hazelcast.map.writebehind.WriteBehindManager
    public void start() {
        this.scheduledExecutor.scheduleAtFixedRate(this.processor, 1L, 1L, TimeUnit.SECONDS);
    }

    @Override // com.hazelcast.map.writebehind.WriteBehindManager
    public void stop() {
        this.scheduledExecutor.shutdown();
    }

    @Override // com.hazelcast.map.writebehind.WriteBehindManager
    public void addStoreListener(StoreListener storeListener) {
        this.listeners.add(storeListener);
    }

    @Override // com.hazelcast.map.writebehind.WriteBehindManager
    public Collection<Data> flush(WriteBehindQueue<DelayedEntry> writeBehindQueue) {
        return writeBehindQueue.size() == 0 ? Collections.emptyList() : flush0(writeBehindQueue.removeAll());
    }

    private Collection<Data> flush0(List<DelayedEntry> list) {
        Collections.sort(list, DELAYED_ENTRY_COMPARATOR);
        Map<Integer, List<DelayedEntry>> process = this.mapStoreManager.process(list);
        if (process.size() > 0) {
            printErrorLog(process);
        }
        return getDataKeys(list);
    }

    private List<Data> getDataKeys(List<DelayedEntry> list) {
        if (list == null || list.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<DelayedEntry> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(this.mapService.toData(it.next().getKey()));
        }
        return arrayList;
    }

    @Override // com.hazelcast.map.writebehind.WriteBehindManager
    public ScheduledExecutorService getScheduler() {
        return this.scheduledExecutor;
    }

    private void printErrorLog(Map<Integer, List<DelayedEntry>> map) {
        int i = 0;
        Iterator<List<DelayedEntry>> it = map.values().iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        this.logger.severe(String.format("Map store flush operation can not be done for %d entries", Integer.valueOf(i)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static List<DelayedEntry> filterItemsLessThanOrEqualToTime(WriteBehindQueue<DelayedEntry> writeBehindQueue, long j) {
        if (writeBehindQueue == null || writeBehindQueue.size() == 0) {
            return Collections.emptyList();
        }
        List<DelayedEntry> emptyList = Collections.emptyList();
        int i = 0;
        while (true) {
            DelayedEntry delayedEntry = writeBehindQueue.get(i);
            if (delayedEntry == null) {
                return emptyList;
            }
            if (i == 0) {
                emptyList = new ArrayList();
            }
            if (delayedEntry.getStoreTime() <= j) {
                emptyList.add(delayedEntry);
            }
            i++;
        }
    }

    private ScheduledExecutorService getScheduledExecutorService(String str, MapService mapService) {
        ExecutionService executionService = mapService.getNodeEngine().getExecutionService();
        String str2 = EXECUTOR_NAME_PREFIX + str;
        executionService.register(str2, 1, 10000, ExecutorType.CACHED);
        return executionService.getScheduledExecutor(str2);
    }
}
