package org.apache.samza.operators.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskContext;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;

/* loaded from: input_file:org/apache/samza/operators/impl/OperatorImplGraph.class */
public class OperatorImplGraph {
    private final Map<OperatorSpec, OperatorImpl> operatorImpls;
    private final Map<SystemStream, RootOperatorImpl> rootOperators;
    private final Clock clock;

    public OperatorImplGraph(Clock clock) {
        this.operatorImpls = new HashMap();
        this.rootOperators = new HashMap();
        this.clock = clock;
    }

    OperatorImplGraph() {
        this(SystemClock.instance());
    }

    public void init(StreamGraphImpl streamGraphImpl, Config config, TaskContext taskContext) {
        streamGraphImpl.getInputStreams().forEach((streamSpec, inputStreamInternal) -> {
            this.rootOperators.put(new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName()), createOperatorImpls((MessageStreamImpl) inputStreamInternal, config, taskContext));
        });
    }

    public RootOperatorImpl getRootOperator(SystemStream systemStream) {
        return this.rootOperators.get(systemStream);
    }

    public Collection<RootOperatorImpl> getAllRootOperators() {
        return Collections.unmodifiableCollection(this.rootOperators.values());
    }

    private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> messageStreamImpl, Config config, TaskContext taskContext) {
        RootOperatorImpl<M> rootOperatorImpl = new RootOperatorImpl<>();
        rootOperatorImpl.init(config, taskContext);
        messageStreamImpl.getRegisteredOperatorSpecs().forEach(operatorSpec -> {
            rootOperatorImpl.registerNextOperator(createAndRegisterOperatorImpl(operatorSpec, config, taskContext));
        });
        return rootOperatorImpl;
    }

    private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec, Config config, TaskContext taskContext) {
        if (!this.operatorImpls.containsKey(operatorSpec)) {
            OperatorImpl<M, ?> createOperatorImpl = createOperatorImpl(operatorSpec, config, taskContext);
            if (this.operatorImpls.putIfAbsent(operatorSpec, createOperatorImpl) == null) {
                createOperatorImpl.init(config, taskContext);
                MessageStreamImpl nextStream = operatorSpec.getNextStream();
                if (nextStream != null) {
                    nextStream.getRegisteredOperatorSpecs().forEach(operatorSpec2 -> {
                        createOperatorImpl.registerNextOperator(createAndRegisterOperatorImpl(operatorSpec2, config, taskContext));
                    });
                }
                return createOperatorImpl;
            }
        }
        return this.operatorImpls.get(operatorSpec);
    }

    private <M> OperatorImpl<M, ?> createOperatorImpl(OperatorSpec operatorSpec, Config config, TaskContext taskContext) {
        if (operatorSpec instanceof StreamOperatorSpec) {
            return new StreamOperatorImpl((StreamOperatorSpec) operatorSpec, config, taskContext);
        }
        if (operatorSpec instanceof SinkOperatorSpec) {
            return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec, config, taskContext);
        }
        if (operatorSpec instanceof WindowOperatorSpec) {
            return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, this.clock);
        }
        if (operatorSpec instanceof PartialJoinOperatorSpec) {
            return new PartialJoinOperatorImpl((PartialJoinOperatorSpec) operatorSpec, config, taskContext, this.clock);
        }
        throw new IllegalArgumentException(String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
    }
}
