/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl.transaction;

import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.Generated;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.Timeout;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.TimerTask;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionImpl
implements Transaction,
TimerTask {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(TransactionImpl.class);
    private final PulsarClientImpl client;
    private final long transactionTimeoutMs;
    private final long txnIdLeastBits;
    private final long txnIdMostBits;
    private final TxnID txnId;
    private final Map<String, CompletableFuture<Void>> registerPartitionMap;
    private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
    private final TransactionCoordinatorClientImpl tcClient;
    private CompletableFuture<Void> opFuture;
    private volatile long opCount = 0L;
    private static final AtomicLongFieldUpdater<TransactionImpl> OP_COUNT_UPDATE = AtomicLongFieldUpdater.newUpdater(TransactionImpl.class, "opCount");
    private volatile Transaction.State state = Transaction.State.OPEN;
    private static final AtomicReferenceFieldUpdater<TransactionImpl, Transaction.State> STATE_UPDATE = AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, Transaction.State.class, "state");
    private volatile boolean hasOpsFailed = false;
    private final Timeout timeout;

    @Override
    public void run(Timeout timeout) throws Exception {
        STATE_UPDATE.compareAndSet(this, Transaction.State.OPEN, Transaction.State.TIME_OUT);
    }

    TransactionImpl(PulsarClientImpl client, long transactionTimeoutMs, long txnIdLeastBits, long txnIdMostBits) {
        this.client = client;
        this.transactionTimeoutMs = transactionTimeoutMs;
        this.txnIdLeastBits = txnIdLeastBits;
        this.txnIdMostBits = txnIdMostBits;
        this.txnId = new TxnID(this.txnIdMostBits, this.txnIdLeastBits);
        this.registerPartitionMap = new ConcurrentHashMap<String, CompletableFuture<Void>>();
        this.registerSubscriptionMap = new ConcurrentHashMap<Pair<String, String>, CompletableFuture<Void>>();
        this.tcClient = client.getTcClient();
        this.opFuture = CompletableFuture.completedFuture(null);
        this.timeout = client.getTimer().newTimeout(this, transactionTimeoutMs, TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> registerProducedTopic(String topic) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        if (this.checkIfOpen(completableFuture)) {
            TransactionImpl transactionImpl = this;
            synchronized (transactionImpl) {
                return this.registerPartitionMap.compute(topic, (key, future) -> {
                    if (future != null) {
                        return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
                    }
                    return this.tcClient.addPublishPartitionToTxnAsync(this.txnId, Lists.newArrayList(topic)).thenCompose(ignored -> CompletableFuture.completedFuture(null));
                });
            }
        }
        return completableFuture;
    }

    public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
        if (OP_COUNT_UPDATE.getAndIncrement(this) == 0L) {
            this.opFuture = new CompletableFuture();
        }
        newSendFuture.whenComplete((messageId, e) -> {
            if (e != null) {
                log.error("The transaction [{}:{}] get an exception when send messages.", new Object[]{this.txnIdMostBits, this.txnIdLeastBits, e});
                if (!this.hasOpsFailed) {
                    this.hasOpsFailed = true;
                }
            }
            CompletableFuture<Void> future = this.opFuture;
            if (OP_COUNT_UPDATE.decrementAndGet(this) == 0L) {
                future.complete(null);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> registerAckedTopic(String topic, String subscription) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<Void>();
        if (this.checkIfOpen(completableFuture)) {
            TransactionImpl transactionImpl = this;
            synchronized (transactionImpl) {
                return this.registerSubscriptionMap.compute(Pair.of(topic, subscription), (key, future) -> {
                    if (future != null) {
                        return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
                    }
                    return this.tcClient.addSubscriptionToTxnAsync(this.txnId, topic, subscription).thenCompose(ignored -> CompletableFuture.completedFuture(null));
                });
            }
        }
        return completableFuture;
    }

    public void registerAckOp(CompletableFuture<Void> newAckFuture) {
        if (OP_COUNT_UPDATE.getAndIncrement(this) == 0L) {
            this.opFuture = new CompletableFuture();
        }
        newAckFuture.whenComplete((ignore, e) -> {
            if (e != null) {
                log.error("The transaction [{}:{}] get an exception when ack messages.", new Object[]{this.txnIdMostBits, this.txnIdLeastBits, e});
                if (!this.hasOpsFailed) {
                    this.hasOpsFailed = true;
                }
            }
            CompletableFuture<Void> future = this.opFuture;
            if (OP_COUNT_UPDATE.decrementAndGet(this) == 0L) {
                future.complete(null);
            }
        });
    }

    @Override
    public CompletableFuture<Void> commit() {
        this.timeout.cancel();
        return this.checkState(Transaction.State.OPEN, Transaction.State.COMMITTING).thenCompose(value -> {
            CompletableFuture commitFuture = new CompletableFuture();
            this.state = Transaction.State.COMMITTING;
            this.opFuture.whenComplete((v, e) -> {
                if (this.hasOpsFailed) {
                    ((CompletableFuture)this.checkState(Transaction.State.COMMITTING).thenCompose(__ -> this.internalAbort())).whenComplete((vx, ex) -> commitFuture.completeExceptionally(new PulsarClientException.TransactionHasOperationFailedException()));
                } else {
                    this.tcClient.commitAsync(this.txnId).whenComplete((vx, ex) -> {
                        if (ex != null) {
                            if (ex instanceof TransactionCoordinatorClientException.TransactionNotFoundException || ex instanceof TransactionCoordinatorClientException.InvalidTxnStatusException) {
                                this.state = Transaction.State.ERROR;
                            }
                            commitFuture.completeExceptionally((Throwable)ex);
                        } else {
                            this.state = Transaction.State.COMMITTED;
                            commitFuture.complete(vx);
                        }
                    });
                }
            });
            return commitFuture;
        });
    }

    @Override
    public CompletableFuture<Void> abort() {
        this.timeout.cancel();
        return this.checkState(Transaction.State.OPEN, Transaction.State.ABORTING).thenCompose(__ -> this.internalAbort());
    }

    private CompletableFuture<Void> internalAbort() {
        CompletableFuture<Void> abortFuture = new CompletableFuture<Void>();
        this.state = Transaction.State.ABORTING;
        this.opFuture.whenComplete((v, e) -> this.tcClient.abortAsync(this.txnId).whenComplete((vx, ex) -> {
            if (ex != null) {
                if (ex instanceof TransactionCoordinatorClientException.TransactionNotFoundException || ex instanceof TransactionCoordinatorClientException.InvalidTxnStatusException) {
                    this.state = Transaction.State.ERROR;
                }
                abortFuture.completeExceptionally((Throwable)ex);
            } else {
                this.state = Transaction.State.ABORTED;
                abortFuture.complete(null);
            }
        }));
        return abortFuture;
    }

    @Override
    public TxnID getTxnID() {
        return this.txnId;
    }

    @Override
    public Transaction.State getState() {
        return this.state;
    }

    public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
        if (this.state == Transaction.State.OPEN) {
            return true;
        }
        completableFuture.completeExceptionally(new TransactionCoordinatorClientException.InvalidTxnStatusException(this.txnId.toString(), this.state.name(), Transaction.State.OPEN.name()));
        return false;
    }

    private CompletableFuture<Void> checkState(Transaction.State ... expectedStates) {
        Transaction.State actualState = STATE_UPDATE.get(this);
        for (Transaction.State expectedState : expectedStates) {
            if (actualState != expectedState) continue;
            return CompletableFuture.completedFuture(null);
        }
        return FutureUtil.failedFuture(new TransactionCoordinatorClientException.InvalidTxnStatusException("[" + this.txnIdMostBits + ":" + this.txnIdLeastBits + "] with unexpected state: " + actualState.name() + ", expect: " + Arrays.toString((Object[])expectedStates)));
    }

    @Generated
    public PulsarClientImpl getClient() {
        return this.client;
    }

    @Generated
    public long getTransactionTimeoutMs() {
        return this.transactionTimeoutMs;
    }

    @Generated
    public long getTxnIdLeastBits() {
        return this.txnIdLeastBits;
    }

    @Generated
    public long getTxnIdMostBits() {
        return this.txnIdMostBits;
    }

    @Generated
    public Map<String, CompletableFuture<Void>> getRegisterPartitionMap() {
        return this.registerPartitionMap;
    }

    @Generated
    public Map<Pair<String, String>, CompletableFuture<Void>> getRegisterSubscriptionMap() {
        return this.registerSubscriptionMap;
    }

    @Generated
    public TransactionCoordinatorClientImpl getTcClient() {
        return this.tcClient;
    }

    @Generated
    public CompletableFuture<Void> getOpFuture() {
        return this.opFuture;
    }

    @Generated
    public long getOpCount() {
        return this.opCount;
    }

    @Generated
    public boolean isHasOpsFailed() {
        return this.hasOpsFailed;
    }

    @Generated
    public Timeout getTimeout() {
        return this.timeout;
    }
}

