package org.apache.samza.zk;

import java.time.Duration;
import java.util.List;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.DistributedLock;
import org.apache.samza.zk.ZkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/zk/ZkDistributedLock.class */
public class ZkDistributedLock implements DistributedLock {
    public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class);
    private static final String STATE_INITED = "sate_initialized";
    private final ZkUtils zkUtils;
    private final String lockPath;
    private final String participantId;
    private final ZkKeyBuilder keyBuilder;
    private String nodePath = null;
    private Object mutex;

    /* loaded from: input_file:org/apache/samza/zk/ZkDistributedLock$ParticipantChangeHandler.class */
    class ParticipantChangeHandler extends ZkUtils.GenerationAwareZkChildListener {
        public ParticipantChangeHandler(ZkUtils zkUtils) {
            super(zkUtils, "ParticipantChangeHandler");
        }

        @Override // org.apache.samza.zk.ZkUtils.GenerationAwareZkChildListener
        public void doHandleChildChange(String str, List<String> list) throws Exception {
            synchronized (ZkDistributedLock.this.mutex) {
                if (list == null) {
                    ZkDistributedLock.LOG.warn("handleChildChange on path " + str + " was invoked with NULL list of children");
                } else {
                    ZkDistributedLock.LOG.info("ParticipantChangeHandler::handleChildChange - Path: {} Current Children: {} ", str, list);
                    ZkDistributedLock.this.mutex.notify();
                }
            }
        }
    }

    public ZkDistributedLock(String str, ZkUtils zkUtils, String str2) {
        this.zkUtils = zkUtils;
        this.participantId = str;
        this.keyBuilder = zkUtils.getKeyBuilder();
        this.lockPath = String.format("%s/lock_%s", this.keyBuilder.getRootPath(), str2);
        zkUtils.validatePaths(new String[]{this.lockPath});
        this.mutex = new Object();
        zkUtils.getZkClient().subscribeChildChanges(this.lockPath, new ParticipantChangeHandler(zkUtils));
    }

    @Override // org.apache.samza.coordinator.DistributedLock
    public boolean lock(Duration duration) {
        this.nodePath = this.zkUtils.getZkClient().createEphemeralSequential(this.lockPath + "/", this.participantId);
        long currentTimeMillis = System.currentTimeMillis();
        long millis = duration.toMillis();
        while (System.currentTimeMillis() - currentTimeMillis < millis) {
            synchronized (this.mutex) {
                List children = this.zkUtils.getZkClient().getChildren(this.lockPath);
                int indexOf = children.indexOf(ZkKeyBuilder.parseIdFromPath(this.nodePath));
                if (children.size() == 0 || indexOf == -1) {
                    throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
                }
                if (indexOf == 0) {
                    LOG.info("Acquired lock for participant id: {}", this.participantId);
                    return true;
                }
                try {
                    this.mutex.wait(millis);
                } catch (InterruptedException e) {
                    Thread.interrupted();
                }
                LOG.info("Trying to acquire lock again...");
            }
        }
        LOG.info("Failed to acquire lock within {} milliseconds.", Long.valueOf(millis));
        return false;
    }

    @Override // org.apache.samza.coordinator.DistributedLock
    public void unlock() {
        if (this.nodePath == null) {
            LOG.warn("Ephemeral lock node you want to delete doesn't exist");
            return;
        }
        this.zkUtils.getZkClient().delete(this.nodePath);
        this.nodePath = null;
        LOG.info("Ephemeral lock node deleted. Unlocked!");
    }
}
