package org.springframework.integration.kafka.listener;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import kafka.common.ErrorMapping;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.integration.kafka.core.BrokerAddress;
import org.springframework.integration.kafka.core.ConnectionFactory;
import org.springframework.integration.kafka.core.ConsumerException;
import org.springframework.integration.kafka.core.KafkaConsumerDefaults;
import org.springframework.integration.kafka.core.Partition;
import org.springframework.integration.kafka.core.PartitionNotFoundException;
import org.springframework.integration.kafka.core.Result;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/integration/kafka/listener/AbstractOffsetManager.class */
public abstract class AbstractOffsetManager implements OffsetManager, DisposableBean {
    protected final Log log;
    protected String consumerId;
    protected long referenceTimestamp;
    protected ConnectionFactory connectionFactory;
    protected Map<Partition, Long> initialOffsets;
    protected Map<Partition, Long> highestUpdatedOffsets;

    public AbstractOffsetManager(ConnectionFactory connectionFactory) {
        this(connectionFactory, new HashMap());
    }

    public AbstractOffsetManager(ConnectionFactory connectionFactory, Map<Partition, Long> map) {
        this.log = LogFactory.getLog(getClass());
        this.consumerId = KafkaConsumerDefaults.GROUP_ID;
        this.referenceTimestamp = KafkaConsumerDefaults.DEFAULT_OFFSET_RESET;
        this.highestUpdatedOffsets = new ConcurrentHashMap();
        Assert.notNull(connectionFactory, "A 'connectionFactory' can't be null");
        Assert.notNull(map, "An initialOffsets can't be null");
        this.connectionFactory = connectionFactory;
        this.initialOffsets = new HashMap(map);
    }

    public String getConsumerId() {
        return this.consumerId;
    }

    public void setConsumerId(String str) {
        this.consumerId = str;
    }

    public void setReferenceTimestamp(long j) {
        this.referenceTimestamp = j;
    }

    public void destroy() throws Exception {
        try {
            flush();
        } catch (IOException e) {
            this.log.error("Error while flushing the OffsetManager", e);
        }
        try {
            close();
        } catch (IOException e2) {
            this.log.error("Error while closing the OffsetManager", e2);
        }
    }

    @Override // org.springframework.integration.kafka.listener.OffsetManager
    public final synchronized void updateOffset(Partition partition, long j) {
        Long l = this.highestUpdatedOffsets.get(partition);
        if (l == null || l.longValue() < j) {
            this.highestUpdatedOffsets.put(partition, Long.valueOf(j));
            doUpdateOffset(partition, j);
        }
    }

    @Override // org.springframework.integration.kafka.listener.OffsetManager
    public final synchronized long getOffset(Partition partition) {
        Long doGetOffset = doGetOffset(partition);
        if (doGetOffset != null) {
            return doGetOffset.longValue();
        }
        if (this.initialOffsets.containsKey(partition)) {
            return this.initialOffsets.get(partition).longValue();
        }
        BrokerAddress leader = this.connectionFactory.getLeader(partition);
        if (leader == null) {
            throw new PartitionNotFoundException(partition);
        }
        Result<Long> fetchInitialOffset = this.connectionFactory.connect(leader).fetchInitialOffset(this.referenceTimestamp, partition);
        if (fetchInitialOffset.getErrors().size() > 0) {
            throw new ConsumerException(ErrorMapping.exceptionFor(fetchInitialOffset.getError(partition)));
        }
        if (fetchInitialOffset.getResults().containsKey(partition)) {
            return fetchInitialOffset.getResult(partition).longValue();
        }
        throw new IllegalStateException("Result does not contain an expected value");
    }

    @Override // org.springframework.integration.kafka.listener.OffsetManager
    public synchronized void resetOffsets(Collection<Partition> collection) {
        for (Partition partition : collection) {
            doRemoveOffset(partition);
            this.initialOffsets.remove(partition);
            this.highestUpdatedOffsets.remove(partition);
        }
    }

    @Override // org.springframework.integration.kafka.listener.OffsetManager
    public synchronized void deleteOffset(Partition partition) {
        doRemoveOffset(partition);
    }

    protected abstract void doUpdateOffset(Partition partition, long j);

    protected abstract void doRemoveOffset(Partition partition);

    protected abstract Long doGetOffset(Partition partition);
}
