package org.apache.samza.operators.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.Timer;
import org.apache.samza.operators.functions.WatermarkFunction;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.WatermarkMessage;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.HighResolutionClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/operators/impl/OperatorImpl.class */
public abstract class OperatorImpl<M, RM> {
    private static final Logger LOG = LoggerFactory.getLogger(OperatorImpl.class);
    private static final String METRICS_GROUP = OperatorImpl.class.getName();
    private boolean initialized;
    private boolean closed;
    private HighResolutionClock highResClock;
    private Counter numMessage;
    private Timer handleMessageNs;
    private Timer handleTimerNs;
    private TaskName taskName;
    Set<OperatorImpl<RM, ?>> registeredOperators;
    Set<OperatorImpl<?, M>> prevOperators;
    Set<SystemStream> inputStreams;
    private TaskModel taskModel;
    private EndOfStreamStates eosStates;
    private WatermarkStates watermarkStates;
    private long inputWatermark = -1;
    private long outputWatermark = -1;
    private boolean usedInCurrentTask = false;

    public final void init(Config config, TaskContext taskContext) {
        String opImplId = getOpImplId();
        if (this.initialized) {
            throw new IllegalStateException(String.format("Attempted to initialize Operator %s more than once.", opImplId));
        }
        if (this.closed) {
            throw new IllegalStateException(String.format("Attempted to initialize Operator %s after it was closed.", opImplId));
        }
        this.highResClock = createHighResClock(config);
        this.registeredOperators = new HashSet();
        this.prevOperators = new HashSet();
        this.inputStreams = new HashSet();
        MetricsRegistry metricsRegistry = taskContext.getMetricsRegistry();
        this.numMessage = metricsRegistry.newCounter(METRICS_GROUP, opImplId + "-messages");
        this.handleMessageNs = metricsRegistry.newTimer(METRICS_GROUP, opImplId + "-handle-message-ns");
        this.handleTimerNs = metricsRegistry.newTimer(METRICS_GROUP, opImplId + "-handle-timer-ns");
        this.taskName = taskContext.getTaskName();
        TaskContextImpl taskContextImpl = (TaskContextImpl) taskContext;
        this.eosStates = (EndOfStreamStates) taskContextImpl.fetchObject(EndOfStreamStates.class.getName());
        this.watermarkStates = (WatermarkStates) taskContextImpl.fetchObject(WatermarkStates.class.getName());
        if (taskContextImpl.getJobModel() != null) {
            this.taskModel = taskContextImpl.getJobModel().getContainers().get(taskContext.getSamzaContainerContext().id).getTasks().get(this.taskName);
        } else {
            this.taskModel = null;
            this.usedInCurrentTask = true;
        }
        handleInit(config, taskContext);
        this.initialized = true;
    }

    protected abstract void handleInit(Config config, TaskContext taskContext);

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerNextOperator(OperatorImpl<RM, ?> operatorImpl) {
        if (!this.initialized) {
            throw new IllegalStateException(String.format("Attempted to register next operator before initializing operator %s.", getOpImplId()));
        }
        this.registeredOperators.add(operatorImpl);
        operatorImpl.registerPrevOperator(this);
    }

    void registerPrevOperator(OperatorImpl<?, M> operatorImpl) {
        this.prevOperators.add(operatorImpl);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerInputStream(SystemStream systemStream) {
        this.inputStreams.add(systemStream);
        this.usedInCurrentTask = this.usedInCurrentTask || this.taskModel.getSystemStreamPartitions().stream().anyMatch(systemStreamPartition -> {
            return systemStreamPartition.getSystemStream().equals(systemStream);
        });
    }

    public final void onMessage(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        this.numMessage.inc();
        long nanoTime = this.highResClock.nanoTime();
        try {
            Collection<RM> handleMessage = handleMessage(m, messageCollector, taskCoordinator);
            this.handleMessageNs.update(this.highResClock.nanoTime() - nanoTime);
            handleMessage.forEach(obj -> {
                this.registeredOperators.forEach(operatorImpl -> {
                    operatorImpl.onMessage(obj, messageCollector, taskCoordinator);
                });
            });
            WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
            if (watermarkFn != null) {
                propagateWatermark(watermarkFn.getOutputWatermark(), messageCollector, taskCoordinator);
            }
        } catch (ClassCastException e) {
            throw new SamzaException(String.format("Error applying operator %s (created at %s) to its input message. Expected input message to be of type %s, but found it to be of type %s. Are Serdes for the inputs to this operator configured correctly?", getOpImplId(), getOperatorSpec().getSourceLocation(), e.getMessage().replaceFirst(".* cannot be cast to ", ""), e.getMessage().replaceFirst(" cannot be cast to .*", "")), e);
        }
    }

    protected abstract Collection<RM> handleMessage(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator);

    public final void onTimer(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        long nanoTime = this.highResClock.nanoTime();
        Collection<RM> handleTimer = handleTimer(messageCollector, taskCoordinator);
        this.handleTimerNs.update(this.highResClock.nanoTime() - nanoTime);
        handleTimer.forEach(obj -> {
            this.registeredOperators.forEach(operatorImpl -> {
                operatorImpl.onMessage(obj, messageCollector, taskCoordinator);
            });
        });
        this.registeredOperators.forEach(operatorImpl -> {
            operatorImpl.onTimer(messageCollector, taskCoordinator);
        });
    }

    protected Collection<RM> handleTimer(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        return Collections.emptyList();
    }

    public final void aggregateEndOfStream(EndOfStreamMessage endOfStreamMessage, SystemStreamPartition systemStreamPartition, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        LOG.info("Received end-of-stream message from task {} in {}", endOfStreamMessage.getTaskName(), systemStreamPartition);
        this.eosStates.update(endOfStreamMessage, systemStreamPartition);
        SystemStream systemStream = systemStreamPartition.getSystemStream();
        if (this.eosStates.isEndOfStream(systemStream)) {
            LOG.info("Input {} reaches the end for task {}", systemStream.toString(), this.taskName.getTaskName());
            onEndOfStream(messageCollector, taskCoordinator);
            if (this.eosStates.allEndOfStream()) {
                LOG.info("All input streams have reached the end for task {}", this.taskName.getTaskName());
                taskCoordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
                taskCoordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final void onEndOfStream(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        if (this.inputStreams.stream().allMatch(systemStream -> {
            return this.eosStates.isEndOfStream(systemStream);
        })) {
            handleEndOfStream(messageCollector, taskCoordinator).forEach(obj -> {
                this.registeredOperators.forEach(operatorImpl -> {
                    operatorImpl.onMessage(obj, messageCollector, taskCoordinator);
                });
            });
            this.registeredOperators.forEach(operatorImpl -> {
                operatorImpl.onEndOfStream(messageCollector, taskCoordinator);
            });
        }
    }

    protected Collection<RM> handleEndOfStream(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        return Collections.emptyList();
    }

    public final void aggregateWatermark(WatermarkMessage watermarkMessage, SystemStreamPartition systemStreamPartition, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        LOG.debug("Received watermark {} from {}", Long.valueOf(watermarkMessage.getTimestamp()), systemStreamPartition);
        this.watermarkStates.update(watermarkMessage, systemStreamPartition);
        long watermark = this.watermarkStates.getWatermark(systemStreamPartition.getSystemStream());
        if (watermark != -1) {
            LOG.debug("Got watermark {} from stream {}", Long.valueOf(watermark), systemStreamPartition.getSystemStream());
            onWatermark(watermark, messageCollector, taskCoordinator);
        }
    }

    private final void onWatermark(long j, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        Collection<RM> handleWatermark;
        Long valueOf;
        long longValue = this.prevOperators.isEmpty() ? j : ((Long) this.prevOperators.stream().map(operatorImpl -> {
            return Long.valueOf(operatorImpl.getOutputWatermark());
        }).min((v0, v1) -> {
            return Long.compare(v0, v1);
        }).get()).longValue();
        if (this.inputWatermark < longValue) {
            this.inputWatermark = longValue;
            LOG.trace("Advance input watermark to {} in operator {}", Long.valueOf(this.inputWatermark), getOpImplId());
            WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn();
            if (watermarkFn != null) {
                handleWatermark = watermarkFn.processWatermark(this.inputWatermark);
                valueOf = watermarkFn.getOutputWatermark();
            } else {
                handleWatermark = handleWatermark(this.inputWatermark, messageCollector, taskCoordinator);
                valueOf = Long.valueOf(getOutputWatermark());
            }
            if (!handleWatermark.isEmpty()) {
                handleWatermark.forEach(obj -> {
                    this.registeredOperators.forEach(operatorImpl2 -> {
                        operatorImpl2.onMessage(obj, messageCollector, taskCoordinator);
                    });
                });
            }
            propagateWatermark(valueOf, messageCollector, taskCoordinator);
        }
    }

    private void propagateWatermark(Long l, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        if (l != null) {
            if (this.outputWatermark < l.longValue()) {
                this.outputWatermark = l.longValue();
                LOG.debug("Advance output watermark to {} in operator {}", Long.valueOf(this.outputWatermark), getOpImplId());
                this.registeredOperators.forEach(operatorImpl -> {
                    operatorImpl.onWatermark(this.outputWatermark, messageCollector, taskCoordinator);
                });
            } else if (this.outputWatermark > l.longValue()) {
                LOG.warn("Ignore watermark {} that is smaller than the previous watermark {}.", l, Long.valueOf(this.outputWatermark));
            }
        }
    }

    protected Collection<RM> handleWatermark(long j, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        return Collections.emptyList();
    }

    final long getInputWatermark() {
        return this.inputWatermark;
    }

    protected long getOutputWatermark() {
        if (this.usedInCurrentTask) {
            return getInputWatermark();
        }
        return Long.MAX_VALUE;
    }

    public void close() {
        if (this.closed) {
            throw new IllegalStateException(String.format("Attempted to close Operator %s more than once.", getOpImplId()));
        }
        handleClose();
        this.closed = true;
    }

    protected abstract void handleClose();

    protected abstract OperatorSpec<M, RM> getOperatorSpec();

    /* JADX INFO: Access modifiers changed from: protected */
    public String getOpImplId() {
        return getOperatorSpec().getOpId();
    }

    private HighResolutionClock createHighResClock(Config config) {
        return new MetricsConfig(config).getMetricsTimerEnabled() ? System::nanoTime : () -> {
            return 0L;
        };
    }
}
