package org.springframework.integration.kafka.listener;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import kafka.admin.AdminUtils$;
import kafka.api.OffsetRequest;
import kafka.common.ErrorMapping$;
import kafka.common.TopicExistsException;
import kafka.serializer.Decoder;
import kafka.utils.ZKStringSerializer$;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.DefaultConnectionFactory;
import org.springframework.integration.kafka.core.FetchRequest;
import org.springframework.integration.kafka.core.KafkaMessage;
import org.springframework.integration.kafka.core.KafkaMessageBatch;
import org.springframework.integration.kafka.core.KafkaTemplate;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.Result;
import org.springframework.integration.kafka.core.TopicNotFoundException;
import org.springframework.integration.kafka.core.ZookeeperConfiguration;
import org.springframework.integration.kafka.support.ProducerFactoryBean;
import org.springframework.integration.kafka.support.ProducerMetadata;
import org.springframework.integration.kafka.support.ZookeeperConnect;
import org.springframework.integration.kafka.util.LoggingUtils;
import org.springframework.integration.kafka.util.MessageUtils;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

/* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaTopicOffsetManager.class */
public class KafkaTopicOffsetManager extends AbstractOffsetManager implements InitializingBean {
    private static final KeySerializerDecoder KEY_CODEC = new KeySerializerDecoder();
    private static final LongSerializerDecoder VALUE_CODEC = new LongSerializerDecoder();
    public static final String CLEANUP_POLICY = "cleanup.policy";
    public static final String CLEANUP_POLICY_COMPACT = "compact";
    public static final String DELETE_RETENTION = "delete.retention.ms";
    public static final String SEGMENT_BYTES = "segment.bytes";
    private final ZookeeperConnect zookeeperConnect;
    private final String topic;
    private final KafkaTemplate kafkaTemplate;
    private final ConcurrentMap<Partition, Long> data;
    private ProducerMetadata.CompressionType compressionType;
    private Producer<Key, Long> producer;
    private int maxSize;
    private int segmentSize;
    private int retentionTime;
    private int replicationFactor;
    private int batchBytes;
    private int requiredAcks;
    private int maxQueueBufferingTime;

    /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaTopicOffsetManager$Key.class */
    public static class Key {
        String consumerId;
        Partition partition;

        public Key(String str, Partition partition) {
            Assert.notNull(str, "Consumer Id cannot be null");
            Assert.notNull(partition, "Partition cannot be null");
            this.consumerId = str;
            this.partition = partition;
        }

        public String getConsumerId() {
            return this.consumerId;
        }

        public Partition getPartition() {
            return this.partition;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Key key = (Key) obj;
            return this.consumerId.equals(key.consumerId) && this.partition.equals(key.partition);
        }

        public int hashCode() {
            return (31 * this.consumerId.hashCode()) + this.partition.hashCode();
        }
    }

    /* loaded from: input_file:org/springframework/integration/kafka/listener/KafkaTopicOffsetManager$KeySerializerDecoder.class */
    public static class KeySerializerDecoder implements Serializer<Key>, Decoder<Key> {
        private static final Log log = LogFactory.getLog(KeySerializerDecoder.class);

        /* renamed from: fromBytes, reason: merged with bridge method [inline-methods] */
        public Key m13fromBytes(byte[] bArr) {
            if (bArr == null || bArr.length <= 0) {
                return null;
            }
            try {
                int intFromBytes = KafkaTopicOffsetManager.intFromBytes(bArr, 0);
                int intFromBytes2 = KafkaTopicOffsetManager.intFromBytes(bArr, intFromBytes + 4);
                return new Key(new String(bArr, 4, intFromBytes), new Partition(new String(bArr, intFromBytes + 8, intFromBytes2), KafkaTopicOffsetManager.intFromBytes(bArr, intFromBytes + intFromBytes2 + 8)));
            } catch (Exception e) {
                if (!log.isDebugEnabled()) {
                    return null;
                }
                log.debug("Cannot decode key:" + LoggingUtils.asCommaSeparatedHexDump(bArr));
                return null;
            }
        }

        public void configure(Map<String, ?> map, boolean z) {
        }

        public byte[] serialize(String str, Key key) {
            if (key == null) {
                return null;
            }
            try {
                byte[] bytes = key.consumerId.getBytes("UTF-8");
                byte[] bytes2 = key.partition.getTopic().getBytes("UTF-8");
                byte[] intToBytes = KafkaTopicOffsetManager.intToBytes(Integer.valueOf(key.partition.getId()));
                byte[] bArr = new byte[4 + bytes.length + 4 + bytes2.length + 4];
                System.arraycopy(KafkaTopicOffsetManager.intToBytes(Integer.valueOf(bytes.length)), 0, bArr, 0, 4);
                System.arraycopy(bytes, 0, bArr, 4, bytes.length);
                System.arraycopy(KafkaTopicOffsetManager.intToBytes(Integer.valueOf(bytes2.length)), 0, bArr, bytes.length + 4, 4);
                System.arraycopy(bytes2, 0, bArr, bytes.length + 8, bytes2.length);
                System.arraycopy(intToBytes, 0, bArr, bytes.length + bytes2.length + 8, 4);
                return bArr;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void close() {
        }
    }

    public KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect, String str) {
        this(zookeeperConnect, str, new HashMap());
    }

    public KafkaTopicOffsetManager(ZookeeperConnect zookeeperConnect, String str, Map<Partition, Long> map) {
        super(new DefaultConnectionFactory(new ZookeeperConfiguration(zookeeperConnect)), map);
        this.data = new ConcurrentHashMap();
        this.compressionType = ProducerMetadata.CompressionType.none;
        this.maxSize = 10240;
        this.segmentSize = 25600;
        this.retentionTime = 60000;
        this.batchBytes = 200;
        this.requiredAcks = 1;
        Assert.notNull(zookeeperConnect);
        this.zookeeperConnect = zookeeperConnect;
        this.kafkaTemplate = new KafkaTemplate(this.connectionFactory);
        this.topic = str;
    }

    public void setMaxSize(int i) {
        this.maxSize = i;
    }

    public void setCompressionCodec(ProducerMetadata.CompressionType compressionType) {
        this.compressionType = compressionType;
    }

    public void setMaxQueueBufferingTime(int i) {
        this.maxQueueBufferingTime = i;
    }

    public void setSegmentSize(int i) {
        this.segmentSize = i;
    }

    public void setRetentionTime(int i) {
        this.retentionTime = i;
    }

    public void setReplicationFactor(int i) {
        this.replicationFactor = i;
    }

    public void setBatchBytes(int i) {
        this.batchBytes = i;
    }

    public void setRequiredAcks(int i) {
        this.requiredAcks = i;
    }

    public void afterPropertiesSet() throws Exception {
        ((DefaultConnectionFactory) this.connectionFactory).afterPropertiesSet();
        ZkClient zkClient = new ZkClient(this.zookeeperConnect.getZkConnect(), Integer.parseInt(this.zookeeperConnect.getZkSessionTimeout()), Integer.parseInt(this.zookeeperConnect.getZkConnectionTimeout()), ZKStringSerializer$.MODULE$);
        try {
            createCompactedTopicIfNotFound(zkClient);
            validateOffsetTopic(zkClient);
            Partition partition = new Partition(this.topic, 0);
            BrokerAddress leader = this.connectionFactory.getLeader(partition);
            readOffsetData(partition, leader);
            initializeProducer(leader);
        } finally {
            try {
                zkClient.close();
            } catch (ZkInterruptedException e) {
                this.log.error("Error while closing Zookeeper client", e);
            }
        }
    }

    @Override // org.springframework.integration.kafka.listener.AbstractOffsetManager
    protected void doUpdateOffset(Partition partition, long j) {
        this.data.put(partition, Long.valueOf(j));
        this.producer.send(new ProducerRecord(this.topic, new Key(this.consumerId, partition), Long.valueOf(j)));
    }

    @Override // org.springframework.integration.kafka.listener.AbstractOffsetManager
    protected void doRemoveOffset(Partition partition) {
        this.data.remove(partition);
        this.producer.send(new ProducerRecord(this.topic, new Key(this.consumerId, partition), (Object) null));
    }

    @Override // org.springframework.integration.kafka.listener.AbstractOffsetManager
    protected Long doGetOffset(Partition partition) {
        return this.data.get(partition);
    }

    @Override // java.io.Flushable
    public void flush() throws IOException {
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.producer.close();
        try {
            ((DefaultConnectionFactory) this.connectionFactory).destroy();
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    private void createCompactedTopicIfNotFound(ZkClient zkClient) {
        Properties properties = new Properties();
        properties.setProperty(CLEANUP_POLICY, CLEANUP_POLICY_COMPACT);
        properties.setProperty(DELETE_RETENTION, String.valueOf(this.retentionTime));
        properties.setProperty(SEGMENT_BYTES, String.valueOf(this.segmentSize));
        try {
            this.replicationFactor = 1;
            AdminUtils$.MODULE$.createTopic(zkClient, this.topic, 1, this.replicationFactor, properties);
        } catch (TopicExistsException e) {
            this.log.debug("Topic already exists", e);
        }
    }

    private void validateOffsetTopic(ZkClient zkClient) throws Exception {
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(10, Collections.singletonMap(TopicNotFoundException.class, true)));
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
        exponentialBackOffPolicy.setInitialInterval(100L);
        exponentialBackOffPolicy.setMaxInterval(1000L);
        exponentialBackOffPolicy.setMultiplier(2.0d);
        retryTemplate.setBackOffPolicy(exponentialBackOffPolicy);
        if (((Collection) retryTemplate.execute(new RetryCallback<Collection<Partition>, Exception>() { // from class: org.springframework.integration.kafka.listener.KafkaTopicOffsetManager.1
            /* renamed from: doWithRetry, reason: merged with bridge method [inline-methods] */
            public Collection<Partition> m11doWithRetry(RetryContext retryContext) throws Exception {
                return KafkaTopicOffsetManager.this.connectionFactory.getPartitions(KafkaTopicOffsetManager.this.topic);
            }
        })).size() > 1) {
            throw new BeanInitializationException("Offset management topic cannot have more than one partition");
        }
        Properties fetchTopicConfig = AdminUtils$.MODULE$.fetchTopicConfig(zkClient, this.topic);
        if (!fetchTopicConfig.containsKey(CLEANUP_POLICY) || !CLEANUP_POLICY_COMPACT.equals(fetchTopicConfig.getProperty(CLEANUP_POLICY))) {
            throw new BeanInitializationException("Property 'cleanup.policy' must be set to 'compact' on offset topic");
        }
    }

    private void readOffsetData(Partition partition, BrokerAddress brokerAddress) {
        Result<Long> fetchInitialOffset = this.connectionFactory.connect(brokerAddress).fetchInitialOffset(OffsetRequest.EarliestTime(), partition);
        if (fetchInitialOffset.getErrors().size() > 0) {
            throw new BeanInitializationException("Cannot initialize offset manager, unable to read earliest offset", ErrorMapping$.MODULE$.exceptionFor(fetchInitialOffset.getError(partition)));
        }
        Result<Long> fetchInitialOffset2 = this.connectionFactory.connect(brokerAddress).fetchInitialOffset(OffsetRequest.LatestTime(), partition);
        if (fetchInitialOffset2.getErrors().size() > 0) {
            throw new BeanInitializationException("Cannot initialize offset manager, unable to read latest offset");
        }
        long longValue = fetchInitialOffset.getResult(partition).longValue();
        long longValue2 = fetchInitialOffset2.getResult(partition).longValue();
        long j = longValue;
        while (j < longValue2) {
            Result<KafkaMessageBatch> receive = this.kafkaTemplate.receive(Collections.singleton(new FetchRequest(partition, j, this.maxSize)));
            if (receive.getErrors().size() > 0) {
                throw new BeanInitializationException("Error while fetching initial offsets:", ErrorMapping$.MODULE$.exceptionFor(receive.getError(partition)));
            }
            for (KafkaMessage kafkaMessage : receive.getResult(partition).getMessages()) {
                checkAndAddData(kafkaMessage);
                j = kafkaMessage.getMetadata().getNextOffset();
            }
            if (this.log.isDebugEnabled()) {
                this.log.debug(this.data.size() + " entries in the final map");
            }
            if (this.log.isTraceEnabled()) {
                for (Map.Entry<Partition, Long> entry : this.data.entrySet()) {
                    this.log.trace(String.format("Final value for %s : %s", entry.getKey().toString(), String.valueOf(entry.getValue())));
                }
            }
        }
    }

    private void checkAndAddData(KafkaMessage kafkaMessage) {
        Key key = (Key) MessageUtils.decodeKey(kafkaMessage, KEY_CODEC);
        Long l = (Long) MessageUtils.decodePayload(kafkaMessage, VALUE_CODEC);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Loading key " + key + " with value " + l);
        }
        if (key == null || !ObjectUtils.nullSafeEquals(this.consumerId, key.getConsumerId())) {
            return;
        }
        if (null != l) {
            this.data.put(key.getPartition(), l);
        } else if (this.data.containsKey(key.getPartition())) {
            this.data.remove(key.getPartition());
        }
    }

    private void initializeProducer(BrokerAddress brokerAddress) throws Exception {
        ProducerMetadata producerMetadata = new ProducerMetadata(this.topic, Key.class, Long.class, KEY_CODEC, VALUE_CODEC);
        producerMetadata.setBatchBytes(this.batchBytes);
        producerMetadata.setCompressionType(this.compressionType);
        Properties properties = new Properties();
        properties.setProperty("linger.ms", Integer.toString(this.maxQueueBufferingTime));
        properties.setProperty("acks", Integer.toString(this.requiredAcks));
        this.producer = new ProducerFactoryBean(producerMetadata, brokerAddress.toString(), properties).m25getObject();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static int intFromBytes(byte[] bArr, int i) {
        return (bArr[i] << 24) | ((bArr[i + 1] & 255) << 16) | ((bArr[i + 2] & 255) << 8) | (bArr[i + 3] & 255);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static byte[] intToBytes(Integer num) {
        int intValue = num.intValue();
        return new byte[]{(byte) (intValue >>> 24), (byte) (intValue >>> 16), (byte) (intValue >>> 8), (byte) intValue};
    }
}
