/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.toolkit.zkmigrator;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.nifi.toolkit.zkmigrator.DataStatAclNode;
import org.apache.nifi.toolkit.zkmigrator.ZooKeeperEndpointConfig;
import org.apache.nifi.toolkit.zkmigrator.ZooKeeperNode;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ZooKeeperMigrator {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperMigrator.class);
    private static final String SCHEME_DIGEST = AuthMode.DIGEST.name().toLowerCase();
    private final ZooKeeperEndpointConfig zooKeeperEndpointConfig;

    ZooKeeperMigrator(String zooKeeperConnectString) {
        LOGGER.debug("ZooKeeper connect string parameter: {}", (Object)zooKeeperConnectString);
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)zooKeeperConnectString) ? 1 : 0) != 0, (Object)"ZooKeeper connect string must not be null");
        this.zooKeeperEndpointConfig = new ZooKeeperEndpointConfig(zooKeeperConnectString);
    }

    void readZooKeeper(OutputStream zkData, AuthMode authMode, byte[] authData) throws IOException, KeeperException, InterruptedException, ExecutionException {
        ZooKeeper zooKeeper = this.getZooKeeper(this.zooKeeperEndpointConfig.getConnectString(), authMode, authData);
        JsonWriter jsonWriter = new JsonWriter((Writer)new BufferedWriter(new OutputStreamWriter(zkData)));
        jsonWriter.setIndent("  ");
        JsonParser jsonParser = new JsonParser();
        Gson gson = new GsonBuilder().create();
        jsonWriter.beginArray();
        gson.toJson((JsonElement)jsonParser.parse(gson.toJson((Object)this.zooKeeperEndpointConfig)).getAsJsonObject(), jsonWriter);
        LOGGER.info("Retrieving data from source ZooKeeper: {}", (Object)this.zooKeeperEndpointConfig);
        List<CompletableFuture> readFutures = ((Stream)this.streamPaths(this.getNode(zooKeeper, "/")).parallel()).map(node -> CompletableFuture.supplyAsync(() -> {
            DataStatAclNode dataStatAclNode = this.retrieveNode(zooKeeper, (String)node);
            LOGGER.debug("retrieved node {} from {}", (Object)dataStatAclNode, (Object)this.zooKeeperEndpointConfig);
            return dataStatAclNode;
        }).thenAccept(dataStatAclNode -> {
            JsonWriter jsonWriter2 = jsonWriter;
            synchronized (jsonWriter2) {
                gson.toJson((JsonElement)jsonParser.parse(gson.toJson(dataStatAclNode)).getAsJsonObject(), jsonWriter);
            }
        })).collect(Collectors.toList());
        CompletableFuture<Void> allReadsFuture = CompletableFuture.allOf(readFutures.toArray(new CompletableFuture[readFutures.size()]));
        CompletionStage finishedReads = allReadsFuture.thenApply(v -> readFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
        List readsDone = (List)((CompletableFuture)finishedReads).get();
        jsonWriter.endArray();
        jsonWriter.close();
        if (LOGGER.isInfoEnabled()) {
            int readCount = readsDone.size();
            LOGGER.info("{} {} read from {}", new Object[]{readCount, readCount == 1 ? "node" : "nodes", this.zooKeeperEndpointConfig});
        }
        this.closeZooKeeper(zooKeeper);
    }

    void writeZooKeeper(InputStream zkData, AuthMode authMode, byte[] authData, boolean ignoreSource, boolean useExistingACL) throws IOException, ExecutionException, InterruptedException {
        ZooKeeper zooKeeperRoot = this.getZooKeeper(Joiner.on((char)',').join((Iterable)this.zooKeeperEndpointConfig.getServers()), authMode, authData);
        this.ensureNodeExists(zooKeeperRoot, this.zooKeeperEndpointConfig.getPath(), CreateMode.PERSISTENT);
        this.closeZooKeeper(zooKeeperRoot);
        ZooKeeper zooKeeper = this.getZooKeeper(this.zooKeeperEndpointConfig.getConnectString(), authMode, authData);
        final JsonReader jsonReader = new JsonReader((Reader)new BufferedReader(new InputStreamReader(zkData)));
        final Gson gson = new GsonBuilder().create();
        jsonReader.beginArray();
        ZooKeeperEndpointConfig sourceZooKeeperEndpointConfig = (ZooKeeperEndpointConfig)gson.fromJson(jsonReader, ZooKeeperEndpointConfig.class);
        LOGGER.info("Source data was obtained from ZooKeeper: {}", (Object)sourceZooKeeperEndpointConfig);
        Preconditions.checkArgument((!Strings.isNullOrEmpty((String)sourceZooKeeperEndpointConfig.getConnectString()) && !Strings.isNullOrEmpty((String)sourceZooKeeperEndpointConfig.getPath()) && sourceZooKeeperEndpointConfig.getServers() != null && sourceZooKeeperEndpointConfig.getServers().size() > 0 ? 1 : 0) != 0, (String)"Source ZooKeeper %s from %s is invalid", (Object[])new Object[]{sourceZooKeeperEndpointConfig, zkData});
        Preconditions.checkArgument((Collections.disjoint(this.zooKeeperEndpointConfig.getServers(), sourceZooKeeperEndpointConfig.getServers()) || !this.zooKeeperEndpointConfig.getPath().equals(sourceZooKeeperEndpointConfig.getPath()) || ignoreSource ? 1 : 0) != 0, (String)"Source ZooKeeper config %s for the data provided can not contain the same server and path as the configured destination ZooKeeper config %s", (Object[])new Object[]{sourceZooKeeperEndpointConfig, this.zooKeeperEndpointConfig});
        Stream<DataStatAclNode> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<DataStatAclNode>(0L, 0){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public boolean tryAdvance(Consumer<? super DataStatAclNode> action) {
                try {
                    JsonReader jsonReader2 = jsonReader;
                    synchronized (jsonReader2) {
                        if (jsonReader.hasNext()) {
                            action.accept((DataStatAclNode)gson.fromJson(jsonReader, DataStatAclNode.class));
                            return true;
                        }
                        return false;
                    }
                }
                catch (IOException e) {
                    throw new RuntimeException("unable to read nodes from json", e);
                }
            }
        }, false);
        List<CompletableFuture> writeFutures = ((Stream)stream.parallel()).map(node -> {
            CompletableFuture<List> determineACLStage = CompletableFuture.supplyAsync(() -> this.determineACLs((DataStatAclNode)node, authMode, useExistingACL));
            Function<List, CompletableFuture> transformNodeStage = acls -> CompletableFuture.supplyAsync(() -> this.transformNode((DataStatAclNode)node, (List<ACL>)acls));
            Function<DataStatAclNode, CompletionStage> ensureNodeExistsStage = dataStatAclNode -> CompletableFuture.supplyAsync(() -> this.ensureNodeExists(zooKeeper, dataStatAclNode.getPath(), dataStatAclNode.getEphemeralOwner() == 0L ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL));
            BiFunction<String, DataStatAclNode, DataStatAclNode> combineEnsureNodeAndTransferNodeStage = (u, dataStatAclNode) -> dataStatAclNode;
            Function<DataStatAclNode, CompletionStage> transmitNodeStage = dataStatNode -> CompletableFuture.supplyAsync(() -> this.transmitNode(zooKeeper, (DataStatAclNode)dataStatNode));
            CompletionStage dataStatAclNodeCompletableFuture = determineACLStage.thenCompose(transformNodeStage);
            return ((CompletableFuture)((CompletableFuture)((CompletableFuture)dataStatAclNodeCompletableFuture).thenCompose(ensureNodeExistsStage)).thenCombine(dataStatAclNodeCompletableFuture, combineEnsureNodeAndTransferNodeStage)).thenCompose(transmitNodeStage);
        }).collect(Collectors.toList());
        CompletableFuture<Void> allWritesFuture = CompletableFuture.allOf(writeFutures.toArray(new CompletableFuture[writeFutures.size()]));
        CompletionStage finishedWrites = allWritesFuture.thenApply(v -> writeFutures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
        List writesDone = (List)((CompletableFuture)finishedWrites).get();
        if (LOGGER.isInfoEnabled()) {
            int writeCount = writesDone.size();
            LOGGER.info("{} {} transferred to {}", new Object[]{writeCount, writeCount == 1 ? "node" : "nodes", this.zooKeeperEndpointConfig});
        }
        jsonReader.close();
        this.closeZooKeeper(zooKeeper);
    }

    private Stream<String> streamPaths(ZooKeeperNode node) {
        return Stream.concat(Stream.of(node.getPath()), node.getChildren().stream().flatMap(this::streamPaths));
    }

    private ZooKeeperNode getNode(ZooKeeper zooKeeper, String path) throws KeeperException, InterruptedException {
        LOGGER.debug("retrieving node and children at {}", (Object)path);
        List children = zooKeeper.getChildren(path, false);
        return new ZooKeeperNode(path, children.stream().map(s -> {
            String childPath = Joiner.on((char)'/').skipNulls().join((Object)(path.equals("/") ? "" : path), s, new Object[0]);
            try {
                return this.getNode(zooKeeper, childPath);
            }
            catch (InterruptedException | KeeperException e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                throw new RuntimeException(String.format("unable to discover sub-tree from %s", childPath), e);
            }
        }).collect(Collectors.toList()));
    }

    private DataStatAclNode retrieveNode(ZooKeeper zooKeeper, String path) {
        long ephemeralOwner;
        List acls;
        byte[] data;
        Preconditions.checkNotNull((Object)zooKeeper, (Object)"ZooKeeper client must not be null");
        Preconditions.checkNotNull((Object)path, (Object)"path must not be null");
        Stat stat = new Stat();
        try {
            data = zooKeeper.getData(path, false, stat);
            acls = zooKeeper.getACL(path, stat);
            ephemeralOwner = stat.getEphemeralOwner();
        }
        catch (InterruptedException | KeeperException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new RuntimeException(String.format("unable to get data, ACLs, and stats from %s for node at path %s", zooKeeper, path), e);
        }
        return new DataStatAclNode(path, data, stat, acls, ephemeralOwner);
    }

    private String ensureNodeExists(ZooKeeper zooKeeper, String path, CreateMode createMode) {
        try {
            LOGGER.debug("attempting to create node at {}", (Object)path);
            ArrayList acls = ZooDefs.Ids.OPEN_ACL_UNSAFE;
            String createNodePath = zooKeeper.create(path, new byte[0], (List)acls, createMode);
            LOGGER.info("created node at {}, acls: {}, createMode: {}", new Object[]{createNodePath, acls, createMode});
            return createNodePath;
        }
        catch (KeeperException e) {
            if (KeeperException.Code.NONODE.equals((Object)e.code())) {
                List pathTokens = Splitter.on((char)'/').omitEmptyStrings().trimResults().splitToList((CharSequence)path);
                String parentPath = "/" + Joiner.on((char)'/').skipNulls().join(pathTokens.subList(0, pathTokens.size() - 1));
                LOGGER.debug("node doesn't exist, recursively attempting to create node at {}", (Object)parentPath);
                this.ensureNodeExists(zooKeeper, parentPath, CreateMode.PERSISTENT);
                LOGGER.debug("recursively created node at {}", (Object)parentPath);
                LOGGER.debug("retrying attempt to create node at {}", (Object)path);
                return this.ensureNodeExists(zooKeeper, path, createMode);
            }
            if (KeeperException.Code.NODEEXISTS.equals((Object)e.code())) {
                return path;
            }
            throw new RuntimeException(String.format("unable to create node at path %s, ZooKeeper returned %s", path, e.code()), e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(String.format("unable to create node at path %s", path), e);
        }
    }

    private List<ACL> determineACLs(DataStatAclNode node, AuthMode authMode, Boolean useExistingACL) {
        return useExistingACL != false ? node.getAcls() : (authMode.equals((Object)AuthMode.OPEN) ? ZooDefs.Ids.OPEN_ACL_UNSAFE : ZooDefs.Ids.CREATOR_ALL_ACL);
    }

    private DataStatAclNode transformNode(DataStatAclNode node, List<ACL> acls) {
        DataStatAclNode migratedNode = new DataStatAclNode(node.getPath(), node.getData(), node.getStat(), acls, node.getEphemeralOwner());
        LOGGER.info("transformed original node {} to {}", (Object)node, (Object)migratedNode);
        return migratedNode;
    }

    private Stat transmitNode(ZooKeeper zooKeeper, DataStatAclNode node) {
        Preconditions.checkNotNull((Object)zooKeeper, (Object)"zooKeeper must not be null");
        Preconditions.checkNotNull((Object)node, (Object)"node must not be null");
        try {
            LOGGER.debug("attempting to transfer node to {} with ACL {}: {}", new Object[]{this.zooKeeperEndpointConfig, node.getAcls(), node});
            zooKeeper.setData(node.getPath(), node.getData(), -1);
            zooKeeper.setACL(node.getPath(), node.getAcls(), -1);
            LOGGER.info("transferred node {} in {}", (Object)node, (Object)this.zooKeeperEndpointConfig);
        }
        catch (InterruptedException | KeeperException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new RuntimeException(String.format("unable to transmit data to %s for path %s", zooKeeper, node.getPath()), e);
        }
        return node.getStat();
    }

    private ZooKeeper getZooKeeper(String zooKeeperConnectString, AuthMode authMode, byte[] authData) throws IOException {
        boolean connected;
        CountDownLatch connectionLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper(zooKeeperConnectString, 3000, watchedEvent -> {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("ZooKeeper server state changed to {} in {}", (Object)watchedEvent.getState(), (Object)zooKeeperConnectString);
            }
            if (watchedEvent.getType().equals((Object)Watcher.Event.EventType.None) && watchedEvent.getState().equals((Object)Watcher.Event.KeeperState.SyncConnected)) {
                connectionLatch.countDown();
            }
        });
        try {
            connected = connectionLatch.await(5L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            this.closeZooKeeper(zooKeeper);
            Thread.currentThread().interrupt();
            throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", zooKeeperConnectString), e);
        }
        if (!connected) {
            this.closeZooKeeper(zooKeeper);
            throw new IOException(String.format("unable to connect to %s", zooKeeperConnectString));
        }
        if (authMode.equals((Object)AuthMode.DIGEST)) {
            zooKeeper.addAuthInfo(SCHEME_DIGEST, authData);
        }
        return zooKeeper;
    }

    private void closeZooKeeper(ZooKeeper zooKeeper) {
        try {
            zooKeeper.close();
        }
        catch (InterruptedException e) {
            LOGGER.warn("could not close ZooKeeper client due to interrupt", (Throwable)e);
            Thread.currentThread().interrupt();
        }
    }

    ZooKeeperEndpointConfig getZooKeeperEndpointConfig() {
        return this.zooKeeperEndpointConfig;
    }

    static enum AuthMode {
        OPEN,
        DIGEST,
        SASL;

    }
}

