package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import kafka.utils.ZKGroupTopicDirs;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.flink.shaded.curator.org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/ZookeeperOffsetHandler.class */
public class ZookeeperOffsetHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
    private final String groupId;
    private final CuratorFramework curatorClient;

    public ZookeeperOffsetHandler(Properties properties) {
        this.groupId = properties.getProperty("group.id");
        if (this.groupId == null) {
            throw new IllegalArgumentException("Required property 'group.id' has not been set");
        }
        String property = properties.getProperty("zookeeper.connect");
        if (property == null) {
            throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
        }
        this.curatorClient = CuratorFrameworkFactory.newClient(property, Integer.valueOf(properties.getProperty("zookeeper.session.timeout.ms", "60000")).intValue(), Integer.valueOf(properties.getProperty("zookeeper.connection.timeout.ms", "15000")).intValue(), new ExponentialBackoffRetry(Integer.valueOf(properties.getProperty("flink.zookeeper.base-sleep-time.ms", "100")).intValue(), Integer.valueOf(properties.getProperty("flink.zookeeper.max-retries", "10")).intValue()));
        this.curatorClient.start();
    }

    public void prepareAndCommitOffsets(Map<KafkaTopicPartition, Long> map) throws Exception {
        for (Map.Entry<KafkaTopicPartition, Long> entry : map.entrySet()) {
            KafkaTopicPartition key = entry.getKey();
            Long value = entry.getValue();
            if (value != null && value.longValue() >= 0) {
                setOffsetInZooKeeper(this.curatorClient, this.groupId, key.getTopic(), key.getPartition(), value.longValue() + 1);
            }
        }
    }

    public Long getCommittedOffset(KafkaTopicPartition kafkaTopicPartition) throws Exception {
        return getOffsetFromZooKeeper(this.curatorClient, this.groupId, kafkaTopicPartition.getTopic(), kafkaTopicPartition.getPartition());
    }

    public void close() throws IOException {
        this.curatorClient.close();
    }

    public static void setOffsetInZooKeeper(CuratorFramework curatorFramework, String str, String str2, int i, long j) throws Exception {
        String str3 = new ZKGroupTopicDirs(str, str2).consumerOffsetDir() + "/" + i;
        curatorFramework.newNamespaceAwareEnsurePath(str3).ensure(curatorFramework.getZookeeperClient());
        curatorFramework.setData().forPath(str3, Long.toString(j).getBytes(ConfigConstants.DEFAULT_CHARSET));
    }

    public static Long getOffsetFromZooKeeper(CuratorFramework curatorFramework, String str, String str2, int i) throws Exception {
        String str3 = new ZKGroupTopicDirs(str, str2).consumerOffsetDir() + "/" + i;
        curatorFramework.newNamespaceAwareEnsurePath(str3).ensure(curatorFramework.getZookeeperClient());
        byte[] bArr = (byte[]) curatorFramework.getData().forPath(str3);
        if (bArr == null) {
            return null;
        }
        String str4 = new String(bArr, ConfigConstants.DEFAULT_CHARSET);
        if (str4.length() == 0) {
            return null;
        }
        try {
            return Long.valueOf(str4);
        } catch (NumberFormatException e) {
            LOG.error("The offset in ZooKeeper for group '{}', topic '{}', partition {} is a malformed string: {}", new Object[]{str, str2, Integer.valueOf(i), str4});
            return null;
        }
    }
}
