/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import io.netty.buffer.ByteBuf;
import io.prometheus.client.Gauge;
import java.io.IOException;
import java.time.Clock;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarMarkers;
import org.apache.pulsar.common.protocol.Markers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicatedSubscriptionsController
implements AutoCloseable,
Topic.PublishContext {
    private static final Logger log = LoggerFactory.getLogger(ReplicatedSubscriptionsController.class);
    private final PersistentTopic topic;
    private final String localCluster;
    private final ScheduledFuture<?> timer;
    private final ConcurrentMap<String, ReplicatedSubscriptionsSnapshotBuilder> pendingSnapshots = new ConcurrentHashMap<String, ReplicatedSubscriptionsSnapshotBuilder>();
    private static final Gauge pendingSnapshotsMetric = (Gauge)Gauge.build((String)"pulsar_replicated_subscriptions_pending_snapshots", (String)"Counter of currently pending snapshots").register();

    public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
        this.topic = topic;
        this.localCluster = localCluster;
        this.timer = topic.getBrokerService().pulsar().getExecutor().scheduleAtFixedRate(this::startNewSnapshot, 0L, topic.getBrokerService().pulsar().getConfiguration().getReplicatedSubscriptionsSnapshotFrequencyMillis(), TimeUnit.MILLISECONDS);
    }

    public void receivedReplicatedSubscriptionMarker(Position position, int markerType, ByteBuf payload) {
        try {
            switch (markerType) {
                case 10: {
                    this.receivedSnapshotRequest(Markers.parseReplicatedSubscriptionsSnapshotRequest((ByteBuf)payload));
                    break;
                }
                case 11: {
                    this.receivedSnapshotResponse(position, Markers.parseReplicatedSubscriptionsSnapshotResponse((ByteBuf)payload));
                    break;
                }
                case 13: {
                    this.receiveSubscriptionUpdated(Markers.parseReplicatedSubscriptionsUpdate((ByteBuf)payload));
                    break;
                }
            }
        }
        catch (IOException e) {
            log.warn("[{}] Failed to parse marker: {}", (Object)this.topic.getName(), (Object)e);
        }
    }

    public void localSubscriptionUpdated(String subscriptionName, PulsarMarkers.ReplicatedSubscriptionsSnapshot snapshot) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Updating subscription to snapshot {}", new Object[]{this.topic, subscriptionName, snapshot.getClustersList().stream().map(cmid -> String.format("%s -> %d:%d", cmid.getCluster(), cmid.getMessageId().getLedgerId(), cmid.getMessageId().getEntryId())).collect(Collectors.toList())});
        }
        TreeMap<String, PulsarMarkers.MessageIdData> clusterIds = new TreeMap<String, PulsarMarkers.MessageIdData>();
        int size = snapshot.getClustersCount();
        for (int i = 0; i < size; ++i) {
            PulsarMarkers.ClusterMessageId cmid2 = snapshot.getClusters(i);
            clusterIds.put(cmid2.getCluster(), cmid2.getMessageId());
        }
        ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate((String)subscriptionName, clusterIds);
        this.topic.publishMessage(subscriptionUpdate, this);
    }

    private void receivedSnapshotRequest(PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest request) {
        PositionImpl lastMsgId = (PositionImpl)this.topic.getLastMessageId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received snapshot request. Last msg id: {}", (Object)this.topic.getName(), (Object)lastMsgId);
        }
        ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse((String)request.getSnapshotId(), (String)request.getSourceCluster(), (String)this.localCluster, (long)lastMsgId.getLedgerId(), (long)lastMsgId.getEntryId());
        this.topic.publishMessage(marker, this);
    }

    private void receivedSnapshotResponse(Position position, PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse response) {
        String snapshotId = response.getSnapshotId();
        ReplicatedSubscriptionsSnapshotBuilder builder = (ReplicatedSubscriptionsSnapshotBuilder)this.pendingSnapshots.get(snapshotId);
        if (builder == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received late reply for timed-out snapshot {} from {}", new Object[]{this.topic.getName(), snapshotId, response.getCluster().getCluster()});
            }
            return;
        }
        builder.receivedSnapshotResponse(position, response);
    }

    private void receiveSubscriptionUpdated(PulsarMarkers.ReplicatedSubscriptionsUpdate update) {
        PersistentSubscription sub;
        PulsarMarkers.MessageIdData updatedMessageId = null;
        int size = update.getClustersCount();
        for (int i = 0; i < size; ++i) {
            PulsarMarkers.ClusterMessageId cmid = update.getClusters(i);
            if (!this.localCluster.equals(cmid.getCluster())) continue;
            updatedMessageId = cmid.getMessageId();
        }
        if (updatedMessageId == null) {
            return;
        }
        PositionImpl pos = new PositionImpl(updatedMessageId.getLedgerId(), updatedMessageId.getEntryId());
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Received update for subscription to {}", new Object[]{this.topic, update.getSubscriptionName(), pos});
        }
        if ((sub = this.topic.getSubscription(update.getSubscriptionName())) != null) {
            sub.acknowledgeMessage(Collections.singletonList(pos), PulsarApi.CommandAck.AckType.Cumulative, Collections.emptyMap());
        } else {
            log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription", new Object[]{this.topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos});
            this.topic.createSubscription(update.getSubscriptionName(), PulsarApi.CommandSubscribe.InitialPosition.Latest, true);
        }
    }

    private void startNewSnapshot() {
        this.cleanupTimedOutSnapshots();
        AtomicBoolean anyReplicatorDisconnected = new AtomicBoolean();
        this.topic.getReplicators().forEach((cluster, replicator) -> {
            if (!replicator.isConnected()) {
                anyReplicatorDisconnected.set(true);
            }
        });
        if (anyReplicatorDisconnected.get()) {
            return;
        }
        pendingSnapshotsMetric.inc();
        ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this, this.topic.getReplicators().keys(), this.topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
        this.pendingSnapshots.put(builder.getSnapshotId(), builder);
        builder.start();
    }

    private void cleanupTimedOutSnapshots() {
        Iterator it = this.pendingSnapshots.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = it.next();
            if (!((ReplicatedSubscriptionsSnapshotBuilder)entry.getValue()).isTimedOut()) continue;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Snapshot creation timed out for {}", (Object)this.topic.getName(), entry.getKey());
            }
            pendingSnapshotsMetric.dec();
            it.remove();
        }
    }

    void snapshotCompleted(String snapshotId) {
        this.pendingSnapshots.remove(snapshotId);
        pendingSnapshotsMetric.dec();
    }

    void writeMarker(ByteBuf marker) {
        this.topic.publishMessage(marker, this);
    }

    @Override
    public void completed(Exception e, long ledgerId, long entryId) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Published marker at {}:{}. Exception: {}", new Object[]{this.topic.getName(), ledgerId, entryId, e});
        }
    }

    PersistentTopic topic() {
        return this.topic;
    }

    String localCluster() {
        return this.localCluster;
    }

    @Override
    public void close() {
        this.timer.cancel(true);
    }
}

