/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TransactionBufferHandlerImpl
implements TransactionBufferHandler,
TimerTask {
    private static final Logger log = LoggerFactory.getLogger(TransactionBufferHandlerImpl.class);
    private final ConcurrentSkipListMap<Long, OpRequestSend> pendingRequests;
    private final ConnectionPool connectionPool;
    private final NamespaceService namespaceService;
    private final AtomicLong requestIdGenerator = new AtomicLong();
    private long operationTimeoutInMills;
    private Timeout requestTimeout;
    private HashedWheelTimer timer;
    private final Semaphore semaphore;
    private final boolean blockIfReachMaxPendingOps;

    public TransactionBufferHandlerImpl(ConnectionPool connectionPool, NamespaceService namespaceService) {
        this.connectionPool = connectionPool;
        this.pendingRequests = new ConcurrentSkipListMap();
        this.namespaceService = namespaceService;
        this.operationTimeoutInMills = 3000L;
        this.semaphore = new Semaphore(10000);
        this.blockIfReachMaxPendingOps = true;
        this.timer = new HashedWheelTimer((ThreadFactory)new DefaultThreadFactory("pulsar-transaction-buffer-client-timer"));
        this.requestTimeout = this.timer.newTimeout((TimerTask)this, this.operationTimeoutInMills, TimeUnit.MILLISECONDS);
    }

    public CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits, long txnIdLeastBits, PulsarApi.TxnAction action, List<MessageId> messageIdList) {
        CompletableFuture<TxnID> cb = new CompletableFuture<TxnID>();
        if (!this.canSendRequest(cb)) {
            return cb;
        }
        long requestId = this.requestIdGenerator.getAndIncrement();
        ArrayList<PulsarApi.MessageIdData> messageIdDataList = new ArrayList<PulsarApi.MessageIdData>();
        for (MessageId messageId : messageIdList) {
            messageIdDataList.add(PulsarApi.MessageIdData.newBuilder().setLedgerId(((MessageIdImpl)messageId).getLedgerId()).setEntryId(((MessageIdImpl)messageId).getEntryId()).setPartition(((MessageIdImpl)messageId).getPartitionIndex()).build());
        }
        ByteBuf cmd = Commands.newEndTxnOnPartition((long)requestId, (long)txnIdLeastBits, (long)txnIdMostBits, (String)topic, (PulsarApi.TxnAction)action, messageIdDataList);
        OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
        this.pendingRequests.put(requestId, op);
        cmd.retain();
        this.cnx(topic).whenComplete((clientCnx, throwable) -> {
            if (throwable == null) {
                try {
                    clientCnx.ctx().writeAndFlush((Object)cmd, clientCnx.ctx().voidPromise());
                }
                catch (Exception e) {
                    cb.completeExceptionally(e);
                    this.pendingRequests.remove(requestId);
                    op.recycle();
                }
            } else {
                cb.completeExceptionally((Throwable)throwable);
                this.pendingRequests.remove(requestId);
                op.recycle();
            }
        });
        return cb;
    }

    public CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscription, long txnIdMostBits, long txnIdLeastBits, PulsarApi.TxnAction action) {
        CompletableFuture<TxnID> cb = new CompletableFuture<TxnID>();
        if (!this.canSendRequest(cb)) {
            return cb;
        }
        long requestId = this.requestIdGenerator.getAndIncrement();
        ByteBuf cmd = Commands.newEndTxnOnSubscription((long)requestId, (long)txnIdLeastBits, (long)txnIdMostBits, (String)topic, (String)subscription, (PulsarApi.TxnAction)action);
        OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
        this.pendingRequests.put(requestId, op);
        cmd.retain();
        this.cnx(topic).whenComplete((clientCnx, throwable) -> {
            if (throwable == null) {
                try {
                    clientCnx.ctx().writeAndFlush((Object)cmd, clientCnx.ctx().voidPromise());
                }
                catch (Exception e) {
                    cb.completeExceptionally(e);
                    this.pendingRequests.remove(requestId);
                    op.recycle();
                }
            } else {
                cb.completeExceptionally((Throwable)throwable);
                this.pendingRequests.remove(requestId);
                op.recycle();
            }
        });
        return cb;
    }

    public void handleEndTxnOnTopicResponse(long requestId, PulsarApi.CommandEndTxnOnPartitionResponse response) {
        OpRequestSend op = this.pendingRequests.remove(requestId);
        if (op == null) {
            if (log.isDebugEnabled()) {
                log.debug("Got end txn on topic response for timeout {} - {}", (Object)response.getTxnidMostBits(), (Object)response.getTxnidLeastBits());
            }
            return;
        }
        if (!response.hasError()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Got end txn on topic response for for request {}", (Object)op.topic, (Object)response.getRequestId());
            }
            log.info("[{}] Got end txn on topic response for for request {}", (Object)op.topic, (Object)response.getRequestId());
            op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
        } else {
            log.error("[{}] Got end txn on topic response for request {} error {}", new Object[]{op.topic, response.getRequestId(), response.getError()});
            op.cb.completeExceptionally((Throwable)this.getException(response.getError(), response.getMessage()));
        }
        op.recycle();
    }

    public void handleEndTxnOnSubscriptionResponse(long requestId, PulsarApi.CommandEndTxnOnSubscriptionResponse response) {
        OpRequestSend op = this.pendingRequests.remove(requestId);
        if (op == null) {
            if (log.isDebugEnabled()) {
                log.debug("Got end txn on subscription response for timeout {} - {}", (Object)response.getTxnidMostBits(), (Object)response.getTxnidLeastBits());
            }
            return;
        }
        if (!response.hasError()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Got end txn on subscription response for for request {}", (Object)op.topic, (Object)response.getRequestId());
            }
            op.cb.complete(new TxnID(response.getTxnidMostBits(), response.getTxnidLeastBits()));
        } else {
            log.error("[{}] Got end txn on subscription response for request {} error {}", new Object[]{op.topic, response.getRequestId(), response.getError()});
            op.cb.completeExceptionally((Throwable)this.getException(response.getError(), response.getMessage()));
        }
        op.recycle();
    }

    private CompletableFuture<ClientCnx> cnx(String topic) {
        return this.getServiceUrl(topic).thenCompose(serviceUrl -> {
            try {
                if (serviceUrl == null) {
                    return CompletableFuture.completedFuture(null);
                }
                URI uri = new URI((String)serviceUrl);
                return this.connectionPool.getConnection(InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort())).thenCompose(clientCnx -> {
                    clientCnx.registerTransactionBufferHandler((TransactionBufferHandler)this);
                    return CompletableFuture.completedFuture(clientCnx);
                });
            }
            catch (Exception e) {
                return FutureUtil.failedFuture((Throwable)e);
            }
        });
    }

    private CompletableFuture<String> getServiceUrl(String topic) {
        TopicName topicName = TopicName.get((String)topic);
        return ((CompletableFuture)this.namespaceService.getBundleAsync(topicName).thenCompose(this.namespaceService::getOwnerAsync)).thenCompose(ned -> {
            String serviceUrl = null;
            if (ned.isPresent()) {
                serviceUrl = ((NamespaceEphemeralData)ned.get()).getNativeUrl();
            }
            return CompletableFuture.completedFuture(serviceUrl);
        });
    }

    private TransactionBufferClientException getException(PulsarApi.ServerError serverError, String msg) {
        return new TransactionBufferClientException(msg);
    }

    private boolean canSendRequest(CompletableFuture<?> callback) {
        try {
            if (this.blockIfReachMaxPendingOps) {
                this.semaphore.acquire();
            } else if (!this.semaphore.tryAcquire()) {
                callback.completeExceptionally((Throwable)new TransactionBufferClientException("Reach max pending ops."));
                return false;
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            callback.completeExceptionally((Throwable)TransactionBufferClientException.unwrap((Throwable)e));
            return false;
        }
        return true;
    }

    public void run(Timeout timeout) throws Exception {
        long diff;
        if (timeout.isCancelled()) {
            return;
        }
        OpRequestSend peeked = null;
        Map.Entry<Long, OpRequestSend> firstEntry = this.pendingRequests.firstEntry();
        OpRequestSend opRequestSend = peeked = firstEntry == null ? null : firstEntry.getValue();
        while (peeked != null && peeked.createdAt + this.operationTimeoutInMills - System.currentTimeMillis() <= 0L && !peeked.cb.isDone()) {
            peeked.cb.completeExceptionally((Throwable)new TransactionBufferClientException.RequestTimeoutException());
            this.onResponse(peeked);
            firstEntry = this.pendingRequests.firstEntry();
            peeked = firstEntry == null ? null : firstEntry.getValue();
        }
        long timeToWaitMs = peeked == null ? this.operationTimeoutInMills : ((diff = peeked.createdAt + this.operationTimeoutInMills - System.currentTimeMillis()) <= 0L ? this.operationTimeoutInMills : diff);
        this.requestTimeout = this.timer.newTimeout((TimerTask)this, timeToWaitMs, TimeUnit.MILLISECONDS);
    }

    void onResponse(OpRequestSend op) {
        ReferenceCountUtil.safeRelease((Object)op.byteBuf);
        op.recycle();
        this.semaphore.release();
    }

    public void close() {
        this.timer.stop();
    }

    private static final class OpRequestSend {
        long requestId;
        String topic;
        ByteBuf byteBuf;
        CompletableFuture<TxnID> cb;
        long createdAt;
        private final Recycler.Handle<OpRequestSend> recyclerHandle;
        private static final Recycler<OpRequestSend> RECYCLER = new Recycler<OpRequestSend>(){

            protected OpRequestSend newObject(Recycler.Handle<OpRequestSend> handle) {
                return new OpRequestSend(handle);
            }
        };

        static OpRequestSend create(long requestId, String topic, ByteBuf byteBuf, CompletableFuture<TxnID> cb) {
            OpRequestSend op = (OpRequestSend)RECYCLER.get();
            op.requestId = requestId;
            op.topic = topic;
            op.byteBuf = byteBuf;
            op.cb = cb;
            op.createdAt = System.nanoTime();
            return op;
        }

        void recycle() {
            this.recyclerHandle.recycle((Object)this);
        }

        private OpRequestSend(Recycler.Handle<OpRequestSend> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }
    }
}

