package org.voltdb.importclient.kafka10;

import au.com.bytecode.opencsv_voltpatches.CSVParser;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons_voltpatches.cli.HelpFormatter;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.voltcore.logging.Level;
import org.voltcore.logging.VoltLogger;
import org.voltcore.utils.EstTime;
import org.voltdb.client.ProcedureCallback;
import org.voltdb.importclient.kafka.util.DurableTracker;
import org.voltdb.importclient.kafka.util.KafkaCommitPolicy;
import org.voltdb.importclient.kafka.util.KafkaConstants;
import org.voltdb.importclient.kafka.util.KafkaUtils;
import org.voltdb.importclient.kafka.util.PendingWorkTracker;
import org.voltdb.importclient.kafka.util.ProcedureInvocationCallback;
import org.voltdb.importclient.kafka.util.SimpleTracker;
import org.voltdb.importer.CommitTracker;
import org.voltdb.importer.ImporterLifecycle;
import org.voltdb.importer.formatter.FormatException;
import org.voltdb.importer.formatter.Formatter;
import org.voltdb.importer.formatter.FormatterBuilder;
import org.voltdb.utils.CatalogUtil;

/* loaded from: input_file:org/voltdb/importclient/kafka10/KafkaConsumerRunner.class */
public abstract class KafkaConsumerRunner implements Runnable {
    private static final VoltLogger LOGGER = new VoltLogger("KAFKAIMPORTER");
    protected Consumer<ByteBuffer, ByteBuffer> m_consumer;
    protected final KafkaStreamImporterConfig m_config;
    protected final ImporterLifecycle m_lifecycle;
    private final AtomicReference<Map<TopicPartition, AtomicLong>> m_lastCommittedOffSets = new AtomicReference<>();
    private final AtomicReference<Map<TopicPartition, CommitTracker>> m_trackerMap = new AtomicReference<>();
    private final Map<TopicPartition, AtomicLong> m_lastSeekedOffSets = new ConcurrentHashMap();
    private final Map<TopicPartition, AtomicLong> m_pauseOffsets = new ConcurrentHashMap();
    private final Map<TopicPartition, PendingWorkTracker> m_workTrackers = new ConcurrentHashMap();
    protected final AtomicBoolean m_done = new AtomicBoolean(false);
    private final Map<String, Formatter> m_formatters = new HashMap();
    private long m_lastCommitTime = 0;
    private final int m_waitSleepMs = 10;
    private final ReentrantLock m_offsetLock = new ReentrantLock();

    public KafkaConsumerRunner(ImporterLifecycle importerLifecycle, KafkaStreamImporterConfig kafkaStreamImporterConfig, Consumer<ByteBuffer, ByteBuffer> consumer) throws Exception {
        this.m_lifecycle = importerLifecycle;
        this.m_consumer = consumer;
        this.m_config = kafkaStreamImporterConfig;
        this.m_lastCommittedOffSets.set(new HashMap());
        this.m_trackerMap.set(new HashMap());
    }

    protected void subscribe() {
        this.m_consumer.subscribe(Arrays.asList(this.m_config.getTopics().split(CatalogUtil.SIGNATURE_DELIMITER)), new ConsumerRebalanceListener() { // from class: org.voltdb.importclient.kafka10.KafkaConsumerRunner.1
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                if (collection.isEmpty()) {
                    return;
                }
                KafkaConsumerRunner.LOGGER.info("Consumer group " + KafkaConsumerRunner.this.m_config.getGroupId() + " drops topic/partitions:" + collection);
                KafkaConsumerRunner.this.commitOffsets((List) collection.stream().collect(Collectors.toList()));
                HashMap hashMap = new HashMap();
                hashMap.putAll((Map) KafkaConsumerRunner.this.m_trackerMap.get());
                HashMap hashMap2 = new HashMap();
                hashMap2.putAll((Map) KafkaConsumerRunner.this.m_lastCommittedOffSets.get());
                for (TopicPartition topicPartition : collection) {
                    hashMap.remove(topicPartition);
                    hashMap2.remove(topicPartition);
                    KafkaConsumerRunner.this.m_pauseOffsets.remove(topicPartition);
                    KafkaConsumerRunner.this.m_workTrackers.remove(topicPartition);
                }
                KafkaConsumerRunner.this.m_trackerMap.set(hashMap);
                KafkaConsumerRunner.this.m_lastCommittedOffSets.set(hashMap2);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                KafkaConsumerRunner.LOGGER.info("Consumer group " + KafkaConsumerRunner.this.m_config.getGroupId() + " is assigned with topic/partition" + collection);
            }
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v58, types: [org.voltdb.importclient.kafka.util.SimpleTracker] */
    private void calculateTrackers(Collection<TopicPartition> collection) {
        HashMap hashMap = new HashMap();
        hashMap.putAll(this.m_trackerMap.get());
        HashMap hashMap2 = new HashMap();
        hashMap2.putAll(this.m_lastCommittedOffSets.get());
        boolean z = false;
        for (TopicPartition topicPartition : collection) {
            if (this.m_trackerMap.get().get(topicPartition) == null) {
                z = true;
                long j = -1;
                DurableTracker durableTracker = (this.m_config.getCommitPolicy() != KafkaCommitPolicy.TIME || this.m_config.getTriggerValue() <= 0) ? new DurableTracker(KafkaConstants.IMPORT_GAP_LEAD, topicPartition.topic(), topicPartition.partition(), this.m_config.getGroupId()) : new SimpleTracker();
                hashMap.put(topicPartition, durableTracker);
                try {
                    OffsetAndMetadata committed = this.m_consumer.committed(topicPartition);
                    j = committed != null ? committed.offset() : -1L;
                    if (j > -1) {
                        durableTracker.resetTo(j);
                    }
                } catch (KafkaException e) {
                    LOGGER.error("Failed to read committed offsets for group " + this.m_config.getGroupId() + topicPartition + HelpFormatter.DEFAULT_LONG_OPT_SEPARATOR + e.getMessage());
                }
                hashMap2.put(topicPartition, new AtomicLong(j));
                this.m_pauseOffsets.put(topicPartition, new AtomicLong(-1L));
                this.m_workTrackers.put(topicPartition, new PendingWorkTracker());
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Starting offset for group:" + this.m_config.getGroupId() + ":" + j + " partition:" + topicPartition);
                }
            }
        }
        if (z) {
            this.m_trackerMap.set(hashMap);
            this.m_lastCommittedOffSets.set(hashMap2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void shutdown() {
        if (this.m_consumer == null) {
            return;
        }
        LOGGER.info("Shutdown Kafka consumer for group " + this.m_config.getGroupId());
        this.m_done.set(true);
        try {
            this.m_consumer.wakeup();
        } catch (Exception e) {
            LOGGER.warn("Kafka wakeup interuption while cleaning up Kafka consumer:" + e.getMessage());
        }
    }

    public abstract boolean invoke(String str, long j, String str2, Object[] objArr, ProcedureCallback procedureCallback) throws Exception;

    @Override // java.lang.Runnable
    public void run() {
        ConsumerRecords consumerRecords;
        LOGGER.info("Starting Kafka consumer for group:" + this.m_config.getGroupId() + " topics:" + this.m_config.getTopics());
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        CSVParser cSVParser = new CSVParser();
        try {
            try {
                subscribe();
                int i = 1;
                while (this.m_lifecycle.shouldRun()) {
                    try {
                        seek(arrayList);
                        arrayList.clear();
                        consumerRecords = null;
                    } catch (KafkaException e) {
                        LOGGER.error("Error seen when processing message " + this.m_config.getTopics(), e);
                        if (this.m_done.get()) {
                            break;
                        } else {
                            i = KafkaUtils.backoffSleep(i);
                        }
                    }
                    if (!this.m_lifecycle.shouldRun()) {
                        break;
                    }
                    try {
                        consumerRecords = this.m_consumer.poll(this.m_config.getPollTimeout());
                    } catch (WakeupException e2) {
                        if (this.m_done.get()) {
                            break;
                        }
                    } catch (Throwable th) {
                        if (this.m_done.get()) {
                            break;
                        } else {
                            LOGGER.warn("Errors seen when polling data from Kafka:" + th.getMessage());
                        }
                    }
                    if (consumerRecords == null || consumerRecords.isEmpty()) {
                        commitOffsets((List) this.m_lastCommittedOffSets.get().keySet().stream().collect(Collectors.toList()));
                        try {
                            Thread.sleep(10L);
                        } catch (InterruptedException e3) {
                        }
                    } else {
                        calculateTrackers(consumerRecords.partitions());
                        ArrayList arrayList2 = new ArrayList();
                        for (TopicPartition topicPartition : consumerRecords.partitions()) {
                            if (!this.m_lifecycle.shouldRun()) {
                                break;
                            }
                            Formatter formatter = getFormatter(topicPartition.topic());
                            int i2 = 0;
                            CommitTracker commitTracker = getCommitTracker(topicPartition);
                            AtomicLong atomicLong = this.m_lastCommittedOffSets.get().get(topicPartition);
                            if (commitTracker != null && atomicLong != null) {
                                AtomicLong atomicLong2 = (AtomicLong) hashMap.get(topicPartition);
                                if (atomicLong2 == null) {
                                    atomicLong2 = new AtomicLong(0L);
                                    hashMap.put(topicPartition, atomicLong2);
                                }
                                String str = "group " + this.m_config.getGroupId() + HelpFormatter.DEFAULT_OPT_PREFIX + topicPartition;
                                List records = consumerRecords.records(topicPartition);
                                int size = records.size();
                                int i3 = 0;
                                while (true) {
                                    if (i3 >= size || !this.m_lifecycle.shouldRun()) {
                                        break;
                                    }
                                    ConsumerRecord consumerRecord = (ConsumerRecord) records.get(i3);
                                    long offset = consumerRecord.offset();
                                    if (atomicLong.longValue() > -1 && offset < atomicLong.longValue()) {
                                        arrayList.add(topicPartition);
                                        break;
                                    }
                                    long j = offset + 1;
                                    if (i3 != size - 1) {
                                        j = ((ConsumerRecord) records.get(i3 + 1)).offset();
                                    }
                                    try {
                                        String str2 = new String(((ByteBuffer) consumerRecord.value()).array(), StandardCharsets.UTF_8);
                                        Object[] transform = formatter != null ? formatter.transform(ByteBuffer.wrap(str2.getBytes())) : cSVParser.parseLine(str2);
                                        commitTracker.submit(j);
                                        atomicLong2.incrementAndGet();
                                        i2++;
                                        if (invoke(str2, offset, topicPartition.topic(), transform, new ProcedureInvocationCallback(offset, j, this.m_workTrackers.get(topicPartition), commitTracker, this.m_done, this.m_pauseOffsets.get(topicPartition), str))) {
                                            this.m_workTrackers.get(topicPartition).produceWork();
                                        } else {
                                            if (LOGGER.isDebugEnabled()) {
                                                LOGGER.debug("Failed to process. possibly bad data: " + Arrays.toString(transform));
                                            }
                                            commitTracker.commit(j);
                                        }
                                    } catch (IOException | FormatException e4) {
                                        i2++;
                                        LOGGER.rateLimitedLog(60L, Level.WARN, e4, "Failed to tranform data: %s", Arrays.toString((Object[]) null));
                                        commitTracker.commit(j);
                                    }
                                    if (!this.m_lifecycle.shouldRun()) {
                                        break;
                                    } else {
                                        i3++;
                                    }
                                }
                                if (i2 > 0) {
                                    arrayList2.add(topicPartition);
                                }
                            }
                        }
                        if (!this.m_lifecycle.shouldRun()) {
                            break;
                        } else if (!arrayList2.isEmpty() && KafkaCommitPolicy.shouldCommit(this.m_config.getCommitPolicy(), this.m_config.getTriggerValue(), this.m_lastCommitTime)) {
                            commitOffsets(arrayList2);
                        }
                    }
                }
                try {
                    commitPauseOffsets();
                    this.m_consumer.close();
                    this.m_consumer = null;
                } catch (Exception e5) {
                }
            } catch (Throwable th2) {
                try {
                    commitPauseOffsets();
                    this.m_consumer.close();
                    this.m_consumer = null;
                } catch (Exception e6) {
                }
                throw th2;
            }
        } catch (Exception e7) {
            LOGGER.error("Failed to start topic partition fetcher for " + this.m_config.getTopics(), e7);
            try {
                commitPauseOffsets();
                this.m_consumer.close();
                this.m_consumer = null;
            } catch (Exception e8) {
            }
        }
        this.m_done.set(true);
        StringBuilder sb = new StringBuilder("Import detail for group " + this.m_config.getGroupId());
        sb.append(" \n         Submitted Counts:" + this.m_workTrackers);
        sb.append(" \n         Committed Offsets: " + this.m_lastCommittedOffSets.get());
        LOGGER.info(sb.toString());
    }

    private void seek(List<TopicPartition> list) {
        AtomicLong atomicLong;
        for (TopicPartition topicPartition : list) {
            AtomicLong atomicLong2 = this.m_lastCommittedOffSets.get().get(topicPartition);
            if (atomicLong2 != null && atomicLong2.get() > -1 && ((atomicLong = this.m_lastSeekedOffSets.get(topicPartition)) == null || atomicLong.get() != atomicLong2.get())) {
                this.m_consumer.seek(topicPartition, atomicLong2.longValue());
                this.m_lastSeekedOffSets.put(topicPartition, new AtomicLong(atomicLong2.get()));
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Moves offset for group " + this.m_config.getGroupId() + " -" + topicPartition + " to " + atomicLong2);
                }
            }
        }
    }

    private CommitTracker getCommitTracker(TopicPartition topicPartition) {
        return this.m_trackerMap.get().get(topicPartition);
    }

    private void commitPauseOffsets() {
        CommitTracker commitTracker;
        AtomicLong atomicLong;
        Map<TopicPartition, OffsetAndMetadata> hashMap = new HashMap<>();
        for (Map.Entry<TopicPartition, AtomicLong> entry : this.m_lastCommittedOffSets.get().entrySet()) {
            PendingWorkTracker pendingWorkTracker = this.m_workTrackers.get(entry.getKey());
            AtomicLong atomicLong2 = this.m_pauseOffsets.get(entry.getKey());
            if (pendingWorkTracker != null && atomicLong2 != null) {
                long j = atomicLong2.get();
                boolean z = false;
                if (j != -1 && !pendingWorkTracker.waitForWorkToFinish()) {
                    if (atomicLong2.get() < entry.getValue().get()) {
                        LOGGER.warn("Committing paused offset even though a timeout occurred waiting for pending stored procedures to finish.");
                    } else {
                        LOGGER.warn("Refusing to commit paused offset because a timeout occurred waiting for pending stored procedures to finish.");
                        z = true;
                    }
                }
                if (!z && (commitTracker = getCommitTracker(entry.getKey())) != null) {
                    long safe = commitTracker.getSafe();
                    if (safe >= 0 && (atomicLong = this.m_lastCommittedOffSets.get().get(entry.getKey())) != null && atomicLong.get() != j && (safe > atomicLong.get() || j != -1)) {
                        if (j != -1) {
                            safe = Math.min(j, safe);
                        }
                        hashMap.put(entry.getKey(), new OffsetAndMetadata(safe));
                        atomicLong.set(safe);
                    }
                }
            }
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Consumer group " + this.m_config.getGroupId() + " committs offsets upon pausing.");
        }
        commit(hashMap);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void commitOffsets(List<TopicPartition> list) {
        this.m_offsetLock.lock();
        try {
            HashMap hashMap = new HashMap();
            for (TopicPartition topicPartition : list) {
                CommitTracker commitTracker = getCommitTracker(topicPartition);
                if (commitTracker != null) {
                    long safe = commitTracker.getSafe();
                    if (safe >= 0) {
                        AtomicLong atomicLong = this.m_lastCommittedOffSets.get().get(topicPartition);
                        if (atomicLong != null && safe > atomicLong.longValue()) {
                            hashMap.put(topicPartition, new OffsetAndMetadata(safe));
                            atomicLong.set(safe);
                        }
                    }
                } else {
                    LOGGER.debug("Consumner group " + this.m_config.getGroupId() + " does't have" + topicPartition + " assigned any more.");
                }
            }
            commit(hashMap);
            this.m_offsetLock.unlock();
        } catch (Throwable th) {
            this.m_offsetLock.unlock();
            throw th;
        }
    }

    private void commit(Map<TopicPartition, OffsetAndMetadata> map) {
        if (map.isEmpty()) {
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder("Consumer group " + this.m_config.getGroupId() + " committs offsets:");
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
                sb.append(entry.getKey() + ":" + entry.getValue().offset());
            }
            LOGGER.debug(sb.toString());
        }
        try {
            this.m_consumer.commitSync(map);
            this.m_lastCommitTime = EstTime.currentTimeMillis();
        } catch (CommitFailedException e) {
            LOGGER.warn("Consumer group " + this.m_config.getGroupId() + " commit offsets:" + e.getMessage());
        } catch (WakeupException e2) {
            try {
                this.m_consumer.commitSync(map);
                this.m_lastCommitTime = EstTime.currentTimeMillis();
            } catch (KafkaException e3) {
                LOGGER.warn("Consumer group " + this.m_config.getGroupId() + " commit offsets:" + e3.getMessage());
            }
        }
    }

    private Formatter getFormatter(String str) {
        Formatter formatter = this.m_formatters.get(str);
        if (formatter != null) {
            return formatter;
        }
        FormatterBuilder formatterBuilder = this.m_config.getFormatterBuilder(str);
        if (formatterBuilder == null) {
            return null;
        }
        Formatter create = formatterBuilder.create();
        this.m_formatters.put(str, create);
        return create;
    }

    public int getKafkaTopicPartitionCount() {
        int i = 0;
        for (String str : this.m_config.getTopics().split(CatalogUtil.SIGNATURE_DELIMITER)) {
            int i2 = 0;
            while (i2 == 0) {
                try {
                    List partitionsFor = this.m_consumer.partitionsFor(str);
                    if (partitionsFor != null) {
                        i2 = partitionsFor.size();
                    }
                } catch (Exception e) {
                    LOGGER.rateLimitedLog(60L, Level.WARN, e, "Failed to get Kafka partition info", new Object[0]);
                }
                if (this.m_done.get()) {
                    return -1;
                }
            }
            i += i2;
        }
        return i;
    }
}
