package org.apache.samza.operators.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
import org.apache.samza.table.ReadWriteUpdateTable;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;

/* loaded from: input_file:org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.class */
class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends OperatorImpl<M, JM> {
    private final StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec;
    private final ReadWriteUpdateTable<K, ?, ?> table;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> streamTableJoinOperatorSpec, Context context) {
        this.joinOpSpec = streamTableJoinOperatorSpec;
        this.table = context.getTaskContext().getUpdatableTable(streamTableJoinOperatorSpec.getTableId());
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleInit(Context context) {
        this.joinOpSpec.getJoinFn().init(context);
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected CompletionStage<Collection<JM>> handleMessageAsync(M m, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        if (m == null) {
            return CompletableFuture.completedFuture(Collections.emptyList());
        }
        Object messageKey = this.joinOpSpec.getJoinFn().getMessageKey(m);
        Object[] args = this.joinOpSpec.getArgs();
        return (CompletionStage) Optional.ofNullable(messageKey).map(obj -> {
            return this.table.getAsync(obj, args).thenApply(obj -> {
                return getJoinOutput(obj, obj, m);
            });
        }).orElseGet(() -> {
            return CompletableFuture.completedFuture(getJoinOutput(messageKey, null, m));
        });
    }

    private Collection<JM> getJoinOutput(K k, Object obj, M m) {
        Object apply = this.joinOpSpec.getJoinFn().apply(m, obj == null ? null : KV.of(k, obj));
        return apply != null ? Collections.singletonList(apply) : Collections.emptyList();
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleClose() {
        this.joinOpSpec.getJoinFn().close();
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected OperatorSpec<M, JM> getOperatorSpec() {
        return this.joinOpSpec;
    }
}
