/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.state.providers.zookeeper;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.components.state.StateProviderInitializationContext;
import org.apache.nifi.components.state.annotation.StateProviderContext;
import org.apache.nifi.components.state.exception.StateTooLargeException;
import org.apache.nifi.controller.state.StandardStateMap;
import org.apache.nifi.controller.state.providers.AbstractStateProvider;
import org.apache.nifi.framework.cluster.zookeeper.SecureClientZooKeeperFactory;
import org.apache.nifi.framework.cluster.zookeeper.ZooKeeperClientConfig;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ConnectStringParser;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZooKeeperStateProvider
extends AbstractStateProvider {
    private static final String COMPONENTS_RELATIVE_PATH = "/components";
    private static final String COMPONENTS_PATH_FORMAT = "%s%s/%s";
    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperStateProvider.class);
    private NiFiProperties nifiProperties;
    static final AllowableValue OPEN_TO_WORLD = new AllowableValue("Open", "Open", "ZNodes will be open to any ZooKeeper client.");
    static final AllowableValue CREATOR_ONLY = new AllowableValue("CreatorOnly", "CreatorOnly", "ZNodes will be accessible only by the creator. The creator will have full access to create, read, write, delete, and administer the ZNodes.");
    static final PropertyDescriptor CONNECTION_STRING = new PropertyDescriptor.Builder().name("Connect String").description("The ZooKeeper Connect String to use. This is a comma-separated list of hostname/IP and port tuples, such as \"host1:2181,host2:2181,127.0.0.1:2181\". If a port is not specified it defaults to the ZooKeeper client port default of 2181").addValidator(new Validator(){

        public ValidationResult validate(String subject, String input, ValidationContext context) {
            String connectionString = context.getProperty(CONNECTION_STRING).getValue();
            try {
                new ConnectStringParser(connectionString);
            }
            catch (Exception e) {
                return new ValidationResult.Builder().subject(subject).input(input).explanation("Invalid Connect String: " + connectionString).valid(false).build();
            }
            return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid Connect String").valid(true).build();
        }
    }).addValidator(StandardValidators.NON_BLANK_VALIDATOR).required(true).build();
    static final PropertyDescriptor SESSION_TIMEOUT = new PropertyDescriptor.Builder().name("Session Timeout").description("Specifies how long this instance of NiFi is allowed to be disconnected from ZooKeeper before creating a new ZooKeeper Session").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).defaultValue("30 sec").required(true).build();
    static final PropertyDescriptor ROOT_NODE = new PropertyDescriptor.Builder().name("Root Node").description("The Root Node to use in ZooKeeper to store state in").addValidator(StandardValidators.NON_EMPTY_VALIDATOR).defaultValue("/nifi").required(true).build();
    static final PropertyDescriptor ACCESS_CONTROL = new PropertyDescriptor.Builder().name("Access Control").description("Specifies the Access Controls that will be placed on ZooKeeper ZNodes that are created by this State Provider").allowableValues(new DescribedValue[]{OPEN_TO_WORLD, CREATOR_ONLY}).defaultValue(OPEN_TO_WORLD.getValue()).required(true).build();
    private static final byte ENCODING_VERSION = 1;
    private ZooKeeper zooKeeper;
    private int timeoutMillis;
    private String rootNode;
    private String connectionString;
    private byte[] auth;
    private List<ACL> acl;
    private ZooKeeperClientConfig zooKeeperClientConfig;

    @StateProviderContext
    public void setNiFiProperties(NiFiProperties properties) {
        this.nifiProperties = properties;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(CONNECTION_STRING);
        properties.add(SESSION_TIMEOUT);
        properties.add(ROOT_NODE);
        properties.add(ACCESS_CONTROL);
        return properties;
    }

    @Override
    public synchronized void init(StateProviderInitializationContext context) {
        this.connectionString = context.getProperty(CONNECTION_STRING).getValue();
        this.rootNode = context.getProperty(ROOT_NODE).getValue();
        this.timeoutMillis = context.getProperty(SESSION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
        this.acl = context.getProperty(ACCESS_CONTROL).getValue().equalsIgnoreCase(CREATOR_ONLY.getValue()) ? ZooDefs.Ids.CREATOR_ALL_ACL : ZooDefs.Ids.OPEN_ACL_UNSAFE;
    }

    static NiFiProperties combineProperties(final NiFiProperties nifiProps, final Properties additionalProperties) {
        return new NiFiProperties(){

            public String getProperty(String key) {
                return additionalProperties.getProperty(key, nifiProps != null ? nifiProps.getProperty(key) : null);
            }

            public Set<String> getPropertyKeys() {
                Set<String> prop = additionalProperties.keySet().stream().map(key -> (String)key).collect(Collectors.toSet());
                prop.addAll(nifiProps.getPropertyKeys());
                return prop;
            }
        };
    }

    public synchronized void shutdown() {
        if (this.zooKeeper != null) {
            try {
                this.zooKeeper.close();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        this.zooKeeper = null;
    }

    synchronized ZooKeeper getZooKeeper() throws IOException {
        ZooKeeperClientConfig clientConfig = this.getZooKeeperConfig();
        if (this.zooKeeper != null && !this.zooKeeper.getState().isAlive()) {
            this.invalidateClient();
        }
        if (this.zooKeeper == null) {
            if (clientConfig != null && clientConfig.isClientSecure()) {
                SecureClientZooKeeperFactory factory = new SecureClientZooKeeperFactory(clientConfig);
                try {
                    this.zooKeeper = factory.newZooKeeper(this.connectionString, this.timeoutMillis, (Watcher)new NoOpWatcher(), true);
                    logger.debug("Secure ZooKeeper Client connection [{}] created", (Object)this.connectionString);
                }
                catch (Exception e) {
                    logger.error("Secure ZooKeeper Client connection [{}] failed", (Object)this.connectionString, (Object)e);
                    this.invalidateClient();
                }
            } else {
                ZKClientConfig zkClientConfig = new ZKClientConfig();
                if (clientConfig != null) {
                    zkClientConfig.setProperty("jute.maxbuffer", Integer.toString(clientConfig.getJuteMaxbuffer()));
                }
                this.zooKeeper = new ZooKeeper(this.connectionString, this.timeoutMillis, (Watcher)new NoOpWatcher(), zkClientConfig);
                logger.debug("Standard ZooKeeper Client connection [{}] created", (Object)this.connectionString);
            }
            if (this.auth != null) {
                this.zooKeeper.addAuthInfo("digest", this.auth);
            }
        }
        return this.zooKeeper;
    }

    private ZooKeeperClientConfig getZooKeeperConfig() {
        if (this.zooKeeperClientConfig == null) {
            Properties stateProviderProperties = new Properties();
            stateProviderProperties.setProperty("nifi.zookeeper.session.timeout", this.timeoutMillis + " millis");
            stateProviderProperties.setProperty("nifi.zookeeper.connect.timeout", this.timeoutMillis + " millis");
            stateProviderProperties.setProperty("nifi.zookeeper.root.node", this.rootNode);
            stateProviderProperties.setProperty("nifi.zookeeper.connect.string", this.connectionString);
            this.zooKeeperClientConfig = ZooKeeperClientConfig.createConfig((NiFiProperties)ZooKeeperStateProvider.combineProperties(this.nifiProperties, stateProviderProperties));
        }
        return this.zooKeeperClientConfig;
    }

    private synchronized void invalidateClient() {
        this.shutdown();
    }

    private String getComponentPath(String componentId) {
        return String.format(COMPONENTS_PATH_FORMAT, this.rootNode, COMPONENTS_RELATIVE_PATH, componentId);
    }

    private void verifyEnabled() throws IOException {
        if (!this.isEnabled()) {
            throw new IOException("Cannot update or retrieve cluster state because node is no longer connected to a cluster.");
        }
    }

    public void onComponentRemoved(String componentId) throws IOException {
        try {
            ZKUtil.deleteRecursive((ZooKeeper)this.getZooKeeper(), (String)this.getComponentPath(componentId));
        }
        catch (KeeperException ke) {
            KeeperException.Code exceptionCode = ke.code();
            if (KeeperException.Code.NONODE == exceptionCode) {
                return;
            }
            if (KeeperException.Code.SESSIONEXPIRED == exceptionCode) {
                this.invalidateClient();
                this.onComponentRemoved(componentId);
                return;
            }
            throw new IOException("Unable to remove state for component with ID '" + componentId + " with exception code " + String.valueOf(exceptionCode), ke);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Failed to remove state for component with ID '" + componentId + "' from ZooKeeper due to being interrupted", e);
        }
    }

    public Scope[] getSupportedScopes() {
        return new Scope[]{Scope.CLUSTER};
    }

    private byte[] serialize(Map<String, String> stateValues) throws IOException {
        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();){
            Object object;
            try (DataOutputStream dos = new DataOutputStream(baos);){
                dos.writeByte(1);
                dos.writeInt(stateValues.size());
                for (Map.Entry<String, String> entry : stateValues.entrySet()) {
                    boolean hasKey = entry.getKey() != null;
                    boolean hasValue = entry.getValue() != null;
                    dos.writeBoolean(hasKey);
                    if (hasKey) {
                        dos.writeUTF(entry.getKey());
                    }
                    dos.writeBoolean(hasValue);
                    if (!hasValue) continue;
                    dos.writeUTF(entry.getValue());
                }
                object = baos.toByteArray();
            }
            return object;
        }
    }

    private StateMap deserialize(byte[] data, int recordVersion, String componentId) throws IOException {
        try (ByteArrayInputStream bais = new ByteArrayInputStream(data);){
            StandardStateMap standardStateMap;
            try (DataInputStream dis = new DataInputStream(bais);){
                byte encodingVersion = dis.readByte();
                if (encodingVersion > 1) {
                    throw new IOException("Retrieved a response from ZooKeeper when retrieving state for component with ID " + componentId + ", but the response was encoded using the ZooKeeperStateProvider Encoding Version of " + encodingVersion + " but this instance can only decode versions up to 1; it appears that the state was encoded using a newer version of NiFi than is currently running. This information cannot be decoded.");
                }
                int numEntries = dis.readInt();
                HashMap<String, String> stateValues = new HashMap<String, String>(numEntries);
                for (int i = 0; i < numEntries; ++i) {
                    boolean hasKey = dis.readBoolean();
                    String key = hasKey ? dis.readUTF() : null;
                    boolean hasValue = dis.readBoolean();
                    String value = hasValue ? dis.readUTF() : null;
                    stateValues.put(key, value);
                }
                String stateVersion = String.valueOf(recordVersion);
                standardStateMap = new StandardStateMap(stateValues, Optional.of(stateVersion));
            }
            return standardStateMap;
        }
    }

    public void setState(Map<String, String> stateValues, String componentId) throws IOException {
        StateModifier stateModifier = (keeper, path, data) -> {
            try {
                keeper.setData(path, data, -1);
            }
            catch (KeeperException.NoNodeException nne) {
                try {
                    this.createNode(path, data, this.acl);
                }
                catch (KeeperException.NodeExistsException nee) {
                    this.setState(stateValues, componentId);
                }
            }
            return true;
        };
        this.modifyState(stateValues, componentId, stateModifier);
    }

    public boolean replace(StateMap oldState, Map<String, String> stateValues, String componentId) throws IOException {
        Optional<Integer> version = oldState.getStateVersion().map(Integer::parseInt);
        StateModifier stateModifier = (keeper, path, data) -> {
            if (version.isPresent()) {
                try {
                    keeper.setData(path, data, ((Integer)version.get()).intValue());
                }
                catch (KeeperException.BadVersionException | KeeperException.NoNodeException e) {
                    return false;
                }
            }
            try {
                this.createNode(path, data, this.acl);
            }
            catch (KeeperException.NodeExistsException e) {
                return false;
            }
            return true;
        };
        return this.modifyState(stateValues, componentId, stateModifier);
    }

    private boolean modifyState(Map<String, String> stateValues, String componentId, StateModifier stateModifier) throws IOException {
        this.verifyEnabled();
        try {
            ZooKeeper keeper = this.getZooKeeper();
            String path = this.getComponentPath(componentId);
            byte[] data = this.serialize(stateValues);
            this.validateDataSize(keeper.getClientConfig(), data, componentId, stateValues.size());
            return stateModifier.apply(keeper, path, data);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId + " due to interruption", ie);
        }
        catch (KeeperException.SessionExpiredException see) {
            this.invalidateClient();
            return this.modifyState(stateValues, componentId, stateModifier);
        }
        catch (StateTooLargeException stle) {
            throw stle;
        }
        catch (Exception e) {
            throw new IOException("Failed to set cluster-wide state in ZooKeeper for component with ID " + componentId, e);
        }
    }

    private void createNode(String path, byte[] data, List<ACL> acls) throws IOException, KeeperException, InterruptedException {
        block3: {
            try {
                ZooKeeper zooKeeper = this.getZooKeeper();
                zooKeeper.create(path, data, acls, CreateMode.PERSISTENT);
            }
            catch (KeeperException.NoNodeException nne) {
                String parentPath = StringUtils.substringBeforeLast((String)path, (String)"/");
                this.createNode(parentPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE);
                this.createNode(path, data, acls);
            }
            catch (KeeperException.NodeExistsException nee) {
                if (data == null) break block3;
                throw nee;
            }
        }
    }

    public StateMap getState(String componentId) throws IOException {
        this.verifyEnabled();
        try {
            Stat stat = new Stat();
            String path = this.getComponentPath(componentId);
            byte[] data = this.getZooKeeper().getData(path, false, stat);
            StateMap stateMap = this.deserialize(data, stat.getVersion(), componentId);
            return stateMap;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId + ", due to interruption", e);
        }
        catch (KeeperException ke) {
            KeeperException.Code exceptionCode = ke.code();
            if (KeeperException.Code.NONODE == exceptionCode) {
                return new StandardStateMap(null, Optional.empty());
            }
            if (KeeperException.Code.SESSIONEXPIRED == exceptionCode) {
                this.invalidateClient();
                return this.getState(componentId);
            }
            throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId + " with exception code " + String.valueOf(exceptionCode), ke);
        }
        catch (IOException ioe) {
            throw new IOException("Failed to obtain value from ZooKeeper for component with ID " + componentId, ioe);
        }
    }

    public void clear(String componentId) throws IOException {
        this.verifyEnabled();
        this.setState(Collections.emptyMap(), componentId);
    }

    public boolean isComponentEnumerationSupported() {
        return true;
    }

    public Collection<String> getStoredComponentIds() throws IOException {
        try {
            ZooKeeper zooKeeper = this.getZooKeeper();
            String componentsPath = String.format("%s%s", this.rootNode, COMPONENTS_RELATIVE_PATH);
            List children = zooKeeper.getChildren(componentsPath, false);
            return Collections.unmodifiableCollection(children);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("ZooKeeper communication interrupted", e);
        }
        catch (KeeperException e) {
            KeeperException.Code code = e.code();
            if (KeeperException.Code.NONODE == code) {
                return Collections.emptyList();
            }
            throw new IOException(String.format("ZooKeeper communication failed: %s", code), e);
        }
    }

    private void validateDataSize(ZKClientConfig clientConfig, byte[] data, String componentId, int totalStateValues) throws StateTooLargeException {
        int maximumSize = clientConfig.getInt("jute.maxbuffer", 1048575);
        if (data != null && data.length > maximumSize) {
            String message = String.format("Component [%s] State Values [%d] Data Size [%d B] exceeds nifi.zookeeper.jute.maxbuffer size [%d B]", componentId, totalStateValues, data.length, maximumSize);
            throw new StateTooLargeException(message);
        }
    }

    private static final class NoOpWatcher
    implements Watcher {
        private NoOpWatcher() {
        }

        public void process(WatchedEvent watchedEvent) {
        }
    }

    private static interface StateModifier {
        public boolean apply(ZooKeeper var1, String var2, byte[] var3) throws InterruptedException, KeeperException, IOException;
    }
}

