package com.couchbase.connect.kafka;

import com.couchbase.client.core.logging.LogRedaction;
import com.couchbase.client.core.util.CbCollections;
import com.couchbase.client.core.util.CbStrings;
import com.couchbase.connect.kafka.config.sink.CouchbaseSinkConfig;
import com.couchbase.connect.kafka.config.sink.SinkBehaviorConfig;
import com.couchbase.connect.kafka.handler.sink.N1qlSinkHandler;
import com.couchbase.connect.kafka.handler.sink.SinkAction;
import com.couchbase.connect.kafka.handler.sink.SinkDocument;
import com.couchbase.connect.kafka.handler.sink.SinkHandler;
import com.couchbase.connect.kafka.handler.sink.SinkHandlerContext;
import com.couchbase.connect.kafka.handler.sink.SinkHandlerParams;
import com.couchbase.connect.kafka.handler.sink.SubDocumentSinkHandler;
import com.couchbase.connect.kafka.util.BatchBuilder;
import com.couchbase.connect.kafka.util.DocumentIdExtractor;
import com.couchbase.connect.kafka.util.DocumentPathExtractor;
import com.couchbase.connect.kafka.util.DurabilitySetter;
import com.couchbase.connect.kafka.util.KafkaRetryHelper;
import com.couchbase.connect.kafka.util.Keyspace;
import com.couchbase.connect.kafka.util.TopicMap;
import com.couchbase.connect.kafka.util.Version;
import com.couchbase.connect.kafka.util.config.ConfigHelper;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/couchbase/connect/kafka/CouchbaseSinkTask.class */
public class CouchbaseSinkTask extends SinkTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseSinkTask.class);
    private Keyspace defaultDestCollection;
    private Map<String, Keyspace> topicToCollection;
    private KafkaCouchbaseClient client;
    private JsonConverter converter;
    private DocumentIdExtractor documentIdExtractor;
    private SinkHandler sinkHandler;
    private boolean sinkHandlerUsesKvConnections;
    private KafkaRetryHelper retryHelper;
    private DurabilitySetter durabilitySetter;
    private Optional<Duration> documentExpiry;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> map) {
        try {
            CouchbaseSinkConfig couchbaseSinkConfig = (CouchbaseSinkConfig) ConfigHelper.parse(CouchbaseSinkConfig.class, map);
            HashMap hashMap = new HashMap();
            map.forEach((str, str2) -> {
                if (!str.startsWith("couchbase.env.") || CbStrings.isNullOrEmpty(str2)) {
                    return;
                }
                hashMap.put(CbStrings.removeStart(str, "couchbase.env."), str2);
            });
            LOGGER.info("Custom ClusterEnvironment properties: {}", hashMap);
            LogRedaction.setRedactionLevel(couchbaseSinkConfig.logRedaction());
            this.client = new KafkaCouchbaseClient(couchbaseSinkConfig, hashMap);
            this.defaultDestCollection = Keyspace.parse(couchbaseSinkConfig.defaultCollection(), couchbaseSinkConfig.bucket());
            this.topicToCollection = TopicMap.parseTopicToCollection(couchbaseSinkConfig.topicToCollection(), couchbaseSinkConfig.bucket());
            this.converter = new JsonConverter();
            this.converter.configure(CbCollections.mapOf("schemas.enable", false), false);
            String documentId = couchbaseSinkConfig.documentId();
            if (documentId != null && !documentId.isEmpty()) {
                this.documentIdExtractor = new DocumentIdExtractor(documentId, couchbaseSinkConfig.removeDocumentId());
            }
            Class<? extends SinkHandler> sinkHandler = couchbaseSinkConfig.sinkHandler();
            SinkBehaviorConfig.DocumentMode documentMode = couchbaseSinkConfig.documentMode();
            if (documentMode != SinkBehaviorConfig.DocumentMode.DOCUMENT) {
                sinkHandler = documentMode == SinkBehaviorConfig.DocumentMode.N1QL ? N1qlSinkHandler.class : SubDocumentSinkHandler.class;
                LOGGER.warn("Forcing sink handler to {} because document mode is {}. The `couchbase.document.mode` config property is deprecated; please use `couchbase.sink.handler` instead.", sinkHandler, documentMode);
            }
            this.sinkHandler = (SinkHandler) Utils.newInstance(sinkHandler);
            this.sinkHandler.init(new SinkHandlerContext(this.client.cluster().reactive(), Collections.unmodifiableMap(map)));
            this.sinkHandlerUsesKvConnections = this.sinkHandler.usesKvCollections();
            if (this.sinkHandlerUsesKvConnections && couchbaseSinkConfig.bucket().isEmpty()) {
                throw new ConfigException("Missing required config property: " + ConfigHelper.keyName(CouchbaseSinkConfig.class, (v0) -> {
                    v0.bucket();
                }));
            }
            LOGGER.info("Using sink handler: {}", this.sinkHandler);
            this.durabilitySetter = DurabilitySetter.create(couchbaseSinkConfig);
            this.documentExpiry = couchbaseSinkConfig.documentExpiration().isZero() ? Optional.empty() : Optional.of(couchbaseSinkConfig.documentExpiration());
            this.retryHelper = new KafkaRetryHelper("CouchbaseSinkTask.put()", couchbaseSinkConfig.retryTimeout());
            if (usingLongKvTimeouts()) {
                LOGGER.warn("The specified KV timeout is very long, and might cause problems for the Kafka consumer session.  Consider using the '" + ConfigHelper.keyName(CouchbaseSinkConfig.class, (v0) -> {
                    v0.retryTimeout();
                }) + "' config property instead of setting a long KV timeout. The retry timeout handles more kinds of write failures and can safely be set to a duration longer than Kafka consumer session timeout.");
            }
        } catch (ConfigException e) {
            throw new ConnectException("Couldn't start CouchbaseSinkTask due to configuration error", e);
        }
    }

    private boolean usingLongKvTimeouts() {
        Duration kvTimeout = this.client.cluster().environment().timeoutConfig().kvTimeout();
        Duration kvDurableTimeout = this.client.cluster().environment().timeoutConfig().kvDurableTimeout();
        Duration ofSeconds = Duration.ofSeconds(20L);
        return kvTimeout.compareTo(ofSeconds) > 0 || kvDurableTimeout.compareTo(ofSeconds) > 0;
    }

    public void put(Collection<SinkRecord> collection) {
        this.retryHelper.runWithRetry(() -> {
            doPut(collection);
        });
    }

    private void doPut(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            return;
        }
        SinkRecord next = collection.iterator().next();
        LOGGER.trace("Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the Couchbase...", new Object[]{Integer.valueOf(collection.size()), next.topic(), next.kafkaPartition(), Long.valueOf(next.kafkaOffset())});
        ArrayList arrayList = new ArrayList();
        for (SinkRecord sinkRecord : collection) {
            Keyspace orDefault = this.topicToCollection.getOrDefault(sinkRecord.topic(), this.defaultDestCollection);
            arrayList.add(new SinkHandlerParams(this.client.cluster().reactive(), this.sinkHandlerUsesKvConnections ? this.client.collection(orDefault).reactive() : null, orDefault, sinkRecord, toSinkDocument(sinkRecord), this.documentExpiry, this.durabilitySetter));
        }
        execute(this.sinkHandler.handleBatch(arrayList));
    }

    private static void execute(List<SinkAction> list) {
        toMono(list).block(Duration.ofMinutes(10L));
    }

    static Mono<Void> toMono(List<SinkAction> list) {
        BatchBuilder batchBuilder = new BatchBuilder();
        for (SinkAction sinkAction : list) {
            batchBuilder.add(sinkAction.action(), sinkAction.concurrencyHint());
        }
        return Flux.fromStream(batchBuilder.build().stream().map(list2 -> {
            return Flux.fromIterable(list2).flatMap(mono -> {
                return mono;
            }).then();
        })).concatMap(mono -> {
            return mono;
        }).then();
    }

    private SinkDocument toSinkDocument(SinkRecord sinkRecord) {
        if (sinkRecord.value() == null) {
            return null;
        }
        byte[] fromConnectData = this.converter.fromConnectData(sinkRecord.topic(), sinkRecord.valueSchema(), sinkRecord.value());
        try {
            if (this.documentIdExtractor != null) {
                return this.documentIdExtractor.extractDocumentId(fromConnectData);
            }
        } catch (DocumentPathExtractor.DocumentPathNotFoundException e) {
            LOGGER.warn(e.getMessage() + "; letting sink handler use fallback ID");
        } catch (IOException e2) {
            throw new RuntimeException(e2);
        }
        return new SinkDocument(null, fromConnectData);
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public void stop() {
        if (this.retryHelper != null) {
            this.retryHelper.close();
            this.retryHelper = null;
        }
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
    }
}
