package org.apache.samza.operators.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.context.Context;
import org.apache.samza.context.InternalTaskContext;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.PartitionByOperatorSpec;
import org.apache.samza.system.ControlMessage;
import org.apache.samza.system.DrainMessage;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.WatermarkMessage;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;

/* loaded from: input_file:org/apache/samza/operators/impl/PartitionByOperatorImpl.class */
class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
    private final PartitionByOperatorSpec<M, K, V> partitionByOpSpec;
    private final SystemStream systemStream;
    private final MapFunction<? super M, ? extends K> keyFunction;
    private final MapFunction<? super M, ? extends V> valueFunction;
    private final String taskName;
    private final String runId;
    private final ControlMessageSender controlMessageSender;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec, SystemStream systemStream, InternalTaskContext internalTaskContext) {
        this.partitionByOpSpec = partitionByOperatorSpec;
        this.systemStream = systemStream;
        this.keyFunction = partitionByOperatorSpec.getKeyFunction();
        this.valueFunction = partitionByOperatorSpec.getValueFunction();
        this.taskName = internalTaskContext.getContext().getTaskContext().getTaskModel().getTaskName().getTaskName();
        this.runId = new ApplicationConfig(internalTaskContext.getContext().getJobContext().getConfig()).getRunId();
        this.controlMessageSender = new ControlMessageSender(internalTaskContext.getStreamMetadataCache());
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleInit(Context context) {
        this.keyFunction.init(context);
        this.valueFunction.init(context);
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected CompletionStage<Collection<Void>> handleMessageAsync(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        Object apply = this.keyFunction.apply(m);
        Object apply2 = this.valueFunction.apply(m);
        messageCollector.send(new OutgoingMessageEnvelope(this.systemStream, apply == null ? 0L : null, apply, apply2));
        return CompletableFuture.completedFuture(Collections.emptyList());
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleClose() {
        this.keyFunction.close();
        this.valueFunction.close();
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected OperatorSpec<M, Void> getOperatorSpec() {
        return this.partitionByOpSpec;
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected Collection<Void> handleEndOfStream(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        sendControlMessage(new EndOfStreamMessage(this.taskName), messageCollector);
        return Collections.emptyList();
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected Collection<Void> handleDrain(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        sendControlMessage(new DrainMessage(this.taskName, this.runId), messageCollector);
        return Collections.emptyList();
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected Collection<Void> handleWatermark(long j, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        sendControlMessage(new WatermarkMessage(j, this.taskName), messageCollector);
        return Collections.emptyList();
    }

    private void sendControlMessage(ControlMessage controlMessage, MessageCollector messageCollector) {
        this.controlMessageSender.send(controlMessage, this.systemStream, messageCollector);
    }
}
