package org.voltdb.dr2.snapshot;

import com.google_voltpatches.common.collect.ImmutableMap;
import com.google_voltpatches.common.collect.ImmutableSet;
import com.google_voltpatches.common.collect.UnmodifiableIterator;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons_voltpatches.cli.HelpFormatter;
import org.voltcore.logging.VoltLogger;
import org.voltcore.utils.CoreUtils;
import org.voltdb.DRLogSegmentId;
import org.voltdb.ExtensibleSnapshotDigestData;
import org.voltdb.VoltDB;
import org.voltdb.catalog.CatalogMap;
import org.voltdb.catalog.DatabaseConfiguration;
import org.voltdb.catalog.Table;
import org.voltdb.compiler.deploymentfile.DrRoleType;
import org.voltdb.dr2.DRPartitionStream;
import org.voltdb.dr2.DRPartitionStreamReader;
import org.voltdb.dr2.DRProducer;
import org.voltdb.dr2.ProducerDRState;
import org.voltdb.dr2.StreamingDRPartitionStream;
import org.voltdb.dr2.snapshot.SyncSnapshotResultCollector;
import org.voltdb.sysprocs.saverestore.TableSaveFile;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/voltdb/dr2/snapshot/SyncSnapshotStreamer.class */
public class SyncSnapshotStreamer {
    private static final VoltLogger log = new VoltLogger(DatabaseConfiguration.DR_MODE_NAME);
    private final ProducerDRState m_stateMachine;
    private final ImmutableSet<Integer> m_partitionIds;
    private final AtomicBoolean m_dontWant;
    private final byte m_consumerClusterId;
    private final int m_consumerClusterPartitionCount;
    private final ExecutorService m_es = new ThreadPoolExecutor(4, 4, 10, TimeUnit.SECONDS, new LinkedBlockingDeque(), CoreUtils.getThreadFactory("Sync Snapshot streaming thread pool", "Sync Snapshot streamer"));
    private final Map<Byte, DRProducer.ClusterStreamReaders> m_snapshotClusterStreamReaders = new TreeMap();
    private final Map<Integer, DRPartitionStream> m_partitionStreams = new HashMap();
    private final Queue<File> m_snapshotFiles = new ArrayDeque();
    private final Queue<SyncSnapshotStreamReader> m_readersToStream = new ConcurrentLinkedQueue();
    private volatile ImmutableMap<Integer, DRLogSegmentId> m_partitionSnapshotIds = null;
    private volatile byte[] m_mergedTrackerBytes = null;
    private volatile boolean m_initialized = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SyncSnapshotStreamer(ProducerDRState producerDRState, ImmutableSet<Integer> immutableSet, AtomicBoolean atomicBoolean, byte b, int i, int i2) {
        this.m_stateMachine = producerDRState;
        this.m_partitionIds = immutableSet;
        this.m_dontWant = atomicBoolean;
        this.m_consumerClusterId = b;
        this.m_consumerClusterPartitionCount = i;
        CatalogMap<Table> catalogMap = VoltDB.instance().getCatalogContext().tables;
        UnmodifiableIterator<Integer> it = this.m_partitionIds.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            this.m_partitionStreams.put(Integer.valueOf(intValue), new StreamingDRPartitionStream(intValue, this.m_stateMachine, catalogMap, this.m_dontWant, i2));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initialize(SyncSnapshotResultCollector.SyncSnapshotResult syncSnapshotResult) throws Exception {
        log.info("Beginning conversion of sync snapshot files");
        this.m_partitionSnapshotIds = syncSnapshotResult.partitionSnapshotIdsMap;
        this.m_mergedTrackerBytes = ExtensibleSnapshotDigestData.serializeSiteConsumerDrIdTrackersToJSON(syncSnapshotResult.mergedTrackers).toString().getBytes("UTF-8");
        long j = -1;
        StringBuilder sb = new StringBuilder();
        Arrays.sort(syncSnapshotResult.snapshotFiles);
        for (File file : syncSnapshotResult.snapshotFiles) {
            sb.append(HelpFormatter.DEFAULT_LONG_OPT_SEPARATOR).append(file.getAbsolutePath());
            FileInputStream fileInputStream = new FileInputStream(file);
            try {
                TableSaveFile tableSaveFile = new TableSaveFile(fileInputStream, 0, null);
                if (!tableSaveFile.getCompleted()) {
                    throw new Exception("Snapshot file " + file.getPath() + " was not completed");
                }
                if (j == -1) {
                    j = tableSaveFile.getTxnId();
                } else if (j != tableSaveFile.getTxnId()) {
                    throw new IOException("Snapshot transaction ID don't match in snapshot files when doing a snapshot sync: " + j + " != " + tableSaveFile.getTxnId());
                }
                this.m_snapshotFiles.add(file);
                fileInputStream.close();
            } catch (Throwable th) {
                fileInputStream.close();
                throw th;
            }
        }
        this.m_initialized = true;
        stream();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ImmutableMap<Integer, DRLogSegmentId> getPartitionSnapshotIds() {
        return this.m_partitionSnapshotIds;
    }

    private void stream() {
        if (this.m_initialized) {
            while (!this.m_readersToStream.isEmpty()) {
                final SyncSnapshotStreamReader poll = this.m_readersToStream.poll();
                poll.initialize(this.m_snapshotFiles, this.m_partitionSnapshotIds.get(Integer.valueOf(poll.getSubscribeId().partitionId)), this.m_mergedTrackerBytes);
                this.m_es.submit(new Runnable() { // from class: org.voltdb.dr2.snapshot.SyncSnapshotStreamer.1
                    @Override // java.lang.Runnable
                    public void run() {
                        if (poll.getSubscribeId().partitionId == 0 && DrRoleType.XDCR.value().equals(VoltDB.instance().getCatalogContext().cluster.getDrrole())) {
                            VoltDB.instance().getConsumerDRGateway().populateEmptyTrackersIfNeeded(SyncSnapshotStreamer.this.m_consumerClusterId, SyncSnapshotStreamer.this.m_consumerClusterPartitionCount);
                        }
                        poll.startStreaming();
                    }
                });
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getRowsConverted(byte b) {
        long j = 0;
        DRProducer.ClusterStreamReaders clusterStreamReaders = this.m_snapshotClusterStreamReaders.get(Byte.valueOf(b));
        if (clusterStreamReaders != null) {
            Iterator<DRPartitionStreamReader> it = clusterStreamReaders.readers.values().iterator();
            while (it.hasNext()) {
                j += ((StreamingDRPartitionStreamReader) it.next()).getRowsConverted();
            }
        }
        return j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        this.m_readersToStream.clear();
        this.m_snapshotClusterStreamReaders.clear();
        this.m_partitionStreams.clear();
        this.m_es.shutdown();
        try {
            this.m_es.awaitTermination(365L, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            log.warn("Unexpected interrupt waiting for SyncSnapshotStreamer executor termination", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DRProducer.ClusterStreamReaders registerCluster(byte b, long j) {
        DRProducer.ClusterStreamReaders clusterStreamReaders = new DRProducer.ClusterStreamReaders(j, true);
        UnmodifiableIterator<Integer> it = this.m_partitionIds.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            clusterStreamReaders.readers.put(Integer.valueOf(intValue), StreamingDRPartitionStream.createDummyStreamingPartitionStreamReader(b, j, intValue));
        }
        this.m_snapshotClusterStreamReaders.put(Byte.valueOf(b), clusterStreamReaders);
        return clusterStreamReaders;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean deregisterCluster(byte b) {
        DRProducer.ClusterStreamReaders remove = this.m_snapshotClusterStreamReaders.remove(Byte.valueOf(b));
        if (remove == null) {
            return false;
        }
        remove.readers.values().forEach((v0) -> {
            v0.delete();
        });
        remove.readers.clear();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyOfClusterSyncSnapshotCompletion(byte b) {
        this.m_snapshotClusterStreamReaders.remove(Byte.valueOf(b));
        Iterator<DRPartitionStream> it = this.m_partitionStreams.values().iterator();
        while (it.hasNext()) {
            it.next().removeDRStreamReader(b);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean canDiscardSyncSnapshot() {
        if (!this.m_snapshotClusterStreamReaders.isEmpty()) {
            return false;
        }
        Iterator<DRPartitionStream> it = this.m_partitionStreams.values().iterator();
        while (it.hasNext()) {
            if (!it.next().hasNoDRStreamReader()) {
                return false;
            }
        }
        this.m_partitionStreams.clear();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void streamPartition(byte b, long j, int i) throws IOException {
        DRPartitionStream dRPartitionStream = this.m_partitionStreams.get(Integer.valueOf(i));
        if (dRPartitionStream != null) {
            DRPartitionStreamReader addDRStreamReader = dRPartitionStream.addDRStreamReader(b, j);
            this.m_snapshotClusterStreamReaders.get(Byte.valueOf(b)).readers.put(Integer.valueOf(i), addDRStreamReader);
            this.m_readersToStream.offer((SyncSnapshotStreamReader) addDRStreamReader);
            stream();
        }
    }
}
