/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.io.kafka.connect;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.kafka.connect.PulsarKafkaConnectSinkConfig;
import org.apache.pulsar.io.kafka.connect.PulsarKafkaSinkContext;
import org.apache.pulsar.io.kafka.connect.PulsarKafkaSinkTaskContext;
import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
import org.apache.pulsar.io.kafka.connect.schema.PulsarSchemaToKafkaSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConnectSink
implements Sink<GenericObject> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaConnectSink.class);
    private boolean unwrapKeyValueIfAvailable;
    private PulsarKafkaSinkContext sinkContext;
    @VisibleForTesting
    PulsarKafkaSinkTaskContext taskContext;
    private SinkConnector connector;
    private SinkTask task;
    private long maxBatchSize;
    private final AtomicLong currentBatchSize = new AtomicLong(0L);
    private long lingerMs;
    private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("pulsar-io-kafka-adaptor-sink-flush-%d").build());
    @VisibleForTesting
    protected final ConcurrentLinkedDeque<Record<GenericObject>> pendingFlushQueue = new ConcurrentLinkedDeque();
    private final AtomicBoolean isFlushRunning = new AtomicBoolean(false);
    private volatile boolean isRunning = false;
    private final Properties props = new Properties();
    private PulsarKafkaConnectSinkConfig kafkaSinkConfig;
    protected String topicName;
    protected boolean useOptionalPrimitives;
    private boolean sanitizeTopicName = false;
    private boolean collapsePartitionedTopics = false;
    private final Cache<String, String> sanitizedTopicCache = CacheBuilder.newBuilder().maximumSize(1000L).expireAfterAccess(30L, TimeUnit.MINUTES).build();
    private final Cache<String, String> desanitizedTopicCache = CacheBuilder.newBuilder().build();
    private int maxBatchBitsForOffset = 12;
    private boolean useIndexAsOffset = true;

    public void write(Record<GenericObject> sourceRecord) {
        if (log.isDebugEnabled()) {
            log.debug("Record sending to kafka, record={}.", sourceRecord);
        }
        if (!this.isRunning) {
            log.warn("Sink is stopped. Cannot send the record {}", sourceRecord);
            sourceRecord.fail();
            return;
        }
        Preconditions.checkArgument((boolean)sourceRecord.getMessage().isPresent());
        try {
            SinkRecord record = this.toSinkRecord(sourceRecord);
            this.task.put((Collection)Lists.newArrayList((Object[])new SinkRecord[]{record}));
        }
        catch (Exception ex) {
            log.error("Error sending the record {}", sourceRecord, (Object)ex);
            sourceRecord.fail();
            return;
        }
        this.pendingFlushQueue.add(sourceRecord);
        this.currentBatchSize.addAndGet(((Message)sourceRecord.getMessage().get()).size());
        this.flushIfNeeded(false);
    }

    public void close() throws Exception {
        this.isRunning = false;
        this.flushIfNeeded(true);
        this.scheduledExecutor.shutdown();
        if (!this.scheduledExecutor.awaitTermination(10L * this.lingerMs, TimeUnit.MILLISECONDS)) {
            log.error("scheduledExecutor did not terminate in {} ms", (Object)(10L * this.lingerMs));
        }
        this.task.stop();
        this.connector.stop();
        this.taskContext.close();
        log.info("Kafka sink stopped.");
    }

    public void open(Map<String, Object> config, SinkContext ctx) throws Exception {
        this.kafkaSinkConfig = PulsarKafkaConnectSinkConfig.load(config);
        Objects.requireNonNull(this.kafkaSinkConfig.getTopic(), "Kafka topic is not set");
        Preconditions.checkArgument((ctx.getSubscriptionType() == SubscriptionType.Failover || ctx.getSubscriptionType() == SubscriptionType.Exclusive ? 1 : 0) != 0, (Object)"Source must run with Exclusive or Failover subscription type");
        this.topicName = this.kafkaSinkConfig.getTopic();
        this.unwrapKeyValueIfAvailable = this.kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
        this.sanitizeTopicName = this.kafkaSinkConfig.isSanitizeTopicName();
        this.collapsePartitionedTopics = this.kafkaSinkConfig.isCollapsePartitionedTopics();
        this.useOptionalPrimitives = this.kafkaSinkConfig.isUseOptionalPrimitives();
        this.useIndexAsOffset = this.kafkaSinkConfig.isUseIndexAsOffset();
        this.maxBatchBitsForOffset = this.kafkaSinkConfig.getMaxBatchBitsForOffset();
        Preconditions.checkArgument((this.maxBatchBitsForOffset <= 20 ? 1 : 0) != 0, (Object)"Cannot use more than 20 bits for maxBatchBitsForOffset");
        String kafkaConnectorFQClassName = this.kafkaSinkConfig.getKafkaConnectorSinkClass();
        this.kafkaSinkConfig.getKafkaConnectorConfigProperties().forEach(this.props::put);
        Class<?> clazz = Class.forName(kafkaConnectorFQClassName);
        this.connector = (SinkConnector)clazz.getConstructor(new Class[0]).newInstance(new Object[0]);
        Class taskClass = this.connector.taskClass();
        this.sinkContext = new PulsarKafkaSinkContext();
        this.connector.initialize((ConnectorContext)this.sinkContext);
        this.connector.start((Map)Maps.fromProperties((Properties)this.props));
        List<Map> configs = this.connector.taskConfigs(1);
        Preconditions.checkNotNull((Object)configs);
        Preconditions.checkArgument((configs.size() == 1 ? 1 : 0) != 0);
        configs = configs.stream().map(HashMap::new).collect(Collectors.toList());
        configs.forEach(x -> x.put("offset.storage.topic", this.kafkaSinkConfig.getOffsetStorageTopic()));
        this.task = (SinkTask)taskClass.getConstructor(new Class[0]).newInstance(new Object[0]);
        this.taskContext = new PulsarKafkaSinkTaskContext((Map)configs.get(0), ctx, arg_0 -> ((SinkTask)this.task).open(arg_0), kafkaName -> {
            if (this.sanitizeTopicName) {
                String pulsarTopicName = (String)this.desanitizedTopicCache.getIfPresent(kafkaName);
                if (log.isDebugEnabled()) {
                    log.debug("desanitizedTopicCache got: kafkaName: {}, pulsarTopicName: {}", kafkaName, (Object)pulsarTopicName);
                }
                return pulsarTopicName != null ? pulsarTopicName : kafkaName;
            }
            return kafkaName;
        });
        this.task.initialize((SinkTaskContext)this.taskContext);
        this.task.start((Map)configs.get(0));
        this.maxBatchSize = this.kafkaSinkConfig.getBatchSize();
        this.lingerMs = this.kafkaSinkConfig.getLingerTimeMs();
        this.isRunning = true;
        this.scheduledExecutor.scheduleWithFixedDelay(() -> this.flushIfNeeded(true), this.lingerMs, this.lingerMs, TimeUnit.MILLISECONDS);
        log.info("Kafka sink started : {}.", (Object)this.props);
    }

    private void flushIfNeeded(boolean force) {
        if (this.isFlushRunning.get()) {
            return;
        }
        if (force || this.currentBatchSize.get() >= this.maxBatchSize) {
            this.scheduledExecutor.submit(this::flush);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void flush() {
        if (log.isDebugEnabled()) {
            log.debug("flush requested, pending: {}, batchSize: {}", (Object)this.currentBatchSize.get(), (Object)this.maxBatchSize);
        }
        if (this.pendingFlushQueue.isEmpty()) {
            return;
        }
        if (!this.isFlushRunning.compareAndSet(false, true)) {
            return;
        }
        Record<GenericObject> lastNotFlushed = this.pendingFlushQueue.getLast();
        Map committedOffsets = null;
        try {
            Map<TopicPartition, OffsetAndMetadata> currentOffsets = this.taskContext.currentOffsets();
            committedOffsets = this.task.preCommit(currentOffsets);
            if (committedOffsets == null || committedOffsets.isEmpty()) {
                log.info("Task returned empty committedOffsets map; skipping flush; task will retry later");
                return;
            }
            if (log.isDebugEnabled() && !KafkaConnectSink.areMapsEqual(committedOffsets, currentOffsets)) {
                log.debug("committedOffsets {} differ from currentOffsets {}", (Object)committedOffsets, currentOffsets);
            }
            this.taskContext.flushOffsets(committedOffsets);
            this.ackUntil(lastNotFlushed, committedOffsets, Record::ack);
            log.info("Flush succeeded");
        }
        catch (Throwable t) {
            log.error("error flushing pending records", t);
            this.ackUntil(lastNotFlushed, committedOffsets, Record::fail);
        }
        finally {
            this.isFlushRunning.compareAndSet(true, false);
        }
    }

    private static boolean areMapsEqual(Map<TopicPartition, OffsetAndMetadata> first, Map<TopicPartition, OffsetAndMetadata> second) {
        if (first.size() != second.size()) {
            return false;
        }
        return first.entrySet().stream().allMatch(e -> ((OffsetAndMetadata)e.getValue()).equals(second.get(e.getKey())));
    }

    @VisibleForTesting
    protected void ackUntil(Record<GenericObject> lastNotFlushed, Map<TopicPartition, OffsetAndMetadata> committedOffsets, Consumer<Record<GenericObject>> cb) {
        HashMap topicOffsets = new HashMap();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : committedOffsets.entrySet()) {
            TopicPartition tp = entry.getKey();
            if (!topicOffsets.containsKey(tp.topic())) {
                topicOffsets.put(tp.topic(), new HashMap());
            }
            Map partitionOffset = (Map)topicOffsets.get(tp.topic());
            partitionOffset.put(tp.partition(), entry.getValue().offset());
        }
        for (Record record : this.pendingFlushQueue) {
            String topic = this.sanitizeNameIfNeeded(record.getTopicName().orElse(this.topicName), this.sanitizeTopicName);
            int partition = record.getPartitionIndex().orElse(0);
            Long lastCommittedOffset = null;
            if (topicOffsets.containsKey(topic)) {
                lastCommittedOffset = (Long)((Map)topicOffsets.get(topic)).get(partition);
            }
            if (lastCommittedOffset == null) {
                if (record != lastNotFlushed) continue;
                break;
            }
            long offset = this.getMessageOffset((Record<GenericObject>)record);
            if (offset > lastCommittedOffset) {
                if (record != lastNotFlushed) continue;
                break;
            }
            cb.accept((Record<GenericObject>)record);
            this.pendingFlushQueue.remove(record);
            this.currentBatchSize.addAndGet(-1 * ((Message)record.getMessage().get()).size());
            if (record != lastNotFlushed) continue;
            break;
        }
    }

    private long getMessageOffset(Record<GenericObject> sourceRecord) {
        if (sourceRecord.getMessage().isPresent()) {
            BatchMessageSequenceRef messageSequenceRef;
            if (this.useIndexAsOffset && ((Message)sourceRecord.getMessage().get()).hasIndex()) {
                return ((Message)sourceRecord.getMessage().get()).getIndex().orElse(-1L);
            }
            MessageId messageId = ((Message)sourceRecord.getMessage().get()).getMessageId();
            if (this.maxBatchBitsForOffset > 0 && (messageSequenceRef = KafkaConnectSink.getMessageSequenceRefForBatchMessage(messageId)) != null) {
                int batchIdx;
                long ledgerId = messageSequenceRef.getLedgerId();
                long entryId = messageSequenceRef.getEntryId();
                if (entryId > (long)(1 << 28 - this.maxBatchBitsForOffset)) {
                    log.error("EntryId of the message {} over max, chance of duplicate offsets", (Object)entryId);
                }
                if ((batchIdx = messageSequenceRef.getBatchIdx()) < 0) {
                    log.error("BatchIdx {} of the message is negative, chance of duplicate offsets", (Object)batchIdx);
                    batchIdx = 0;
                }
                if (batchIdx > 1 << this.maxBatchBitsForOffset) {
                    log.error("BatchIdx of the message {} over max, chance of duplicate offsets", (Object)batchIdx);
                }
                entryId = entryId << this.maxBatchBitsForOffset | (long)batchIdx;
                long offset = ledgerId << 28 | entryId;
                return offset;
            }
        }
        return sourceRecord.getRecordSequence().orElse(-1L);
    }

    private static Method getMethodOfMessageId(MessageId messageId, String name) throws NoSuchMethodException {
        NoSuchMethodException firstException = null;
        for (Class<?> clazz = messageId.getClass(); clazz != null; clazz = clazz.getSuperclass()) {
            try {
                return clazz.getDeclaredMethod(name, new Class[0]);
            }
            catch (NoSuchMethodException e) {
                if (firstException != null) continue;
                firstException = e;
                continue;
            }
        }
        assert (firstException != null);
        throw firstException;
    }

    @VisibleForTesting
    static BatchMessageSequenceRef getMessageSequenceRefForBatchMessage(MessageId messageId) {
        long entryId;
        long ledgerId;
        int batchIdx;
        try {
            try {
                batchIdx = (Integer)KafkaConnectSink.getMethodOfMessageId(messageId, "getBatchIndex").invoke((Object)messageId, new Object[0]);
                if (batchIdx < 0) {
                    return null;
                }
            }
            catch (NoSuchMethodException noSuchMethodException) {
                return null;
            }
            ledgerId = (Long)KafkaConnectSink.getMethodOfMessageId(messageId, "getLedgerId").invoke((Object)messageId, new Object[0]);
            entryId = (Long)KafkaConnectSink.getMethodOfMessageId(messageId, "getEntryId").invoke((Object)messageId, new Object[0]);
        }
        catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException ex) {
            log.error("Unexpected error while retrieving sequenceId, messageId class: {}, error: {}", new Object[]{messageId.getClass().getName(), ex.getMessage(), ex});
            throw new RuntimeException(ex);
        }
        return new BatchMessageSequenceRef(ledgerId, entryId, batchIdx);
    }

    protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
        Object value;
        Object key;
        Schema valueSchema;
        Schema keySchema;
        String topic;
        int partition;
        if (this.collapsePartitionedTopics && sourceRecord.getTopicName().isPresent() && TopicName.get((String)((String)sourceRecord.getTopicName().get())).isPartitioned()) {
            TopicName tn = TopicName.get((String)((String)sourceRecord.getTopicName().get()));
            partition = tn.getPartitionIndex();
            topic = this.sanitizeNameIfNeeded(tn.getPartitionedTopicName(), this.sanitizeTopicName);
        } else {
            partition = sourceRecord.getPartitionIndex().orElse(0);
            topic = this.sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(this.topicName), this.sanitizeTopicName);
        }
        if (this.unwrapKeyValueIfAvailable && sourceRecord.getSchema() != null && sourceRecord.getSchema().getSchemaInfo() != null && sourceRecord.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
            KeyValueSchema kvSchema = (KeyValueSchema)sourceRecord.getSchema();
            keySchema = PulsarSchemaToKafkaSchema.getOptionalKafkaConnectSchema(kvSchema.getKeySchema(), this.useOptionalPrimitives);
            valueSchema = PulsarSchemaToKafkaSchema.getOptionalKafkaConnectSchema(kvSchema.getValueSchema(), this.useOptionalPrimitives);
            Object nativeObject = ((GenericObject)sourceRecord.getValue()).getNativeObject();
            if (nativeObject instanceof KeyValue) {
                KeyValue kv = (KeyValue)nativeObject;
                key = KafkaConnectData.getKafkaConnectDataFromSchema(kv.getKey(), keySchema);
                value = KafkaConnectData.getKafkaConnectDataFromSchema(kv.getValue(), valueSchema);
            } else {
                if (nativeObject != null) {
                    throw new IllegalStateException("Cannot extract KeyValue data from " + nativeObject.getClass());
                }
                key = null;
                value = null;
            }
        } else {
            if (((Message)sourceRecord.getMessage().get()).hasBase64EncodedKey()) {
                key = ((Message)sourceRecord.getMessage().get()).getKeyBytes();
                keySchema = this.useOptionalPrimitives ? Schema.OPTIONAL_BYTES_SCHEMA : Schema.BYTES_SCHEMA;
            } else {
                key = sourceRecord.getKey().orElse(null);
                keySchema = this.useOptionalPrimitives ? Schema.OPTIONAL_STRING_SCHEMA : Schema.STRING_SCHEMA;
            }
            valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(sourceRecord.getSchema(), this.useOptionalPrimitives);
            value = KafkaConnectData.getKafkaConnectData(((GenericObject)sourceRecord.getValue()).getNativeObject(), valueSchema);
        }
        long offset = this.getMessageOffset(sourceRecord);
        if (offset < 0L) {
            log.error("Message without sequenceId. Key: {} Value: {}", key, value);
            throw new IllegalStateException("Message without sequenceId");
        }
        this.taskContext.updateLastOffset(new TopicPartition(topic, partition), offset);
        Long timestamp = null;
        TimestampType timestampType = TimestampType.NO_TIMESTAMP_TYPE;
        if (sourceRecord.getEventTime().isPresent()) {
            timestamp = (Long)sourceRecord.getEventTime().get();
            timestampType = TimestampType.CREATE_TIME;
        } else {
            timestamp = ((Message)sourceRecord.getMessage().get()).getPublishTime();
        }
        return new SinkRecord(topic, partition, keySchema, key, valueSchema, value, offset, timestamp, timestampType);
    }

    @VisibleForTesting
    protected long currentOffset(String topic, int partition) {
        return this.taskContext.currentOffset(this.sanitizeNameIfNeeded(topic, this.sanitizeTopicName), partition);
    }

    protected String sanitizeNameIfNeeded(String name, boolean sanitize) {
        if (!sanitize) {
            return name;
        }
        try {
            return (String)this.sanitizedTopicCache.get((Object)name, () -> {
                Object sanitizedName = name.replaceAll("[^a-zA-Z0-9_]", "_");
                if (((String)sanitizedName).matches("^[^a-zA-Z_].*")) {
                    sanitizedName = "_" + (String)sanitizedName;
                }
                this.desanitizedTopicCache.get(sanitizedName, () -> name);
                return sanitizedName;
            });
        }
        catch (ExecutionException e) {
            log.error("Failed to get sanitized topic name for {}", (Object)name, (Object)e);
            throw new IllegalStateException("Failed to get sanitized topic name for " + name, e);
        }
    }

    static class BatchMessageSequenceRef {
        long ledgerId;
        long entryId;
        int batchIdx;

        @Generated
        public long getLedgerId() {
            return this.ledgerId;
        }

        @Generated
        public long getEntryId() {
            return this.entryId;
        }

        @Generated
        public int getBatchIdx() {
            return this.batchIdx;
        }

        @Generated
        public BatchMessageSequenceRef(long ledgerId, long entryId, int batchIdx) {
            this.ledgerId = ledgerId;
            this.entryId = entryId;
            this.batchIdx = batchIdx;
        }
    }
}

