package org.apache.samza.operators.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.samza.config.Config;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.HighResolutionClock;

/* loaded from: input_file:org/apache/samza/operators/impl/OperatorImpl.class */
public abstract class OperatorImpl<M, RM> {
    private static final String METRICS_GROUP = OperatorImpl.class.getName();
    private boolean initialized;
    private Set<OperatorImpl<RM, ?>> registeredOperators;
    private HighResolutionClock highResClock;
    private Counter numMessage;
    private Timer handleMessageNs;
    private Timer handleTimerNs;

    public final void init(Config config, TaskContext taskContext) {
        String opName = getOperatorSpec().getOpName();
        if (this.initialized) {
            throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opName));
        }
        this.highResClock = createHighResClock(config);
        this.registeredOperators = new HashSet();
        MetricsRegistry metricsRegistry = taskContext.getMetricsRegistry();
        this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opName + "-messages");
        this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-message-ns");
        this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opName + "-handle-timer-ns");
        handleInit(config, taskContext);
        this.initialized = true;
    }

    protected abstract void handleInit(Config config, TaskContext taskContext);

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerNextOperator(OperatorImpl<RM, ?> operatorImpl) {
        if (!this.initialized) {
            throw new IllegalStateException(String.format("Attempted to register next operator before initializing operator %s.", getOperatorSpec().getOpName()));
        }
        this.registeredOperators.add(operatorImpl);
    }

    public final void onMessage(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        this.numMessage.inc();
        long nanoTime = this.highResClock.nanoTime();
        Collection<RM> handleMessage = handleMessage(m, messageCollector, taskCoordinator);
        this.handleMessageNs.update(this.highResClock.nanoTime() - nanoTime);
        handleMessage.forEach(obj -> {
            this.registeredOperators.forEach(operatorImpl -> {
                operatorImpl.onMessage(obj, messageCollector, taskCoordinator);
            });
        });
    }

    protected abstract Collection<RM> handleMessage(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator);

    public final void onTimer(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        long nanoTime = this.highResClock.nanoTime();
        Collection<RM> handleTimer = handleTimer(messageCollector, taskCoordinator);
        this.handleTimerNs.update(this.highResClock.nanoTime() - nanoTime);
        handleTimer.forEach(obj -> {
            this.registeredOperators.forEach(operatorImpl -> {
                operatorImpl.onMessage(obj, messageCollector, taskCoordinator);
            });
        });
        this.registeredOperators.forEach(operatorImpl -> {
            operatorImpl.onTimer(messageCollector, taskCoordinator);
        });
    }

    protected Collection<RM> handleTimer(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        return Collections.emptyList();
    }

    protected abstract OperatorSpec<RM> getOperatorSpec();

    private HighResolutionClock createHighResClock(Config config) {
        return new MetricsConfig(config).getMetricsTimerEnabled() ? System::nanoTime : () -> {
            return 0L;
        };
    }
}
