/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SegmentCompletionManager {
    public static Logger LOGGER = LoggerFactory.getLogger(SegmentCompletionManager.class);
    private final HelixManager _helixManager;
    private final Map<String, SegmentCompletionFSM> _fsmMap = new ConcurrentHashMap<String, SegmentCompletionFSM>();
    private final Map<String, Long> _commitTimeMap = new ConcurrentHashMap<String, Long>();
    private final PinotLLCRealtimeSegmentManager _segmentManager;
    private final ControllerMetrics _controllerMetrics;
    private final LeadControllerManager _leadControllerManager;
    private final Lock[] _fsmLocks;
    private static final int NUM_FSM_LOCKS = 20;
    private static final int MAX_COMMIT_TIME_FOR_ALL_SEGMENTS_SECONDS = 1800;

    public static int getMaxCommitTimeForAllSegmentsSeconds() {
        return 1800;
    }

    public SegmentCompletionManager(HelixManager helixManager, PinotLLCRealtimeSegmentManager segmentManager, ControllerMetrics controllerMetrics, LeadControllerManager leadControllerManager, int segmentCommitTimeoutSeconds) {
        this._helixManager = helixManager;
        this._segmentManager = segmentManager;
        this._controllerMetrics = controllerMetrics;
        this._leadControllerManager = leadControllerManager;
        SegmentCompletionProtocol.setMaxSegmentCommitTimeMs((long)TimeUnit.MILLISECONDS.convert(segmentCommitTimeoutSeconds, TimeUnit.SECONDS));
        this._fsmLocks = new Lock[20];
        for (int i = 0; i < 20; ++i) {
            this._fsmLocks[i] = new ReentrantLock();
        }
    }

    public boolean isSplitCommitEnabled() {
        return this._segmentManager.getIsSplitCommitEnabled();
    }

    public String getControllerVipUrl() {
        return this._segmentManager.getControllerVipUrl();
    }

    protected long getCurrentTimeMs() {
        return System.currentTimeMillis();
    }

    private SegmentCompletionFSM lookupOrCreateFsm(LLCSegmentName segmentName, String msgType) {
        SegmentCompletionFSM fsm;
        String segmentNameStr = segmentName.getSegmentName();
        int lockIndex = (segmentNameStr.hashCode() & Integer.MAX_VALUE) % 20;
        Lock lock = this._fsmLocks[lockIndex];
        try {
            lock.lock();
            fsm = this._fsmMap.get(segmentNameStr);
            if (fsm == null) {
                String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(segmentName.getTableName());
                LLCRealtimeSegmentZKMetadata segmentMetadata = this._segmentManager.getSegmentZKMetadata(realtimeTableName, segmentName.getSegmentName(), null);
                if (segmentMetadata.getStatus().equals((Object)CommonConstants.Segment.Realtime.Status.DONE)) {
                    long endOffset = segmentMetadata.getEndOffset();
                    fsm = SegmentCompletionFSM.fsmInCommit(this._segmentManager, this, segmentName, segmentMetadata.getNumReplicas(), endOffset);
                } else {
                    fsm = msgType.equals("segmentStoppedConsuming") ? SegmentCompletionFSM.fsmStoppedConsuming(this._segmentManager, this, segmentName, segmentMetadata.getNumReplicas()) : SegmentCompletionFSM.fsmInHolding(this._segmentManager, this, segmentName, segmentMetadata.getNumReplicas());
                }
                LOGGER.info("Created FSM {}", (Object)fsm);
                this._fsmMap.put(segmentNameStr, fsm);
            }
        }
        catch (Exception e) {
            LOGGER.error("Exception getting FSM for segment {}", (Object)segmentNameStr, (Object)e);
            throw new RuntimeException("Exception getting FSM for segment " + segmentNameStr, e);
        }
        finally {
            lock.unlock();
        }
        return fsm;
    }

    public SegmentCompletionProtocol.Response segmentConsumed(SegmentCompletionProtocol.Request.Params reqParams) {
        String segmentNameStr = reqParams.getSegmentName();
        String tableName = segmentNameStr.split("__")[0];
        if (!this.isLeader(tableName) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        String instanceId = reqParams.getInstanceId();
        String stopReason = reqParams.getReason();
        long offset = reqParams.getOffset();
        LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        SegmentCompletionFSM fsm = null;
        try {
            fsm = this.lookupOrCreateFsm(segmentName, "segmentConsumed");
            response = fsm.segmentConsumed(instanceId, offset, stopReason);
        }
        catch (Exception e) {
            LOGGER.error("Caught exception in segmentConsumed for segment {}", (Object)segmentNameStr, (Object)e);
        }
        if (fsm != null && fsm.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", (Object)fsm.toString());
            this._fsmMap.remove(segmentNameStr);
        }
        return response;
    }

    public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params reqParams) {
        String segmentNameStr = reqParams.getSegmentName();
        String tableName = segmentNameStr.split("__")[0];
        if (!this.isLeader(tableName) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        String instanceId = reqParams.getInstanceId();
        long offset = reqParams.getOffset();
        LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
        SegmentCompletionFSM fsm = null;
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        try {
            fsm = this.lookupOrCreateFsm(segmentName, "segmentCommit");
            response = fsm.segmentCommitStart(instanceId, offset);
        }
        catch (Exception e) {
            LOGGER.error("Caught exception in segmentCommitStart for segment {}", (Object)segmentNameStr, (Object)e);
        }
        if (fsm != null && fsm.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", (Object)fsm.toString());
            this._fsmMap.remove(segmentNameStr);
        }
        return response;
    }

    public SegmentCompletionProtocol.Response extendBuildTime(SegmentCompletionProtocol.Request.Params reqParams) {
        String segmentNameStr = reqParams.getSegmentName();
        String tableName = segmentNameStr.split("__")[0];
        if (!this.isLeader(tableName) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        String instanceId = reqParams.getInstanceId();
        long offset = reqParams.getOffset();
        int extTimeSec = reqParams.getExtraTimeSec();
        LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
        SegmentCompletionFSM fsm = null;
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        try {
            fsm = this.lookupOrCreateFsm(segmentName, "segmentCommit");
            response = fsm.extendBuildTime(instanceId, offset, extTimeSec);
        }
        catch (Exception e) {
            LOGGER.error("Caught exception in extendBuildTime for segment {}", (Object)segmentNameStr, (Object)e);
        }
        if (fsm != null && fsm.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", (Object)fsm.toString());
            this._fsmMap.remove(segmentNameStr);
        }
        return response;
    }

    public SegmentCompletionProtocol.Response segmentStoppedConsuming(SegmentCompletionProtocol.Request.Params reqParams) {
        String segmentNameStr = reqParams.getSegmentName();
        String tableName = segmentNameStr.split("__")[0];
        if (!this.isLeader(tableName) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        String instanceId = reqParams.getInstanceId();
        long offset = reqParams.getOffset();
        String reason = reqParams.getReason();
        LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
        SegmentCompletionFSM fsm = null;
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        try {
            fsm = this.lookupOrCreateFsm(segmentName, "segmentStoppedConsuming");
            response = fsm.stoppedConsuming(instanceId, offset, reason);
        }
        catch (Exception e) {
            LOGGER.error("Caught exception in segmentStoppedConsuming for segment {}", (Object)segmentNameStr, (Object)e);
        }
        if (fsm != null && fsm.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", (Object)fsm.toString());
            this._fsmMap.remove(segmentNameStr);
        }
        return response;
    }

    public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
        String segmentNameStr = reqParams.getSegmentName();
        String tableName = segmentNameStr.split("__")[0];
        if (!this.isLeader(tableName) || !this._helixManager.isConnected()) {
            return SegmentCompletionProtocol.RESP_NOT_LEADER;
        }
        LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
        SegmentCompletionFSM fsm = null;
        SegmentCompletionProtocol.Response response = SegmentCompletionProtocol.RESP_FAILED;
        try {
            fsm = this.lookupOrCreateFsm(segmentName, "segmentCommit");
            response = fsm.segmentCommitEnd(reqParams, success, isSplitCommit, committingSegmentDescriptor);
        }
        catch (Exception e) {
            LOGGER.error("Caught exception in segmentCommitEnd for segment {}", (Object)segmentNameStr, (Object)e);
        }
        if (fsm != null && fsm.isDone()) {
            LOGGER.info("Removing FSM (if present):{}", (Object)fsm.toString());
            this._fsmMap.remove(segmentNameStr);
        }
        return response;
    }

    @VisibleForTesting
    protected boolean isLeader(String tableName) {
        return this._leadControllerManager.isLeaderForTable(tableName);
    }

    private static class SegmentCompletionFSM {
        private static final long MAX_TIME_TO_PICK_WINNER_MS = 3300L;
        private static final long MAX_TIME_TO_NOTIFY_WINNER_MS = 6600L;
        public final Logger LOGGER;
        State _state = State.HOLDING;
        final long _startTimeMs;
        private final LLCSegmentName _segmentName;
        private final String _rawTableName;
        private final String _realtimeTableName;
        private final int _numReplicas;
        private final Set<String> _excludedServerStateMap;
        private final Map<String, Long> _commitStateMap;
        private long _winningOffset = -1L;
        private String _winner;
        private final PinotLLCRealtimeSegmentManager _segmentManager;
        private final SegmentCompletionManager _segmentCompletionManager;
        private final long _maxTimeToPickWinnerMs;
        private final long _maxTimeToNotifyWinnerMs;
        private final long _initialCommitTimeMs;
        private long _maxTimeAllowedToCommitMs;
        private final boolean _isSplitCommitEnabled;
        private final String _controllerVipUrl;

        public static SegmentCompletionFSM fsmInHolding(PinotLLCRealtimeSegmentManager segmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, int numReplicas) {
            return new SegmentCompletionFSM(segmentManager, segmentCompletionManager, segmentName, numReplicas);
        }

        public static SegmentCompletionFSM fsmInCommit(PinotLLCRealtimeSegmentManager segmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, int numReplicas, long winningOffset) {
            return new SegmentCompletionFSM(segmentManager, segmentCompletionManager, segmentName, numReplicas, winningOffset);
        }

        public static SegmentCompletionFSM fsmStoppedConsuming(PinotLLCRealtimeSegmentManager segmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, int numReplicas) {
            SegmentCompletionFSM fsm = new SegmentCompletionFSM(segmentManager, segmentCompletionManager, segmentName, numReplicas);
            fsm._state = State.PARTIAL_CONSUMING;
            return fsm;
        }

        private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, int numReplicas) {
            this._segmentName = segmentName;
            this._rawTableName = this._segmentName.getTableName();
            this._realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(this._rawTableName);
            this._numReplicas = numReplicas;
            this._segmentManager = segmentManager;
            this._commitStateMap = new HashMap<String, Long>(this._numReplicas);
            this._excludedServerStateMap = new HashSet<String>(this._numReplicas);
            this._segmentCompletionManager = segmentCompletionManager;
            this._startTimeMs = this._segmentCompletionManager.getCurrentTimeMs();
            this._maxTimeToPickWinnerMs = this._startTimeMs + 3300L;
            this._maxTimeToNotifyWinnerMs = this._startTimeMs + 6600L;
            long initialCommitTimeMs = 6600L + this._segmentManager.getCommitTimeoutMS(this._realtimeTableName);
            Long savedCommitTime = (Long)this._segmentCompletionManager._commitTimeMap.get(this._rawTableName);
            if (savedCommitTime != null && savedCommitTime > initialCommitTimeMs) {
                initialCommitTimeMs = savedCommitTime;
            }
            this.LOGGER = LoggerFactory.getLogger((String)("SegmentCompletionFSM_" + segmentName.getSegmentName()));
            if (initialCommitTimeMs > 1800000L) {
                this.LOGGER.info("Configured max commit time {}s too high for table {}, changing to {}s", new Object[]{initialCommitTimeMs / 1000L, this._realtimeTableName, 1800});
                initialCommitTimeMs = 1800000L;
            }
            this._initialCommitTimeMs = initialCommitTimeMs;
            this._maxTimeAllowedToCommitMs = this._startTimeMs + this._initialCommitTimeMs;
            this._isSplitCommitEnabled = segmentCompletionManager.isSplitCommitEnabled();
            this._controllerVipUrl = segmentCompletionManager.getControllerVipUrl();
        }

        private SegmentCompletionFSM(PinotLLCRealtimeSegmentManager segmentManager, SegmentCompletionManager segmentCompletionManager, LLCSegmentName segmentName, int numReplicas, long winningOffset) {
            this(segmentManager, segmentCompletionManager, segmentName, numReplicas);
            this._state = State.COMMITTED;
            this._winningOffset = winningOffset;
            this._winner = "UNKNOWN";
        }

        public String toString() {
            return "{" + this._segmentName.getSegmentName() + "," + (Object)((Object)this._state) + "," + this._startTimeMs + "," + this._winner + "," + this._winningOffset + "," + this._isSplitCommitEnabled + "," + this._controllerVipUrl + "}";
        }

        public boolean isDone() {
            return this._state.equals((Object)State.COMMITTED) || this._state.equals((Object)State.ABORTED);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SegmentCompletionProtocol.Response segmentConsumed(String instanceId, long offset, String stopReason) {
            long now = this._segmentCompletionManager.getCurrentTimeMs();
            SegmentCompletionFSM segmentCompletionFSM = this;
            synchronized (segmentCompletionFSM) {
                this.LOGGER.info("Processing segmentConsumed({}, {})", (Object)instanceId, (Object)offset);
                if (this._excludedServerStateMap.contains(instanceId)) {
                    this.LOGGER.info("Marking instance {} alive again", (Object)instanceId);
                    this._excludedServerStateMap.remove(instanceId);
                }
                this._commitStateMap.put(instanceId, offset);
                switch (this._state) {
                    case PARTIAL_CONSUMING: {
                        return this.PARTIAL_CONSUMING__consumed(instanceId, offset, now, stopReason);
                    }
                    case HOLDING: {
                        return this.HOLDING__consumed(instanceId, offset, now, stopReason);
                    }
                    case COMMITTER_DECIDED: {
                        return this.COMMITTER_DECIDED__consumed(instanceId, offset, now);
                    }
                    case COMMITTER_NOTIFIED: {
                        return this.COMMITTER_NOTIFIED__consumed(instanceId, offset, now);
                    }
                    case COMMITTER_UPLOADING: {
                        return this.COMMITTER_UPLOADING__consumed(instanceId, offset, now);
                    }
                    case COMMITTING: {
                        return this.COMMITTING__consumed(instanceId, offset, now);
                    }
                    case COMMITTED: {
                        return this.COMMITTED__consumed(instanceId, offset);
                    }
                    case ABORTED: {
                        return this.hold(instanceId, offset);
                    }
                }
                return this.fail(instanceId, offset);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SegmentCompletionProtocol.Response segmentCommitStart(String instanceId, long offset) {
            long now = this._segmentCompletionManager.getCurrentTimeMs();
            if (this._excludedServerStateMap.contains(instanceId)) {
                this.LOGGER.warn("Not accepting commit from {} since it had stoppd consuming", (Object)instanceId);
                return SegmentCompletionProtocol.RESP_FAILED;
            }
            SegmentCompletionFSM segmentCompletionFSM = this;
            synchronized (segmentCompletionFSM) {
                this.LOGGER.info("Processing segmentCommitStart({}, {})", (Object)instanceId, (Object)offset);
                switch (this._state) {
                    case PARTIAL_CONSUMING: {
                        return this.PARTIAL_CONSUMING__commit(instanceId, offset, now);
                    }
                    case HOLDING: {
                        return this.HOLDING__commit(instanceId, offset, now);
                    }
                    case COMMITTER_DECIDED: {
                        return this.COMMITTER_DECIDED__commit(instanceId, offset, now);
                    }
                    case COMMITTER_NOTIFIED: {
                        return this.COMMITTER_NOTIFIED__commit(instanceId, offset, now);
                    }
                    case COMMITTER_UPLOADING: {
                        return this.COMMITTER_UPLOADING__commit(instanceId, offset, now);
                    }
                    case COMMITTING: {
                        return this.COMMITTING__commit(instanceId, offset, now);
                    }
                    case COMMITTED: {
                        return this.COMMITTED__commit(instanceId, offset);
                    }
                    case ABORTED: {
                        return this.hold(instanceId, offset);
                    }
                }
                return this.fail(instanceId, offset);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SegmentCompletionProtocol.Response stoppedConsuming(String instanceId, long offset, String reason) {
            SegmentCompletionFSM segmentCompletionFSM = this;
            synchronized (segmentCompletionFSM) {
                this.LOGGER.info("Processing stoppedConsuming({}, {})", (Object)instanceId, (Object)offset);
                this._excludedServerStateMap.add(instanceId);
                switch (this._state) {
                    case PARTIAL_CONSUMING: {
                        return this.PARTIAL_CONSUMING__stoppedConsuming(instanceId, offset, reason);
                    }
                    case HOLDING: {
                        return this.HOLDING_stoppedConsuming(instanceId, offset, reason);
                    }
                    case COMMITTER_DECIDED: {
                        return this.COMMITTER_DECIDED__stoppedConsuming(instanceId, offset, reason);
                    }
                    case COMMITTER_NOTIFIED: {
                        return this.COMMITTER_NOTIFIED__stoppedConsuming(instanceId, offset, reason);
                    }
                    case COMMITTER_UPLOADING: {
                        return this.COMMITTER_UPLOADING__stoppedConsuming(instanceId, offset, reason);
                    }
                    case COMMITTING: {
                        return this.COMMITTING__stoppedConsuming(instanceId, offset, reason);
                    }
                    case COMMITTED: {
                        return this.COMMITTED__stoppedConsuming(instanceId, offset, reason);
                    }
                    case ABORTED: {
                        this.LOGGER.info("Ignoring StoppedConsuming message from {} in state {}", (Object)instanceId, (Object)this._state);
                        return SegmentCompletionProtocol.RESP_PROCESSED;
                    }
                }
                return this.fail(instanceId, offset);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SegmentCompletionProtocol.Response extendBuildTime(String instanceId, long offset, int extTimeSec) {
            long now = this._segmentCompletionManager.getCurrentTimeMs();
            SegmentCompletionFSM segmentCompletionFSM = this;
            synchronized (segmentCompletionFSM) {
                this.LOGGER.info("Processing extendBuildTime({}, {}, {})", new Object[]{instanceId, offset, extTimeSec});
                switch (this._state) {
                    case PARTIAL_CONSUMING: 
                    case HOLDING: 
                    case COMMITTER_DECIDED: {
                        return this.fail(instanceId, offset);
                    }
                    case COMMITTER_NOTIFIED: {
                        return this.COMMITTER_NOTIFIED__extendBuildlTime(instanceId, offset, extTimeSec, now);
                    }
                }
                return this.fail(instanceId, offset);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SegmentCompletionProtocol.Response segmentCommitEnd(SegmentCompletionProtocol.Request.Params reqParams, boolean success, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
            String instanceId = reqParams.getInstanceId();
            long offset = reqParams.getOffset();
            SegmentCompletionFSM segmentCompletionFSM = this;
            synchronized (segmentCompletionFSM) {
                if (this._excludedServerStateMap.contains(instanceId)) {
                    this.LOGGER.warn("Not accepting commitEnd from {} since it had stoppd consuming", (Object)instanceId);
                    return this.abortAndReturnFailed();
                }
                this.LOGGER.info("Processing segmentCommitEnd({}, {})", (Object)instanceId, (Object)offset);
                if (!this._state.equals((Object)State.COMMITTER_UPLOADING) || !instanceId.equals(this._winner) || offset != this._winningOffset) {
                    this.LOGGER.warn("State change during upload: state={} segment={} winner={} winningOffset={}", new Object[]{this._state, this._segmentName.getSegmentName(), this._winner, this._winningOffset});
                    return this.abortAndReturnFailed();
                }
                if (!success) {
                    this.LOGGER.error("Segment upload failed");
                    return this.abortAndReturnFailed();
                }
                SegmentCompletionProtocol.Response response = this.commitSegment(reqParams, isSplitCommit, committingSegmentDescriptor);
                if (!response.equals(SegmentCompletionProtocol.RESP_COMMIT_SUCCESS)) {
                    return this.abortAndReturnFailed();
                }
                return response;
            }
        }

        private SegmentCompletionProtocol.Response fail(String instanceId, long offset) {
            this.LOGGER.info("{}:FAIL for instance={} offset={}", new Object[]{this._state, instanceId, offset});
            return SegmentCompletionProtocol.RESP_FAILED;
        }

        private SegmentCompletionProtocol.Response commit(String instanceId, long offset) {
            long allowedBuildTimeSec = (this._maxTimeAllowedToCommitMs - this._startTimeMs) / 1000L;
            this.LOGGER.info("{}:COMMIT for instance={} offset={} buldTimeSec={}", new Object[]{this._state, instanceId, offset, allowedBuildTimeSec});
            SegmentCompletionProtocol.Response.Params params = new SegmentCompletionProtocol.Response.Params().withOffset(offset).withBuildTimeSeconds(allowedBuildTimeSec).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT).withSplitCommit(this._isSplitCommitEnabled);
            if (this._isSplitCommitEnabled) {
                params.withControllerVipUrl(this._controllerVipUrl);
            }
            return new SegmentCompletionProtocol.Response(params);
        }

        private SegmentCompletionProtocol.Response discard(String instanceId, long offset) {
            this.LOGGER.warn("{}:DISCARD for instance={} offset={}", new Object[]{this._state, instanceId, offset});
            return SegmentCompletionProtocol.RESP_DISCARD;
        }

        private SegmentCompletionProtocol.Response keep(String instanceId, long offset) {
            this.LOGGER.info("{}:KEEP for instance={} offset={}", new Object[]{this._state, instanceId, offset});
            return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withOffset(offset).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.KEEP));
        }

        private SegmentCompletionProtocol.Response catchup(String instanceId, long offset) {
            this.LOGGER.info("{}:CATCHUP for instance={} offset={}", new Object[]{this._state, instanceId, offset});
            return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withOffset(this._winningOffset).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.CATCH_UP));
        }

        private SegmentCompletionProtocol.Response hold(String instanceId, long offset) {
            this.LOGGER.info("{}:HOLD for instance={} offset={}", new Object[]{this._state, instanceId, offset});
            return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD).withOffset(offset));
        }

        private SegmentCompletionProtocol.Response abortAndReturnHold(long now, String instanceId, long offset) {
            this._state = State.ABORTED;
            this._segmentCompletionManager._controllerMetrics.addMeteredTableValue(this._rawTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1L);
            return this.hold(instanceId, offset);
        }

        private SegmentCompletionProtocol.Response abortAndReturnFailed() {
            this._state = State.ABORTED;
            this._segmentCompletionManager._controllerMetrics.addMeteredTableValue(this._rawTableName, (AbstractMetrics.Meter)ControllerMeter.LLC_STATE_MACHINE_ABORTS, 1L);
            return SegmentCompletionProtocol.RESP_FAILED;
        }

        private SegmentCompletionProtocol.Response abortIfTooLateAndReturnHold(long now, String instanceId, long offset) {
            if (now > this._maxTimeAllowedToCommitMs) {
                this.LOGGER.warn("{}:Aborting FSM (too late) instance={} offset={} now={} start={}", new Object[]{this._state, instanceId, offset, now, this._startTimeMs});
                return this.abortAndReturnHold(now, instanceId, offset);
            }
            return null;
        }

        private int numReplicasToLookFor() {
            return this._numReplicas - this._excludedServerStateMap.size();
        }

        private SegmentCompletionProtocol.Response PARTIAL_CONSUMING__consumed(String instanceId, long offset, long now, String stopReason) {
            this._state = State.HOLDING;
            return this.HOLDING__consumed(instanceId, offset, now, stopReason);
        }

        private SegmentCompletionProtocol.Response PARTIAL_CONSUMING__commit(String instanceId, long offset, long now) {
            return this.processCommitWhileHoldingOrPartialConsuming(instanceId, offset, now);
        }

        private SegmentCompletionProtocol.Response PARTIAL_CONSUMING__stoppedConsuming(String instanceId, long offset, String reason) {
            return this.processStoppedConsuming(instanceId, offset, reason, true);
        }

        private SegmentCompletionProtocol.Response HOLDING__consumed(String instanceId, long offset, long now, String stopReason) {
            SegmentCompletionProtocol.Response response;
            if (this.isWinnerPicked(instanceId, now, stopReason)) {
                if (this._winner.equals(instanceId)) {
                    this.LOGGER.info("{}:Committer notified winner instance={} offset={}", new Object[]{this._state, instanceId, offset});
                    response = this.commit(instanceId, offset);
                    this._state = State.COMMITTER_NOTIFIED;
                } else {
                    this.LOGGER.info("{}:Committer decided winner={} offset={}", new Object[]{this._state, this._winner, this._winningOffset});
                    response = this.catchup(instanceId, offset);
                    this._state = State.COMMITTER_DECIDED;
                }
            } else {
                response = this.hold(instanceId, offset);
            }
            return response;
        }

        private SegmentCompletionProtocol.Response HOLDING__commit(String instanceId, long offset, long now) {
            return this.processCommitWhileHoldingOrPartialConsuming(instanceId, offset, now);
        }

        private SegmentCompletionProtocol.Response HOLDING_stoppedConsuming(String instanceId, long offset, String reason) {
            return this.processStoppedConsuming(instanceId, offset, reason, true);
        }

        private SegmentCompletionProtocol.Response COMMITTER_DECIDED__consumed(String instanceId, long offset, long now) {
            SegmentCompletionProtocol.Response response;
            if (offset > this._winningOffset) {
                this.LOGGER.warn("{}:Aborting FSM (offset larger than winning) instance={} offset={} now={} winning={}", new Object[]{this._state, instanceId, offset, now, this._winningOffset});
                return this.abortAndReturnHold(now, instanceId, offset);
            }
            if (this._winner.equals(instanceId)) {
                if (this._winningOffset == offset) {
                    this.LOGGER.info("{}:Notifying winner instance={} offset={}", new Object[]{this._state, instanceId, offset});
                    response = this.commit(instanceId, offset);
                    this._state = State.COMMITTER_NOTIFIED;
                } else {
                    this.LOGGER.warn("{}:Winner coming back with different offset for instance={} offset={} prevWinnOffset={}", new Object[]{this._state, instanceId, offset, this._winningOffset});
                    response = this.abortAndReturnHold(now, instanceId, offset);
                }
            } else {
                response = offset == this._winningOffset ? this.hold(instanceId, offset) : this.catchup(instanceId, offset);
            }
            if (now > this._maxTimeToNotifyWinnerMs) {
                response = this.abortAndReturnHold(now, instanceId, offset);
            }
            return response;
        }

        private SegmentCompletionProtocol.Response COMMITTER_DECIDED__commit(String instanceId, long offset, long now) {
            return this.processCommitWhileHoldingOrPartialConsuming(instanceId, offset, now);
        }

        private SegmentCompletionProtocol.Response COMMITTER_DECIDED__stoppedConsuming(String instanceId, long offset, String reason) {
            return this.processStoppedConsuming(instanceId, offset, reason, false);
        }

        private SegmentCompletionProtocol.Response COMMITTER_NOTIFIED__consumed(String instanceId, long offset, long now) {
            SegmentCompletionProtocol.Response response = this.abortIfTooLateAndReturnHold(now, instanceId, offset);
            if (response != null) {
                return response;
            }
            if (instanceId.equals(this._winner)) {
                if (offset == this._winningOffset) {
                    response = this.commit(instanceId, offset);
                } else {
                    response = this.discard(instanceId, offset);
                    this.LOGGER.warn("{}:Aborting for instance={} offset={}", new Object[]{this._state, instanceId, offset});
                    this._state = State.ABORTED;
                }
            } else {
                response = offset == this._winningOffset ? this.hold(instanceId, offset) : (offset < this._winningOffset ? this.catchup(instanceId, offset) : this.hold(instanceId, offset));
            }
            return response;
        }

        private SegmentCompletionProtocol.Response COMMITTER_NOTIFIED__commit(String instanceId, long offset, long now) {
            SegmentCompletionProtocol.Response response = null;
            response = this.checkBadCommitRequest(instanceId, offset, now);
            if (response != null) {
                return response;
            }
            this.LOGGER.info("{}:Uploading for instance={} offset={}", new Object[]{this._state, instanceId, offset});
            this._state = State.COMMITTER_UPLOADING;
            long commitTimeMs = now - this._startTimeMs;
            if (commitTimeMs > this._initialCommitTimeMs) {
                this._segmentCompletionManager._commitTimeMap.put(this._segmentName.getTableName(), commitTimeMs);
            }
            return SegmentCompletionProtocol.RESP_COMMIT_CONTINUE;
        }

        private SegmentCompletionProtocol.Response COMMITTER_NOTIFIED__stoppedConsuming(String instanceId, long offset, String reason) {
            return this.processStoppedConsuming(instanceId, offset, reason, false);
        }

        private SegmentCompletionProtocol.Response COMMITTER_NOTIFIED__extendBuildlTime(String instanceId, long offset, int extTimeSec, long now) {
            SegmentCompletionProtocol.Response response = this.abortIfTooLateAndReturnHold(now, instanceId, offset);
            if (response == null) {
                long maxTimeAllowedToCommitMs = now + (long)(extTimeSec * 1000);
                if (maxTimeAllowedToCommitMs > this._startTimeMs + 1800000L) {
                    this.LOGGER.warn("Not accepting lease extension from {} startTime={} requestedTime={}", new Object[]{instanceId, this._startTimeMs, maxTimeAllowedToCommitMs});
                    return this.abortAndReturnFailed();
                }
                this._maxTimeAllowedToCommitMs = maxTimeAllowedToCommitMs;
                response = SegmentCompletionProtocol.RESP_PROCESSED;
            }
            return response;
        }

        private SegmentCompletionProtocol.Response COMMITTER_UPLOADING__consumed(String instanceId, long offset, long now) {
            return this.processConsumedAfterCommitStart(instanceId, offset, now);
        }

        private SegmentCompletionProtocol.Response COMMITTER_UPLOADING__commit(String instanceId, long offset, long now) {
            return this.processCommitWhileUploading(instanceId, offset, now);
        }

        private SegmentCompletionProtocol.Response COMMITTER_UPLOADING__stoppedConsuming(String instanceId, long offset, String reason) {
            return this.processStoppedConsuming(instanceId, offset, reason, false);
        }

        private SegmentCompletionProtocol.Response COMMITTING__consumed(String instanceId, long offset, long now) {
            return this.processConsumedAfterCommitStart(instanceId, offset, now);
        }

        private SegmentCompletionProtocol.Response COMMITTING__commit(String instanceId, long offset, long now) {
            return this.processCommitWhileUploading(instanceId, offset, now);
        }

        private SegmentCompletionProtocol.Response COMMITTING__stoppedConsuming(String instanceId, long offset, String reason) {
            return this.processStoppedConsuming(instanceId, offset, reason, false);
        }

        private SegmentCompletionProtocol.Response COMMITTED__consumed(String instanceId, long offset) {
            SegmentCompletionProtocol.Response response = offset == this._winningOffset ? this.keep(instanceId, offset) : this.discard(instanceId, offset);
            return response;
        }

        private SegmentCompletionProtocol.Response COMMITTED__commit(String instanceId, long offset) {
            if (offset == this._winningOffset) {
                return this.keep(instanceId, offset);
            }
            return this.discard(instanceId, offset);
        }

        private SegmentCompletionProtocol.Response COMMITTED__stoppedConsuming(String instanceId, long offset, String reason) {
            return this.processStoppedConsuming(instanceId, offset, reason, false);
        }

        private SegmentCompletionProtocol.Response processStoppedConsuming(String instanceId, long offset, String reason, boolean createNew) {
            this.LOGGER.info("Instance {} stopped consuming segment {} at offset {}, state {}, createNew: {}, reason:{}", new Object[]{instanceId, this._segmentName, offset, this._state, createNew, reason});
            try {
                this._segmentManager.segmentStoppedConsuming(this._segmentName, instanceId);
            }
            catch (Exception e) {
                this.LOGGER.error("Caught exception while processing stopped CONSUMING segment: {} on instance: {}", new Object[]{this._segmentName.getSegmentName(), instanceId, e});
                return SegmentCompletionProtocol.RESP_FAILED;
            }
            return SegmentCompletionProtocol.RESP_PROCESSED;
        }

        private SegmentCompletionProtocol.Response processConsumedAfterCommitStart(String instanceId, long offset, long now) {
            SegmentCompletionProtocol.Response response = this.abortIfTooLateAndReturnHold(now, instanceId, offset);
            if (response != null) {
                return null;
            }
            if (instanceId.equals(this._winner)) {
                this.LOGGER.warn("{}:Aborting FSM because winner is reporting a segment while it is also committing instance={} offset={} now={}", new Object[]{this._state, instanceId, offset, now});
                return this.abortAndReturnHold(now, instanceId, offset);
            }
            response = offset == this._winningOffset ? this.hold(instanceId, offset) : (offset < this._winningOffset ? this.catchup(instanceId, offset) : this.hold(instanceId, offset));
            return response;
        }

        private SegmentCompletionProtocol.Response commitSegment(SegmentCompletionProtocol.Request.Params reqParams, boolean isSplitCommit, CommittingSegmentDescriptor committingSegmentDescriptor) {
            String instanceId = reqParams.getInstanceId();
            long offset = reqParams.getOffset();
            if (!this._state.equals((Object)State.COMMITTER_UPLOADING)) {
                this.LOGGER.warn("State change during upload: state={} segment={} winner={} winningOffset={}", new Object[]{this._state, this._segmentName.getSegmentName(), this._winner, this._winningOffset});
                return SegmentCompletionProtocol.RESP_FAILED;
            }
            this.LOGGER.info("Committing segment {} at offset {} winner {}", new Object[]{this._segmentName.getSegmentName(), offset, instanceId});
            this._state = State.COMMITTING;
            if (isSplitCommit) {
                try {
                    this._segmentManager.commitSegmentFile(this._realtimeTableName, committingSegmentDescriptor);
                }
                catch (Exception e) {
                    this.LOGGER.error("Caught exception while committing segment file for segment: {}", (Object)this._segmentName.getSegmentName(), (Object)e);
                    return SegmentCompletionProtocol.RESP_FAILED;
                }
            }
            try {
                this._segmentManager.commitSegmentMetadata(this._realtimeTableName, committingSegmentDescriptor);
            }
            catch (Exception e) {
                this.LOGGER.error("Caught exception while committing segment metadata for segment: {}", (Object)this._segmentName.getSegmentName(), (Object)e);
                return SegmentCompletionProtocol.RESP_FAILED;
            }
            this._state = State.COMMITTED;
            this.LOGGER.info("Committed segment {} at offset {} winner {}", new Object[]{this._segmentName.getSegmentName(), offset, instanceId});
            return SegmentCompletionProtocol.RESP_COMMIT_SUCCESS;
        }

        private SegmentCompletionProtocol.Response processCommitWhileUploading(String instanceId, long offset, long now) {
            this.LOGGER.info("Processing segmentCommit({}, {})", (Object)instanceId, (Object)offset);
            SegmentCompletionProtocol.Response response = this.abortIfTooLateAndReturnHold(now, instanceId, offset);
            if (response != null) {
                return response;
            }
            return new SegmentCompletionProtocol.Response(new SegmentCompletionProtocol.Response.Params().withOffset(offset).withStatus(SegmentCompletionProtocol.ControllerResponseStatus.HOLD));
        }

        private SegmentCompletionProtocol.Response checkBadCommitRequest(String instanceId, long offset, long now) {
            SegmentCompletionProtocol.Response response = this.abortIfTooLateAndReturnHold(now, instanceId, offset);
            if (response != null) {
                return response;
            }
            if (instanceId.equals(this._winner) && offset != this._winningOffset) {
                this.LOGGER.warn("{}:Aborting FSM (bad commit req) instance={} offset={} now={} winning={}", new Object[]{this._state, instanceId, offset, now, this._winningOffset});
                return this.abortAndReturnHold(now, instanceId, offset);
            }
            return null;
        }

        private SegmentCompletionProtocol.Response processCommitWhileHoldingOrPartialConsuming(String instanceId, long offset, long now) {
            this.LOGGER.info("Processing segmentCommit({}, {})", (Object)instanceId, (Object)offset);
            SegmentCompletionProtocol.Response response = this.abortIfTooLateAndReturnHold(now, instanceId, offset);
            if (response != null) {
                return response;
            }
            return this.hold(instanceId, offset);
        }

        private boolean isWinnerPicked(String preferredInstance, long now, String stopReason) {
            if ("rowLimit".equals(stopReason) && this._commitStateMap.size() == 1) {
                this._winner = preferredInstance;
                this._winningOffset = this._commitStateMap.get(preferredInstance);
                return true;
            }
            if (now > this._maxTimeToPickWinnerMs || this._commitStateMap.size() == this.numReplicasToLookFor()) {
                this.LOGGER.info("{}:Picking winner time={} size={}", new Object[]{this._state, now - this._startTimeMs, this._commitStateMap.size()});
                long maxOffsetSoFar = -1L;
                String winnerSoFar = null;
                for (Map.Entry<String, Long> entry : this._commitStateMap.entrySet()) {
                    if (entry.getValue() <= maxOffsetSoFar) continue;
                    maxOffsetSoFar = entry.getValue();
                    winnerSoFar = entry.getKey();
                }
                this._winningOffset = maxOffsetSoFar;
                if (this._commitStateMap.get(preferredInstance) == maxOffsetSoFar) {
                    winnerSoFar = preferredInstance;
                }
                this._winner = winnerSoFar;
                return true;
            }
            return false;
        }
    }

    private static enum State {
        PARTIAL_CONSUMING,
        HOLDING,
        COMMITTER_DECIDED,
        COMMITTER_NOTIFIED,
        COMMITTER_UPLOADING,
        COMMITTING,
        COMMITTED,
        ABORTED;

    }
}

