/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.etcd3;

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.etcd3.Etcd3Configuration;
import org.apache.camel.component.etcd3.Etcd3Endpoint;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.util.StringHelper;

class Etcd3Consumer
extends DefaultConsumer
implements Watch.Listener {
    private final Etcd3Configuration configuration;
    private final String path;
    private final Client client;
    private final Watch watch;
    private final AtomicLong revision;
    private final Charset keyCharset;
    private final AtomicReference<Watch.Watcher> watcher = new AtomicReference();

    Etcd3Consumer(Etcd3Endpoint endpoint, Processor processor, Etcd3Configuration configuration, String path) {
        super((Endpoint)endpoint, processor);
        this.configuration = configuration;
        this.path = StringHelper.notEmpty((String)path, (String)"path");
        this.client = configuration.createClient();
        this.watch = this.client.getWatchClient();
        this.revision = new AtomicLong(configuration.getFromIndex());
        this.keyCharset = Charset.forName(configuration.getKeyCharset());
    }

    protected void doStart() throws Exception {
        this.doWatch();
        super.doStart();
    }

    protected void doStop() throws Exception {
        try {
            this.client.close();
        }
        finally {
            super.doStop();
        }
    }

    private void doWatch() {
        if (!this.isRunAllowed()) {
            return;
        }
        this.watcher.getAndUpdate(w -> {
            if (w != null) {
                w.close();
            }
            return this.watch.watch(ByteSequence.from((String)this.path, (Charset)this.keyCharset), WatchOption.newBuilder().isPrefix(this.configuration.isPrefix()).withRevision(this.revision.get()).build(), (Watch.Listener)this);
        });
    }

    public void onNext(WatchResponse response) {
        for (WatchEvent event : response.getEvents()) {
            Exchange exchange = this.createExchange(false);
            KeyValue keyValue = event.getKeyValue();
            exchange.getIn().setHeader("CamelEtcdPath", (Object)keyValue.getKey().toString(this.keyCharset));
            exchange.getIn().setBody((Object)event);
            this.revision.getAndUpdate(r -> Math.max(r, keyValue.getModRevision() + 1L));
            try {
                this.getProcessor().process(exchange);
            }
            catch (Exception e) {
                this.getExceptionHandler().handleException("Error processing exchange", exchange, (Throwable)e);
            }
            this.releaseExchange(exchange, false);
        }
    }

    public void onError(Throwable throwable) {
        this.handleException("Error processing etcd response", throwable);
    }

    public void onCompleted() {
        this.doWatch();
    }
}

