/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.metadata.impl;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.ClientBuilder;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Txn;
import io.etcd.jetcd.kv.DeleteResponse;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.kv.PutResponse;
import io.etcd.jetcd.kv.TxnResponse;
import io.etcd.jetcd.lease.LeaseKeepAliveResponse;
import io.etcd.jetcd.op.Cmp;
import io.etcd.jetcd.op.CmpTarget;
import io.etcd.jetcd.op.Op;
import io.etcd.jetcd.options.DeleteOption;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.support.CloseableClient;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.pulsar.functions.runtime.shaded.com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.apache.pulsar.functions.runtime.shaded.io.grpc.Status;
import org.apache.pulsar.functions.runtime.shaded.io.grpc.StatusRuntimeException;
import org.apache.pulsar.functions.runtime.shaded.io.grpc.netty.GrpcSslContexts;
import org.apache.pulsar.functions.runtime.shaded.io.grpc.stub.StreamObserver;
import org.apache.pulsar.functions.runtime.shaded.io.netty.handler.ssl.SslContext;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.EtcdConfig;
import org.apache.pulsar.metadata.impl.EtcdSessionWatcher;
import org.apache.pulsar.metadata.impl.batching.AbstractBatchedMetadataStore;
import org.apache.pulsar.metadata.impl.batching.MetadataOp;
import org.apache.pulsar.metadata.impl.batching.OpDelete;
import org.apache.pulsar.metadata.impl.batching.OpGet;
import org.apache.pulsar.metadata.impl.batching.OpGetChildren;
import org.apache.pulsar.metadata.impl.batching.OpPut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EtcdMetadataStore
extends AbstractBatchedMetadataStore {
    private static final Logger log = LoggerFactory.getLogger(EtcdMetadataStore.class);
    static final String ETCD_SCHEME = "etcd";
    static final String ETCD_SCHEME_IDENTIFIER = "etcd:";
    private final int leaseTTLSeconds;
    private final Client client;
    private final KV kv;
    private volatile long leaseId;
    private volatile CloseableClient leaseClient;
    private final EtcdSessionWatcher sessionWatcher;
    private static final GetOption EXISTS_GET_OPTION = GetOption.newBuilder().withCountOnly(true).build();
    private static final GetOption SINGLE_GET_OPTION = GetOption.newBuilder().withLimit(1L).build();

    public EtcdMetadataStore(String metadataURL, MetadataStoreConfig conf, boolean enableSessionWatcher) throws MetadataStoreException {
        super(conf);
        this.leaseTTLSeconds = conf.getSessionTimeoutMillis() / 1000;
        try {
            this.client = this.newEtcdClient(metadataURL, conf);
            this.kv = this.client.getKVClient();
            this.client.getWatchClient().watch(ByteSequence.from((String)"/", (Charset)StandardCharsets.UTF_8), WatchOption.newBuilder().isPrefix(true).build(), this::handleWatchResponse);
            if (enableSessionWatcher) {
                this.sessionWatcher = new EtcdSessionWatcher(this.client, conf.getSessionTimeoutMillis(), this::receivedSessionEvent);
                this.createLease(false).join();
            } else {
                this.sessionWatcher = null;
            }
        }
        catch (Exception e) {
            throw new MetadataStoreException(e);
        }
    }

    private Client newEtcdClient(String metadataURL, MetadataStoreConfig conf) throws IOException {
        String etcdUrl = metadataURL.replaceFirst(ETCD_SCHEME_IDENTIFIER, "");
        ClientBuilder clientBuilder = Client.builder().endpoints(etcdUrl.split(","));
        if (StringUtils.isNotEmpty(conf.getConfigFilePath())) {
            try (InputStream inputStream = Files.newInputStream(Paths.get(conf.getConfigFilePath(), new String[0]), new OpenOption[0]);){
                EtcdConfig etcdConfig = new ObjectMapper(new YAMLFactory()).readValue(inputStream, EtcdConfig.class);
                if (etcdConfig.isUseTls()) {
                    File trustCertsFile = this.readFile(etcdConfig.getTlsTrustCertsFilePath());
                    File keyFile = this.readFile(etcdConfig.getTlsKeyFilePath());
                    File certFile = this.readFile(etcdConfig.getTlsCertificateFilePath());
                    SslContext context = GrpcSslContexts.forClient().trustManager(trustCertsFile).sslProvider(etcdConfig.getTlsProvider()).keyManager(certFile, keyFile).build();
                    clientBuilder.sslContext(context);
                }
                if (StringUtils.isNotEmpty(etcdConfig.getAuthority())) {
                    clientBuilder.authority(etcdConfig.getAuthority());
                }
            }
        }
        return clientBuilder.build();
    }

    private File readFile(String path) {
        return StringUtils.isEmpty(path) ? null : new File(path);
    }

    @Override
    public void close() throws Exception {
        if (this.isClosed.compareAndSet(false, true)) {
            super.close();
            if (this.sessionWatcher != null) {
                this.sessionWatcher.close();
            }
            if (this.leaseClient != null) {
                this.leaseClient.close();
            }
            if (this.leaseId != 0L) {
                this.client.getLeaseClient().revoke(this.leaseId);
            }
            this.kv.close();
            this.client.close();
        }
    }

    @Override
    protected CompletableFuture<Boolean> existsFromStore(String path) {
        return this.kv.get(ByteSequence.from((String)path, (Charset)StandardCharsets.UTF_8), EXISTS_GET_OPTION).thenApplyAsync(gr -> gr.getCount() == 1L, (Executor)this.executor);
    }

    @Override
    protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> optExpectedVersion, EnumSet<CreateOption> options) {
        if (!options.contains((Object)CreateOption.Sequential)) {
            return super.storePut(path, data, optExpectedVersion, options);
        }
        String parent = EtcdMetadataStore.parent(path);
        if (parent == null) {
            parent = "/";
        }
        return super.storePut(parent, new byte[0], Optional.empty(), EnumSet.noneOf(CreateOption.class)).thenComposeAsync(stat -> super.storePut(path + stat.getVersion(), data, optExpectedVersion, options), (Executor)this.executor);
    }

    @Override
    protected void batchOperation(List<MetadataOp> ops) {
        try {
            Txn txn = this.kv.txn();
            ops.forEach(op -> {
                switch (op.getType()) {
                    case PUT: {
                        OpPut put = op.asPut();
                        ByteSequence key = ByteSequence.from((String)put.getPath(), (Charset)StandardCharsets.UTF_8);
                        if (!put.getOptExpectedVersion().isPresent()) break;
                        long expectedVersion = put.getOptExpectedVersion().get();
                        if (expectedVersion == -1L) {
                            txn.If(new Cmp[]{new Cmp(key, Cmp.Op.EQUAL, (CmpTarget)CmpTarget.createRevision((long)0L))});
                            break;
                        }
                        txn.If(new Cmp[]{new Cmp(key, Cmp.Op.EQUAL, (CmpTarget)CmpTarget.version((long)(expectedVersion + 1L)))});
                        break;
                    }
                    case DELETE: {
                        OpDelete del = op.asDelete();
                        ByteSequence key = ByteSequence.from((String)del.getPath(), (Charset)StandardCharsets.UTF_8);
                        if (!del.getOptExpectedVersion().isPresent()) break;
                        txn.If(new Cmp[]{new Cmp(key, Cmp.Op.EQUAL, (CmpTarget)CmpTarget.version((long)(del.getOptExpectedVersion().get() + 1L)))});
                        break;
                    }
                }
            });
            ops.forEach(op -> {
                switch (op.getType()) {
                    case GET: {
                        txn.Then(new Op[]{Op.get((ByteSequence)ByteSequence.from((String)op.asGet().getPath(), (Charset)StandardCharsets.UTF_8), (GetOption)SINGLE_GET_OPTION)});
                        break;
                    }
                    case PUT: {
                        OpPut put = op.asPut();
                        ByteSequence key = ByteSequence.from((String)put.getPath(), (Charset)StandardCharsets.UTF_8);
                        if (put.getFuture().isDone()) break;
                        PutOption.Builder b = PutOption.newBuilder().withPrevKV();
                        if (put.isEphemeral()) {
                            b.withLeaseId(this.leaseId);
                        }
                        txn.Then(new Op[]{Op.put((ByteSequence)key, (ByteSequence)ByteSequence.from((byte[])put.getData()), (PutOption)b.build())});
                        break;
                    }
                    case DELETE: {
                        OpDelete del = op.asDelete();
                        ByteSequence key = ByteSequence.from((String)del.getPath(), (Charset)StandardCharsets.UTF_8);
                        txn.Then(new Op[]{Op.delete((ByteSequence)key, (DeleteOption)DeleteOption.DEFAULT)});
                        break;
                    }
                    case GET_CHILDREN: {
                        OpGetChildren opGetChildren = op.asGetChildren();
                        String path = opGetChildren.getPath();
                        ByteSequence prefix = ByteSequence.from((String)(path.equals("/") ? path : path + "/"), (Charset)StandardCharsets.UTF_8);
                        txn.Then(new Op[]{Op.get((ByteSequence)prefix, (GetOption)GetOption.newBuilder().withKeysOnly(true).withSortField(GetOption.SortTarget.KEY).withSortOrder(GetOption.SortOrder.ASCEND).isPrefix(true).build())});
                        break;
                    }
                }
            });
            ((CompletableFuture)txn.commit().thenAccept(txnResponse -> this.handleBatchOperationResult((TxnResponse)txnResponse, ops))).exceptionally(ex -> {
                Throwable cause = ex.getCause();
                if (cause instanceof ExecutionException || cause instanceof CompletionException) {
                    cause = cause.getCause();
                }
                if (ops.size() > 1 && cause instanceof StatusRuntimeException) {
                    Status.Code code = ((StatusRuntimeException)cause).getStatus().getCode();
                    if (code == Status.Code.INVALID_ARGUMENT || code == Status.Code.RESOURCE_EXHAUSTED) {
                        ops.forEach(o -> this.batchOperation(Collections.singletonList(o)));
                    }
                } else {
                    log.warn("Failed to commit: {}", (Object)cause.getMessage());
                    this.executor.execute(() -> ops.forEach(o -> o.getFuture().completeExceptionally((Throwable)ex)));
                }
                return null;
            });
        }
        catch (Throwable t) {
            log.warn("Error in committing batch: {}", (Object)t.getMessage());
        }
    }

    private void handleBatchOperationResult(TxnResponse txnResponse, List<MetadataOp> ops) {
        this.executor.execute(() -> {
            if (!txnResponse.isSucceeded()) {
                if (ops.size() > 1) {
                    ops.forEach(o -> this.batchOperation(Collections.singletonList(o)));
                } else {
                    ((MetadataOp)ops.get(0)).getFuture().completeExceptionally(new MetadataStoreException.BadVersionException("Bad version"));
                }
                return;
            }
            int getIdx = 0;
            int deletedIdx = 0;
            int putIdx = 0;
            for (MetadataOp op : ops) {
                switch (op.getType()) {
                    case GET: {
                        OpGet get = op.asGet();
                        GetResponse gr = (GetResponse)txnResponse.getGetResponses().get(getIdx++);
                        if (gr.getCount() == 0L) {
                            get.getFuture().complete(Optional.empty());
                            break;
                        }
                        KeyValue kv2 = (KeyValue)gr.getKvs().get(0);
                        boolean isEphemeral = kv2.getLease() != 0L;
                        boolean createdBySelf = kv2.getLease() == this.leaseId;
                        get.getFuture().complete(Optional.of(new GetResult(kv2.getValue().getBytes(), new Stat(get.getPath(), kv2.getVersion() - 1L, 0L, 0L, isEphemeral, createdBySelf))));
                        break;
                    }
                    case PUT: {
                        OpPut put = op.asPut();
                        PutResponse pr = (PutResponse)txnResponse.getPutResponses().get(putIdx++);
                        KeyValue prevKv = pr.getPrevKv();
                        if (prevKv == null) {
                            put.getFuture().complete(new Stat(put.getPath(), 0L, 0L, 0L, put.isEphemeral(), true));
                            break;
                        }
                        put.getFuture().complete(new Stat(put.getPath(), prevKv.getVersion(), 0L, 0L, put.isEphemeral(), true));
                        break;
                    }
                    case DELETE: {
                        OpDelete del = op.asDelete();
                        DeleteResponse dr = (DeleteResponse)txnResponse.getDeleteResponses().get(deletedIdx++);
                        if (dr.getDeleted() == 0L) {
                            del.getFuture().completeExceptionally(new MetadataStoreException.NotFoundException());
                            break;
                        }
                        del.getFuture().complete(null);
                        break;
                    }
                    case GET_CHILDREN: {
                        OpGetChildren getChildren = op.asGetChildren();
                        GetResponse gr = (GetResponse)txnResponse.getGetResponses().get(getIdx++);
                        String basePath = getChildren.getPath().equals("/") ? "/" : getChildren.getPath() + "/";
                        Set children = gr.getKvs().stream().map(kv -> kv.getKey().toString(StandardCharsets.UTF_8)).map(p -> p.replaceFirst(basePath, "")).map(k -> k.split("/", 2)[0]).collect(Collectors.toCollection(TreeSet::new));
                        getChildren.getFuture().complete(new ArrayList(children));
                    }
                }
            }
        });
    }

    private synchronized CompletableFuture<Void> createLease(boolean retryOnFailure) {
        CompletionStage future = this.client.getLeaseClient().grant((long)this.leaseTTLSeconds).thenAccept(lease -> {
            EtcdMetadataStore etcdMetadataStore = this;
            synchronized (etcdMetadataStore) {
                this.leaseId = lease.getID();
                if (this.leaseClient != null) {
                    this.leaseClient.close();
                }
                this.leaseClient = this.client.getLeaseClient().keepAlive(this.leaseId, (StreamObserver)new StreamObserver<LeaseKeepAliveResponse>(){

                    public void onNext(LeaseKeepAliveResponse leaseKeepAliveResponse) {
                        if (log.isDebugEnabled()) {
                            log.debug("On next: {}", (Object)leaseKeepAliveResponse);
                        }
                    }

                    public void onError(Throwable throwable) {
                        log.warn("Lease client error :", throwable);
                        EtcdMetadataStore.this.receivedSessionEvent(SessionEvent.SessionLost);
                    }

                    public void onCompleted() {
                        log.info("Etcd lease has expired");
                        EtcdMetadataStore.this.receivedSessionEvent(SessionEvent.SessionLost);
                    }
                });
            }
        });
        if (retryOnFailure) {
            ((CompletableFuture)future).exceptionally(ex -> {
                log.warn("Failed to create Etcd lease. Retrying later", ex);
                this.executor.schedule(() -> this.createLease(true), 1L, TimeUnit.SECONDS);
                return null;
            });
        }
        return future;
    }

    private void handleWatchResponse(WatchResponse watchResponse) {
        watchResponse.getEvents().forEach(we -> {
            String path = we.getKeyValue().getKey().toString(StandardCharsets.UTF_8);
            if (we.getEventType() == WatchEvent.EventType.PUT) {
                if (we.getKeyValue().getVersion() == 1L) {
                    this.receivedNotification(new Notification(NotificationType.Created, path));
                    this.notifyParentChildrenChanged(path);
                } else {
                    this.receivedNotification(new Notification(NotificationType.Modified, path));
                }
            } else if (we.getEventType() == WatchEvent.EventType.DELETE) {
                this.receivedNotification(new Notification(NotificationType.Deleted, path));
                this.notifyParentChildrenChanged(path);
            }
        });
    }

    @Override
    protected void receivedSessionEvent(SessionEvent event) {
        if (event == SessionEvent.SessionReestablished) {
            this.createLease(true).thenRun(() -> super.receivedSessionEvent(event));
        } else {
            super.receivedSessionEvent(event);
        }
    }
}

