package org.apache.nifi.cluster.flow.impl;

import java.util.Collections;
import java.util.Date;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.cluster.flow.ClusterDataFlow;
import org.apache.nifi.cluster.flow.DaoException;
import org.apache.nifi.cluster.flow.DataFlowDao;
import org.apache.nifi.cluster.flow.DataFlowManagementService;
import org.apache.nifi.cluster.flow.PersistedFlowState;
import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl.class */
public class DataFlowManagementServiceImpl implements DataFlowManagementService {
    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class));
    private final DataFlowDao flowDao;
    private final ClusterManagerProtocolSender sender;
    private Timer flowRetriever;
    private final Set<NodeIdentifier> nodeIds = new CopyOnWriteArraySet();
    private final AtomicBoolean stopRequested = new AtomicBoolean(false);
    private final AtomicLong lastRetrievalTime = new AtomicLong(-1);
    private long retrievableAfterTime = 0;
    private AtomicInteger retrievalDelaySeconds = new AtomicInteger(0);
    private final TimingReentrantLock resourceLock = new TimingReentrantLock(new ReentrantLock());

    /* loaded from: input_file:org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl$FlowRetrieverTimerTask.class */
    private class FlowRetrieverTimerTask extends TimerTask {
        private FlowRetrieverTimerTask() {
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            long time;
            DataFlowManagementServiceImpl.this.resourceLock.lock();
            try {
            } catch (Exception e) {
                DataFlowManagementServiceImpl.logger.info("Encountered exception checking if flow is current caused by " + e, e);
            } finally {
                DataFlowManagementServiceImpl.this.resourceLock.unlock("FlowRetrieverTimerTask - isFlowCurrent");
            }
            if (DataFlowManagementServiceImpl.this.isFlowCurrent()) {
                return;
            }
            FlowRequestMessage flowRequestMessage = new FlowRequestMessage();
            for (NodeIdentifier nodeIdentifier : DataFlowManagementServiceImpl.this.getNodeIds()) {
                try {
                    flowRequestMessage.setNodeId(nodeIdentifier);
                    time = new Date().getTime();
                    DataFlowManagementServiceImpl.this.resourceLock.lock();
                    try {
                    } finally {
                        DataFlowManagementServiceImpl.this.resourceLock.unlock("FlowRetrieverTimerTask - check stopRequested");
                    }
                } catch (Throwable th) {
                    DataFlowManagementServiceImpl.logger.info("Encountered exception retrieving flow from node " + nodeIdentifier + " caused by " + th, th);
                }
                if (DataFlowManagementServiceImpl.this.stopRequested.get()) {
                    DataFlowManagementServiceImpl.logger.debug("Stopping runnable prematurely because a request to stop was issued.");
                    return;
                }
                if (time < DataFlowManagementServiceImpl.this.retrievableAfterTime) {
                    DataFlowManagementServiceImpl.this.resourceLock.unlock("FlowRetrieverTimerTask - check stopRequested");
                    return;
                }
                DataFlowManagementServiceImpl.this.resourceLock.unlock("FlowRetrieverTimerTask - check stopRequested");
                FlowResponseMessage requestFlow = DataFlowManagementServiceImpl.this.sender.requestFlow(flowRequestMessage);
                DataFlowManagementServiceImpl.this.resourceLock.lock();
                try {
                    if (time > DataFlowManagementServiceImpl.this.retrievableAfterTime) {
                        DataFlowManagementServiceImpl.logger.info("Saving retrieved flow.");
                        StandardDataFlow dataFlow = requestFlow.getDataFlow();
                        ClusterDataFlow loadDataFlow = DataFlowManagementServiceImpl.this.flowDao.loadDataFlow();
                        DataFlowManagementServiceImpl.this.flowDao.saveDataFlow(loadDataFlow == null ? new ClusterDataFlow(dataFlow, null, new byte[0], new byte[0]) : new ClusterDataFlow(dataFlow, loadDataFlow.getPrimaryNodeId(), loadDataFlow.getControllerServices(), loadDataFlow.getReportingTasks()));
                        DataFlowManagementServiceImpl.this.flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);
                        DataFlowManagementServiceImpl.this.lastRetrievalTime.set(new Date().getTime());
                    }
                    DataFlowManagementServiceImpl.this.resourceLock.unlock("FlowRetrieverTimerTask - saveDataFlow");
                } catch (Throwable th2) {
                    DataFlowManagementServiceImpl.this.resourceLock.unlock("FlowRetrieverTimerTask - saveDataFlow");
                    throw th2;
                }
                DataFlowManagementServiceImpl.logger.info("Encountered exception retrieving flow from node " + nodeIdentifier + " caused by " + th, th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/cluster/flow/impl/DataFlowManagementServiceImpl$TimingReentrantLock.class */
    public static class TimingReentrantLock {
        private final Lock lock;
        private static final Logger logger = LoggerFactory.getLogger("dataFlowManagementService.lock");
        private final ThreadLocal<Long> lockTime = new ThreadLocal<>();

        public TimingReentrantLock(Lock lock) {
            this.lock = lock;
        }

        public void lock() {
            this.lock.lock();
            this.lockTime.set(Long.valueOf(System.nanoTime()));
        }

        public void unlock(String str) {
            long nanoTime = System.nanoTime() - this.lockTime.get().longValue();
            this.lock.unlock();
            long convert = TimeUnit.MILLISECONDS.convert(nanoTime, TimeUnit.NANOSECONDS);
            if (convert > 100) {
                logger.debug("Lock held for {} milliseconds for task: {}", Long.valueOf(convert), str);
            }
        }
    }

    public DataFlowManagementServiceImpl(DataFlowDao dataFlowDao, ClusterManagerProtocolSender clusterManagerProtocolSender) {
        if (dataFlowDao == null) {
            throw new IllegalArgumentException("Flow DAO may not be null.");
        }
        if (clusterManagerProtocolSender == null) {
            throw new IllegalArgumentException("Cluster Manager Protocol Sender may not be null.");
        }
        this.flowDao = dataFlowDao;
        this.sender = clusterManagerProtocolSender;
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public void start() {
        if (isRunning()) {
            throw new IllegalArgumentException("Instance is already running.");
        }
        this.stopRequested.set(false);
        this.flowRetriever = new Timer("Flow Management Service", true);
        this.flowRetriever.schedule(new FlowRetrieverTimerTask(), 0L, 500L);
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public boolean isRunning() {
        return this.flowRetriever != null;
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public void stop() {
        if (!isRunning()) {
            throw new IllegalArgumentException("Instance is already stopped.");
        }
        this.stopRequested.set(true);
        this.flowRetriever.cancel();
        this.flowRetriever = null;
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public ClusterDataFlow loadDataFlow() throws DaoException {
        this.resourceLock.lock();
        try {
            return this.flowDao.loadDataFlow();
        } finally {
            this.resourceLock.unlock("loadDataFlow");
        }
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public void updatePrimaryNode(NodeIdentifier nodeIdentifier) {
        StandardDataFlow dataFlow;
        byte[] controllerServices;
        byte[] reportingTasks;
        this.resourceLock.lock();
        try {
            ClusterDataFlow loadDataFlow = this.flowDao.loadDataFlow();
            if (loadDataFlow == null) {
                dataFlow = null;
                controllerServices = new byte[0];
                reportingTasks = new byte[0];
            } else {
                dataFlow = loadDataFlow.getDataFlow();
                controllerServices = loadDataFlow.getControllerServices();
                reportingTasks = loadDataFlow.getReportingTasks();
            }
            this.flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeIdentifier, controllerServices, reportingTasks));
            this.resourceLock.unlock("updatePrimaryNode");
        } catch (Throwable th) {
            this.resourceLock.unlock("updatePrimaryNode");
            throw th;
        }
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public void updateControllerServices(byte[] bArr) throws DaoException {
        StandardDataFlow dataFlow;
        NodeIdentifier primaryNodeId;
        byte[] reportingTasks;
        this.resourceLock.lock();
        try {
            ClusterDataFlow loadDataFlow = this.flowDao.loadDataFlow();
            if (loadDataFlow == null) {
                dataFlow = null;
                primaryNodeId = null;
                reportingTasks = new byte[0];
            } else {
                dataFlow = loadDataFlow.getDataFlow();
                primaryNodeId = loadDataFlow.getPrimaryNodeId();
                reportingTasks = loadDataFlow.getReportingTasks();
            }
            this.flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, primaryNodeId, bArr, reportingTasks));
            this.resourceLock.unlock("updateControllerServices");
        } catch (Throwable th) {
            this.resourceLock.unlock("updateControllerServices");
            throw th;
        }
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public void updateReportingTasks(byte[] bArr) throws DaoException {
        StandardDataFlow dataFlow;
        NodeIdentifier primaryNodeId;
        byte[] controllerServices;
        this.resourceLock.lock();
        try {
            ClusterDataFlow loadDataFlow = this.flowDao.loadDataFlow();
            if (loadDataFlow == null) {
                dataFlow = null;
                primaryNodeId = null;
                controllerServices = null;
            } else {
                dataFlow = loadDataFlow.getDataFlow();
                primaryNodeId = loadDataFlow.getPrimaryNodeId();
                controllerServices = loadDataFlow.getControllerServices();
            }
            this.flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, primaryNodeId, controllerServices, bArr));
            this.resourceLock.unlock("updateControllerServices");
        } catch (Throwable th) {
            this.resourceLock.unlock("updateControllerServices");
            throw th;
        }
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public PersistedFlowState getPersistedFlowState() {
        this.resourceLock.lock();
        try {
            return this.flowDao.getPersistedFlowState();
        } finally {
            this.resourceLock.unlock("getPersistedFlowState");
        }
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public boolean isFlowCurrent() {
        return PersistedFlowState.CURRENT == getPersistedFlowState();
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public void setPersistedFlowState(PersistedFlowState persistedFlowState) {
        this.resourceLock.lock();
        try {
            this.flowDao.setPersistedFlowState(persistedFlowState);
            if (PersistedFlowState.STALE == persistedFlowState) {
                this.retrievableAfterTime = new Date().getTime() + (getRetrievalDelaySeconds() * 1000);
            } else if (PersistedFlowState.UNKNOWN == persistedFlowState || PersistedFlowState.CURRENT == persistedFlowState) {
                this.retrievableAfterTime = Long.MAX_VALUE;
            }
        } finally {
            this.resourceLock.unlock("setPersistedFlowState");
        }
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public Set<NodeIdentifier> getNodeIds() {
        return Collections.unmodifiableSet(this.nodeIds);
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public void setNodeIds(Set<NodeIdentifier> set) {
        if (set == null) {
            throw new IllegalArgumentException("Node IDs may not be null.");
        }
        this.resourceLock.lock();
        try {
            if (this.nodeIds.equals(set)) {
                return;
            }
            this.nodeIds.clear();
            this.nodeIds.addAll(set);
        } finally {
            this.resourceLock.unlock("setNodeIds");
        }
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public int getRetrievalDelaySeconds() {
        return this.retrievalDelaySeconds.get();
    }

    @Override // org.apache.nifi.cluster.flow.DataFlowManagementService
    public void setRetrievalDelay(String str) {
        this.retrievalDelaySeconds.set((int) FormatUtils.getTimeDuration(str, TimeUnit.SECONDS));
    }

    public ClusterManagerProtocolSender getSender() {
        return this.sender;
    }

    public long getLastRetrievalTime() {
        return this.lastRetrievalTime.get();
    }
}
