/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.loadbalance.extensions.store;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;

public class TableViewLoadDataStoreImpl<T>
implements LoadDataStore<T> {
    private TableView<T> tableView;
    private final Producer<T> producer;
    private final PulsarClient client;
    private final String topic;
    private final Class<T> clazz;

    public TableViewLoadDataStoreImpl(PulsarClient client, String topic, Class<T> clazz) throws LoadDataStoreException {
        try {
            this.client = client;
            this.producer = client.newProducer(Schema.JSON(clazz)).topic(topic).create();
            this.topic = topic;
            this.clazz = clazz;
        }
        catch (Exception e) {
            throw new LoadDataStoreException(e);
        }
    }

    @Override
    public CompletableFuture<Void> pushAsync(String key, T loadData) {
        return this.producer.newMessage().key(key).value(loadData).sendAsync().thenAccept(__ -> {});
    }

    @Override
    public CompletableFuture<Void> removeAsync(String key) {
        return this.producer.newMessage().key(key).value(null).sendAsync().thenAccept(__ -> {});
    }

    @Override
    public Optional<T> get(String key) {
        this.validateTableViewStart();
        return Optional.ofNullable(this.tableView.get(key));
    }

    @Override
    public void forEach(BiConsumer<String, T> action) {
        this.validateTableViewStart();
        this.tableView.forEach(action);
    }

    @Override
    public Set<Map.Entry<String, T>> entrySet() {
        this.validateTableViewStart();
        return this.tableView.entrySet();
    }

    @Override
    public int size() {
        this.validateTableViewStart();
        return this.tableView.size();
    }

    @Override
    public void closeTableView() throws IOException {
        if (this.tableView != null) {
            this.tableView.close();
            this.tableView = null;
        }
    }

    @Override
    public void startTableView() throws LoadDataStoreException {
        if (this.tableView == null) {
            try {
                this.tableView = this.client.newTableViewBuilder(Schema.JSON(this.clazz)).topic(this.topic).create();
            }
            catch (PulsarClientException e) {
                this.tableView = null;
                throw new LoadDataStoreException(e);
            }
        }
    }

    @Override
    public void close() throws IOException {
        if (this.producer != null) {
            this.producer.close();
        }
        this.closeTableView();
    }

    private void validateTableViewStart() {
        if (this.tableView == null) {
            throw new IllegalStateException("table view has not been started");
        }
    }
}

