package org.apache.flink.runtime.iterative.task;

import java.util.Map;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.runtime.event.task.AbstractTaskEvent;
import org.apache.flink.runtime.event.task.EventListener;
import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.types.Value;

/* loaded from: input_file:org/apache/flink/runtime/iterative/task/SyncEventHandler.class */
public class SyncEventHandler implements EventListener {
    private final ClassLoader userCodeClassLoader;
    private final Map<String, Aggregator<?>> aggregators;
    private final int numberOfEventsUntilEndOfSuperstep;
    private int workerDoneEventCounter;
    private boolean endOfSuperstep;

    public SyncEventHandler(int i, Map<String, Aggregator<?>> map, ClassLoader classLoader) {
        Preconditions.checkArgument(i > 0);
        this.userCodeClassLoader = classLoader;
        this.numberOfEventsUntilEndOfSuperstep = i;
        this.aggregators = map;
    }

    @Override // org.apache.flink.runtime.event.task.EventListener
    public void eventOccurred(AbstractTaskEvent abstractTaskEvent) {
        if (!WorkerDoneEvent.class.equals(abstractTaskEvent.getClass())) {
            throw new IllegalStateException("Unable to handle event " + abstractTaskEvent.getClass().getName());
        }
        onWorkerDoneEvent((WorkerDoneEvent) abstractTaskEvent);
    }

    private void onWorkerDoneEvent(WorkerDoneEvent workerDoneEvent) {
        if (this.endOfSuperstep) {
            throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status.");
        }
        this.workerDoneEventCounter++;
        String[] aggregatorNames = workerDoneEvent.getAggregatorNames();
        Value[] aggregates = workerDoneEvent.getAggregates(this.userCodeClassLoader);
        if (aggregatorNames.length != aggregates.length) {
            throw new RuntimeException("Inconsistent WorkerDoneEvent received!");
        }
        for (int i = 0; i < aggregatorNames.length; i++) {
            this.aggregators.get(aggregatorNames[i]).aggregate(aggregates[i]);
        }
        if (this.workerDoneEventCounter % this.numberOfEventsUntilEndOfSuperstep == 0) {
            this.endOfSuperstep = true;
            Thread.currentThread().interrupt();
        }
    }

    public boolean isEndOfSuperstep() {
        return this.endOfSuperstep;
    }

    public void resetEndOfSuperstep() {
        this.endOfSuperstep = false;
    }
}
