package org.apache.beam.runners.direct.repackaged.runners.core.fn;

import java.util.concurrent.ExecutionException;
import org.apache.beam.runners.direct.repackaged.javax.annotation.Nullable;
import org.apache.beam.runners.direct.repackaged.runners.core.DoFnRunner;
import org.apache.beam.runners.direct.repackaged.runners.core.fn.SdkHarnessClient;
import org.apache.beam.runners.direct.repackaged.runners.core.java.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/direct/repackaged/runners/core/fn/SdkHarnessDoFnRunner.class */
public class SdkHarnessDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
    private final SdkHarnessClient sdkHarnessClient;
    private final String processBundleDescriptorId;

    @Nullable
    private SdkHarnessClient.ActiveBundle activeBundle;

    private SdkHarnessDoFnRunner(SdkHarnessClient sdkHarnessClient, String str) {
        this.sdkHarnessClient = sdkHarnessClient;
        this.processBundleDescriptorId = str;
    }

    public static <InputT, OutputT> SdkHarnessDoFnRunner<InputT, OutputT> create(SdkHarnessClient sdkHarnessClient, String str) {
        return new SdkHarnessDoFnRunner<>(sdkHarnessClient, str);
    }

    @Override // org.apache.beam.runners.direct.repackaged.runners.core.DoFnRunner
    public void startBundle() {
        this.activeBundle = this.sdkHarnessClient.newBundle(this.processBundleDescriptorId);
    }

    @Override // org.apache.beam.runners.direct.repackaged.runners.core.DoFnRunner
    public void processElement(WindowedValue<InputT> windowedValue) {
        Preconditions.checkState(this.activeBundle != null, "%s attempted to process an element without an active bundle", SdkHarnessDoFnRunner.class.getSimpleName());
        try {
            this.activeBundle.getInputReceiver().accept(windowedValue);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.beam.runners.direct.repackaged.runners.core.DoFnRunner
    public void onTimer(String str, BoundedWindow boundedWindow, Instant instant, TimeDomain timeDomain) {
        throw new UnsupportedOperationException("Timers are not supported over the Fn API");
    }

    @Override // org.apache.beam.runners.direct.repackaged.runners.core.DoFnRunner
    public void finishBundle() {
        try {
            this.activeBundle.getBundleResponse().get();
        } catch (InterruptedException e) {
            Thread.interrupted();
        } catch (ExecutionException e2) {
            throw UserCodeException.wrap(e2);
        }
    }
}
