/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer.impl;

import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.utils.SimpleCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TableView<T> {
    private static final Logger log = LoggerFactory.getLogger(TableView.class);
    private static final long CACHE_EXPIRE_TIMEOUT_MS = 60000L;
    private static final long CACHE_EXPIRE_CHECK_FREQUENCY_MS = 3000L;
    @VisibleForTesting
    protected final Function<TopicName, CompletableFuture<SystemTopicClient.Reader<T>>> readerCreator;
    private final Map<String, T> snapshots = new ConcurrentHashMap<String, T>();
    private final long clientOperationTimeoutMs;
    private final SimpleCache<NamespaceName, SystemTopicClient.Reader<T>> readers;

    public TableView(Function<TopicName, CompletableFuture<SystemTopicClient.Reader<T>>> readerCreator, long clientOperationTimeoutMs, ScheduledExecutorService executor) {
        this.readerCreator = readerCreator;
        this.clientOperationTimeoutMs = clientOperationTimeoutMs;
        this.readers = new SimpleCache(executor, 60000L, 3000L);
    }

    public T readLatest(String topic) throws Exception {
        SystemTopicClient.Reader<T> reader = this.getReader(topic);
        while (this.wait(reader.hasMoreEventsAsync(), "has more events").booleanValue()) {
            Message<T> msg = this.wait(reader.readNextAsync(), "read message");
            if (msg.getKey() == null) continue;
            if (msg.getValue() != null) {
                this.snapshots.put(msg.getKey(), msg.getValue());
                continue;
            }
            this.snapshots.remove(msg.getKey());
        }
        return this.snapshots.get(topic);
    }

    @VisibleForTesting
    protected SystemTopicClient.Reader<T> getReader(String topic) {
        TopicName topicName = TopicName.get((String)topic);
        return this.readers.get(topicName.getNamespaceObject(), () -> {
            try {
                return this.wait(this.readerCreator.apply(topicName), "create reader");
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }, __ -> __.closeAsync().exceptionally(e -> {
            log.warn("Failed to close reader {}", (Object)e.getMessage());
            return null;
        }));
    }

    private <R> R wait(CompletableFuture<R> future, String msg) throws Exception {
        try {
            return future.get(this.clientOperationTimeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (ExecutionException e) {
            throw new CompletionException("Failed to " + msg, e.getCause());
        }
    }
}

