package org.apache.iotdb.db.pipe.task.subtask.connector;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.concurrent.ExecutorService;
import javax.validation.constraints.NotNull;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.db.pipe.metric.PipeConnectorMetrics;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.task.subtask.DecoratingLock;
import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.class */
public class PipeConnectorSubtask extends PipeSubtask {
    private final BoundedBlockingPendingQueue<Event> inputPendingQueue;
    private final PipeConnector outputPipeConnector;
    protected final DecoratingLock callbackDecoratingLock;
    protected ExecutorService subtaskCallbackListeningExecutor;
    protected volatile boolean isSubmitted;
    private final String attributeSortedString;
    private final int connectorIndex;
    private long lastHeartbeatEventInjectTime;
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
    private static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT = new PipeHeartbeatEvent("cron", false);
    private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_SECONDS = PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();

    public PipeConnectorSubtask(String str, long j, String str2, int i, BoundedBlockingPendingQueue<Event> boundedBlockingPendingQueue, PipeConnector pipeConnector) {
        super(str, j);
        this.callbackDecoratingLock = new DecoratingLock();
        this.isSubmitted = false;
        this.lastHeartbeatEventInjectTime = System.currentTimeMillis();
        this.attributeSortedString = str2;
        this.connectorIndex = i;
        this.inputPendingQueue = boundedBlockingPendingQueue;
        this.outputPipeConnector = pipeConnector;
        PipeConnectorMetrics.getInstance().register(this);
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask
    public void bindExecutors(ListeningExecutorService listeningExecutorService, ExecutorService executorService, PipeSubtaskScheduler pipeSubtaskScheduler) {
        this.subtaskWorkerThreadPoolExecutor = listeningExecutorService;
        this.subtaskCallbackListeningExecutor = executorService;
        this.subtaskScheduler = pipeSubtaskScheduler;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask, java.util.concurrent.Callable
    public Boolean call() throws Exception {
        boolean booleanValue = super.call().booleanValue();
        this.callbackDecoratingLock.waitForDecorated();
        return Boolean.valueOf(booleanValue);
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask
    protected boolean executeOnce() {
        if (this.isClosed.get()) {
            return false;
        }
        Event waitedPoll = this.lastEvent != null ? this.lastEvent : this.inputPendingQueue.waitedPoll();
        setLastEvent(waitedPoll);
        try {
            if (waitedPoll == null) {
                if (System.currentTimeMillis() - this.lastHeartbeatEventInjectTime <= CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_SECONDS) {
                    return false;
                }
                transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
                return false;
            }
            if (waitedPoll instanceof TabletInsertionEvent) {
                this.outputPipeConnector.transfer((TabletInsertionEvent) waitedPoll);
                PipeConnectorMetrics.getInstance().markTabletEvent(this.taskID);
            } else if (waitedPoll instanceof TsFileInsertionEvent) {
                this.outputPipeConnector.transfer((TsFileInsertionEvent) waitedPoll);
                PipeConnectorMetrics.getInstance().markTsFileEvent(this.taskID);
            } else if (waitedPoll instanceof PipeHeartbeatEvent) {
                transferHeartbeatEvent((PipeHeartbeatEvent) waitedPoll);
            } else {
                this.outputPipeConnector.transfer(waitedPoll);
            }
            releaseLastEvent(true);
            return true;
        } catch (PipeConnectionException e) {
            if (!this.isClosed.get()) {
                throw e;
            }
            LOGGER.info("PipeConnectionException in pipe transfer, ignored because pipe is dropped.");
            releaseLastEvent(false);
            return true;
        } catch (Exception e2) {
            if (!this.isClosed.get()) {
                throw new PipeException("Error occurred during executing PipeConnector#transfer, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.", e2);
            }
            LOGGER.info("Exception in pipe transfer, ignored because pipe is dropped.");
            releaseLastEvent(false);
            return true;
        }
    }

    private void transferHeartbeatEvent(PipeHeartbeatEvent pipeHeartbeatEvent) {
        try {
            this.outputPipeConnector.heartbeat();
            this.outputPipeConnector.transfer(pipeHeartbeatEvent);
            this.lastHeartbeatEventInjectTime = System.currentTimeMillis();
            pipeHeartbeatEvent.onTransferred();
            PipeConnectorMetrics.getInstance().markPipeHeartbeatEvent(this.taskID);
        } catch (Exception e) {
            throw new PipeConnectionException("PipeConnector: " + this.outputPipeConnector.getClass().getName() + " heartbeat failed, or encountered failure when transferring generic event.", e);
        }
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask
    public synchronized void onSuccess(Boolean bool) {
        this.isSubmitted = false;
        super.onSuccess(bool);
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask
    public synchronized void onFailure(@NotNull Throwable th) {
        this.isSubmitted = false;
        if (this.isClosed.get()) {
            LOGGER.info("onFailure in pipe transfer, ignored because pipe is dropped.");
            releaseLastEvent(false);
            return;
        }
        if (th instanceof PipeConnectionException) {
            LOGGER.warn("PipeConnectionException occurred, retrying to connect to the target system...", th);
            int i = 0;
            while (i < 5) {
                try {
                    this.outputPipeConnector.handshake();
                    LOGGER.info("Successfully reconnected to the target system.");
                    break;
                } catch (Exception e) {
                    i++;
                    LOGGER.warn("Failed to reconnect to the target system, retrying ... after [{}/{}] time(s) retries.", new Object[]{Integer.valueOf(i), 5, e});
                    try {
                        Thread.sleep(i * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
                    } catch (InterruptedException e2) {
                        LOGGER.info("Interrupted while sleeping, perhaps need to check whether the thread is interrupted.", e2);
                        Thread.currentThread().interrupt();
                    }
                }
            }
            if (i == 5) {
                if (!(this.lastEvent instanceof EnrichedEvent)) {
                    LOGGER.error("Failed to reconnect to the target system after {} times, stopping current pipe task {} locally... Status shown when query the pipe will be 'RUNNING' instead of 'STOPPED', but the task is actually stopped. Please restart the task by executing 'START PIPE' manually if needed.", new Object[]{5, this.taskID, th});
                    return;
                } else {
                    LOGGER.warn("Failed to reconnect to the target system after {} times, stopping current pipe task {}... Status shown when query the pipe will be 'STOPPED'. Please restart the task by executing 'START PIPE' manually if needed.", new Object[]{5, this.taskID, th});
                    ((EnrichedEvent) this.lastEvent).reportException(new PipeRuntimeConnectorCriticalException(th.getMessage() + ", root cause: " + ErrorHandlingUtils.getRootCause(th).getMessage()));
                    return;
                }
            }
        } else {
            LOGGER.warn("A non-PipeConnectionException occurred, exception message: {}", th.getMessage(), th);
        }
        super.onFailure(new PipeRuntimeConnectorCriticalException(th.getMessage()));
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask
    public synchronized void submitSelf() {
        if (this.shouldStopSubmittingSelf.get() || this.isSubmitted) {
            return;
        }
        this.callbackDecoratingLock.markAsDecorating();
        try {
            Futures.addCallback(this.subtaskWorkerThreadPoolExecutor.submit(this), this, this.subtaskCallbackListeningExecutor);
            this.isSubmitted = true;
        } finally {
            this.callbackDecoratingLock.markAsDecorated();
        }
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask, java.lang.AutoCloseable
    public void close() {
        PipeConnectorMetrics.getInstance().deregister(this.taskID);
        this.isClosed.set(true);
        try {
            this.outputPipeConnector.close();
        } catch (Exception e) {
            LOGGER.info("Error occurred during closing PipeConnector, perhaps need to check whether the implementation of PipeConnector is correct according to the pipe-api description.", e);
        } finally {
            this.inputPendingQueue.forEach(event -> {
                if (event instanceof EnrichedEvent) {
                    ((EnrichedEvent) event).clearReferenceCount(PipeEventCollector.class.getName());
                }
            });
            this.inputPendingQueue.clear();
            super.close();
        }
    }

    public String getAttributeSortedString() {
        return this.attributeSortedString;
    }

    public int getConnectorIndex() {
        return this.connectorIndex;
    }

    public Integer getTsFileInsertionEventCount() {
        return Integer.valueOf(this.inputPendingQueue.getTsFileInsertionEventCount());
    }

    public Integer getTabletInsertionEventCount() {
        return Integer.valueOf(this.inputPendingQueue.getTabletInsertionEventCount());
    }

    public Integer getPipeHeartbeatEventCount() {
        return Integer.valueOf(this.inputPendingQueue.getPipeHeartbeatEventCount());
    }
}
