/*
 * Decompiled with CFR 0.152.
 */
package org.apache.zookeeper.server.quorum;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.TxnLogProposalIterator;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperThread;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
import org.apache.zookeeper.server.quorum.LearnerSnapshot;
import org.apache.zookeeper.server.quorum.LearnerSyncRequest;
import org.apache.zookeeper.server.quorum.QuorumPacket;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.SnapshotThrottleException;
import org.apache.zookeeper.server.quorum.StateSummary;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class LearnerHandler
extends ZooKeeperThread {
    private static final Logger LOG = LoggerFactory.getLogger(LearnerHandler.class);
    protected final Socket sock;
    final Leader leader;
    volatile long tickOfNextAckDeadline;
    protected long sid = 0L;
    protected int version = 1;
    final LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue();
    private SyncLimitCheck syncLimitCheck = new SyncLimitCheck();
    private BinaryInputArchive ia;
    private BinaryOutputArchive oa;
    private BufferedOutputStream bufferedOutput;
    private volatile boolean sendingThreadStarted = false;
    public static final String FORCE_SNAP_SYNC = "zookeeper.forceSnapshotSync";
    private boolean forceSnapSync = false;
    private boolean needOpPacket = true;
    private long leaderLastZxid;
    final QuorumPacket proposalOfDeath = new QuorumPacket();
    private QuorumPeer.LearnerType learnerType = QuorumPeer.LearnerType.PARTICIPANT;

    public Socket getSocket() {
        return this.sock;
    }

    long getSid() {
        return this.sid;
    }

    int getVersion() {
        return this.version;
    }

    LearnerHandler(Socket sock, Leader leader) throws IOException {
        super("LearnerHandler-" + sock.getRemoteSocketAddress());
        this.sock = sock;
        this.leader = leader;
        leader.addLearnerHandler(this);
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("LearnerHandler ").append(this.sock);
        sb.append(" tickOfNextAckDeadline:").append(this.tickOfNextAckDeadline());
        sb.append(" synced?:").append(this.synced());
        sb.append(" queuedPacketLength:").append(this.queuedPackets.size());
        return sb.toString();
    }

    public QuorumPeer.LearnerType getLearnerType() {
        return this.learnerType;
    }

    private void sendPackets() throws InterruptedException {
        block10: {
            long traceMask = 16L;
            try {
                while (true) {
                    QuorumPacket p;
                    if ((p = this.queuedPackets.poll()) == null) {
                        this.bufferedOutput.flush();
                        p = this.queuedPackets.take();
                    }
                    if (p != this.proposalOfDeath) {
                        if (p.getType() == 5) {
                            traceMask = 128L;
                        }
                        if (p.getType() == 2) {
                            this.syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
                        }
                        if (LOG.isTraceEnabled()) {
                            ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
                        }
                        this.oa.writeRecord(p, "packet");
                        continue;
                    }
                    break;
                }
            }
            catch (IOException e) {
                if (this.sock.isClosed()) break block10;
                LOG.warn("Unexpected exception at " + this, (Throwable)e);
                try {
                    this.sock.close();
                }
                catch (IOException ie) {
                    LOG.warn("Error closing socket for handler " + this, (Throwable)ie);
                }
            }
        }
    }

    public static String packetToString(QuorumPacket p) {
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public void run() {
        block48: {
            block47: {
                Object object;
                StringBuilder stringBuilder;
                block46: {
                    try {
                        try {
                            QuorumPacket newLeaderQP;
                            this.tickOfNextAckDeadline = this.leader.self.tick.get() + this.leader.self.initLimit + this.leader.self.syncLimit;
                            this.ia = BinaryInputArchive.getArchive(new BufferedInputStream(this.sock.getInputStream()));
                            this.bufferedOutput = new BufferedOutputStream(this.sock.getOutputStream());
                            this.oa = BinaryOutputArchive.getArchive(this.bufferedOutput);
                            QuorumPacket qp = new QuorumPacket();
                            this.ia.readRecord(qp, "packet");
                            if (qp.getType() != 11 && qp.getType() != 16) {
                                LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!");
                                Object var32_5 = null;
                                stringBuilder = new StringBuilder().append("******* GOODBYE ");
                                object = this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>";
                                break block46;
                            }
                            byte[] learnerInfoData = qp.getData();
                            if (learnerInfoData != null) {
                                long configVersion;
                                ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                                if (learnerInfoData.length >= 8) {
                                    this.sid = bbsid.getLong();
                                }
                                if (learnerInfoData.length >= 12) {
                                    this.version = bbsid.getInt();
                                }
                                if (learnerInfoData.length >= 20 && (configVersion = bbsid.getLong()) > this.leader.self.getQuorumVerifier().getVersion()) {
                                    throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
                                }
                            } else {
                                this.sid = this.leader.followerCounter.getAndDecrement();
                            }
                            if (this.leader.self.getView().containsKey(this.sid)) {
                                LOG.info("Follower sid: " + this.sid + " : info : " + this.leader.self.getView().get(this.sid).toString());
                            } else {
                                LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(this.leader.self.getQuorumVerifier().getVersion()));
                            }
                            if (qp.getType() == 16) {
                                this.learnerType = QuorumPeer.LearnerType.OBSERVER;
                            }
                            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
                            StateSummary ss = null;
                            long zxid = qp.getZxid();
                            long newEpoch = this.leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
                            long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0L);
                            if (this.getVersion() < 65536) {
                                long epoch = ZxidUtils.getEpochFromZxid(zxid);
                                ss = new StateSummary(epoch, zxid);
                                this.leader.waitForEpochAck(this.getSid(), ss);
                            } else {
                                byte[] ver = new byte[4];
                                ByteBuffer.wrap(ver).putInt(65536);
                                QuorumPacket newEpochPacket = new QuorumPacket(17, newLeaderZxid, ver, null);
                                this.oa.writeRecord(newEpochPacket, "packet");
                                this.bufferedOutput.flush();
                                QuorumPacket ackEpochPacket = new QuorumPacket();
                                this.ia.readRecord(ackEpochPacket, "packet");
                                if (ackEpochPacket.getType() != 18) {
                                    LOG.error(ackEpochPacket.toString() + " is not ACKEPOCH");
                                    break block47;
                                }
                                ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                                ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
                                this.leader.waitForEpochAck(this.getSid(), ss);
                            }
                            long peerLastZxid = ss.getLastZxid();
                            boolean needSnap = this.syncFollower(peerLastZxid, this.leader.zk.getZKDatabase(), this.leader);
                            LOG.debug("Sending NEWLEADER message to " + this.sid);
                            if (this.getVersion() < 65536) {
                                newLeaderQP = new QuorumPacket(10, newLeaderZxid, null, null);
                                this.oa.writeRecord(newLeaderQP, "packet");
                            } else {
                                newLeaderQP = new QuorumPacket(10, newLeaderZxid, this.leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
                                this.queuedPackets.add(newLeaderQP);
                            }
                            this.bufferedOutput.flush();
                            if (needSnap) {
                                boolean exemptFromThrottle = this.getLearnerType() != QuorumPeer.LearnerType.OBSERVER;
                                LearnerSnapshot snapshot = this.leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
                                try {
                                    long zxidToSend = this.leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                                    this.oa.writeRecord(new QuorumPacket(15, zxidToSend, null, null), "packet");
                                    this.bufferedOutput.flush();
                                    LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, send zxid of db as 0x{}, {} concurrent snapshots, snapshot was {} from throttle", new Object[]{Long.toHexString(peerLastZxid), Long.toHexString(this.leaderLastZxid), Long.toHexString(zxidToSend), snapshot.getConcurrentSnapshotNumber(), snapshot.isEssential() ? "exempt" : "not exempt"});
                                    this.leader.zk.getZKDatabase().serializeSnapshot(this.oa);
                                    this.oa.writeString("BenWasHere", "signature");
                                    this.bufferedOutput.flush();
                                    Object var20_31 = null;
                                    snapshot.close();
                                }
                                catch (Throwable throwable) {
                                    Object var20_32 = null;
                                    snapshot.close();
                                    throw throwable;
                                }
                            }
                            this.startSendingPackets();
                            qp = new QuorumPacket();
                            this.ia.readRecord(qp, "packet");
                            if (qp.getType() != 3) {
                                LOG.error("Next packet was supposed to be an ACK");
                                break block48;
                            }
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Received NEWLEADER-ACK message from " + this.sid);
                            }
                            this.leader.waitForNewLeaderAck(this.getSid(), qp.getZxid(), this.getLearnerType());
                            this.syncLimitCheck.start();
                            this.sock.setSoTimeout(this.leader.self.tickTime * this.leader.self.syncLimit);
                            LeaderZooKeeperServer exemptFromThrottle = this.leader.zk;
                            synchronized (exemptFromThrottle) {
                                while (!this.leader.zk.isRunning() && !this.isInterrupted()) {
                                    this.leader.zk.wait(20L);
                                }
                            }
                            LOG.debug("Sending UPTODATE message to " + this.sid);
                            this.queuedPackets.add(new QuorumPacket(12, -1L, null, null));
                            block21: while (true) {
                                qp = new QuorumPacket();
                                this.ia.readRecord(qp, "packet");
                                long traceMask = 16L;
                                if (qp.getType() == 5) {
                                    traceMask = 128L;
                                }
                                if (LOG.isTraceEnabled()) {
                                    ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
                                }
                                this.tickOfNextAckDeadline = this.leader.self.tick.get() + this.leader.self.syncLimit;
                                switch (qp.getType()) {
                                    case 3: {
                                        if (this.learnerType == QuorumPeer.LearnerType.OBSERVER && LOG.isDebugEnabled()) {
                                            LOG.debug("Received ACK from Observer  " + this.sid);
                                        }
                                        this.syncLimitCheck.updateAck(qp.getZxid());
                                        this.leader.processAck(this.sid, qp.getZxid(), this.sock.getLocalSocketAddress());
                                        break;
                                    }
                                    case 5: {
                                        int to;
                                        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
                                        DataInputStream dis = new DataInputStream(bis);
                                        while (dis.available() > 0) {
                                            long sess = dis.readLong();
                                            to = dis.readInt();
                                            this.leader.zk.touch(sess, to);
                                        }
                                        continue block21;
                                    }
                                    case 6: {
                                        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
                                        DataInputStream dis = new DataInputStream(bis);
                                        long id = dis.readLong();
                                        int to = dis.readInt();
                                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
                                        DataOutputStream dos = new DataOutputStream(bos);
                                        dos.writeLong(id);
                                        boolean valid = this.leader.zk.checkIfValidGlobalSession(id, to);
                                        if (valid) {
                                            try {
                                                this.leader.zk.setOwner(id, this);
                                            }
                                            catch (KeeperException.SessionExpiredException e) {
                                                LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", (Throwable)e);
                                            }
                                        }
                                        if (LOG.isTraceEnabled()) {
                                            ZooTrace.logTraceMessage(LOG, 32L, "Session 0x" + Long.toHexString(id) + " is valid: " + valid);
                                        }
                                        dos.writeBoolean(valid);
                                        qp.setData(bos.toByteArray());
                                        this.queuedPackets.add(qp);
                                        break;
                                    }
                                    case 1: {
                                        ByteBuffer bb = ByteBuffer.wrap(qp.getData());
                                        long sessionId = bb.getLong();
                                        int cxid = bb.getInt();
                                        int type = bb.getInt();
                                        bb = bb.slice();
                                        Request si = type == 9 ? new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo()) : new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                                        si.setOwner(this);
                                        this.leader.zk.submitLearnerRequest(si);
                                        break;
                                    }
                                }
                            }
                        }
                        catch (IOException e) {
                            if (this.sock != null && !this.sock.isClosed()) {
                                LOG.error("Unexpected exception causing shutdown while sock still open", (Throwable)e);
                                try {
                                    this.sock.close();
                                }
                                catch (IOException iOException) {
                                }
                            }
                            Object var32_8 = null;
                            LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
                            this.shutdown();
                            return;
                        }
                        catch (InterruptedException e) {
                            LOG.error("Unexpected exception causing shutdown", (Throwable)e);
                            Object var32_9 = null;
                            LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
                            this.shutdown();
                            return;
                        }
                        catch (SnapshotThrottleException e) {
                            LOG.error("too many concurrent snapshots: " + e);
                            Object var32_10 = null;
                            LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
                            this.shutdown();
                            return;
                        }
                    }
                    catch (Throwable throwable) {
                        Object var32_11 = null;
                        LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
                        this.shutdown();
                        throw throwable;
                    }
                }
                LOG.warn(stringBuilder.append(object).append(" ********").toString());
                this.shutdown();
                return;
            }
            Object var32_6 = null;
            LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
            this.shutdown();
            return;
        }
        Object var32_7 = null;
        LOG.warn("******* GOODBYE " + (this.sock != null ? this.sock.getRemoteSocketAddress() : "<null>") + " ********");
        this.shutdown();
    }

    protected void startSendingPackets() {
        if (!this.sendingThreadStarted) {
            new Thread(){

                public void run() {
                    Thread.currentThread().setName("Sender-" + LearnerHandler.this.sock.getRemoteSocketAddress());
                    try {
                        LearnerHandler.this.sendPackets();
                    }
                    catch (InterruptedException e) {
                        LOG.warn("Unexpected interruption " + e.getMessage());
                    }
                }
            }.start();
            this.sendingThreadStarted = true;
        } else {
            LOG.error("Attempting to start sending thread after it already started");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
        boolean isPeerNewEpochZxid = (peerLastZxid & 0xFFFFFFFFL) == 0L;
        long currentZxid = peerLastZxid;
        boolean needSnap = true;
        boolean txnLogSyncEnabled = db.getSnapshotSizeFactor() >= 0.0;
        ReentrantReadWriteLock lock = db.getLogLock();
        ReentrantReadWriteLock.ReadLock rl = lock.readLock();
        try {
            rl.lock();
            long maxCommittedLog = db.getmaxCommittedLog();
            long minCommittedLog = db.getminCommittedLog();
            long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
            LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{} minCommittedLog=0x{} lastProcessedZxid=0x{} peerLastZxid=0x{}", new Object[]{this.getSid(), Long.toHexString(maxCommittedLog), Long.toHexString(minCommittedLog), Long.toHexString(lastProcessedZxid), Long.toHexString(peerLastZxid)});
            if (db.getCommittedLog().isEmpty()) {
                minCommittedLog = lastProcessedZxid;
                maxCommittedLog = lastProcessedZxid;
            }
            if (this.forceSnapSync) {
                LOG.warn("Forcing snapshot sync - should not see this in production");
            } else if (lastProcessedZxid == peerLastZxid) {
                LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) + " for peer sid: " + this.getSid());
                this.queueOpPacket(13, peerLastZxid);
                this.needOpPacket = false;
                needSnap = false;
            } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
                LOG.debug("Sending TRUNC to follower zxidToSend=0x" + Long.toHexString(maxCommittedLog) + " for peer sid:" + this.getSid());
                this.queueOpPacket(14, maxCommittedLog);
                currentZxid = maxCommittedLog;
                this.needOpPacket = false;
                needSnap = false;
            } else if (maxCommittedLog >= peerLastZxid && minCommittedLog <= peerLastZxid) {
                LOG.info("Using committedLog for peer sid: " + this.getSid());
                Iterator<Leader.Proposal> itr = db.getCommittedLog().iterator();
                currentZxid = this.queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog);
                needSnap = false;
            } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
                long sizeLimit = db.calculateTxnLogSizeLimit();
                Iterator<Leader.Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);
                if (txnLogItr.hasNext()) {
                    LOG.info("Use txnlog and committedLog for peer sid: " + this.getSid());
                    currentZxid = this.queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);
                    LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
                    Iterator<Leader.Proposal> committedLogItr = db.getCommittedLog().iterator();
                    currentZxid = this.queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog);
                    needSnap = false;
                }
                if (txnLogItr instanceof TxnLogProposalIterator) {
                    TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator)txnLogItr;
                    txnProposalItr.close();
                }
            } else {
                LOG.warn("Unhandled scenario for peer sid: " + this.getSid());
            }
            LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) + " for peer sid: " + this.getSid());
            this.leaderLastZxid = leader.startForwarding(this, currentZxid);
            Object var23_17 = null;
            rl.unlock();
        }
        catch (Throwable throwable) {
            Object var23_18 = null;
            rl.unlock();
            throw throwable;
        }
        if (this.needOpPacket && !needSnap) {
            LOG.error("Unhandled scenario for peer sid: " + this.getSid() + " fall back to use snapshot");
            needSnap = true;
        }
        return needSnap;
    }

    protected long queueCommittedProposals(Iterator<Leader.Proposal> itr, long peerLastZxid, Long maxZxid, Long lastCommitedZxid) {
        boolean isPeerNewEpochZxid = (peerLastZxid & 0xFFFFFFFFL) == 0L;
        long queuedZxid = peerLastZxid;
        long prevProposalZxid = -1L;
        while (itr.hasNext()) {
            Leader.Proposal propose = itr.next();
            long packetZxid = propose.packet.getZxid();
            if (maxZxid != null && packetZxid > maxZxid) break;
            if (packetZxid < peerLastZxid) {
                prevProposalZxid = packetZxid;
                continue;
            }
            if (this.needOpPacket) {
                if (packetZxid == peerLastZxid) {
                    LOG.info("Sending DIFF zxid=0x" + Long.toHexString(lastCommitedZxid) + " for peer sid: " + this.getSid());
                    this.queueOpPacket(13, lastCommitedZxid);
                    this.needOpPacket = false;
                    continue;
                }
                if (isPeerNewEpochZxid) {
                    LOG.info("Sending DIFF zxid=0x" + Long.toHexString(lastCommitedZxid) + " for peer sid: " + this.getSid());
                    this.queueOpPacket(13, lastCommitedZxid);
                    this.needOpPacket = false;
                } else if (packetZxid > peerLastZxid) {
                    if (ZxidUtils.getEpochFromZxid(packetZxid) != ZxidUtils.getEpochFromZxid(peerLastZxid)) {
                        LOG.warn("Cannot send TRUNC to peer sid: " + this.getSid() + " peer zxid is from different epoch");
                        return queuedZxid;
                    }
                    LOG.info("Sending TRUNC zxid=0x" + Long.toHexString(prevProposalZxid) + " for peer sid: " + this.getSid());
                    this.queueOpPacket(14, prevProposalZxid);
                    this.needOpPacket = false;
                }
            }
            if (packetZxid <= queuedZxid) continue;
            this.queuePacket(propose.packet);
            this.queueOpPacket(4, packetZxid);
            queuedZxid = packetZxid;
        }
        if (this.needOpPacket && isPeerNewEpochZxid) {
            LOG.info("Sending DIFF zxid=0x" + Long.toHexString(lastCommitedZxid) + " for peer sid: " + this.getSid());
            this.queueOpPacket(13, lastCommitedZxid);
            this.needOpPacket = false;
        }
        return queuedZxid;
    }

    public void shutdown() {
        try {
            this.queuedPackets.put(this.proposalOfDeath);
        }
        catch (InterruptedException e) {
            LOG.warn("Ignoring unexpected exception", (Throwable)e);
        }
        try {
            if (this.sock != null && !this.sock.isClosed()) {
                this.sock.close();
            }
        }
        catch (IOException e) {
            LOG.warn("Ignoring unexpected exception during socket close", (Throwable)e);
        }
        this.interrupt();
        this.leader.removeLearnerHandler(this);
    }

    public long tickOfNextAckDeadline() {
        return this.tickOfNextAckDeadline;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void ping() {
        if (!this.sendingThreadStarted) {
            return;
        }
        if (this.syncLimitCheck.check(System.nanoTime())) {
            long id;
            Leader leader = this.leader;
            synchronized (leader) {
                id = this.leader.lastProposed;
            }
            QuorumPacket ping = new QuorumPacket(5, id, null, null);
            this.queuePacket(ping);
        } else {
            LOG.warn("Closing connection to peer due to transaction timeout.");
            this.shutdown();
        }
    }

    private void queueOpPacket(int type, long zxid) {
        QuorumPacket packet = new QuorumPacket(type, zxid, null, null);
        this.queuePacket(packet);
    }

    void queuePacket(QuorumPacket p) {
        this.queuedPackets.add(p);
    }

    public boolean synced() {
        return this.isAlive() && (long)this.leader.self.tick.get() <= this.tickOfNextAckDeadline;
    }

    public Queue<QuorumPacket> getQueuedPackets() {
        return this.queuedPackets;
    }

    public void setFirstPacket(boolean value) {
        this.needOpPacket = value;
    }

    private class SyncLimitCheck {
        private boolean started = false;
        private long currentZxid = 0L;
        private long currentTime = 0L;
        private long nextZxid = 0L;
        private long nextTime = 0L;

        private SyncLimitCheck() {
        }

        public synchronized void start() {
            this.started = true;
        }

        public synchronized void updateProposal(long zxid, long time) {
            if (!this.started) {
                return;
            }
            if (this.currentTime == 0L) {
                this.currentTime = time;
                this.currentZxid = zxid;
            } else {
                this.nextTime = time;
                this.nextZxid = zxid;
            }
        }

        public synchronized void updateAck(long zxid) {
            if (this.currentZxid == zxid) {
                this.currentTime = this.nextTime;
                this.currentZxid = this.nextZxid;
                this.nextTime = 0L;
                this.nextZxid = 0L;
            } else if (this.nextZxid == zxid) {
                LOG.warn("ACK for " + zxid + " received before ACK for " + this.currentZxid + "!!!!");
                this.nextTime = 0L;
                this.nextZxid = 0L;
            }
        }

        public synchronized boolean check(long time) {
            if (this.currentTime == 0L) {
                return true;
            }
            long msDelay = (time - this.currentTime) / 1000000L;
            return msDelay < (long)(LearnerHandler.this.leader.self.tickTime * LearnerHandler.this.leader.self.syncLimit);
        }
    }
}

