package org.springframework.integration.kafka.listener;

import com.gs.collections.impl.factory.Maps;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import kafka.client.ClientUtils$;
import kafka.common.ErrorMapping;
import kafka.common.OffsetAndMetadata;
import kafka.network.BlockingChannel;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.Configuration;
import org.springframework.integration.kafka.core.Connection;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.ConsumerException;
import org.springframework.integration.kafka.core.DefaultConnectionFactory;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.PartitionNotFoundException;
import org.springframework.integration.kafka.core.Result;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.listener.RetryListenerSupport;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaNativeOffsetManager.class */
public class KafkaNativeOffsetManager extends AbstractOffsetManager implements InitializingBean {
    private static final String PARTITION_ATTRIBUTE = "partition";
    private final Map<Partition, BrokerAddress> offsetManagerBrokerAddressCache;
    private final ZkClient zkClient;
    private RetryTemplate retryTemplate;

    /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaNativeOffsetManager$ResetOffsetManagerBrokerAddressRetryListener.class */
    private class ResetOffsetManagerBrokerAddressRetryListener extends RetryListenerSupport {
        private ResetOffsetManagerBrokerAddressRetryListener() {
        }

        public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable th) {
            if (KafkaNativeOffsetManager.this.log.isWarnEnabled()) {
                KafkaNativeOffsetManager.this.log.warn("Retrying kafka operation [" + retryCallback + "] due to [" + th + "]", th);
            }
            Partition partition = (Partition) retryContext.getAttribute(KafkaNativeOffsetManager.PARTITION_ATTRIBUTE);
            if (partition != null) {
                KafkaNativeOffsetManager.this.offsetManagerBrokerAddressCache.remove(partition);
            }
        }
    }

    public KafkaNativeOffsetManager(ConnectionFactory connectionFactory, ZookeeperConnect zookeeperConnect) {
        this(connectionFactory, zookeeperConnect, Collections.emptyMap());
    }

    public KafkaNativeOffsetManager(ConnectionFactory connectionFactory, ZookeeperConnect zookeeperConnect, Map<Partition, Long> map) {
        super(connectionFactory, map);
        this.offsetManagerBrokerAddressCache = new ConcurrentHashMap();
        Assert.notNull(zookeeperConnect, "'zookeeperConnect' must not be null.");
        this.zkClient = new ZkClient(zookeeperConnect.getZkConnect(), Integer.parseInt(zookeeperConnect.getZkSessionTimeout()), Integer.parseInt(zookeeperConnect.getZkConnectionTimeout()), ZKStringSerializer$.MODULE$);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void afterPropertiesSet() throws Exception {
        if (this.retryTemplate == null) {
            this.retryTemplate = new RetryTemplate();
            this.retryTemplate.registerListener(new ResetOffsetManagerBrokerAddressRetryListener());
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            simpleRetryPolicy.setMaxAttempts(5);
            this.retryTemplate.setRetryPolicy(simpleRetryPolicy);
            ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
            exponentialBackOffPolicy.setInitialInterval(125L);
            exponentialBackOffPolicy.setMaxInterval(5000L);
            exponentialBackOffPolicy.setMultiplier(2.0d);
            this.retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
        }
    }

    @Override // org.springframework.integration.kafka.listener.AbstractOffsetManager
    protected Long doGetOffset(final Partition partition) {
        Long l = (Long) this.retryTemplate.execute(new RetryCallback<Long, RuntimeException>() { // from class: org.springframework.integration.kafka.listener.KafkaNativeOffsetManager.1
            /* renamed from: doWithRetry, reason: merged with bridge method [inline-methods] */
            public Long m8doWithRetry(RetryContext retryContext) throws RuntimeException {
                retryContext.setAttribute(KafkaNativeOffsetManager.PARTITION_ATTRIBUTE, partition);
                Result<Long> fetchStoredOffsetsForConsumer = KafkaNativeOffsetManager.this.getOffsetManagerConnection(partition).fetchStoredOffsetsForConsumer(KafkaNativeOffsetManager.this.getConsumerId(), partition);
                KafkaNativeOffsetManager.this.checkResultForErrors(fetchStoredOffsetsForConsumer, partition);
                return fetchStoredOffsetsForConsumer.getResult(partition);
            }

            public String toString() {
                return String.format("fetchStoredOffsetsForConsumer(%s, %s)", KafkaNativeOffsetManager.this.getConsumerId(), partition);
            }
        });
        if (l == null || l.longValue() >= 0) {
            return l;
        }
        return null;
    }

    @Override // org.springframework.integration.kafka.listener.AbstractOffsetManager
    protected void doUpdateOffset(final Partition partition, final long j) {
        this.retryTemplate.execute(new RetryCallback<Void, RuntimeException>() { // from class: org.springframework.integration.kafka.listener.KafkaNativeOffsetManager.2
            /* renamed from: doWithRetry, reason: merged with bridge method [inline-methods] */
            public Void m9doWithRetry(RetryContext retryContext) throws RuntimeException {
                retryContext.setAttribute(KafkaNativeOffsetManager.PARTITION_ATTRIBUTE, partition);
                KafkaNativeOffsetManager.this.checkResultForErrors(KafkaNativeOffsetManager.this.getOffsetManagerConnection(partition).commitOffsetsForConsumer(KafkaNativeOffsetManager.this.getConsumerId(), Maps.immutable.of(partition, Long.valueOf(j)).castToMap()), partition);
                return null;
            }

            public String toString() {
                return String.format("commitOffsetsForConsumer(%s, %s, %s)", KafkaNativeOffsetManager.this.getConsumerId(), partition, Long.valueOf(j));
            }
        });
    }

    @Override // org.springframework.integration.kafka.listener.AbstractOffsetManager
    protected void doRemoveOffset(Partition partition) {
        doUpdateOffset(partition, OffsetAndMetadata.InvalidOffset());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.zkClient.close();
    }

    @Override // java.io.Flushable
    public void flush() throws IOException {
    }

    private BrokerAddress getOffsetManagerBrokerAddress(Partition partition) {
        BrokerAddress brokerAddress = this.offsetManagerBrokerAddressCache.get(partition);
        if (brokerAddress == null) {
            int i = 30000;
            int i2 = 1000;
            if (this.connectionFactory instanceof DefaultConnectionFactory) {
                Configuration configuration = ((DefaultConnectionFactory) this.connectionFactory).getConfiguration();
                i = configuration.getSocketTimeout();
                i2 = configuration.getBackOff();
            }
            BlockingChannel channelToOffsetManager = ClientUtils$.MODULE$.channelToOffsetManager(getConsumerId(), this.zkClient, i, i2);
            brokerAddress = new BrokerAddress(channelToOffsetManager.host(), channelToOffsetManager.port());
            if (this.log.isDebugEnabled()) {
                this.log.debug(String.format("Offset manager for [%s] is at [%s].", partition, brokerAddress));
            }
            this.offsetManagerBrokerAddressCache.put(partition, brokerAddress);
            channelToOffsetManager.disconnect();
        }
        return brokerAddress;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Connection getOffsetManagerConnection(Partition partition) {
        return this.connectionFactory.connect(getOffsetManagerBrokerAddress(partition));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkResultForErrors(Result<?> result, Partition partition) {
        if (result.getErrors().containsKey(partition)) {
            short error = result.getError(partition);
            if (error == ErrorMapping.UnknownTopicOrPartitionCode()) {
                throw new PartitionNotFoundException(partition);
            }
            if (error != ErrorMapping.NoError()) {
                throw new ConsumerException(ErrorMapping.exceptionFor(error));
            }
        }
    }
}
