package org.apache.samza.operators.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.function.Function;
import org.apache.samza.config.Config;
import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.PartitionByOperatorSpec;
import org.apache.samza.system.ControlMessage;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.WatermarkMessage;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/samza/operators/impl/PartitionByOperatorImpl.class */
public class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
    private final PartitionByOperatorSpec<M, K, V> partitionByOpSpec;
    private final SystemStream systemStream;
    private final Function<? super M, ? extends K> keyFunction;
    private final Function<? super M, ? extends V> valueFunction;
    private final String taskName;
    private final ControlMessageSender controlMessageSender;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec, Config config, TaskContext taskContext) {
        this.partitionByOpSpec = partitionByOperatorSpec;
        OutputStreamImpl<KV<K, V>> outputStream = partitionByOperatorSpec.getOutputStream();
        this.systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(), outputStream.getStreamSpec().getPhysicalName());
        this.keyFunction = partitionByOperatorSpec.getKeyFunction();
        this.valueFunction = partitionByOperatorSpec.getValueFunction();
        this.taskName = taskContext.getTaskName().getTaskName();
        this.controlMessageSender = new ControlMessageSender(((TaskContextImpl) taskContext).getStreamMetadataCache());
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleInit(Config config, TaskContext taskContext) {
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    public Collection<Void> handleMessage(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        K apply = this.keyFunction.apply(m);
        V apply2 = this.valueFunction.apply(m);
        messageCollector.send(new OutgoingMessageEnvelope(this.systemStream, apply == null ? 0L : null, apply, apply2));
        return Collections.emptyList();
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleClose() {
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected OperatorSpec<M, Void> getOperatorSpec() {
        return this.partitionByOpSpec;
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected Collection<Void> handleEndOfStream(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        sendControlMessage(new EndOfStreamMessage(this.taskName), messageCollector);
        return Collections.emptyList();
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected Collection<Void> handleWatermark(long j, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        sendControlMessage(new WatermarkMessage(j, this.taskName), messageCollector);
        return Collections.emptyList();
    }

    private void sendControlMessage(ControlMessage controlMessage, MessageCollector messageCollector) {
        this.controlMessageSender.send(controlMessage, this.partitionByOpSpec.getOutputStream().getStreamSpec().toSystemStream(), messageCollector);
    }
}
