package org.apache.hadoop.ozone.om.ratis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.function.SupplierWithIOException;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.ratis.OMTransactionInfo;
import org.apache.hadoop.ozone.om.ratis.helpers.DoubleBufferEntry;
import org.apache.hadoop.ozone.om.ratis.metrics.OzoneManagerDoubleBufferMetrics;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.ExitUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.class */
public final class OzoneManagerDoubleBuffer {
    private static final Logger LOG = LoggerFactory.getLogger(OzoneManagerDoubleBuffer.class);
    private Queue<DoubleBufferEntry<OMClientResponse>> currentBuffer;
    private Queue<DoubleBufferEntry<OMClientResponse>> readyBuffer;
    private volatile Queue<CompletableFuture<Void>> currentFutureQueue;
    private volatile Queue<CompletableFuture<Void>> readyFutureQueue;
    private Daemon daemon;
    private final OMMetadataManager omMetadataManager;
    private final AtomicLong flushedTransactionCount;
    private final AtomicLong flushIterations;
    private final AtomicBoolean isRunning;
    private OzoneManagerDoubleBufferMetrics ozoneManagerDoubleBufferMetrics;
    private long maxFlushedTransactionsInOneIteration;
    private final OzoneManagerRatisSnapshot ozoneManagerRatisSnapShot;
    private final boolean isRatisEnabled;
    private final boolean isTracingEnabled;
    private Function<Long, Long> indexToTerm;

    /* loaded from: input_file:org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer$Builder.class */
    public static class Builder {
        private OMMetadataManager mm;
        private OzoneManagerRatisSnapshot rs;
        private boolean isRatisEnabled = false;
        private boolean isTracingEnabled = false;
        private Function<Long, Long> indexToTerm = null;

        public Builder setOmMetadataManager(OMMetadataManager oMMetadataManager) {
            this.mm = oMMetadataManager;
            return this;
        }

        public Builder setOzoneManagerRatisSnapShot(OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot) {
            this.rs = ozoneManagerRatisSnapshot;
            return this;
        }

        public Builder enableRatis(boolean z) {
            this.isRatisEnabled = z;
            return this;
        }

        public Builder enableTracing(boolean z) {
            this.isTracingEnabled = z;
            return this;
        }

        public Builder setIndexToTerm(Function<Long, Long> function) {
            this.indexToTerm = function;
            return this;
        }

        public OzoneManagerDoubleBuffer build() {
            if (this.isRatisEnabled) {
                Preconditions.checkNotNull(this.rs, "When ratis is enabled, OzoneManagerRatisSnapshot should not be null");
                Preconditions.checkNotNull(this.indexToTerm, "When ratis is enabled indexToTerm should not be null");
            }
            return new OzoneManagerDoubleBuffer(this.mm, this.rs, this.isRatisEnabled, this.isTracingEnabled, this.indexToTerm);
        }
    }

    private OzoneManagerDoubleBuffer(OMMetadataManager oMMetadataManager, OzoneManagerRatisSnapshot ozoneManagerRatisSnapshot, boolean z, boolean z2, Function<Long, Long> function) {
        this.flushedTransactionCount = new AtomicLong(0L);
        this.flushIterations = new AtomicLong(0L);
        this.isRunning = new AtomicBoolean(false);
        this.currentBuffer = new ConcurrentLinkedQueue();
        this.readyBuffer = new ConcurrentLinkedQueue();
        this.isRatisEnabled = z;
        this.isTracingEnabled = z2;
        if (z) {
            this.currentFutureQueue = null;
            this.readyFutureQueue = null;
        } else {
            this.currentFutureQueue = new ConcurrentLinkedQueue();
            this.readyFutureQueue = new ConcurrentLinkedQueue();
        }
        this.omMetadataManager = oMMetadataManager;
        this.ozoneManagerRatisSnapShot = ozoneManagerRatisSnapshot;
        this.ozoneManagerDoubleBufferMetrics = OzoneManagerDoubleBufferMetrics.create();
        this.indexToTerm = function;
        this.isRunning.set(true);
        this.daemon = new Daemon(this::flushTransactions);
        this.daemon.setName("OMDoubleBufferFlushThread");
        this.daemon.start();
    }

    private Void addToBatchWithTrace(OzoneManagerProtocolProtos.OMResponse oMResponse, SupplierWithIOException<Void> supplierWithIOException) throws IOException {
        return !this.isTracingEnabled ? (Void) supplierWithIOException.get() : (Void) TracingUtil.executeAsChildSpan("DB-addToWriteBatch-" + oMResponse.getCmdType().toString(), oMResponse.getTraceID(), supplierWithIOException);
    }

    private Void flushBatchWithTrace(String str, int i, SupplierWithIOException<Void> supplierWithIOException) throws IOException {
        return !this.isTracingEnabled ? (Void) supplierWithIOException.get() : (Void) TracingUtil.executeAsChildSpan("DB-commitWriteBatch-Size-" + i, str, supplierWithIOException);
    }

    private Void addToBatchTransactionInfoWithTrace(String str, long j, SupplierWithIOException<Void> supplierWithIOException) throws IOException {
        return !this.isTracingEnabled ? (Void) supplierWithIOException.get() : (Void) TracingUtil.executeAsChildSpan("DB-addWriteBatch-transactioninfo-" + j, str, supplierWithIOException);
    }

    private void flushTransactions() {
        while (this.isRunning.get()) {
            try {
                if (canFlush()) {
                    HashMap hashMap = new HashMap();
                    setReadyBuffer();
                    BatchOperation initBatchOperation = this.omMetadataManager.getStore().initBatchOperation();
                    Throwable th = null;
                    try {
                        try {
                            AtomicReference atomicReference = new AtomicReference();
                            this.readyBuffer.iterator().forEachRemaining(doubleBufferEntry -> {
                                try {
                                    OzoneManagerProtocolProtos.OMResponse oMResponse = doubleBufferEntry.getResponse().getOMResponse();
                                    atomicReference.set(oMResponse.getTraceID());
                                    addToBatchWithTrace(oMResponse, () -> {
                                        doubleBufferEntry.getResponse().checkAndUpdateDB(this.omMetadataManager, initBatchOperation);
                                        return null;
                                    });
                                    addCleanupEntry(doubleBufferEntry, hashMap);
                                } catch (IOException e) {
                                    terminate(e);
                                }
                            });
                            List<Long> list = (List) this.readyBuffer.stream().map((v0) -> {
                                return v0.getTrxLogIndex();
                            }).sorted().collect(Collectors.toList());
                            long longValue = list.get(list.size() - 1).longValue();
                            long longValue2 = this.isRatisEnabled ? this.indexToTerm.apply(Long.valueOf(longValue)).longValue() : -1L;
                            addToBatchTransactionInfoWithTrace((String) atomicReference.get(), longValue, () -> {
                                this.omMetadataManager.getTransactionInfoTable().putWithBatch(initBatchOperation, "#TRANSACTIONINFO", new OMTransactionInfo.Builder().setTransactionIndex(longValue).setCurrentTerm(longValue2).build());
                                return null;
                            });
                            long monotonicNow = Time.monotonicNow();
                            flushBatchWithTrace((String) atomicReference.get(), this.readyBuffer.size(), () -> {
                                this.omMetadataManager.getStore().commitBatchOperation(initBatchOperation);
                                return null;
                            });
                            this.ozoneManagerDoubleBufferMetrics.updateFlushTime(Time.monotonicNow() - monotonicNow);
                            if (initBatchOperation != null) {
                                if (0 != 0) {
                                    try {
                                        initBatchOperation.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    initBatchOperation.close();
                                }
                            }
                            if (!this.isRatisEnabled) {
                                this.readyFutureQueue.iterator().forEachRemaining(completableFuture -> {
                                    completableFuture.complete(null);
                                });
                                this.readyFutureQueue.clear();
                            }
                            int size = this.readyBuffer.size();
                            this.flushedTransactionCount.addAndGet(size);
                            this.flushIterations.incrementAndGet();
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Sync Iteration {} flushed transactions in this iteration{}", Long.valueOf(this.flushIterations.get()), Integer.valueOf(size));
                            }
                            if (!this.isRatisEnabled) {
                                list = (List) this.readyBuffer.stream().map((v0) -> {
                                    return v0.getTrxLogIndex();
                                }).sorted().collect(Collectors.toList());
                            }
                            cleanupCache(hashMap);
                            this.readyBuffer.clear();
                            this.ozoneManagerRatisSnapShot.updateLastAppliedIndex(list);
                            updateMetrics(size);
                        } catch (Throwable th3) {
                            th = th3;
                            throw th3;
                            break;
                        }
                    } catch (Throwable th4) {
                        if (initBatchOperation != null) {
                            if (th != null) {
                                try {
                                    initBatchOperation.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                initBatchOperation.close();
                            }
                        }
                        throw th4;
                        break;
                    }
                }
            } catch (IOException e) {
                terminate(e);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                if (this.isRunning.get()) {
                    ExitUtils.terminate(1, "OMDoubleBuffer flush thread " + Thread.currentThread().getName() + " encountered Interrupted exception while running", e2, LOG);
                } else {
                    LOG.info("OMDoubleBuffer flush thread {} is interrupted and will exit. {}", Thread.currentThread().getName(), Thread.currentThread().getName());
                }
            } catch (Throwable th6) {
                ExitUtils.terminate(2, "OMDoubleBuffer flush thread" + Thread.currentThread().getName() + "encountered Throwable error", th6, LOG);
            }
        }
    }

    private void addCleanupEntry(DoubleBufferEntry doubleBufferEntry, Map<String, List<Long>> map) {
        Class<?> cls = doubleBufferEntry.getResponse().getClass();
        CleanupTableInfo cleanupTableInfo = (CleanupTableInfo) cls.getAnnotation(CleanupTableInfo.class);
        if (cleanupTableInfo == null) {
            throw new RuntimeException("CleanupTableInfo Annotation is missing for" + cls);
        }
        for (String str : cleanupTableInfo.cleanupTables()) {
            map.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            }).add(Long.valueOf(doubleBufferEntry.getTrxLogIndex()));
        }
    }

    private void cleanupCache(Map<String, List<Long>> map) {
        map.forEach((str, list) -> {
            Collections.sort(list);
            this.omMetadataManager.getTable(str).cleanupCache(list);
        });
    }

    private void updateMetrics(long j) {
        this.ozoneManagerDoubleBufferMetrics.incrTotalNumOfFlushOperations();
        this.ozoneManagerDoubleBufferMetrics.incrTotalSizeOfFlushedTransactions(j);
        this.ozoneManagerDoubleBufferMetrics.setAvgFlushTransactionsInOneIteration(((float) this.ozoneManagerDoubleBufferMetrics.getTotalNumOfFlushedTransactions()) / ((float) this.ozoneManagerDoubleBufferMetrics.getTotalNumOfFlushOperations()));
        if (this.maxFlushedTransactionsInOneIteration < j) {
            this.maxFlushedTransactionsInOneIteration = j;
            this.ozoneManagerDoubleBufferMetrics.setMaxNumberOfTransactionsFlushedInOneIteration(j);
        }
    }

    public void stop() {
        if (!this.isRunning.compareAndSet(true, false)) {
            LOG.info("OMDoubleBuffer flush thread is not running.");
            return;
        }
        LOG.info("Stopping OMDoubleBuffer flush thread");
        this.daemon.interrupt();
        try {
            this.daemon.join();
        } catch (InterruptedException e) {
            LOG.debug("Interrupted while waiting for daemon to exit.", e);
        }
        this.ozoneManagerDoubleBufferMetrics.unRegister();
    }

    private void terminate(IOException iOException) {
        ExitUtils.terminate(1, "During flush to DB encountered error in OMDoubleBuffer flush thread " + Thread.currentThread().getName(), iOException, LOG);
    }

    public long getFlushedTransactionCount() {
        return this.flushedTransactionCount.get();
    }

    public long getFlushIterations() {
        return this.flushIterations.get();
    }

    public synchronized CompletableFuture<Void> add(OMClientResponse oMClientResponse, long j) {
        this.currentBuffer.add(new DoubleBufferEntry<>(j, oMClientResponse));
        notify();
        if (this.isRatisEnabled) {
            return null;
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.currentFutureQueue.add(completableFuture);
        return completableFuture;
    }

    private synchronized boolean canFlush() throws InterruptedException {
        while (this.currentBuffer.size() == 0) {
            wait(Long.MAX_VALUE);
        }
        return true;
    }

    private synchronized void setReadyBuffer() {
        Queue<DoubleBufferEntry<OMClientResponse>> queue = this.currentBuffer;
        this.currentBuffer = this.readyBuffer;
        this.readyBuffer = queue;
        if (this.isRatisEnabled) {
            return;
        }
        Queue<CompletableFuture<Void>> queue2 = this.currentFutureQueue;
        this.currentFutureQueue = this.readyFutureQueue;
        this.readyFutureQueue = queue2;
    }

    @VisibleForTesting
    public OzoneManagerDoubleBufferMetrics getOzoneManagerDoubleBufferMetrics() {
        return this.ozoneManagerDoubleBufferMetrics;
    }
}
