package org.apache.samza.task;

import java.util.HashMap;
import java.util.Map;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.ContextManager;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.impl.OperatorImplGraph;
import org.apache.samza.operators.impl.RootOperatorImpl;
import org.apache.samza.operators.stream.InputStreamInternal;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;

/* loaded from: input_file:org/apache/samza/task/StreamOperatorTask.class */
public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
    private final StreamApplication streamApplication;
    private final ApplicationRunner runner;
    private final Clock clock;
    private OperatorImplGraph operatorImplGraph;
    private ContextManager contextManager;
    private Map<SystemStream, InputStreamInternal> inputSystemStreamToInputStream;

    public StreamOperatorTask(StreamApplication streamApplication, ApplicationRunner applicationRunner, Clock clock) {
        this.streamApplication = streamApplication;
        this.runner = applicationRunner;
        this.clock = clock;
    }

    public StreamOperatorTask(StreamApplication streamApplication, ApplicationRunner applicationRunner) {
        this(streamApplication, applicationRunner, SystemClock.instance());
    }

    public final void init(Config config, TaskContext taskContext) throws Exception {
        StreamGraphImpl streamGraphImpl = new StreamGraphImpl(this.runner, config);
        this.streamApplication.init(streamGraphImpl, config);
        this.contextManager = streamGraphImpl.getContextManager();
        if (this.contextManager != null) {
            this.contextManager.init(config, taskContext);
        }
        OperatorImplGraph operatorImplGraph = new OperatorImplGraph(this.clock);
        operatorImplGraph.init(streamGraphImpl, config, taskContext);
        this.operatorImplGraph = operatorImplGraph;
        this.inputSystemStreamToInputStream = new HashMap();
        streamGraphImpl.getInputStreams().forEach((streamSpec, inputStreamInternal) -> {
            this.inputSystemStreamToInputStream.put(new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName()), inputStreamInternal);
        });
    }

    public final void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        SystemStream systemStream = incomingMessageEnvelope.getSystemStreamPartition().getSystemStream();
        InputStreamInternal inputStreamInternal = this.inputSystemStreamToInputStream.get(systemStream);
        RootOperatorImpl rootOperator = this.operatorImplGraph.getRootOperator(systemStream);
        if (rootOperator != null) {
            rootOperator.onMessage(inputStreamInternal.getMsgBuilder().apply(incomingMessageEnvelope.getKey(), incomingMessageEnvelope.getMessage()), messageCollector, taskCoordinator);
        }
    }

    public final void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        this.operatorImplGraph.getAllRootOperators().forEach(rootOperatorImpl -> {
            rootOperatorImpl.onTimer(messageCollector, taskCoordinator);
        });
    }

    public void close() throws Exception {
        if (this.contextManager != null) {
            this.contextManager.close();
        }
    }
}
