/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group.runtime;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.runtime.CoordinatorEvent;
import org.apache.kafka.coordinator.group.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.group.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.group.runtime.CoordinatorPlayback;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.group.runtime.CoordinatorTimer;
import org.apache.kafka.coordinator.group.runtime.PartitionWriter;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;

public class CoordinatorRuntime<S extends CoordinatorShard<U>, U>
implements AutoCloseable {
    private final String logPrefix;
    private final LogContext logContext;
    private final Logger log;
    private final Time time;
    private final Timer timer;
    private final ConcurrentHashMap<TopicPartition, CoordinatorContext> coordinators;
    private final CoordinatorEventProcessor processor;
    private final PartitionWriter<U> partitionWriter;
    private final HighWatermarkListener highWatermarklistener;
    private final CoordinatorLoader<U> loader;
    private final CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
    private final AtomicBoolean isRunning = new AtomicBoolean(true);
    private volatile MetadataImage metadataImage = MetadataImage.EMPTY;

    private CoordinatorRuntime(String logPrefix, LogContext logContext, CoordinatorEventProcessor processor, PartitionWriter<U> partitionWriter, CoordinatorLoader<U> loader, CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier, Time time, Timer timer) {
        this.logPrefix = logPrefix;
        this.logContext = logContext;
        this.log = logContext.logger(CoordinatorRuntime.class);
        this.time = time;
        this.timer = timer;
        this.coordinators = new ConcurrentHashMap();
        this.processor = processor;
        this.partitionWriter = partitionWriter;
        this.highWatermarklistener = new HighWatermarkListener();
        this.loader = loader;
        this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
    }

    private void throwIfNotRunning() {
        if (!this.isRunning.get()) {
            throw Errors.NOT_COORDINATOR.exception();
        }
    }

    private void enqueue(CoordinatorEvent event) {
        try {
            this.processor.enqueue(event);
        }
        catch (RejectedExecutionException ex) {
            throw new NotCoordinatorException("Can't accept an event because the processor is closed", (Throwable)ex);
        }
    }

    private void maybeCreateContext(TopicPartition tp) {
        this.coordinators.computeIfAbsent(tp, x$0 -> new CoordinatorContext((TopicPartition)x$0));
    }

    CoordinatorContext contextOrThrow(TopicPartition tp) throws NotCoordinatorException {
        CoordinatorContext context = this.coordinators.get(tp);
        if (context == null) {
            throw Errors.NOT_COORDINATOR.exception();
        }
        return context;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void withContextOrThrow(TopicPartition tp, Consumer<CoordinatorContext> func) throws NotCoordinatorException {
        CoordinatorContext context = this.contextOrThrow(tp);
        try {
            context.lock.lock();
            func.accept(context);
        }
        finally {
            context.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void withActiveContextOrThrow(TopicPartition tp, Consumer<CoordinatorContext> func) throws NotCoordinatorException, CoordinatorLoadInProgressException {
        block5: {
            CoordinatorContext context = this.contextOrThrow(tp);
            try {
                context.lock.lock();
                if (context.state == CoordinatorState.ACTIVE) {
                    func.accept(context);
                    break block5;
                }
                if (context.state == CoordinatorState.LOADING) {
                    throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception();
                }
                throw Errors.NOT_COORDINATOR.exception();
            }
            finally {
                context.lock.unlock();
            }
        }
    }

    public <T> CompletableFuture<T> scheduleWriteOperation(String name, TopicPartition tp, CoordinatorWriteOperation<S, T, U> op) {
        this.throwIfNotRunning();
        this.log.debug("Scheduled execution of write operation {}.", (Object)name);
        CoordinatorWriteEvent<T> event = new CoordinatorWriteEvent<T>(name, tp, op);
        this.enqueue(event);
        return event.future;
    }

    public <T> CompletableFuture<T> scheduleReadOperation(String name, TopicPartition tp, CoordinatorReadOperation<S, T> op) {
        this.throwIfNotRunning();
        this.log.debug("Scheduled execution of read operation {}.", (Object)name);
        CoordinatorReadEvent<T> event = new CoordinatorReadEvent<T>(name, tp, op);
        this.enqueue(event);
        return event.future;
    }

    private void scheduleInternalOperation(String name, TopicPartition tp, Runnable op) {
        this.log.debug("Scheduled execution of internal operation {}.", (Object)name);
        this.enqueue(new CoordinatorInternalEvent(name, tp, op));
    }

    public Set<TopicPartition> partitions() {
        this.throwIfNotRunning();
        return new HashSet<TopicPartition>(this.coordinators.keySet());
    }

    public void scheduleLoadOperation(TopicPartition tp, int partitionEpoch) {
        this.throwIfNotRunning();
        this.log.info("Scheduling loading of metadata from {} with epoch {}", (Object)tp, (Object)partitionEpoch);
        this.maybeCreateContext(tp);
        this.scheduleInternalOperation("Load(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> this.withContextOrThrow(tp, context -> {
            if (context.epoch < partitionEpoch) {
                context.epoch = partitionEpoch;
                switch (context.state) {
                    case FAILED: 
                    case INITIAL: {
                        ((CoordinatorContext)context).transitionTo(CoordinatorState.LOADING);
                        this.loader.load(tp, (CoordinatorPlayback<U>)context.coordinator).whenComplete((state, exception) -> this.scheduleInternalOperation("CompleteLoad(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> this.withContextOrThrow(tp, ctx -> {
                            if (ctx.state != CoordinatorState.LOADING) {
                                this.log.info("Ignoring load completion from {} because context is in {} state.", (Object)ctx.tp, (Object)ctx.state);
                                return;
                            }
                            try {
                                if (exception != null) {
                                    throw exception;
                                }
                                ((CoordinatorContext)ctx).transitionTo(CoordinatorState.ACTIVE);
                                this.log.info("Finished loading of metadata from {} with epoch {}.", (Object)tp, (Object)partitionEpoch);
                            }
                            catch (Throwable ex) {
                                this.log.error("Failed to load metadata from {} with epoch {} due to {}.", new Object[]{tp, partitionEpoch, ex.toString()});
                                ((CoordinatorContext)ctx).transitionTo(CoordinatorState.FAILED);
                            }
                        })));
                        break;
                    }
                    case LOADING: {
                        this.log.info("The coordinator {} is already loading metadata.", (Object)tp);
                        break;
                    }
                    case ACTIVE: {
                        this.log.info("The coordinator {} is already active.", (Object)tp);
                        break;
                    }
                    default: {
                        this.log.error("Cannot load coordinator {} in state {}.", (Object)tp, (Object)context.state);
                        break;
                    }
                }
            } else {
                this.log.info("Ignoring loading metadata from {} since current epoch {} is larger than or equals to {}.", new Object[]{context.tp, context.epoch, partitionEpoch});
            }
        }));
    }

    public void scheduleUnloadOperation(TopicPartition tp, int partitionEpoch) {
        this.throwIfNotRunning();
        this.log.info("Scheduling unloading of metadata for {} with epoch {}", (Object)tp, (Object)partitionEpoch);
        this.scheduleInternalOperation("UnloadCoordinator(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> this.withContextOrThrow(tp, context -> {
            if (context.epoch < partitionEpoch) {
                this.log.info("Started unloading metadata for {} with epoch {}.", (Object)tp, (Object)partitionEpoch);
                ((CoordinatorContext)context).transitionTo(CoordinatorState.CLOSED);
                this.coordinators.remove(tp, context);
                this.log.info("Finished unloading metadata for {} with epoch {}.", (Object)tp, (Object)partitionEpoch);
            } else {
                this.log.info("Ignored unloading metadata for {} in epoch {} since current epoch is {}.", new Object[]{tp, partitionEpoch, context.epoch});
            }
        }));
    }

    public void onNewMetadataImage(MetadataImage newImage, MetadataDelta delta) {
        this.throwIfNotRunning();
        this.log.debug("Scheduling applying of a new metadata image with offset {}.", (Object)newImage.offset());
        this.metadataImage = newImage;
        ((ConcurrentHashMap.KeySetView)this.coordinators.keySet()).forEach(tp -> this.scheduleInternalOperation("UpdateImage(tp=" + tp + ", offset=" + newImage.offset() + ")", (TopicPartition)tp, () -> this.withContextOrThrow((TopicPartition)tp, context -> {
            if (context.state == CoordinatorState.ACTIVE) {
                this.log.debug("Applying new metadata image with offset {} to {}.", (Object)newImage.offset(), tp);
                context.coordinator.onNewMetadataImage(newImage, delta);
            } else {
                this.log.debug("Ignoring new metadata image with offset {} for {} because the coordinator is not active.", (Object)newImage.offset(), tp);
            }
        })));
    }

    @Override
    public void close() throws Exception {
        if (!this.isRunning.compareAndSet(true, false)) {
            this.log.warn("Coordinator runtime is already shutting down.");
            return;
        }
        this.log.info("Closing coordinator runtime.");
        Utils.closeQuietly(this.loader, (String)"loader");
        Utils.closeQuietly((AutoCloseable)this.timer, (String)"timer");
        Utils.closeQuietly((AutoCloseable)this.processor, (String)"event processor");
        this.coordinators.forEach((tp, context) -> ((CoordinatorContext)context).transitionTo(CoordinatorState.CLOSED));
        this.coordinators.clear();
        this.log.info("Coordinator runtime closed.");
    }

    class HighWatermarkListener
    implements PartitionWriter.Listener {
        HighWatermarkListener() {
        }

        @Override
        public void onHighWatermarkUpdated(TopicPartition tp, long offset) {
            CoordinatorRuntime.this.log.debug("High watermark of {} incremented to {}.", (Object)tp, (Object)offset);
            CoordinatorRuntime.this.scheduleInternalOperation("HighWatermarkUpdated(tp=" + tp + ", offset=" + offset + ")", tp, () -> CoordinatorRuntime.this.withActiveContextOrThrow(tp, context -> ((CoordinatorContext)context).updateLastCommittedOffset(offset)));
        }
    }

    class CoordinatorInternalEvent
    implements CoordinatorEvent {
        final TopicPartition tp;
        final String name;
        final Runnable op;

        CoordinatorInternalEvent(String name, TopicPartition tp, Runnable op) {
            this.tp = tp;
            this.name = name;
            this.op = op;
        }

        @Override
        public TopicPartition key() {
            return this.tp;
        }

        @Override
        public void run() {
            try {
                this.op.run();
            }
            catch (Throwable t) {
                this.complete(t);
            }
        }

        @Override
        public void complete(Throwable exception) {
            if (exception != null) {
                CoordinatorRuntime.this.log.error("Execution of {} failed due to {}.", new Object[]{this.name, exception.getMessage(), exception});
            }
        }

        public String toString() {
            return "InternalEvent(name=" + this.name + ")";
        }
    }

    class CoordinatorReadEvent<T>
    implements CoordinatorEvent {
        final TopicPartition tp;
        final String name;
        final CoordinatorReadOperation<S, T> op;
        final CompletableFuture<T> future;
        T response;

        CoordinatorReadEvent(String name, TopicPartition tp, CoordinatorReadOperation<S, T> op) {
            this.tp = tp;
            this.name = name;
            this.op = op;
            this.future = new CompletableFuture();
        }

        @Override
        public TopicPartition key() {
            return this.tp;
        }

        @Override
        public void run() {
            try {
                CoordinatorRuntime.this.withActiveContextOrThrow(this.tp, context -> {
                    this.response = this.op.generateResponse(context.coordinator, context.lastCommittedOffset);
                    this.complete(null);
                });
            }
            catch (Throwable t) {
                this.complete(t);
            }
        }

        @Override
        public void complete(Throwable exception) {
            if (exception == null) {
                this.future.complete(this.response);
            } else {
                this.future.completeExceptionally(exception);
            }
        }

        public String toString() {
            return "CoordinatorReadEvent(name=" + this.name + ")";
        }
    }

    public static interface CoordinatorReadOperation<S, T> {
        public T generateResponse(S var1, long var2) throws KafkaException;
    }

    class CoordinatorWriteEvent<T>
    implements CoordinatorEvent,
    DeferredEvent {
        final TopicPartition tp;
        final String name;
        final CoordinatorWriteOperation<S, T, U> op;
        final CompletableFuture<T> future;
        CoordinatorResult<T, U> result;

        CoordinatorWriteEvent(String name, TopicPartition tp, CoordinatorWriteOperation<S, T, U> op) {
            this.tp = tp;
            this.name = name;
            this.op = op;
            this.future = new CompletableFuture();
        }

        @Override
        public TopicPartition key() {
            return this.tp;
        }

        @Override
        public void run() {
            try {
                CoordinatorRuntime.this.withActiveContextOrThrow(this.tp, context -> {
                    long prevLastWrittenOffset = context.lastWrittenOffset;
                    this.result = this.op.generateRecordsAndResult(context.coordinator);
                    if (this.result.records().isEmpty()) {
                        OptionalLong pendingOffset = context.deferredEventQueue.highestPendingOffset();
                        if (pendingOffset.isPresent()) {
                            context.deferredEventQueue.add(pendingOffset.getAsLong(), (DeferredEvent)this);
                        } else {
                            this.complete(null);
                        }
                    } else {
                        try {
                            if (this.result.replayRecords()) {
                                this.result.records().forEach(arg_0 -> context.coordinator.replay(arg_0));
                            }
                            long offset = CoordinatorRuntime.this.partitionWriter.append(this.tp, this.result.records());
                            ((CoordinatorContext)context).updateLastWrittenOffset(offset);
                            if (!this.future.isDone()) {
                                context.deferredEventQueue.add(offset, (DeferredEvent)this);
                            } else {
                                this.complete(null);
                            }
                        }
                        catch (Throwable t) {
                            ((CoordinatorContext)context).revertLastWrittenOffset(prevLastWrittenOffset);
                            this.complete(t);
                        }
                    }
                });
            }
            catch (Throwable t) {
                this.complete(t);
            }
        }

        @Override
        public void complete(Throwable exception) {
            CompletableFuture<T> appendFuture;
            CompletableFuture<T> completableFuture = appendFuture = this.result != null ? this.result.appendFuture() : null;
            if (exception == null) {
                if (appendFuture != null) {
                    this.result.appendFuture().complete(this.result.response());
                }
                this.future.complete(this.result.response());
            } else {
                if (appendFuture != null) {
                    this.result.appendFuture().completeExceptionally(exception);
                }
                this.future.completeExceptionally(exception);
            }
        }

        public String toString() {
            return "CoordinatorWriteEvent(name=" + this.name + ")";
        }
    }

    public static interface CoordinatorWriteOperation<S, T, U> {
        public CoordinatorResult<T, U> generateRecordsAndResult(S var1) throws KafkaException;
    }

    class CoordinatorContext {
        final ReentrantLock lock = new ReentrantLock();
        final TopicPartition tp;
        final LogContext logContext;
        final DeferredEventQueue deferredEventQueue;
        final EventBasedCoordinatorTimer timer;
        CoordinatorState state;
        int epoch;
        SnapshotRegistry snapshotRegistry;
        S coordinator;
        long lastWrittenOffset;
        long lastCommittedOffset;

        private CoordinatorContext(TopicPartition tp) {
            this.tp = tp;
            this.logContext = new LogContext(String.format("[%s topic=%s partition=%d] ", CoordinatorRuntime.this.logPrefix, tp.topic(), tp.partition()));
            this.state = CoordinatorState.INITIAL;
            this.epoch = -1;
            this.deferredEventQueue = new DeferredEventQueue(this.logContext);
            this.timer = new EventBasedCoordinatorTimer(tp, this.logContext);
        }

        private void updateLastWrittenOffset(long offset) {
            if (offset <= this.lastWrittenOffset) {
                throw new IllegalStateException("New last written offset " + offset + " of " + this.tp + " must be larger than " + this.lastWrittenOffset + ".");
            }
            CoordinatorRuntime.this.log.debug("Update last written offset of {} to {}.", (Object)this.tp, (Object)offset);
            this.lastWrittenOffset = offset;
            this.snapshotRegistry.getOrCreateSnapshot(offset);
        }

        private void revertLastWrittenOffset(long offset) {
            if (offset > this.lastWrittenOffset) {
                throw new IllegalStateException("New offset " + offset + " of " + this.tp + " must be smaller than " + this.lastWrittenOffset + ".");
            }
            CoordinatorRuntime.this.log.debug("Revert last written offset of {} to {}.", (Object)this.tp, (Object)offset);
            this.lastWrittenOffset = offset;
            this.snapshotRegistry.revertToSnapshot(offset);
        }

        private void updateLastCommittedOffset(long offset) {
            if (offset <= this.lastCommittedOffset) {
                throw new IllegalStateException("New committed offset " + offset + " of " + this.tp + " must be larger than " + this.lastCommittedOffset + ".");
            }
            CoordinatorRuntime.this.log.debug("Update committed offset of {} to {}.", (Object)this.tp, (Object)offset);
            this.lastCommittedOffset = offset;
            this.deferredEventQueue.completeUpTo(offset);
            this.snapshotRegistry.deleteSnapshotsUpTo(offset);
        }

        private void transitionTo(CoordinatorState newState) {
            if (!newState.canTransitionFrom(this.state)) {
                throw new IllegalStateException("Cannot transition from " + (Object)((Object)this.state) + " to " + (Object)((Object)newState));
            }
            CoordinatorRuntime.this.log.debug("Transition from {} to {}.", (Object)this.state, (Object)newState);
            switch (newState) {
                case LOADING: {
                    this.state = CoordinatorState.LOADING;
                    this.snapshotRegistry = new SnapshotRegistry(this.logContext);
                    this.lastWrittenOffset = 0L;
                    this.lastCommittedOffset = 0L;
                    this.coordinator = CoordinatorRuntime.this.coordinatorShardBuilderSupplier.get().withLogContext(this.logContext).withSnapshotRegistry(this.snapshotRegistry).withTime(CoordinatorRuntime.this.time).withTimer(this.timer).build();
                    break;
                }
                case ACTIVE: {
                    this.state = CoordinatorState.ACTIVE;
                    this.snapshotRegistry.getOrCreateSnapshot(0L);
                    CoordinatorRuntime.this.partitionWriter.registerListener(this.tp, CoordinatorRuntime.this.highWatermarklistener);
                    this.coordinator.onLoaded(CoordinatorRuntime.this.metadataImage);
                    break;
                }
                case FAILED: {
                    this.state = CoordinatorState.FAILED;
                    this.unload();
                    break;
                }
                case CLOSED: {
                    this.state = CoordinatorState.CLOSED;
                    this.unload();
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Transitioning to " + (Object)((Object)newState) + " is not supported.");
                }
            }
        }

        private void unload() {
            CoordinatorRuntime.this.partitionWriter.deregisterListener(this.tp, CoordinatorRuntime.this.highWatermarklistener);
            this.timer.cancelAll();
            this.deferredEventQueue.failAll((Exception)Errors.NOT_COORDINATOR.exception());
            if (this.coordinator != null) {
                this.coordinator.onUnloaded();
            }
            this.coordinator = null;
            this.snapshotRegistry = null;
        }
    }

    class EventBasedCoordinatorTimer
    implements CoordinatorTimer<Void, U> {
        final Logger log;
        final TopicPartition tp;
        final Map<String, TimerTask> tasks = new HashMap<String, TimerTask>();

        EventBasedCoordinatorTimer(TopicPartition tp, LogContext logContext) {
            this.tp = tp;
            this.log = logContext.logger(EventBasedCoordinatorTimer.class);
        }

        @Override
        public void schedule(final String key, long delay, TimeUnit unit, final boolean retry, final CoordinatorTimer.TimeoutOperation<Void, U> operation) {
            TimerTask task = new TimerTask(unit.toMillis(delay)){

                public void run() {
                    String eventName = "Timeout(tp=" + EventBasedCoordinatorTimer.this.tp + ", key=" + key + ")";
                    CoordinatorWriteEvent event = new CoordinatorWriteEvent(eventName, EventBasedCoordinatorTimer.this.tp, coordinator -> {
                        EventBasedCoordinatorTimer.this.log.debug("Executing write event {} for timer {}.", (Object)eventName, (Object)key);
                        if (!EventBasedCoordinatorTimer.this.tasks.remove(key, (Object)this)) {
                            throw new RejectedExecutionException("Timer " + key + " was overridden or cancelled");
                        }
                        return operation.generateRecords();
                    });
                    event.future.exceptionally(ex -> {
                        if (ex instanceof RejectedExecutionException) {
                            EventBasedCoordinatorTimer.this.log.debug("The write event {} for the timer {} was not executed because it was cancelled or overridden.", (Object)event.name, (Object)key);
                            return null;
                        }
                        if (ex instanceof NotCoordinatorException || ex instanceof CoordinatorLoadInProgressException) {
                            EventBasedCoordinatorTimer.this.log.debug("The write event {} for the timer {} failed due to {}. Ignoring it because the coordinator is not active.", new Object[]{event.name, key, ex.getMessage()});
                            return null;
                        }
                        if (retry) {
                            EventBasedCoordinatorTimer.this.log.info("The write event {} for the timer {} failed due to {}. Rescheduling it. ", new Object[]{event.name, key, ex.getMessage()});
                            EventBasedCoordinatorTimer.this.schedule(key, 500L, TimeUnit.MILLISECONDS, retry, operation);
                        } else {
                            EventBasedCoordinatorTimer.this.log.error("The write event {} for the timer {} failed due to {}. Ignoring it. ", new Object[]{event.name, key, ex.getMessage()});
                        }
                        return null;
                    });
                    EventBasedCoordinatorTimer.this.log.debug("Scheduling write event {} for timer {}.", (Object)event.name, (Object)key);
                    try {
                        CoordinatorRuntime.this.enqueue(event);
                    }
                    catch (NotCoordinatorException ex2) {
                        EventBasedCoordinatorTimer.this.log.info("Failed to enqueue write event {} for timer {} because the runtime is closed. Ignoring it.", (Object)event.name, (Object)key);
                    }
                }
            };
            this.log.debug("Registering timer {} with delay of {}ms.", (Object)key, (Object)unit.toMillis(delay));
            TimerTask prevTask = this.tasks.put(key, task);
            if (prevTask != null) {
                prevTask.cancel();
            }
            CoordinatorRuntime.this.timer.add(task);
        }

        @Override
        public void cancel(String key) {
            TimerTask prevTask = this.tasks.remove(key);
            if (prevTask != null) {
                prevTask.cancel();
            }
        }

        public void cancelAll() {
            Iterator<Map.Entry<String, TimerTask>> iterator = this.tasks.entrySet().iterator();
            while (iterator.hasNext()) {
                iterator.next().getValue().cancel();
                iterator.remove();
            }
        }

        public int size() {
            return this.tasks.size();
        }
    }

    static enum CoordinatorState {
        INITIAL{

            @Override
            boolean canTransitionFrom(CoordinatorState state) {
                return false;
            }
        }
        ,
        LOADING{

            @Override
            boolean canTransitionFrom(CoordinatorState state) {
                return state == INITIAL || state == FAILED;
            }
        }
        ,
        ACTIVE{

            @Override
            boolean canTransitionFrom(CoordinatorState state) {
                return state == ACTIVE || state == LOADING;
            }
        }
        ,
        CLOSED{

            @Override
            boolean canTransitionFrom(CoordinatorState state) {
                return true;
            }
        }
        ,
        FAILED{

            @Override
            boolean canTransitionFrom(CoordinatorState state) {
                return state == LOADING;
            }
        };


        abstract boolean canTransitionFrom(CoordinatorState var1);
    }

    public static class Builder<S extends CoordinatorShard<U>, U> {
        private String logPrefix;
        private LogContext logContext;
        private CoordinatorEventProcessor eventProcessor;
        private PartitionWriter<U> partitionWriter;
        private CoordinatorLoader<U> loader;
        private CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier;
        private Time time = Time.SYSTEM;
        private Timer timer;

        public Builder<S, U> withLogPrefix(String logPrefix) {
            this.logPrefix = logPrefix;
            return this;
        }

        public Builder<S, U> withLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        public Builder<S, U> withEventProcessor(CoordinatorEventProcessor eventProcessor) {
            this.eventProcessor = eventProcessor;
            return this;
        }

        public Builder<S, U> withPartitionWriter(PartitionWriter<U> partitionWriter) {
            this.partitionWriter = partitionWriter;
            return this;
        }

        public Builder<S, U> withLoader(CoordinatorLoader<U> loader) {
            this.loader = loader;
            return this;
        }

        public Builder<S, U> withCoordinatorShardBuilderSupplier(CoordinatorShardBuilderSupplier<S, U> coordinatorShardBuilderSupplier) {
            this.coordinatorShardBuilderSupplier = coordinatorShardBuilderSupplier;
            return this;
        }

        public Builder<S, U> withTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder<S, U> withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public CoordinatorRuntime<S, U> build() {
            if (this.logPrefix == null) {
                this.logPrefix = "";
            }
            if (this.logContext == null) {
                this.logContext = new LogContext(this.logPrefix);
            }
            if (this.eventProcessor == null) {
                throw new IllegalArgumentException("Event processor must be set.");
            }
            if (this.partitionWriter == null) {
                throw new IllegalArgumentException("Partition write must be set.");
            }
            if (this.loader == null) {
                throw new IllegalArgumentException("Loader must be set.");
            }
            if (this.coordinatorShardBuilderSupplier == null) {
                throw new IllegalArgumentException("State machine supplier must be set.");
            }
            if (this.time == null) {
                throw new IllegalArgumentException("Time must be set.");
            }
            if (this.timer == null) {
                throw new IllegalArgumentException("Timer must be set.");
            }
            return new CoordinatorRuntime(this.logPrefix, this.logContext, this.eventProcessor, this.partitionWriter, this.loader, this.coordinatorShardBuilderSupplier, this.time, this.timer);
        }
    }
}

