package io.debezium.heartbeat;

import io.debezium.config.Configuration;
import io.debezium.function.BlockingConsumer;
import io.debezium.util.Clock;
import io.debezium.util.SchemaNameAdjuster;
import io.debezium.util.Threads;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/heartbeat/HeartbeatImpl.class */
class HeartbeatImpl implements Heartbeat {
    static final int DEFAULT_HEARTBEAT_INTERVAL = 0;
    static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat";
    private final String topicName;
    private final Supplier<OffsetPosition> positionSupplier;
    private final Duration heartbeatInterval;
    private final String key;
    private volatile Threads.Timer heartbeatTimeout = resetHeartbeat();
    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatImpl.class);
    private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);
    private static final String SERVER_NAME_KEY = "serverName";
    private static Schema KEY_SCHEMA = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector.common.ServerNameKey")).field(SERVER_NAME_KEY, Schema.STRING_SCHEMA).build();

    /* JADX INFO: Access modifiers changed from: package-private */
    public HeartbeatImpl(Configuration configuration, String str, String str2, Supplier<OffsetPosition> supplier) {
        this.topicName = str;
        this.positionSupplier = supplier;
        this.key = str2;
        this.heartbeatInterval = configuration.getDuration(HEARTBEAT_INTERVAL, ChronoUnit.MILLIS);
    }

    @Override // io.debezium.heartbeat.Heartbeat
    public void heartbeat(BlockingConsumer<SourceRecord> blockingConsumer) throws InterruptedException {
        if (this.heartbeatTimeout.expired()) {
            LOGGER.debug("Generating heartbeat event");
            blockingConsumer.accept(heartbeatRecord());
            this.heartbeatTimeout = resetHeartbeat();
        }
    }

    private Struct serverNameKey(String str) {
        Struct struct = new Struct(KEY_SCHEMA);
        struct.put(SERVER_NAME_KEY, str);
        return struct;
    }

    private SourceRecord heartbeatRecord() {
        Integer valueOf = Integer.valueOf(DEFAULT_HEARTBEAT_INTERVAL);
        OffsetPosition offsetPosition = this.positionSupplier.get();
        return new SourceRecord(offsetPosition.partition(), offsetPosition.offset(), this.topicName, valueOf, KEY_SCHEMA, serverNameKey(this.key), (Schema) null, (Object) null);
    }

    private Threads.Timer resetHeartbeat() {
        return Threads.timer(Clock.SYSTEM, this.heartbeatInterval);
    }
}
