/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.flow.impl;

import java.util.Collections;
import java.util.Date;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.cluster.flow.ClusterDataFlow;
import org.apache.nifi.cluster.flow.DaoException;
import org.apache.nifi.cluster.flow.DataFlowDao;
import org.apache.nifi.cluster.flow.DataFlowManagementService;
import org.apache.nifi.cluster.flow.PersistedFlowState;
import org.apache.nifi.cluster.protocol.ClusterManagerProtocolSender;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataFlowManagementServiceImpl
implements DataFlowManagementService {
    private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(DataFlowManagementServiceImpl.class));
    private final DataFlowDao flowDao;
    private final ClusterManagerProtocolSender sender;
    private final Set<NodeIdentifier> nodeIds = new CopyOnWriteArraySet<NodeIdentifier>();
    private final AtomicBoolean stopRequested = new AtomicBoolean(false);
    private final AtomicLong lastRetrievalTime = new AtomicLong(-1L);
    private Timer flowRetriever;
    private long retrievableAfterTime = 0L;
    private AtomicInteger retrievalDelaySeconds = new AtomicInteger(0);
    private final TimingReentrantLock resourceLock = new TimingReentrantLock(new ReentrantLock());

    public DataFlowManagementServiceImpl(DataFlowDao flowDao, ClusterManagerProtocolSender sender) {
        if (flowDao == null) {
            throw new IllegalArgumentException("Flow DAO may not be null.");
        }
        if (sender == null) {
            throw new IllegalArgumentException("Cluster Manager Protocol Sender may not be null.");
        }
        this.flowDao = flowDao;
        this.sender = sender;
    }

    @Override
    public void start() {
        if (this.isRunning()) {
            throw new IllegalArgumentException("Instance is already running.");
        }
        this.stopRequested.set(false);
        this.flowRetriever = new Timer("Flow Management Service", true);
        this.flowRetriever.schedule((TimerTask)new FlowRetrieverTimerTask(), 0L, 500L);
    }

    @Override
    public boolean isRunning() {
        return this.flowRetriever != null;
    }

    @Override
    public void stop() {
        if (!this.isRunning()) {
            throw new IllegalArgumentException("Instance is already stopped.");
        }
        this.stopRequested.set(true);
        this.flowRetriever.cancel();
        this.flowRetriever = null;
    }

    @Override
    public ClusterDataFlow loadDataFlow() throws DaoException {
        this.resourceLock.lock();
        try {
            ClusterDataFlow clusterDataFlow = this.flowDao.loadDataFlow();
            return clusterDataFlow;
        }
        finally {
            this.resourceLock.unlock("loadDataFlow");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void updatePrimaryNode(NodeIdentifier nodeId) {
        this.resourceLock.lock();
        try {
            byte[] reportingTaskBytes;
            byte[] controllerServiceBytes;
            StandardDataFlow dataFlow;
            ClusterDataFlow existingClusterDataFlow = this.flowDao.loadDataFlow();
            if (existingClusterDataFlow == null) {
                dataFlow = null;
                controllerServiceBytes = new byte[]{};
                reportingTaskBytes = new byte[]{};
            } else {
                dataFlow = existingClusterDataFlow.getDataFlow();
                controllerServiceBytes = existingClusterDataFlow.getControllerServices();
                reportingTaskBytes = existingClusterDataFlow.getReportingTasks();
            }
            this.flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
        }
        finally {
            this.resourceLock.unlock("updatePrimaryNode");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void updateControllerServices(byte[] controllerServiceBytes) throws DaoException {
        this.resourceLock.lock();
        try {
            byte[] reportingTaskBytes;
            NodeIdentifier nodeId;
            StandardDataFlow dataFlow;
            ClusterDataFlow existingClusterDataFlow = this.flowDao.loadDataFlow();
            if (existingClusterDataFlow == null) {
                dataFlow = null;
                nodeId = null;
                reportingTaskBytes = new byte[]{};
            } else {
                dataFlow = existingClusterDataFlow.getDataFlow();
                nodeId = existingClusterDataFlow.getPrimaryNodeId();
                reportingTaskBytes = existingClusterDataFlow.getReportingTasks();
            }
            this.flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
        }
        finally {
            this.resourceLock.unlock("updateControllerServices");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void updateReportingTasks(byte[] reportingTaskBytes) throws DaoException {
        this.resourceLock.lock();
        try {
            byte[] controllerServiceBytes;
            NodeIdentifier nodeId;
            StandardDataFlow dataFlow;
            ClusterDataFlow existingClusterDataFlow = this.flowDao.loadDataFlow();
            if (existingClusterDataFlow == null) {
                dataFlow = null;
                nodeId = null;
                controllerServiceBytes = null;
            } else {
                dataFlow = existingClusterDataFlow.getDataFlow();
                nodeId = existingClusterDataFlow.getPrimaryNodeId();
                controllerServiceBytes = existingClusterDataFlow.getControllerServices();
            }
            this.flowDao.saveDataFlow(new ClusterDataFlow(dataFlow, nodeId, controllerServiceBytes, reportingTaskBytes));
        }
        finally {
            this.resourceLock.unlock("updateControllerServices");
        }
    }

    @Override
    public PersistedFlowState getPersistedFlowState() {
        this.resourceLock.lock();
        try {
            PersistedFlowState persistedFlowState = this.flowDao.getPersistedFlowState();
            return persistedFlowState;
        }
        finally {
            this.resourceLock.unlock("getPersistedFlowState");
        }
    }

    @Override
    public boolean isFlowCurrent() {
        return PersistedFlowState.CURRENT == this.getPersistedFlowState();
    }

    @Override
    public void setPersistedFlowState(PersistedFlowState flowState) {
        this.resourceLock.lock();
        try {
            this.flowDao.setPersistedFlowState(flowState);
            if (PersistedFlowState.STALE == flowState) {
                this.retrievableAfterTime = new Date().getTime() + (long)(this.getRetrievalDelaySeconds() * 1000);
            } else if (PersistedFlowState.UNKNOWN == flowState || PersistedFlowState.CURRENT == flowState) {
                this.retrievableAfterTime = Long.MAX_VALUE;
            }
        }
        finally {
            this.resourceLock.unlock("setPersistedFlowState");
        }
    }

    @Override
    public Set<NodeIdentifier> getNodeIds() {
        return Collections.unmodifiableSet(this.nodeIds);
    }

    @Override
    public void setNodeIds(Set<NodeIdentifier> nodeIds) {
        if (nodeIds == null) {
            throw new IllegalArgumentException("Node IDs may not be null.");
        }
        this.resourceLock.lock();
        try {
            if (this.nodeIds.equals(nodeIds)) {
                return;
            }
            this.nodeIds.clear();
            this.nodeIds.addAll(nodeIds);
        }
        finally {
            this.resourceLock.unlock("setNodeIds");
        }
    }

    @Override
    public int getRetrievalDelaySeconds() {
        return this.retrievalDelaySeconds.get();
    }

    @Override
    public void setRetrievalDelay(String retrievalDelay) {
        this.retrievalDelaySeconds.set((int)FormatUtils.getTimeDuration((String)retrievalDelay, (TimeUnit)TimeUnit.SECONDS));
    }

    public ClusterManagerProtocolSender getSender() {
        return this.sender;
    }

    public long getLastRetrievalTime() {
        return this.lastRetrievalTime.get();
    }

    private static class TimingReentrantLock {
        private final Lock lock;
        private static final Logger logger = LoggerFactory.getLogger((String)"dataFlowManagementService.lock");
        private final ThreadLocal<Long> lockTime = new ThreadLocal();

        public TimingReentrantLock(Lock lock) {
            this.lock = lock;
        }

        public void lock() {
            this.lock.lock();
            this.lockTime.set(System.nanoTime());
        }

        public void unlock(String task) {
            long nanosLocked = System.nanoTime() - this.lockTime.get();
            this.lock.unlock();
            long millisLocked = TimeUnit.MILLISECONDS.convert(nanosLocked, TimeUnit.NANOSECONDS);
            if (millisLocked > 100L) {
                logger.debug("Lock held for {} milliseconds for task: {}", (Object)millisLocked, (Object)task);
            }
        }
    }

    private class FlowRetrieverTimerTask
    extends TimerTask {
        private FlowRetrieverTimerTask() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            DataFlowManagementServiceImpl.this.resourceLock.lock();
            try {
                if (DataFlowManagementServiceImpl.this.isFlowCurrent()) {
                    return;
                }
            }
            catch (Exception ex) {
                logger.info("Encountered exception checking if flow is current caused by " + ex, (Throwable)ex);
            }
            finally {
                DataFlowManagementServiceImpl.this.resourceLock.unlock("FlowRetrieverTimerTask - isFlowCurrent");
            }
            FlowRequestMessage request = new FlowRequestMessage();
            for (NodeIdentifier nodeId : DataFlowManagementServiceImpl.this.getNodeIds()) {
                try {
                    request.setNodeId(nodeId);
                    long requestSentTime = new Date().getTime();
                    DataFlowManagementServiceImpl.this.resourceLock.lock();
                    try {
                        if (DataFlowManagementServiceImpl.this.stopRequested.get()) {
                            logger.debug("Stopping runnable prematurely because a request to stop was issued.");
                            return;
                        }
                        if (requestSentTime < DataFlowManagementServiceImpl.this.retrievableAfterTime) {
                            return;
                        }
                    }
                    finally {
                        DataFlowManagementServiceImpl.this.resourceLock.unlock("FlowRetrieverTimerTask - check stopRequested");
                    }
                    FlowResponseMessage response = DataFlowManagementServiceImpl.this.sender.requestFlow(request);
                    DataFlowManagementServiceImpl.this.resourceLock.lock();
                    try {
                        if (requestSentTime <= DataFlowManagementServiceImpl.this.retrievableAfterTime) continue;
                        logger.info("Saving retrieved flow.");
                        StandardDataFlow dataFlow = response.getDataFlow();
                        ClusterDataFlow existingClusterDataFlow = DataFlowManagementServiceImpl.this.flowDao.loadDataFlow();
                        ClusterDataFlow currentClusterDataFlow = existingClusterDataFlow == null ? new ClusterDataFlow(dataFlow, null, new byte[0], new byte[0]) : new ClusterDataFlow(dataFlow, existingClusterDataFlow.getPrimaryNodeId(), existingClusterDataFlow.getControllerServices(), existingClusterDataFlow.getReportingTasks());
                        DataFlowManagementServiceImpl.this.flowDao.saveDataFlow(currentClusterDataFlow);
                        DataFlowManagementServiceImpl.this.flowDao.setPersistedFlowState(PersistedFlowState.CURRENT);
                        DataFlowManagementServiceImpl.this.lastRetrievalTime.set(new Date().getTime());
                    }
                    finally {
                        DataFlowManagementServiceImpl.this.resourceLock.unlock("FlowRetrieverTimerTask - saveDataFlow");
                    }
                }
                catch (Throwable t) {
                    logger.info("Encountered exception retrieving flow from node " + nodeId + " caused by " + t, t);
                }
            }
        }
    }
}

