package org.apache.iotdb.db.doublelive;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.rpc.BatchExecutionException;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/doublelive/OperationSyncConsumer.class */
public class OperationSyncConsumer implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncConsumer.class);
    private final BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>> operationSyncQueue;
    private final SessionPool operationSyncSessionPool;
    private final OperationSyncLogService dmlLogService;

    public OperationSyncConsumer(BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>> blockingQueue, SessionPool sessionPool, OperationSyncLogService operationSyncLogService) {
        this.operationSyncQueue = blockingQueue;
        this.operationSyncSessionPool = sessionPool;
        this.dmlLogService = operationSyncLogService;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                ByteBuffer byteBuffer = (ByteBuffer) this.operationSyncQueue.take().left;
                boolean z = false;
                if (StorageEngine.isSecondaryAlive().get()) {
                    try {
                        byteBuffer.position(0);
                        z = this.operationSyncSessionPool.operationSyncTransmit(byteBuffer);
                    } catch (Exception e) {
                        LOGGER.error("OperationSyncConsumer can't transmit, discard it", e);
                    } catch (StatementExecutionException e2) {
                        if (e2.getStatusCode() == TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode()) {
                            LOGGER.warn("OperationSyncConsumer can't transmit for STORAGE_GROUP_NOT_READY", e2);
                        } else {
                            LOGGER.warn("OperationSyncConsumer can't transmit for statementExecutionException, discard it", e2);
                        }
                    } catch (IoTDBConnectionException e3) {
                        LOGGER.warn("OperationSyncConsumer can't transmit for connection error", e3);
                    } catch (BatchExecutionException e4) {
                        if (e4.getStatusList().stream().anyMatch(tSStatus -> {
                            return tSStatus.getCode() == TSStatusCode.STORAGE_GROUP_NOT_READY.getStatusCode();
                        })) {
                            LOGGER.warn("OperationSyncConsumer can't transmit for STORAGE_GROUP_NOT_READY", e4);
                        } else {
                            LOGGER.warn("OperationSyncConsumer can't transmit for batchExecutionException, discard it", e4);
                        }
                    }
                }
                if (!z) {
                    try {
                        try {
                            byteBuffer.position(byteBuffer.limit());
                            this.dmlLogService.acquireLogWriter();
                            this.dmlLogService.write(byteBuffer);
                            this.dmlLogService.releaseLogWriter();
                        } catch (IOException e5) {
                            LOGGER.error("OperationSyncConsumer can't serialize physicalPlan", e5);
                            this.dmlLogService.releaseLogWriter();
                        }
                    } catch (Throwable th) {
                        this.dmlLogService.releaseLogWriter();
                        throw th;
                    }
                }
            } catch (InterruptedException e6) {
                LOGGER.error("OperationSyncConsumer been interrupted: ", e6);
            }
        }
    }
}
