package org.apache.samza.operators;

import java.time.Duration;
import java.util.Collection;
import java.util.function.Function;
import org.apache.samza.SamzaException;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OperatorSpecs;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;

/* loaded from: input_file:org/apache/samza/operators/MessageStreamImpl.class */
public class MessageStreamImpl<M> implements MessageStream<M> {
    private final StreamGraphImpl graph;
    private final OperatorSpec operatorSpec;

    public MessageStreamImpl(StreamGraphImpl streamGraphImpl, OperatorSpec<?, M> operatorSpec) {
        this.graph = streamGraphImpl;
        this.operatorSpec = operatorSpec;
    }

    public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFunction) {
        StreamOperatorSpec createMapOperatorSpec = OperatorSpecs.createMapOperatorSpec(mapFunction, this.graph.getNextOpId(OperatorSpec.OpCode.MAP));
        this.operatorSpec.registerNextOperatorSpec(createMapOperatorSpec);
        return new MessageStreamImpl(this.graph, createMapOperatorSpec);
    }

    public MessageStream<M> filter(FilterFunction<? super M> filterFunction) {
        StreamOperatorSpec createFilterOperatorSpec = OperatorSpecs.createFilterOperatorSpec(filterFunction, this.graph.getNextOpId(OperatorSpec.OpCode.FILTER));
        this.operatorSpec.registerNextOperatorSpec(createFilterOperatorSpec);
        return new MessageStreamImpl(this.graph, createFilterOperatorSpec);
    }

    public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFunction) {
        StreamOperatorSpec createFlatMapOperatorSpec = OperatorSpecs.createFlatMapOperatorSpec(flatMapFunction, this.graph.getNextOpId(OperatorSpec.OpCode.FLAT_MAP));
        this.operatorSpec.registerNextOperatorSpec(createFlatMapOperatorSpec);
        return new MessageStreamImpl(this.graph, createFlatMapOperatorSpec);
    }

    public void sink(SinkFunction<? super M> sinkFunction) {
        this.operatorSpec.registerNextOperatorSpec(OperatorSpecs.createSinkOperatorSpec(sinkFunction, this.graph.getNextOpId(OperatorSpec.OpCode.SINK)));
    }

    public void sendTo(OutputStream<M> outputStream) {
        this.operatorSpec.registerNextOperatorSpec(OperatorSpecs.createSendToOperatorSpec((OutputStreamImpl) outputStream, this.graph.getNextOpId(OperatorSpec.OpCode.SEND_TO)));
    }

    public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String str) {
        WindowOperatorSpec createWindowOperatorSpec = OperatorSpecs.createWindowOperatorSpec((WindowInternal) window, this.graph.getNextOpId(OperatorSpec.OpCode.WINDOW, str));
        this.operatorSpec.registerNextOperatorSpec(createWindowOperatorSpec);
        return new MessageStreamImpl(this.graph, createWindowOperatorSpec);
    }

    public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> messageStream, JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFunction, Serde<K> serde, Serde<M> serde2, Serde<OM> serde3, Duration duration, String str) {
        if (messageStream.equals(this)) {
            throw new SamzaException("Cannot join a MessageStream with itself.");
        }
        OperatorSpec<?, M> operatorSpec = ((MessageStreamImpl) messageStream).getOperatorSpec();
        JoinOperatorSpec createJoinOperatorSpec = OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, operatorSpec, joinFunction, serde, serde2, serde3, duration.toMillis(), this.graph.getNextOpId(OperatorSpec.OpCode.JOIN, str));
        this.operatorSpec.registerNextOperatorSpec(createJoinOperatorSpec);
        operatorSpec.registerNextOperatorSpec(createJoinOperatorSpec);
        return new MessageStreamImpl(this.graph, createJoinOperatorSpec);
    }

    public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> collection) {
        if (collection.isEmpty()) {
            return this;
        }
        StreamOperatorSpec createMergeOperatorSpec = OperatorSpecs.createMergeOperatorSpec(this.graph.getNextOpId(OperatorSpec.OpCode.MERGE));
        this.operatorSpec.registerNextOperatorSpec(createMergeOperatorSpec);
        collection.forEach(messageStream -> {
            ((MessageStreamImpl) messageStream).getOperatorSpec().registerNextOperatorSpec(createMergeOperatorSpec);
        });
        return new MessageStreamImpl(this.graph, createMergeOperatorSpec);
    }

    public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> function, Function<? super M, ? extends V> function2, KVSerde<K, V> kVSerde, String str) {
        String nextOpId = this.graph.getNextOpId(OperatorSpec.OpCode.PARTITION_BY, str);
        IntermediateMessageStreamImpl<M> intermediateStream = this.graph.getIntermediateStream(nextOpId, kVSerde);
        if (!intermediateStream.isKeyed()) {
            throw new SamzaException("partitionBy can not be used with a default serde that is not a KVSerde.");
        }
        this.operatorSpec.registerNextOperatorSpec(OperatorSpecs.createPartitionByOperatorSpec(intermediateStream.getOutputStream(), function, function2, nextOpId));
        return intermediateStream;
    }

    public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> function, Function<? super M, ? extends V> function2, String str) {
        return partitionBy(function, function2, null, str);
    }

    protected OperatorSpec<?, M> getOperatorSpec() {
        return this.operatorSpec;
    }
}
