package org.apache.nifi.processors.kafka;

import java.io.Closeable;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;

/* loaded from: input_file:org/apache/nifi/processors/kafka/AbstractKafkaProcessor.class */
abstract class AbstractKafkaProcessor<T extends Closeable> extends AbstractSessionFactoryProcessor {
    private volatile boolean acceptTask = true;
    private final AtomicInteger taskCounter = new AtomicInteger();
    volatile T kafkaResource;

    public final void onTrigger(ProcessContext processContext, ProcessSessionFactory processSessionFactory) throws ProcessException {
        if (!this.acceptTask) {
            processContext.yield();
            return;
        }
        this.taskCounter.incrementAndGet();
        ProcessSession createSession = processSessionFactory.createSession();
        try {
            try {
                synchronized (this) {
                    if (this.kafkaResource == null) {
                        this.kafkaResource = buildKafkaResource(processContext, createSession);
                    }
                }
                boolean rendezvousWithKafka = rendezvousWithKafka(processContext, createSession);
                createSession.commit();
                if (rendezvousWithKafka) {
                    postCommit(processContext);
                } else {
                    processContext.yield();
                }
                synchronized (this) {
                    if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) {
                        close();
                        this.acceptTask = true;
                    }
                }
            } catch (Throwable th) {
                this.acceptTask = false;
                createSession.rollback(true);
                getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, th});
                synchronized (this) {
                    if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) {
                        close();
                        this.acceptTask = true;
                    }
                }
            }
        } catch (Throwable th2) {
            synchronized (this) {
                if (this.taskCounter.decrementAndGet() == 0 && !this.acceptTask) {
                    close();
                    this.acceptTask = true;
                }
                throw th2;
            }
        }
    }

    @OnStopped
    public void close() {
        if (this.taskCounter.get() == 0) {
            try {
                if (this.kafkaResource != null) {
                    try {
                        this.kafkaResource.close();
                    } catch (Exception e) {
                        getLogger().warn("Failed while closing " + this.kafkaResource, e);
                    }
                }
            } finally {
                this.kafkaResource = null;
            }
        }
    }

    protected void postCommit(ProcessContext processContext) {
    }

    protected abstract boolean rendezvousWithKafka(ProcessContext processContext, ProcessSession processSession);

    protected abstract T buildKafkaResource(ProcessContext processContext, ProcessSession processSession) throws ProcessException;
}
