package org.apache.iotdb.db.pipe.task.subtask.connector;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.concurrent.ExecutorService;
import javax.validation.constraints.NotNull;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.DecoratingLock;
import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.class */
public class PipeConnectorSubtask extends PipeSubtask {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
    private final BoundedBlockingPendingQueue<Event> inputPendingQueue;
    private final PipeConnector outputPipeConnector;
    private static final int HEARTBEAT_CHECK_INTERVAL = 1000;
    private int executeOnceInvokedTimes;
    protected final DecoratingLock callbackDecoratingLock;
    protected ExecutorService subtaskCallbackListeningExecutor;

    public PipeConnectorSubtask(String str, BoundedBlockingPendingQueue<Event> boundedBlockingPendingQueue, PipeConnector pipeConnector) {
        super(str);
        this.callbackDecoratingLock = new DecoratingLock();
        this.inputPendingQueue = boundedBlockingPendingQueue;
        this.outputPipeConnector = pipeConnector;
        this.executeOnceInvokedTimes = 0;
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask
    public void bindExecutors(ListeningExecutorService listeningExecutorService, ExecutorService executorService, PipeSubtaskScheduler pipeSubtaskScheduler) {
        this.subtaskWorkerThreadPoolExecutor = listeningExecutorService;
        this.subtaskCallbackListeningExecutor = executorService;
        this.subtaskScheduler = pipeSubtaskScheduler;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask, java.util.concurrent.Callable
    public Boolean call() throws Exception {
        boolean booleanValue = super.call().booleanValue();
        this.callbackDecoratingLock.waitForDecorated();
        return Boolean.valueOf(booleanValue);
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask
    protected synchronized boolean executeOnce() {
        try {
            int i = this.executeOnceInvokedTimes;
            this.executeOnceInvokedTimes = i + 1;
            if (i % HEARTBEAT_CHECK_INTERVAL == 0) {
                this.outputPipeConnector.heartbeat();
            }
            Event waitedPoll = this.lastEvent != null ? this.lastEvent : this.inputPendingQueue.waitedPoll();
            this.lastEvent = waitedPoll;
            if (waitedPoll == null) {
                return false;
            }
            try {
                if (waitedPoll instanceof TabletInsertionEvent) {
                    this.outputPipeConnector.transfer((TabletInsertionEvent) waitedPoll);
                } else if (waitedPoll instanceof TsFileInsertionEvent) {
                    this.outputPipeConnector.transfer((TsFileInsertionEvent) waitedPoll);
                } else {
                    this.outputPipeConnector.transfer(waitedPoll);
                }
                releaseLastEvent();
                return true;
            } catch (PipeConnectionException e) {
                throw e;
            } catch (Exception e2) {
                throw new PipeException("Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.", e2);
            }
        } catch (Exception e3) {
            throw new PipeConnectionException("PipeConnector: failed to connect to the target system.", e3);
        }
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask
    public void onFailure(@NotNull Throwable th) {
        if (th instanceof PipeConnectionException) {
            LOGGER.warn("PipeConnectionException occurred, retrying to connect to the target system...", th);
            int i = 0;
            while (i < 5) {
                try {
                    this.outputPipeConnector.handshake();
                    LOGGER.info("Successfully reconnected to the target system.");
                    break;
                } catch (Exception e) {
                    i++;
                    LOGGER.warn("Failed to reconnect to the target system, retrying ... after [{}/{}] time(s) retries.", new Object[]{Integer.valueOf(i), 5, e});
                    try {
                        Thread.sleep(i * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
                    } catch (InterruptedException e2) {
                        LOGGER.info("Interrupted while sleeping, perhaps need to check whether the thread is interrupted.", e2);
                        Thread.currentThread().interrupt();
                    }
                }
            }
            if (i == 5) {
                if (!(this.lastEvent instanceof EnrichedEvent)) {
                    LOGGER.error("Failed to reconnect to the target system after {} times, stopping current pipe task {} locally... Status shown when query the pipe will be 'RUNNING' instead of 'STOPPED', but the task is actually stopped. Please restart the task by executing 'START PIPE' manually if needed.", new Object[]{5, this.taskID, th});
                    return;
                } else {
                    LOGGER.warn("Failed to reconnect to the target system after {} times, stopping current pipe task {}... Status shown when query the pipe will be 'STOPPED'. Please restart the task by executing 'START PIPE' manually if needed.", new Object[]{5, this.taskID, th});
                    ((EnrichedEvent) this.lastEvent).reportException(new PipeRuntimeConnectorCriticalException(th.getMessage() + ", root cause: " + ErrorHandlingUtils.getRootCause(th).getMessage()));
                    return;
                }
            }
        } else {
            LOGGER.warn("A non-PipeConnectionException occurred, exception message: {}", th.getMessage(), th);
        }
        super.onFailure(new PipeRuntimeConnectorCriticalException(th.getMessage()));
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask
    public void submitSelf() {
        if (this.shouldStopSubmittingSelf.get()) {
            return;
        }
        this.callbackDecoratingLock.markAsDecorating();
        try {
            Futures.addCallback(this.subtaskWorkerThreadPoolExecutor.submit(this), this, this.subtaskCallbackListeningExecutor);
        } finally {
            this.callbackDecoratingLock.markAsDecorated();
        }
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask, java.lang.AutoCloseable
    public synchronized void close() {
        try {
            this.outputPipeConnector.close();
            super.close();
        } catch (Exception e) {
            LOGGER.info("Error occurred during closing PipeConnector, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.", e);
        }
    }
}
