/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.state.providers.local;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.controller.state.StandardStateMap;
import org.apache.nifi.controller.state.StateMapSerDe;
import org.apache.nifi.controller.state.StateMapUpdate;
import org.apache.nifi.controller.state.providers.AbstractStateProvider;
import org.apache.nifi.processor.util.StandardValidators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wali.MinimalLockingWriteAheadLog;
import org.wali.SerDe;
import org.wali.UpdateType;
import org.wali.WriteAheadRepository;

public class WriteAheadLocalStateProvider
extends AbstractStateProvider {
    private static final Logger logger = LoggerFactory.getLogger(WriteAheadLocalStateProvider.class);
    private final StateMapSerDe serde;
    private final ConcurrentMap<String, ComponentProvider> componentProviders = new ConcurrentHashMap<String, ComponentProvider>();
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory());
    static final PropertyDescriptor PATH = new PropertyDescriptor.Builder().name("Directory").description("The directory where the Provider should store its data").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("./state").required(true).build();
    private WriteAheadRepository<StateMapUpdate> writeAheadLog;
    private AtomicLong versionGenerator;

    public WriteAheadLocalStateProvider() {
        this.serde = new StateMapSerDe();
    }

    @Override
    public synchronized void init(StateProviderInitializationContext context) throws IOException {
        File basePath = new File(context.getProperty(PATH).getValue());
        if (!basePath.exists() && !basePath.mkdirs()) {
            throw new RuntimeException("Cannot Initialize Local State Provider because the 'Directory' property is set to \"" + basePath + "\", but that directory could not be created");
        }
        if (!basePath.isDirectory()) {
            throw new RuntimeException("Cannot Initialize Local State Provider because the 'Directory' property is set to \"" + basePath + "\", but that is a file, rather than a directory");
        }
        if (!basePath.canWrite()) {
            throw new RuntimeException("Cannot Initialize Local State Provider because the 'Directory' property is set to \"" + basePath + "\", but that directory cannot be written to");
        }
        if (!basePath.canRead()) {
            throw new RuntimeException("Cannot Initialize Local State Provider because the 'Directory' property is set to \"" + basePath + "\", but that directory cannot be read");
        }
        this.versionGenerator = new AtomicLong(-1L);
        this.writeAheadLog = new MinimalLockingWriteAheadLog(basePath.toPath(), 16, (SerDe)this.serde, null);
        Collection updates = this.writeAheadLog.recoverRecords();
        long maxRecordVersion = -1L;
        for (StateMapUpdate update : updates) {
            if (update.getUpdateType() == UpdateType.DELETE) continue;
            long recordVersion = update.getStateMap().getVersion();
            if (recordVersion > maxRecordVersion) {
                maxRecordVersion = recordVersion;
            }
            String componentId = update.getComponentId();
            this.componentProviders.put(componentId, new ComponentProvider(this.writeAheadLog, this.versionGenerator, componentId, update.getStateMap()));
        }
        this.versionGenerator.set(maxRecordVersion);
        this.executor.scheduleWithFixedDelay(new CheckpointTask(), 2L, 2L, TimeUnit.MINUTES);
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(PATH);
        return properties;
    }

    public synchronized void shutdown() {
        this.executor.shutdown();
        try {
            this.writeAheadLog.shutdown();
        }
        catch (IOException ioe) {
            logger.warn("Failed to shut down {} successfully due to {}", (Object)this, (Object)ioe.toString());
            logger.warn("", (Throwable)ioe);
        }
    }

    private ComponentProvider getProvider(String componentId) {
        StandardStateMap stateMap;
        ComponentProvider existingComponentProvider;
        ComponentProvider componentProvider = (ComponentProvider)this.componentProviders.get(componentId);
        if (componentProvider == null && (existingComponentProvider = this.componentProviders.putIfAbsent(componentId, componentProvider = new ComponentProvider(this.writeAheadLog, this.versionGenerator, componentId, stateMap = new StandardStateMap(Collections.emptyMap(), -1L)))) != null) {
            componentProvider = existingComponentProvider;
        }
        return componentProvider;
    }

    public StateMap getState(String componentId) throws IOException {
        return this.getProvider(componentId).getState();
    }

    public void setState(Map<String, String> state, String componentId) throws IOException {
        this.getProvider(componentId).setState(state);
    }

    public boolean replace(StateMap oldValue, Map<String, String> newValue, String componentId) throws IOException {
        return this.getProvider(componentId).replace(oldValue, newValue);
    }

    public void clear(String componentId) throws IOException {
        this.getProvider(componentId).clear();
    }

    public void onComponentRemoved(String componentId) throws IOException {
        this.clear(componentId);
        this.componentProviders.remove(componentId);
    }

    public Scope[] getSupportedScopes() {
        return new Scope[]{Scope.LOCAL};
    }

    private static class NamedThreadFactory
    implements ThreadFactory {
        private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();

        private NamedThreadFactory() {
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = this.defaultFactory.newThread(r);
            t.setName("Write-Ahead Local State Provider Maintenance");
            t.setDaemon(true);
            return t;
        }
    }

    private class CheckpointTask
    implements Runnable {
        private CheckpointTask() {
        }

        @Override
        public void run() {
            try {
                logger.debug("Checkpointing Write-Ahead Log used to store components' state");
                WriteAheadLocalStateProvider.this.writeAheadLog.checkpoint();
            }
            catch (IOException e) {
                logger.error("Failed to checkpoint Write-Ahead Log used to store components' state", (Throwable)e);
            }
        }
    }

    private static class ComponentProvider {
        private final AtomicLong versionGenerator;
        private final WriteAheadRepository<StateMapUpdate> wal;
        private final String componentId;
        private StateMap stateMap;

        public ComponentProvider(WriteAheadRepository<StateMapUpdate> wal, AtomicLong versionGenerator, String componentId, StateMap stateMap) {
            this.wal = wal;
            this.versionGenerator = versionGenerator;
            this.componentId = componentId;
            this.stateMap = stateMap;
        }

        public synchronized StateMap getState() throws IOException {
            return this.stateMap;
        }

        public synchronized void setState(Map<String, String> state) throws IOException {
            this.stateMap = new StandardStateMap(state, this.versionGenerator.incrementAndGet());
            StateMapUpdate updateRecord = new StateMapUpdate(this.stateMap, this.componentId, UpdateType.UPDATE);
            this.wal.update(Collections.singleton(updateRecord), false);
        }

        public synchronized boolean replace(StateMap oldValue, Map<String, String> newValue) throws IOException {
            if (this.stateMap.getVersion() == -1L) {
                return false;
            }
            if (this.stateMap != oldValue) {
                return false;
            }
            this.stateMap = new StandardStateMap(new HashMap<String, String>(newValue), this.versionGenerator.incrementAndGet());
            StateMapUpdate updateRecord = new StateMapUpdate(this.stateMap, this.componentId, UpdateType.UPDATE);
            this.wal.update(Collections.singleton(updateRecord), false);
            return true;
        }

        public synchronized void clear() throws IOException {
            this.stateMap = new StandardStateMap(null, this.versionGenerator.incrementAndGet());
            StateMapUpdate update = new StateMapUpdate(this.stateMap, this.componentId, UpdateType.UPDATE);
            this.wal.update(Collections.singleton(update), false);
        }
    }
}

