package org.apache.samza.storage;

import java.util.Collections;
import java.util.Set;
import org.apache.samza.checkpoint.OffsetManager;
import org.apache.samza.container.RunLoopTask;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.scheduler.EpochTimeScheduler;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.ReadableCoordinator;
import org.apache.samza.task.TaskCallback;
import org.apache.samza.task.TaskCallbackFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/storage/SideInputTask.class */
public class SideInputTask implements RunLoopTask {
    private static final Logger LOG = LoggerFactory.getLogger(SideInputTask.class);
    private final TaskName taskName;
    private final Set<SystemStreamPartition> taskSSPs;
    private final TaskSideInputHandler taskSideInputHandler;
    private final TaskInstanceMetrics metrics;

    public SideInputTask(TaskName taskName, Set<SystemStreamPartition> set, TaskSideInputHandler taskSideInputHandler, TaskInstanceMetrics taskInstanceMetrics) {
        this.taskName = taskName;
        this.taskSSPs = set;
        this.taskSideInputHandler = taskSideInputHandler;
        this.metrics = taskInstanceMetrics;
    }

    @Override // org.apache.samza.container.RunLoopTask
    public TaskName taskName() {
        return this.taskName;
    }

    @Override // org.apache.samza.container.RunLoopTask
    public synchronized void process(IncomingMessageEnvelope incomingMessageEnvelope, ReadableCoordinator readableCoordinator, TaskCallbackFactory taskCallbackFactory) {
        TaskCallback createCallback = taskCallbackFactory.createCallback();
        this.metrics.processes().inc();
        try {
            this.taskSideInputHandler.process(incomingMessageEnvelope);
            this.metrics.messagesActuallyProcessed().inc();
            createCallback.complete();
        } catch (Exception e) {
            createCallback.failure(e);
        }
    }

    @Override // org.apache.samza.container.RunLoopTask
    public void window(ReadableCoordinator readableCoordinator) {
        throw new UnsupportedOperationException("Windowing is not applicable for side input tasks.");
    }

    @Override // org.apache.samza.container.RunLoopTask
    public void scheduler(ReadableCoordinator readableCoordinator) {
        throw new UnsupportedOperationException("Scheduling is not applicable for side input tasks.");
    }

    @Override // org.apache.samza.container.RunLoopTask
    public synchronized void commit() {
        this.taskSideInputHandler.flush();
        this.metrics.commits().inc();
    }

    @Override // org.apache.samza.container.RunLoopTask
    public void endOfStream(ReadableCoordinator readableCoordinator) {
        LOG.info("Task {} has reached end of stream", this.taskName);
    }

    @Override // org.apache.samza.container.RunLoopTask
    public void drain(ReadableCoordinator readableCoordinator) {
        LOG.info("Task {} has drained", this.taskName);
    }

    @Override // org.apache.samza.container.RunLoopTask
    public boolean isWindowableTask() {
        return false;
    }

    @Override // org.apache.samza.container.RunLoopTask
    public Set<String> intermediateStreams() {
        return Collections.emptySet();
    }

    @Override // org.apache.samza.container.RunLoopTask
    public Set<SystemStreamPartition> systemStreamPartitions() {
        return this.taskSSPs;
    }

    @Override // org.apache.samza.container.RunLoopTask
    public OffsetManager offsetManager() {
        return null;
    }

    @Override // org.apache.samza.container.RunLoopTask
    public TaskInstanceMetrics metrics() {
        return this.metrics;
    }

    @Override // org.apache.samza.container.RunLoopTask
    public EpochTimeScheduler epochTimeScheduler() {
        return null;
    }
}
