package org.voltdb.importclient.kafka;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import kafka.api.ConsumerMetadataRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.OffsetAndMetadata;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.ConsumerMetadataResponse;
import kafka.javaapi.OffsetCommitRequest;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.OffsetFetchRequest;
import kafka.javaapi.OffsetFetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.network.BlockingChannel;
import org.voltcore.logging.Level;
import org.voltcore.utils.EstTime;
import org.voltdb.importclient.kafka.util.DurableTracker;
import org.voltdb.importclient.kafka.util.HostAndPort;
import org.voltdb.importclient.kafka.util.KafkaCommitPolicy;
import org.voltdb.importclient.kafka.util.KafkaConstants;
import org.voltdb.importclient.kafka.util.KafkaUtils;
import org.voltdb.importclient.kafka.util.ProcedureInvocationCallback;
import org.voltdb.importclient.kafka.util.SimpleTracker;
import org.voltdb.importer.CommitTracker;
import org.voltdb.importer.ImporterLifecycle;
import org.voltdb.importer.ImporterLogger;

/* loaded from: input_file:org/voltdb/importclient/kafka/BaseKafkaTopicPartitionImporter.class */
public abstract class BaseKafkaTopicPartitionImporter {
    private static final PartitionOffsetRequestInfo LATEST_OFFSET = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1);
    private static final PartitionOffsetRequestInfo EARLIEST_OFFSET = new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1);
    public final TopicAndPartition m_topicAndPartition;
    protected final CommitTracker m_gapTracker;
    protected final KafkaStreamImporterConfig m_config;
    private HostAndPort m_coordinator;
    protected ImporterLifecycle m_lifecycle;
    protected ImporterLogger m_logger;
    private final int m_waitSleepMs = 1;
    protected final AtomicBoolean m_dead = new AtomicBoolean(false);
    private final AtomicLong m_currentOffset = new AtomicLong(-1);
    protected final AtomicLong m_pauseOffset = new AtomicLong(-1);
    private long m_lastCommittedOffset = -1;
    protected final AtomicReference<BlockingChannel> m_offsetManager = new AtomicReference<>();
    protected SimpleConsumer m_consumer = null;
    private long m_lastCommitTime = 0;
    private final FetchRequestBuilder m_fetchRequestBuilder = new FetchRequestBuilder().clientId(KafkaConstants.CLIENT_ID);

    public abstract boolean invoke(Object[] objArr, ProcedureInvocationCallback procedureInvocationCallback);

    public BaseKafkaTopicPartitionImporter(KafkaStreamImporterConfig kafkaStreamImporterConfig, ImporterLifecycle importerLifecycle, ImporterLogger importerLogger) {
        this.m_lifecycle = importerLifecycle;
        this.m_logger = importerLogger;
        this.m_config = kafkaStreamImporterConfig;
        this.m_coordinator = this.m_config.getPartitionLeader();
        this.m_topicAndPartition = new TopicAndPartition(kafkaStreamImporterConfig.getTopic(), kafkaStreamImporterConfig.getPartition());
        if (this.m_config.getCommitPolicy() != KafkaCommitPolicy.TIME || this.m_config.getTriggerValue() <= 0) {
            this.m_gapTracker = new DurableTracker(KafkaConstants.IMPORT_GAP_LEAD, kafkaStreamImporterConfig.getTopic(), kafkaStreamImporterConfig.getPartition());
        } else {
            this.m_gapTracker = new SimpleTracker();
        }
    }

    public URI getResourceID() {
        return this.m_config.getResourceID();
    }

    private PartitionMetadata findLeader() {
        PartitionMetadata partitionMetadata = null;
        Iterator<HostAndPort> it = this.m_config.getBrokers().iterator();
        loop0: while (true) {
            if (!it.hasNext()) {
                break;
            }
            HostAndPort next = it.next();
            SimpleConsumer simpleConsumer = null;
            try {
                try {
                    simpleConsumer = new SimpleConsumer(next.getHost(), next.getPort(), this.m_config.getSocketTimeout(), this.m_config.getFetchSize(), "findLeader");
                    Iterator it2 = simpleConsumer.send(new TopicMetadataRequest(Collections.singletonList(this.m_topicAndPartition.topic()))).topicsMetadata().iterator();
                    while (it2.hasNext()) {
                        for (PartitionMetadata partitionMetadata2 : ((TopicMetadata) it2.next()).partitionsMetadata()) {
                            if (partitionMetadata2.partitionId() == this.m_topicAndPartition.partition()) {
                                partitionMetadata = partitionMetadata2;
                                KafkaStreamImporterConfig.closeConsumer(simpleConsumer);
                                break loop0;
                            }
                        }
                    }
                    KafkaStreamImporterConfig.closeConsumer(simpleConsumer);
                } catch (Exception e) {
                    this.m_logger.rateLimitedLog(Level.WARN, e, "Error in finding leader for " + this.m_topicAndPartition, new Object[0]);
                    KafkaStreamImporterConfig.closeConsumer(simpleConsumer);
                }
            } catch (Throwable th) {
                KafkaStreamImporterConfig.closeConsumer(simpleConsumer);
                throw th;
            }
        }
        if (partitionMetadata == null) {
            this.m_logger.rateLimitedLog(Level.WARN, null, "Failed to find Leader for " + this.m_topicAndPartition, new Object[0]);
        }
        return partitionMetadata;
    }

    private int nextCorrelationId() {
        return this.m_fetchRequestBuilder.addFetch(this.m_topicAndPartition.topic(), this.m_topicAndPartition.partition(), 1L, this.m_config.getFetchSize()).build().correlationId();
    }

    private HostAndPort findNewLeader() {
        boolean z;
        for (int i = 0; i < 3; i++) {
            PartitionMetadata findLeader = findLeader();
            if (findLeader == null) {
                z = true;
            } else if (findLeader.leader() == null) {
                z = true;
            } else {
                if (!this.m_config.getPartitionLeader().getHost().equalsIgnoreCase(findLeader.leader().host()) || i != 0) {
                    return new HostAndPort(findLeader.leader().host(), findLeader.leader().port());
                }
                z = true;
            }
            if (z) {
                KafkaUtils.backoffSleep(i + 1);
            }
        }
        this.m_logger.rateLimitedLog(Level.WARN, null, "Failed to find new leader for " + this.m_topicAndPartition, new Object[0]);
        return null;
    }

    public void getOffsetCoordinator() {
        ConsumerMetadataResponse readFrom;
        KafkaStreamImporterException kafkaStreamImporterException = null;
        for (int i = 0; i < 3; i++) {
            for (HostAndPort hostAndPort : this.m_config.getBrokers()) {
                BlockingChannel blockingChannel = null;
                try {
                    try {
                        blockingChannel = new BlockingChannel(hostAndPort.getHost(), hostAndPort.getPort(), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), this.m_config.getSocketTimeout());
                        blockingChannel.connect();
                        blockingChannel.send(new ConsumerMetadataRequest(this.m_config.getGroupId(), ConsumerMetadataRequest.CurrentVersion(), nextCorrelationId(), KafkaConstants.CLIENT_ID));
                        readFrom = ConsumerMetadataResponse.readFrom(blockingChannel.receive().buffer());
                    } catch (Exception e) {
                        kafkaStreamImporterException = new KafkaStreamImporterException("Failed to get Offset Coordinator for %s", e, this.m_topicAndPartition);
                        if (blockingChannel != null) {
                            blockingChannel.disconnect();
                        }
                    }
                    if (readFrom.errorCode() == ErrorMapping.NoError()) {
                        Broker coordinator = readFrom.coordinator();
                        this.m_coordinator = new HostAndPort(coordinator.host(), coordinator.port());
                        BlockingChannel andSet = this.m_offsetManager.getAndSet(new BlockingChannel(this.m_coordinator.getHost(), this.m_coordinator.getPort(), BlockingChannel.UseDefaultBufferSize(), BlockingChannel.UseDefaultBufferSize(), this.m_config.getSocketTimeout()));
                        this.m_offsetManager.get().connect();
                        this.m_logger.info(null, "Offset Coordinator for " + this.m_topicAndPartition + " is " + coordinator, new Object[0]);
                        if (andSet != null) {
                            try {
                                andSet.disconnect();
                            } catch (Exception e2) {
                            }
                        }
                        if (blockingChannel != null) {
                            blockingChannel.disconnect();
                            return;
                        }
                        return;
                    }
                    kafkaStreamImporterException = new KafkaStreamImporterException("Failed to get Offset Coordinator for %s", ErrorMapping.exceptionFor(readFrom.errorCode()), this.m_topicAndPartition);
                    if (blockingChannel != null) {
                        blockingChannel.disconnect();
                    }
                } catch (Throwable th) {
                    if (blockingChannel != null) {
                        blockingChannel.disconnect();
                    }
                    throw th;
                }
            }
            if (kafkaStreamImporterException != null) {
                this.m_logger.warn(kafkaStreamImporterException, "Failed to query all brokers for the offset coordinator for " + this.m_topicAndPartition, new Object[0]);
            }
            KafkaUtils.backoffSleep(i + 1);
        }
    }

    private OffsetResponse getTopicOffset(PartitionOffsetRequestInfo partitionOffsetRequestInfo) {
        int partition = this.m_topicAndPartition.partition();
        String str = this.m_topicAndPartition.topic();
        kafka.javaapi.OffsetRequest offsetRequest = new kafka.javaapi.OffsetRequest(Collections.singletonMap(this.m_topicAndPartition, partitionOffsetRequestInfo), OffsetRequest.CurrentVersion(), KafkaConstants.CLIENT_ID);
        OffsetResponse offsetResponse = null;
        Throwable th = null;
        for (int i = 0; i < 3; i++) {
            try {
                offsetResponse = this.m_consumer.getOffsetsBefore(offsetRequest);
            } catch (Exception e) {
                if (e instanceof IOException) {
                    resetLeader();
                }
                th = e;
            }
            if (!offsetResponse.hasError()) {
                return offsetResponse;
            }
            th = ErrorMapping.exceptionFor(offsetResponse.errorCode(str, partition));
            resetLeader();
        }
        if (th != null) {
            this.m_logger.rateLimitedLog(Level.WARN, th, "unable to fetch earliest offset for " + this.m_topicAndPartition, new Object[0]);
            offsetResponse = null;
        }
        return offsetResponse;
    }

    private OffsetFetchResponse getClientTopicOffset() {
        short error;
        OffsetFetchResponse offsetFetchResponse = null;
        Throwable th = null;
        int i = 0;
        while (true) {
            if (i >= 3) {
                break;
            }
            try {
                OffsetFetchRequest offsetFetchRequest = new OffsetFetchRequest(this.m_config.getGroupId(), Collections.singletonList(this.m_topicAndPartition), (short) 1, nextCorrelationId(), KafkaConstants.CLIENT_ID);
                BlockingChannel blockingChannel = this.m_offsetManager.get();
                blockingChannel.send(offsetFetchRequest.underlying());
                offsetFetchResponse = OffsetFetchResponse.readFrom(blockingChannel.receive().buffer());
                error = ((OffsetMetadataAndError) offsetFetchResponse.offsets().get(this.m_topicAndPartition)).error();
            } catch (Exception e) {
                if (e instanceof IOException) {
                    getOffsetCoordinator();
                }
                th = e;
            }
            if (error == ErrorMapping.NoError()) {
                th = null;
                break;
            }
            th = ErrorMapping.exceptionFor(error);
            KafkaUtils.backoffSleep(i + 1);
            if (error == ErrorMapping.NotCoordinatorForConsumerCode()) {
                getOffsetCoordinator();
            } else if (error == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
                getOffsetCoordinator();
            } else if (error == ErrorMapping.UnknownTopicOrPartitionCode()) {
                getOffsetCoordinator();
                th = null;
            }
            i++;
        }
        if (th != null) {
            this.m_logger.rateLimitedLog(Level.WARN, th, "unable to fetch earliest offset for " + this.m_topicAndPartition, new Object[0]);
            offsetFetchResponse = null;
        }
        return offsetFetchResponse;
    }

    public long getLastOffset() {
        int partition = this.m_topicAndPartition.partition();
        String str = this.m_topicAndPartition.topic();
        OffsetResponse topicOffset = getTopicOffset(EARLIEST_OFFSET);
        if (topicOffset == null) {
            return -1L;
        }
        long j = topicOffset.offsets(str, partition)[0];
        OffsetResponse topicOffset2 = getTopicOffset(LATEST_OFFSET);
        if (topicOffset2 == null) {
            return -1L;
        }
        long j2 = topicOffset2.offsets(str, partition)[0];
        if (j2 == j) {
            return j2;
        }
        OffsetFetchResponse clientTopicOffset = getClientTopicOffset();
        if (clientTopicOffset == null) {
            return j;
        }
        long offset = ((OffsetMetadataAndError) clientTopicOffset.offsets().get(this.m_topicAndPartition)).offset();
        return offset < j ? j : offset < j2 ? offset : j2;
    }

    protected void resetLeader() {
        KafkaStreamImporterConfig.closeConsumer(this.m_consumer);
        this.m_consumer = null;
        HostAndPort findNewLeader = findNewLeader();
        if (findNewLeader == null) {
            this.m_logger.rateLimitedLog(Level.WARN, null, "Fetch Failed to find leader continue with old leader: " + this.m_config.getPartitionLeader(), new Object[0]);
            findNewLeader = this.m_config.getPartitionLeader();
        } else if (!findNewLeader.equals(this.m_config.getPartitionLeader())) {
            this.m_logger.info(null, "Fetch Found new leader for " + this.m_topicAndPartition + " New Leader: " + findNewLeader, new Object[0]);
            this.m_config.setPartitionLeader(findNewLeader);
        }
        this.m_consumer = new SimpleConsumer(findNewLeader.getHost(), findNewLeader.getPort(), this.m_config.getSocketTimeout(), this.m_config.getFetchSize(), KafkaConstants.CLIENT_ID);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't wrap try/catch for region: R(9:6|(3:8|(3:129|130|131)(3:10|11|(3:126|127|128)(3:13|14|(3:123|124|125)(2:16|17)))|63)(1:132)|18|19|20|22|(3:111|112|113)(4:24|25|26|(3:102|103|(3:108|109|110)(3:105|106|107))(6:28|29|(4:32|(3:98|99|100)(8:34|35|36|37|(2:39|(1:41)(3:42|(1:44)|45))|46|47|(2:52|53)(2:49|50))|51|30)|101|54|(2:68|69)(6:56|(2:64|65)|58|(1:60)|61|62)))|63|4) */
    /* JADX WARN: Code restructure failed: missing block: B:114:0x015c, code lost:
    
        r22 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:115:0x015e, code lost:
    
        r12.m_logger.rateLimitedLog(org.voltcore.logging.Level.WARN, r22, "Failed to fetch from " + r12.m_topicAndPartition, new java.lang.Object[0]);
     */
    /* JADX WARN: Code restructure failed: missing block: B:116:0x018b, code lost:
    
        if ((r22 instanceof java.io.IOException) != false) goto L168;
     */
    /* JADX WARN: Code restructure failed: missing block: B:118:0x0195, code lost:
    
        r17 = org.voltdb.importclient.kafka.util.KafkaUtils.backoffSleep(r17);
     */
    /* JADX WARN: Code restructure failed: missing block: B:121:0x018e, code lost:
    
        resetLeader();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void accept() {
        /*
            Method dump skipped, instructions count: 1526
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.voltdb.importclient.kafka.BaseKafkaTopicPartitionImporter.accept():void");
    }

    public void resetCounters() {
        switch (this.m_config.getCommitPolicy()) {
            case TIME:
                this.m_lastCommitTime = EstTime.currentTimeMillis();
                return;
            default:
                return;
        }
    }

    public boolean commitOffset(boolean z) {
        long safe = this.m_gapTracker.getSafe();
        if (safe < 0) {
            return true;
        }
        long j = z ? this.m_pauseOffset.get() : -1L;
        if (this.m_lastCommittedOffset == j) {
            return false;
        }
        if (safe <= this.m_lastCommittedOffset && j == -1) {
            return false;
        }
        long currentTimeMillis = System.currentTimeMillis();
        OffsetCommitResponse offsetCommitResponse = null;
        BlockingChannel blockingChannel = null;
        int i = 3;
        if (j != -1) {
            try {
                this.m_logger.rateLimitedLog(Level.INFO, null, this.m_topicAndPartition + " is using paused offset to commit: " + j, new Object[0]);
            } catch (Exception e) {
                this.m_logger.rateLimitedLog(Level.WARN, e, "Failed to commit Offset for " + this.m_topicAndPartition, new Object[0]);
                if (!(e instanceof IOException)) {
                    return false;
                }
                getOffsetCoordinator();
                return false;
            }
        }
        while (blockingChannel == null) {
            i--;
            if (i < 0) {
                break;
            }
            BlockingChannel blockingChannel2 = this.m_offsetManager.get();
            blockingChannel = blockingChannel2;
            if (blockingChannel2 == null) {
                getOffsetCoordinator();
                this.m_logger.rateLimitedLog(Level.WARN, null, "Commit Offset Failed to get offset coordinator for " + this.m_topicAndPartition, new Object[0]);
            } else {
                if (j != -1) {
                    safe = Math.min(j, safe);
                }
                blockingChannel.send(new OffsetCommitRequest(this.m_config.getGroupId(), Collections.singletonMap(this.m_topicAndPartition, new OffsetAndMetadata(safe, "commit", currentTimeMillis)), nextCorrelationId(), KafkaConstants.CLIENT_ID, (short) 1).underlying());
                offsetCommitResponse = OffsetCommitResponse.readFrom(blockingChannel.receive().buffer());
                short shortValue = ((Short) offsetCommitResponse.errors().get(this.m_topicAndPartition)).shortValue();
                if (shortValue == ErrorMapping.NotCoordinatorForConsumerCode() || shortValue == ErrorMapping.ConsumerCoordinatorNotAvailableCode()) {
                    this.m_logger.info(null, "Not coordinator for committing offset for " + this.m_topicAndPartition + " Updating coordinator.", new Object[0]);
                    getOffsetCoordinator();
                    blockingChannel = null;
                }
            }
        }
        if (i < 0 || offsetCommitResponse == null) {
            return false;
        }
        short shortValue2 = ((Short) offsetCommitResponse.errors().get(this.m_topicAndPartition)).shortValue();
        if (shortValue2 != ErrorMapping.NoError()) {
            this.m_logger.rateLimitedLog(Level.WARN, ErrorMapping.exceptionFor(shortValue2), "Commit Offset Failed to commit for " + this.m_topicAndPartition, new Object[0]);
            return false;
        }
        this.m_lastCommittedOffset = safe;
        resetCounters();
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stop() {
    }
}
