package org.apache.samza.zk;

import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.samza.zk.ZkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/zk/ZkBarrierForVersionUpgrade.class */
public class ZkBarrierForVersionUpgrade {
    private static final Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
    private final ZkUtils zkUtils;
    private final BarrierKeyBuilder keyBuilder;
    private final Optional<ZkBarrierListener> barrierListenerOptional;
    private final ScheduleAfterDebounceTime debounceTimer;

    /* loaded from: input_file:org/apache/samza/zk/ZkBarrierForVersionUpgrade$BarrierKeyBuilder.class */
    class BarrierKeyBuilder {
        private static final String BARRIER_PARTICIPANTS = "/barrier_participants";
        private static final String BARRIER_STATE = "/barrier_state";
        private final String barrierRoot;

        BarrierKeyBuilder(String str) {
            if (str == null || str.trim().isEmpty() || !str.trim().startsWith("/")) {
                throw new IllegalArgumentException("Barrier root path cannot be null or empty and the path has to start with '/'");
            }
            this.barrierRoot = str;
        }

        String getBarrierRoot() {
            return this.barrierRoot;
        }

        String getBarrierPath(String str) {
            return String.format("%s/barrier_%s", this.barrierRoot, str);
        }

        String getBarrierParticipantsPath(String str) {
            return getBarrierPath(str) + BARRIER_PARTICIPANTS;
        }

        String getBarrierStatePath(String str) {
            return getBarrierPath(str) + BARRIER_STATE;
        }
    }

    /* loaded from: input_file:org/apache/samza/zk/ZkBarrierForVersionUpgrade$State.class */
    public enum State {
        NEW("NEW"),
        TIMED_OUT("TIMED_OUT"),
        DONE("DONE");

        private String str;

        State(String str) {
            this.str = str;
        }

        @Override // java.lang.Enum
        public String toString() {
            return this.str;
        }
    }

    /* loaded from: input_file:org/apache/samza/zk/ZkBarrierForVersionUpgrade$ZkBarrierChangeHandler.class */
    class ZkBarrierChangeHandler extends ZkUtils.GenerationAwareZkChildListener {
        private static final String ACTION_NAME = "ZkBarrierChangeHandler";
        private final String barrierVersion;
        private final List<String> expectedParticipantIds;

        public ZkBarrierChangeHandler(String str, List<String> list, ZkUtils zkUtils) {
            super(zkUtils, ACTION_NAME);
            this.barrierVersion = str;
            this.expectedParticipantIds = list;
        }

        @Override // org.apache.samza.zk.ZkUtils.GenerationAwareZkChildListener
        public void doHandleChildChange(String str, List<String> list) {
            if (list == null) {
                ZkBarrierForVersionUpgrade.LOG.info("Received notification with null participants for barrier: {}. Ignoring it.", str);
                return;
            }
            ZkBarrierForVersionUpgrade.LOG.info(String.format("Current participants in barrier version: %s = %s.", this.barrierVersion, list));
            ZkBarrierForVersionUpgrade.LOG.info(String.format("Expected participants in barrier version: %s = %s.", this.barrierVersion, this.expectedParticipantIds));
            if (list.size() == this.expectedParticipantIds.size() && CollectionUtils.containsAll(list, this.expectedParticipantIds)) {
                ZkBarrierForVersionUpgrade.this.debounceTimer.scheduleAfterDebounceTime(ACTION_NAME, 0L, () -> {
                    String barrierStatePath = ZkBarrierForVersionUpgrade.this.keyBuilder.getBarrierStatePath(this.barrierVersion);
                    State valueOf = State.valueOf((String) ZkBarrierForVersionUpgrade.this.zkUtils.getZkClient().readData(barrierStatePath));
                    if (Objects.equals(valueOf, State.NEW)) {
                        ZkBarrierForVersionUpgrade.LOG.info(String.format("Expected participants has joined the barrier version: %s. Marking the barrier state: %s as %s.", this.barrierVersion, barrierStatePath, State.DONE));
                        ZkBarrierForVersionUpgrade.this.zkUtils.writeData(barrierStatePath, State.DONE.toString());
                    } else {
                        ZkBarrierForVersionUpgrade.LOG.debug(String.format("Barrier version: %s is at: %s state. Not marking barrier as %s.", this.barrierVersion, valueOf, State.DONE));
                    }
                    ZkBarrierForVersionUpgrade.LOG.info("Unsubscribing child changes on the path: {} for barrier version: {}.", str, this.barrierVersion);
                    ZkBarrierForVersionUpgrade.this.zkUtils.unsubscribeChildChanges(str, this);
                });
            }
        }
    }

    /* loaded from: input_file:org/apache/samza/zk/ZkBarrierForVersionUpgrade$ZkBarrierReachedHandler.class */
    class ZkBarrierReachedHandler extends ZkUtils.GenerationAwareZkDataListener {
        private final String barrierStatePath;
        private final String barrierVersion;

        public ZkBarrierReachedHandler(String str, String str2, ZkUtils zkUtils) {
            super(zkUtils, "ZkBarrierReachedHandler");
            this.barrierStatePath = str;
            this.barrierVersion = str2;
        }

        @Override // org.apache.samza.zk.ZkUtils.GenerationAwareZkDataListener
        public void doHandleDataChange(String str, Object obj) {
            ZkBarrierForVersionUpgrade.LOG.info(String.format("Received barrierState change notification for barrier version: %s from zkNode: %s with data: %s.", this.barrierVersion, str, obj));
            State valueOf = State.valueOf((String) obj);
            if (!ImmutableList.of(State.DONE, State.TIMED_OUT).contains(valueOf)) {
                ZkBarrierForVersionUpgrade.LOG.debug("Barrier version: {} is at state: {}. Ignoring the barrierState change notification.", this.barrierVersion, valueOf);
            } else {
                ZkBarrierForVersionUpgrade.this.zkUtils.unsubscribeDataChanges(this.barrierStatePath, this);
                ZkBarrierForVersionUpgrade.this.barrierListenerOptional.ifPresent(zkBarrierListener -> {
                    zkBarrierListener.onBarrierStateChanged(this.barrierVersion, valueOf);
                });
            }
        }

        @Override // org.apache.samza.zk.ZkUtils.GenerationAwareZkDataListener
        public void doHandleDataDeleted(String str) {
            ZkBarrierForVersionUpgrade.LOG.warn("Data deleted in path: " + str + " barrierVersion: " + this.barrierVersion);
        }
    }

    public ZkBarrierForVersionUpgrade(String str, ZkUtils zkUtils, ZkBarrierListener zkBarrierListener, ScheduleAfterDebounceTime scheduleAfterDebounceTime) {
        if (zkUtils == null) {
            throw new RuntimeException("Cannot operate ZkBarrierForVersionUpgrade without ZkUtils.");
        }
        this.zkUtils = zkUtils;
        this.keyBuilder = new BarrierKeyBuilder(str);
        this.barrierListenerOptional = Optional.ofNullable(zkBarrierListener);
        this.debounceTimer = scheduleAfterDebounceTime;
    }

    public void create(String str, List<String> list) {
        LOG.info(String.format("Creating barrier with version: %s, participants: %s.", str, list));
        String barrierRoot = this.keyBuilder.getBarrierRoot();
        String barrierParticipantsPath = this.keyBuilder.getBarrierParticipantsPath(str);
        String barrierStatePath = this.keyBuilder.getBarrierStatePath(str);
        this.zkUtils.validatePaths(new String[]{barrierRoot, this.keyBuilder.getBarrierPath(str), barrierParticipantsPath, barrierStatePath});
        LOG.info("Marking the barrier state: {} as {}.", barrierStatePath, State.NEW);
        this.zkUtils.writeData(barrierStatePath, State.NEW.toString());
        LOG.info("Subscribing child changes on the path: {} for barrier version: {}.", barrierParticipantsPath, str);
        this.zkUtils.subscribeChildChanges(barrierParticipantsPath, new ZkBarrierChangeHandler(str, list, this.zkUtils));
        this.barrierListenerOptional.ifPresent(zkBarrierListener -> {
            zkBarrierListener.onBarrierCreated(str);
        });
    }

    public void join(String str, String str2) {
        LOG.info("Joining the barrier version: {} as participant: {}.", str, str2);
        String barrierStatePath = this.keyBuilder.getBarrierStatePath(str);
        LOG.info("Subscribing data changes on the path: {} for barrier version: {}.", barrierStatePath, str);
        this.zkUtils.subscribeDataChanges(barrierStatePath, new ZkBarrierReachedHandler(barrierStatePath, str, this.zkUtils));
        this.zkUtils.getZkClient().createPersistent(String.format("%s/%s", this.keyBuilder.getBarrierParticipantsPath(str), str2));
    }

    public void expire(String str) {
        String barrierStatePath = this.keyBuilder.getBarrierStatePath(str);
        State valueOf = State.valueOf((String) this.zkUtils.getZkClient().readData(barrierStatePath));
        if (!Objects.equals(valueOf, State.NEW)) {
            LOG.debug(String.format("Barrier version: %s is at: %s state. Not marking barrier as %s.", str, valueOf, State.TIMED_OUT));
        } else {
            LOG.info(String.format("Expiring the barrier version: %s. Marking the barrier state: %s as %s.", str, barrierStatePath, State.TIMED_OUT));
            this.zkUtils.writeData(this.keyBuilder.getBarrierStatePath(str), State.TIMED_OUT.toString());
        }
    }

    public static int getVersion(String str) {
        return Integer.valueOf(str.substring(str.lastIndexOf(95) + 1)).intValue();
    }
}
