package org.apache.samza.operators;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Collection;
import org.apache.samza.SamzaException;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.operators.functions.AsyncFlatMapFunction;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.operators.spec.AsyncFlatMapOperatorSpec;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OperatorSpecs;
import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.SendToTableOperatorSpec;
import org.apache.samza.operators.spec.SendToTableWithUpdateOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.table.Table;

/* loaded from: input_file:org/apache/samza/operators/MessageStreamImpl.class */
public class MessageStreamImpl<M> implements MessageStream<M> {
    private final StreamApplicationDescriptorImpl streamAppDesc;
    private final OperatorSpec operatorSpec;

    public MessageStreamImpl(StreamApplicationDescriptorImpl streamApplicationDescriptorImpl, OperatorSpec<?, M> operatorSpec) {
        this.streamAppDesc = streamApplicationDescriptorImpl;
        this.operatorSpec = operatorSpec;
    }

    public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFunction) {
        StreamOperatorSpec createMapOperatorSpec = OperatorSpecs.createMapOperatorSpec(mapFunction, this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.MAP));
        this.operatorSpec.registerNextOperatorSpec(createMapOperatorSpec);
        return new MessageStreamImpl(this.streamAppDesc, createMapOperatorSpec);
    }

    public MessageStream<M> filter(FilterFunction<? super M> filterFunction) {
        StreamOperatorSpec createFilterOperatorSpec = OperatorSpecs.createFilterOperatorSpec(filterFunction, this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.FILTER));
        this.operatorSpec.registerNextOperatorSpec(createFilterOperatorSpec);
        return new MessageStreamImpl(this.streamAppDesc, createFilterOperatorSpec);
    }

    public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFunction) {
        StreamOperatorSpec createFlatMapOperatorSpec = OperatorSpecs.createFlatMapOperatorSpec(flatMapFunction, this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.FLAT_MAP));
        this.operatorSpec.registerNextOperatorSpec(createFlatMapOperatorSpec);
        return new MessageStreamImpl(this.streamAppDesc, createFlatMapOperatorSpec);
    }

    public <OM> MessageStream<OM> flatMapAsync(AsyncFlatMapFunction<? super M, ? extends OM> asyncFlatMapFunction) {
        AsyncFlatMapOperatorSpec createAsyncOperatorSpec = OperatorSpecs.createAsyncOperatorSpec(asyncFlatMapFunction, this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.ASYNC_FLAT_MAP));
        this.operatorSpec.registerNextOperatorSpec(createAsyncOperatorSpec);
        return new MessageStreamImpl(this.streamAppDesc, createAsyncOperatorSpec);
    }

    public void sink(SinkFunction<? super M> sinkFunction) {
        this.operatorSpec.registerNextOperatorSpec(OperatorSpecs.createSinkOperatorSpec(sinkFunction, this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.SINK)));
    }

    public MessageStream<M> sendTo(OutputStream<M> outputStream) {
        OutputOperatorSpec createSendToOperatorSpec = OperatorSpecs.createSendToOperatorSpec((OutputStreamImpl) outputStream, this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.SEND_TO));
        this.operatorSpec.registerNextOperatorSpec(createSendToOperatorSpec);
        return new MessageStreamImpl(this.streamAppDesc, createSendToOperatorSpec);
    }

    public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window, String str) {
        WindowOperatorSpec createWindowOperatorSpec = OperatorSpecs.createWindowOperatorSpec((WindowInternal) window, this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.WINDOW, str));
        this.operatorSpec.registerNextOperatorSpec(createWindowOperatorSpec);
        return new MessageStreamImpl(this.streamAppDesc, createWindowOperatorSpec);
    }

    public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> messageStream, JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFunction, Serde<K> serde, Serde<M> serde2, Serde<OM> serde3, Duration duration, String str) {
        if (messageStream.equals(this)) {
            throw new SamzaException("Cannot join a MessageStream with itself.");
        }
        String nextOpId = this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.JOIN, str);
        OperatorSpec<?, M> operatorSpec = ((MessageStreamImpl) messageStream).getOperatorSpec();
        JoinOperatorSpec createJoinOperatorSpec = OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, operatorSpec, joinFunction, serde, serde2, serde3, duration.toMillis(), nextOpId);
        this.operatorSpec.registerNextOperatorSpec(createJoinOperatorSpec);
        operatorSpec.registerNextOperatorSpec(createJoinOperatorSpec);
        return new MessageStreamImpl(this.streamAppDesc, createJoinOperatorSpec);
    }

    public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table, StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> streamTableJoinFunction, Object... objArr) {
        StreamTableJoinOperatorSpec createStreamTableJoinOperatorSpec = OperatorSpecs.createStreamTableJoinOperatorSpec(((TableImpl) table).getTableId(), streamTableJoinFunction, this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.JOIN), objArr);
        this.operatorSpec.registerNextOperatorSpec(createStreamTableJoinOperatorSpec);
        return new MessageStreamImpl(this.streamAppDesc, createStreamTableJoinOperatorSpec);
    }

    public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> collection) {
        if (collection.isEmpty()) {
            return this;
        }
        StreamOperatorSpec createMergeOperatorSpec = OperatorSpecs.createMergeOperatorSpec(this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.MERGE));
        this.operatorSpec.registerNextOperatorSpec(createMergeOperatorSpec);
        collection.forEach(messageStream -> {
            ((MessageStreamImpl) messageStream).getOperatorSpec().registerNextOperatorSpec(createMergeOperatorSpec);
        });
        return new MessageStreamImpl(this.streamAppDesc, createMergeOperatorSpec);
    }

    public <K, V> MessageStream<KV<K, V>> partitionBy(MapFunction<? super M, ? extends K> mapFunction, MapFunction<? super M, ? extends V> mapFunction2, KVSerde<K, V> kVSerde, String str) {
        String nextOpId = this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.PARTITION_BY, str);
        IntermediateMessageStreamImpl<M> intermediateStream = this.streamAppDesc.getIntermediateStream(nextOpId, kVSerde, false);
        if (!intermediateStream.isKeyed()) {
            throw new SamzaException("partitionBy can not be used with a default serde that is not a KVSerde.");
        }
        this.operatorSpec.registerNextOperatorSpec(OperatorSpecs.createPartitionByOperatorSpec(intermediateStream.getOutputStream(), mapFunction, mapFunction2, nextOpId));
        return intermediateStream;
    }

    public <K, V> MessageStream<KV<K, V>> sendTo(Table<KV<K, V>> table) {
        SendToTableOperatorSpec createSendToTableOperatorSpec = OperatorSpecs.createSendToTableOperatorSpec(((TableImpl) table).getTableId(), this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.SEND_TO));
        this.operatorSpec.registerNextOperatorSpec(createSendToTableOperatorSpec);
        return new MessageStreamImpl(this.streamAppDesc, createSendToTableOperatorSpec);
    }

    public <K, V, U> MessageStream<KV<K, UpdateMessage<U, V>>> sendTo(Table<KV<K, V>> table, UpdateOptions updateOptions) {
        SendToTableWithUpdateOperatorSpec createSendToTableWithUpdateOperatorSpec = OperatorSpecs.createSendToTableWithUpdateOperatorSpec(((TableImpl) table).getTableId(), this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.SEND_TO_WITH_UPDATE), updateOptions);
        this.operatorSpec.registerNextOperatorSpec(createSendToTableWithUpdateOperatorSpec);
        return new MessageStreamImpl(this.streamAppDesc, createSendToTableWithUpdateOperatorSpec);
    }

    public MessageStream<M> broadcast(Serde<M> serde, String str) {
        String nextOpId = this.streamAppDesc.getNextOpId(OperatorSpec.OpCode.BROADCAST, str);
        IntermediateMessageStreamImpl<M> intermediateStream = this.streamAppDesc.getIntermediateStream(nextOpId, serde, true);
        this.operatorSpec.registerNextOperatorSpec(OperatorSpecs.createBroadCastOperatorSpec(intermediateStream.getOutputStream(), nextOpId));
        return intermediateStream;
    }

    @VisibleForTesting
    public OperatorSpec<?, M> getOperatorSpec() {
        return this.operatorSpec;
    }
}
