package org.apache.iotdb.db.pipe.task.subtask.connector;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.concurrent.ExecutorService;
import javax.validation.constraints.NotNull;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.commons.pipe.task.DecoratingLock;
import org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBThriftAsyncConnector;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.metric.PipeConnectorMetrics;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.task.subtask.PipeDataNodeSubtask;
import org.apache.iotdb.db.utils.ErrorHandlingUtils;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.class */
public class PipeConnectorSubtask extends PipeDataNodeSubtask {
    private final BoundedBlockingPendingQueue<Event> inputPendingQueue;
    private final PipeConnector outputPipeConnector;
    protected final DecoratingLock callbackDecoratingLock;
    protected ExecutorService subtaskCallbackListeningExecutor;
    protected volatile boolean isSubmitted;
    private final String attributeSortedString;
    private final int connectorIndex;
    private long lastHeartbeatEventInjectTime;
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeConnectorSubtask.class);
    private static final PipeHeartbeatEvent CRON_HEARTBEAT_EVENT = new PipeHeartbeatEvent("cron", false);
    private static final long CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_SECONDS = PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds();

    public PipeConnectorSubtask(String str, long j, String str2, int i, BoundedBlockingPendingQueue<Event> boundedBlockingPendingQueue, PipeConnector pipeConnector) {
        super(str, j);
        this.callbackDecoratingLock = new DecoratingLock();
        this.isSubmitted = false;
        this.lastHeartbeatEventInjectTime = System.currentTimeMillis();
        this.attributeSortedString = str2;
        this.connectorIndex = i;
        this.inputPendingQueue = boundedBlockingPendingQueue;
        this.outputPipeConnector = pipeConnector;
        PipeConnectorMetrics.getInstance().register(this);
    }

    public void bindExecutors(ListeningExecutorService listeningExecutorService, ExecutorService executorService, PipeSubtaskScheduler pipeSubtaskScheduler) {
        this.subtaskWorkerThreadPoolExecutor = listeningExecutorService;
        this.subtaskCallbackListeningExecutor = executorService;
        this.subtaskScheduler = pipeSubtaskScheduler;
    }

    /* renamed from: call, reason: merged with bridge method [inline-methods] */
    public Boolean m131call() throws Exception {
        boolean booleanValue = super.call().booleanValue();
        this.callbackDecoratingLock.waitForDecorated();
        return Boolean.valueOf(booleanValue);
    }

    protected boolean executeOnce() {
        if (this.isClosed.get()) {
            return false;
        }
        Event waitedPoll = this.lastEvent != null ? this.lastEvent : this.inputPendingQueue.waitedPoll();
        setLastEvent(waitedPoll);
        try {
            if (waitedPoll == null) {
                if (System.currentTimeMillis() - this.lastHeartbeatEventInjectTime <= CRON_HEARTBEAT_EVENT_INJECT_INTERVAL_SECONDS) {
                    return false;
                }
                transferHeartbeatEvent(CRON_HEARTBEAT_EVENT);
                return false;
            }
            if (waitedPoll instanceof TabletInsertionEvent) {
                this.outputPipeConnector.transfer((TabletInsertionEvent) waitedPoll);
                PipeConnectorMetrics.getInstance().markTabletEvent(this.taskID);
            } else if (waitedPoll instanceof TsFileInsertionEvent) {
                this.outputPipeConnector.transfer((TsFileInsertionEvent) waitedPoll);
                PipeConnectorMetrics.getInstance().markTsFileEvent(this.taskID);
            } else if (waitedPoll instanceof PipeHeartbeatEvent) {
                transferHeartbeatEvent((PipeHeartbeatEvent) waitedPoll);
            } else {
                this.outputPipeConnector.transfer(waitedPoll);
            }
            releaseLastEvent(true);
            return true;
        } catch (Exception e) {
            if (!this.isClosed.get()) {
                throw new PipeException(String.format("Exception in pipe transfer, subtask: %s, last event: %s, root cause: %s", this.taskID, this.lastEvent, ErrorHandlingUtils.getRootCause(e).getMessage()), e);
            }
            LOGGER.info("Exception in pipe transfer, ignored because pipe is dropped.", e);
            releaseLastEvent(false);
            return true;
        } catch (PipeConnectionException e2) {
            if (!this.isClosed.get()) {
                throw e2;
            }
            LOGGER.info("PipeConnectionException in pipe transfer, ignored because pipe is dropped.", e2);
            releaseLastEvent(false);
            return true;
        }
    }

    private void transferHeartbeatEvent(PipeHeartbeatEvent pipeHeartbeatEvent) {
        try {
            this.outputPipeConnector.heartbeat();
            this.outputPipeConnector.transfer(pipeHeartbeatEvent);
            this.lastHeartbeatEventInjectTime = System.currentTimeMillis();
            pipeHeartbeatEvent.onTransferred();
            PipeConnectorMetrics.getInstance().markPipeHeartbeatEvent(this.taskID);
        } catch (Exception e) {
            throw new PipeConnectionException("PipeConnector: " + this.outputPipeConnector.getClass().getName() + " heartbeat failed, or encountered failure when transferring generic event.", e);
        }
    }

    public synchronized void onSuccess(Boolean bool) {
        this.isSubmitted = false;
        super.onSuccess(bool);
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeDataNodeSubtask
    public synchronized void onFailure(@NotNull Throwable th) {
        this.isSubmitted = false;
        if (this.isClosed.get()) {
            LOGGER.info("onFailure in pipe transfer, ignored because pipe is dropped.", th);
            releaseLastEvent(false);
        } else {
            if ((th instanceof PipeConnectionException) && onPipeConnectionException(th)) {
                return;
            }
            super.onFailure(new PipeRuntimeConnectorCriticalException(th.getMessage()));
        }
    }

    private boolean onPipeConnectionException(Throwable th) {
        LOGGER.warn("PipeConnectionException occurred, {} retries to handshake with the target system.", this.outputPipeConnector.getClass().getName(), th);
        int i = 0;
        while (i < 5) {
            try {
                this.outputPipeConnector.handshake();
                LOGGER.info("{} handshakes with the target system successfully.", this.outputPipeConnector.getClass().getName());
                break;
            } catch (Exception e) {
                i++;
                LOGGER.warn("{} failed to handshake with the target system for {} times, will retry at most {} times.", new Object[]{this.outputPipeConnector.getClass().getName(), Integer.valueOf(i), 5, e});
                try {
                    Thread.sleep(i * PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
                } catch (InterruptedException e2) {
                    LOGGER.info("Interrupted while sleeping, will retry to handshake with the target system.", e2);
                    Thread.currentThread().interrupt();
                }
            }
        }
        if (i != 5 || !(this.lastEvent instanceof EnrichedEvent)) {
            return false;
        }
        ((EnrichedEvent) this.lastEvent).reportException(new PipeRuntimeConnectorCriticalException(th.getMessage() + ", root cause: " + ErrorHandlingUtils.getRootCause(th).getMessage()));
        LOGGER.warn("{} failed to handshake with the target system after {} times, stopping current subtask {} (creation time: {}, simple class: {}). Status shown when query the pipe will be 'STOPPED'. Please restart the task by executing 'START PIPE' manually if needed.", new Object[]{this.outputPipeConnector.getClass().getName(), 5, this.taskID, Long.valueOf(this.creationTime), getClass().getSimpleName(), th});
        return true;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public synchronized void submitSelf() {
        if (this.shouldStopSubmittingSelf.get() || this.isSubmitted) {
            return;
        }
        this.callbackDecoratingLock.markAsDecorating();
        try {
            Futures.addCallback(this.subtaskWorkerThreadPoolExecutor.submit(this), this, this.subtaskCallbackListeningExecutor);
            this.isSubmitted = true;
        } finally {
            this.callbackDecoratingLock.markAsDecorated();
        }
    }

    public void close() {
        PipeConnectorMetrics.getInstance().deregister(this.taskID);
        this.isClosed.set(true);
        try {
            this.outputPipeConnector.close();
        } catch (Exception e) {
            LOGGER.info("Exception occurred when closing pipe connector subtask {}, root cause: {}", new Object[]{this.taskID, ErrorHandlingUtils.getRootCause(e).getMessage(), e});
        } finally {
            this.inputPendingQueue.forEach(event -> {
                if (event instanceof EnrichedEvent) {
                    ((EnrichedEvent) event).clearReferenceCount(PipeEventCollector.class.getName());
                }
            });
            this.inputPendingQueue.clear();
            super.close();
        }
    }

    public void discardEventsOfPipe(String str) {
        if (this.outputPipeConnector instanceof IoTDBThriftAsyncConnector) {
            this.outputPipeConnector.discardEventsOfPipe(str);
        }
    }

    public String getAttributeSortedString() {
        return this.attributeSortedString;
    }

    public int getConnectorIndex() {
        return this.connectorIndex;
    }

    public int getTsFileInsertionEventCount() {
        return this.inputPendingQueue.getTsFileInsertionEventCount();
    }

    public int getTabletInsertionEventCount() {
        return this.inputPendingQueue.getTabletInsertionEventCount();
    }

    public int getPipeHeartbeatEventCount() {
        return this.inputPendingQueue.getPipeHeartbeatEventCount();
    }

    public int getAsyncConnectorRetryEventQueueSize() {
        if (this.outputPipeConnector instanceof IoTDBThriftAsyncConnector) {
            return this.outputPipeConnector.getRetryEventQueueSize();
        }
        return 0;
    }
}
