package org.apache.samza.operators;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Function;
import org.apache.samza.config.Config;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OperatorSpecs;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.stream.OutputStreamInternal;
import org.apache.samza.operators.util.InternalInMemoryStore;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.TaskContext;

/* loaded from: input_file:org/apache/samza/operators/MessageStreamImpl.class */
public class MessageStreamImpl<M> implements MessageStream<M> {
    private final StreamGraphImpl graph;
    private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet();

    public MessageStreamImpl(StreamGraphImpl streamGraphImpl) {
        this.graph = streamGraphImpl;
    }

    public <TM> MessageStream<TM> map(MapFunction<? super M, ? extends TM> mapFunction) {
        StreamOperatorSpec createMapOperatorSpec = OperatorSpecs.createMapOperatorSpec(mapFunction, new MessageStreamImpl(this.graph), this.graph.getNextOpId());
        this.registeredOperatorSpecs.add(createMapOperatorSpec);
        return createMapOperatorSpec.getNextStream();
    }

    public MessageStream<M> filter(FilterFunction<? super M> filterFunction) {
        StreamOperatorSpec createFilterOperatorSpec = OperatorSpecs.createFilterOperatorSpec(filterFunction, new MessageStreamImpl(this.graph), this.graph.getNextOpId());
        this.registeredOperatorSpecs.add(createFilterOperatorSpec);
        return createFilterOperatorSpec.getNextStream();
    }

    public <TM> MessageStream<TM> flatMap(FlatMapFunction<? super M, ? extends TM> flatMapFunction) {
        StreamOperatorSpec createStreamOperatorSpec = OperatorSpecs.createStreamOperatorSpec(flatMapFunction, new MessageStreamImpl(this.graph), this.graph.getNextOpId());
        this.registeredOperatorSpecs.add(createStreamOperatorSpec);
        return createStreamOperatorSpec.getNextStream();
    }

    public void sink(SinkFunction<? super M> sinkFunction) {
        this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFunction, this.graph.getNextOpId()));
    }

    public <K, V> void sendTo(OutputStream<K, V, M> outputStream) {
        this.registeredOperatorSpecs.add(OperatorSpecs.createSendToOperatorSpec((OutputStreamInternal) outputStream, this.graph.getNextOpId()));
    }

    public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
        WindowOperatorSpec createWindowOperatorSpec = OperatorSpecs.createWindowOperatorSpec((WindowInternal) window, new MessageStreamImpl(this.graph), this.graph.getNextOpId());
        this.registeredOperatorSpecs.add(createWindowOperatorSpec);
        return createWindowOperatorSpec.getNextStream();
    }

    public <K, OM, TM> MessageStream<TM> join(MessageStream<OM> messageStream, final JoinFunction<? extends K, ? super M, ? super OM, ? extends TM> joinFunction, Duration duration) {
        MessageStreamImpl messageStreamImpl = new MessageStreamImpl(this.graph);
        PartialJoinFunction<K, M, OM, TM> partialJoinFunction = new PartialJoinFunction<K, M, OM, TM>() { // from class: org.apache.samza.operators.MessageStreamImpl.1
            private KeyValueStore<K, PartialJoinFunction.PartialJoinMessage<M>> thisStreamState;

            @Override // org.apache.samza.operators.functions.PartialJoinFunction
            public TM apply(M m, OM om) {
                return (TM) joinFunction.apply(m, om);
            }

            @Override // org.apache.samza.operators.functions.PartialJoinFunction
            public K getKey(M m) {
                return (K) joinFunction.getFirstKey(m);
            }

            @Override // org.apache.samza.operators.functions.PartialJoinFunction
            public KeyValueStore<K, PartialJoinFunction.PartialJoinMessage<M>> getState() {
                return this.thisStreamState;
            }

            public void init(Config config, TaskContext taskContext) {
                joinFunction.init(config, taskContext);
                this.thisStreamState = new InternalInMemoryStore();
            }
        };
        PartialJoinFunction<K, OM, M, TM> partialJoinFunction2 = new PartialJoinFunction<K, OM, M, TM>() { // from class: org.apache.samza.operators.MessageStreamImpl.2
            private KeyValueStore<K, PartialJoinFunction.PartialJoinMessage<OM>> otherStreamState;

            @Override // org.apache.samza.operators.functions.PartialJoinFunction
            public TM apply(OM om, M m) {
                return (TM) joinFunction.apply(m, om);
            }

            @Override // org.apache.samza.operators.functions.PartialJoinFunction
            public K getKey(OM om) {
                return (K) joinFunction.getSecondKey(om);
            }

            @Override // org.apache.samza.operators.functions.PartialJoinFunction
            public KeyValueStore<K, PartialJoinFunction.PartialJoinMessage<OM>> getState() {
                return this.otherStreamState;
            }

            public void init(Config config, TaskContext taskContext) {
                this.otherStreamState = new InternalInMemoryStore();
            }
        };
        this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(partialJoinFunction, partialJoinFunction2, duration.toMillis(), messageStreamImpl, this.graph.getNextOpId()));
        ((MessageStreamImpl) messageStream).registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(partialJoinFunction2, partialJoinFunction, duration.toMillis(), messageStreamImpl, this.graph.getNextOpId()));
        return messageStreamImpl;
    }

    public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> collection) {
        MessageStreamImpl messageStreamImpl = new MessageStreamImpl(this.graph);
        ArrayList arrayList = new ArrayList(collection);
        arrayList.add(this);
        arrayList.forEach(messageStream -> {
            ((MessageStreamImpl) messageStream).registeredOperatorSpecs.add(OperatorSpecs.createMergeOperatorSpec(messageStreamImpl, this.graph.getNextOpId()));
        });
        return messageStreamImpl;
    }

    public <K> MessageStream<M> partitionBy(Function<? super M, ? extends K> function) {
        int nextOpId = this.graph.getNextOpId();
        MessageStreamImpl<M> intermediateStream = this.graph.getIntermediateStream(String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), Integer.valueOf(nextOpId)), function, obj -> {
            return obj;
        }, (obj2, obj3) -> {
            return obj3;
        });
        this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionByOperatorSpec((OutputStreamInternal) intermediateStream, nextOpId));
        return intermediateStream;
    }

    public Collection<OperatorSpec> getRegisteredOperatorSpecs() {
        return Collections.unmodifiableSet(this.registeredOperatorSpecs);
    }
}
