package org.apache.samza.operators.impl;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.apache.samza.SamzaException;
import org.apache.samza.context.Context;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.UpdateMessage;
import org.apache.samza.operators.UpdateOptions;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.SendToTableWithUpdateOperatorSpec;
import org.apache.samza.table.ReadWriteUpdateTable;
import org.apache.samza.table.RecordNotFoundException;
import org.apache.samza.table.batching.CompactBatchProvider;
import org.apache.samza.table.remote.RemoteTable;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/operators/impl/SendToTableWithUpdateOperatorImpl.class */
public class SendToTableWithUpdateOperatorImpl<K, V, U> extends OperatorImpl<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> {
    private static final Logger LOG = LoggerFactory.getLogger(SendToTableWithUpdateOperatorImpl.class);
    private final SendToTableWithUpdateOperatorSpec<K, V, U> spec;
    private final ReadWriteUpdateTable<K, V, U> table;

    public SendToTableWithUpdateOperatorImpl(SendToTableWithUpdateOperatorSpec<K, V, U> sendToTableWithUpdateOperatorSpec, Context context) {
        this.spec = sendToTableWithUpdateOperatorSpec;
        this.table = context.getTaskContext().getUpdatableTable(sendToTableWithUpdateOperatorSpec.getTableId());
        if ((context.getTaskContext().getUpdatableTable(sendToTableWithUpdateOperatorSpec.getTableId()) instanceof RemoteTable) && (((RemoteTable) this.table).getBatchProvider() instanceof CompactBatchProvider)) {
            throw new SamzaException("Batching is not supported with Compact Batches for partial updates");
        }
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleInit(Context context) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.samza.operators.impl.OperatorImpl
    public CompletionStage<Collection<KV<K, UpdateMessage<U, V>>>> handleMessageAsync(KV<K, UpdateMessage<U, V>> kv, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
        UpdateOptions updateOptions = this.spec.getUpdateOptions();
        return this.table.updateAsync(kv.getKey(), ((UpdateMessage) kv.getValue()).getUpdate()).handle((r7, th) -> {
            if (th == null) {
                return false;
            }
            if (!(th.getCause() instanceof RecordNotFoundException) || !((UpdateMessage) kv.getValue()).hasDefault()) {
                throw new SamzaException("Update failed with exception: ", th);
            }
            if (updateOptions == UpdateOptions.UPDATE_WITH_DEFAULTS) {
                return true;
            }
            throw new SamzaException("Put default failed for update as the Update options was set to " + updateOptions + ". Please use UpdateOptions.UPDATE_WITH_DEFAULTS instead.");
        }).thenCompose(bool -> {
            return bool.booleanValue() ? this.table.putAsync(kv.getKey(), ((UpdateMessage) kv.getValue()).getDefault(), new Object[0]).exceptionally(th2 -> {
                LOG.warn("PUT default failed due to an exception. Ignoring the exception and proceeding with update. The exception encountered is: ", th2);
                return null;
            }).thenCompose(r6 -> {
                return this.table.updateAsync(kv.getKey(), ((UpdateMessage) kv.getValue()).getUpdate());
            }).exceptionally(th3 -> {
                throw new SamzaException("Update after Put default failed with exception: ", th3);
            }) : CompletableFuture.completedFuture(null);
        }).thenApply(r3 -> {
            return Collections.singleton(kv);
        });
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected void handleClose() {
        this.table.close();
    }

    @Override // org.apache.samza.operators.impl.OperatorImpl
    protected OperatorSpec<KV<K, UpdateMessage<U, V>>, KV<K, UpdateMessage<U, V>>> getOperatorSpec() {
        return this.spec;
    }
}
