package org.apache.iotdb.db.pipe.task.subtask.processor;

import com.google.common.util.concurrent.ListeningExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.execution.scheduler.PipeSubtaskScheduler;
import org.apache.iotdb.db.pipe.task.connection.EventSupplier;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.pipe.task.subtask.PipeSubtask;
import org.apache.iotdb.pipe.api.PipeProcessor;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.class */
public class PipeProcessorSubtask extends PipeSubtask {
    private static final Logger LOGGER = LoggerFactory.getLogger(PipeProcessorSubtask.class);
    private static final AtomicReference<PipeProcessorSubtaskWorkerManager> subtaskWorkerManager = new AtomicReference<>();
    private final EventSupplier inputEventSupplier;
    private final PipeProcessor pipeProcessor;
    private final PipeEventCollector outputEventCollector;
    private final AtomicBoolean isClosed;

    public PipeProcessorSubtask(String str, EventSupplier eventSupplier, PipeProcessor pipeProcessor, PipeEventCollector pipeEventCollector) {
        super(str);
        this.inputEventSupplier = eventSupplier;
        this.pipeProcessor = pipeProcessor;
        this.outputEventCollector = pipeEventCollector;
        this.isClosed = new AtomicBoolean(false);
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask
    public void bindExecutors(ListeningExecutorService listeningExecutorService, ExecutorService executorService, PipeSubtaskScheduler pipeSubtaskScheduler) {
        this.subtaskWorkerThreadPoolExecutor = listeningExecutorService;
        this.subtaskScheduler = pipeSubtaskScheduler;
        if (subtaskWorkerManager.get() == null) {
            synchronized (PipeProcessorSubtaskWorkerManager.class) {
                if (subtaskWorkerManager.get() == null) {
                    subtaskWorkerManager.set(new PipeProcessorSubtaskWorkerManager(listeningExecutorService));
                }
            }
        }
        subtaskWorkerManager.get().schedule(this);
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask
    protected synchronized boolean executeOnce() throws Exception {
        Event supply = this.lastEvent != null ? this.lastEvent : this.inputEventSupplier.supply();
        this.lastEvent = supply;
        if (supply == null) {
            return this.outputEventCollector.tryCollectBufferedEvents();
        }
        try {
            if (!this.isClosed.get()) {
                if (supply instanceof TabletInsertionEvent) {
                    this.pipeProcessor.process((TabletInsertionEvent) supply, this.outputEventCollector);
                } else if (supply instanceof TsFileInsertionEvent) {
                    this.pipeProcessor.process((TsFileInsertionEvent) supply, this.outputEventCollector);
                } else if (supply instanceof PipeHeartbeatEvent) {
                    this.pipeProcessor.process(supply, this.outputEventCollector);
                    ((PipeHeartbeatEvent) supply).onProcessed();
                } else {
                    this.pipeProcessor.process(supply, this.outputEventCollector);
                }
            }
            releaseLastEvent(true);
            return true;
        } catch (Exception e) {
            throw new PipeException("Error occurred during executing PipeProcessor#process, perhaps need to check whether the implementation of PipeProcessor is correct according to the pipe-api description.", e);
        }
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask
    public void submitSelf() {
    }

    @Override // org.apache.iotdb.db.pipe.task.subtask.PipeSubtask, java.lang.AutoCloseable
    public synchronized void close() {
        try {
            this.isClosed.set(true);
            this.pipeProcessor.close();
        } catch (Exception e) {
            LOGGER.info("Error occurred during closing PipeProcessor, perhaps need to check whether the implementation of PipeProcessor is correct according to the pipe-api description.", e);
        } finally {
            this.outputEventCollector.close();
            super.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isClosed() {
        return this.isClosed.get();
    }

    public boolean equals(Object obj) {
        return (obj instanceof PipeProcessorSubtask) && this.taskID.equals(((PipeProcessorSubtask) obj).taskID);
    }

    public int hashCode() {
        return this.taskID.hashCode();
    }
}
