/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.splittabledofn;

import java.io.IOException;
import java.util.List;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateNamespace;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateTag;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateTags;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_runners_java_fn_execution.com.google.common.collect.Iterables;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1_13_1.com.google.protobuf.util.Timestamps;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class SDFFeederViaStateAndTimers<InputT, RestrictionT> {
    private final Coder<BoundedWindow> windowCoder;
    private final Coder<WindowedValue<KV<InputT, RestrictionT>>> elementRestrictionWireCoder;
    private final StateInternals stateInternals;
    private final TimerInternals timerInternals;
    private StateNamespace stateNamespace;
    private final StateTag<ValueState<WindowedValue<KV<InputT, RestrictionT>>>> seedTag;
    private ValueState<WindowedValue<KV<InputT, RestrictionT>>> seedState;
    private final StateTag<ValueState<RestrictionT>> restrictionTag;
    private ValueState<RestrictionT> restrictionState;
    private StateTag<WatermarkHoldState> watermarkHoldTag = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold", TimestampCombiner.LATEST));
    private WatermarkHoldState holdState;
    private Instant inputTimestamp;
    private List<BeamFnApi.BundleApplication> primaryRoots;
    private List<BeamFnApi.DelayedBundleApplication> residualRoots;

    public SDFFeederViaStateAndTimers(StateInternals stateInternals, TimerInternals timerInternals, Coder<InputT> elementWireCoder, Coder<RestrictionT> restrictionWireCoder, Coder<BoundedWindow> windowCoder) {
        this.stateInternals = stateInternals;
        this.timerInternals = timerInternals;
        this.windowCoder = windowCoder;
        this.elementRestrictionWireCoder = WindowedValue.FullWindowedValueCoder.of((Coder)KvCoder.of(elementWireCoder, restrictionWireCoder), windowCoder);
        this.seedTag = StateTags.value("seed", this.elementRestrictionWireCoder);
        this.restrictionTag = StateTags.value("restriction", restrictionWireCoder);
    }

    public void seed(WindowedValue<KV<InputT, RestrictionT>> elementRestriction) {
        this.initState(StateNamespaces.window(this.windowCoder, (BoundedWindow)Iterables.getOnlyElement(elementRestriction.getWindows())));
        this.seedState.write(elementRestriction);
        this.inputTimestamp = elementRestriction.getTimestamp();
    }

    public WindowedValue<KV<InputT, RestrictionT>> resume(TimerInternals.TimerData timer) {
        this.initState(timer.getNamespace());
        WindowedValue seed = (WindowedValue)this.seedState.read();
        this.inputTimestamp = seed.getTimestamp();
        return seed.withValue((Object)KV.of((Object)((KV)seed.getValue()).getKey(), (Object)this.restrictionState.read()));
    }

    public void commit() throws IOException {
        if (this.primaryRoots == null) {
            this.seedState.clear();
            this.restrictionState.clear();
            this.holdState.clear();
            return;
        }
        Preconditions.checkArgument(this.residualRoots.size() == 1, "More than 1 residual is unsupported for now");
        BeamFnApi.DelayedBundleApplication residual = this.residualRoots.get(0);
        ByteString encodedResidual = residual.getApplication().getElement();
        WindowedValue decodedResidual = (WindowedValue)this.elementRestrictionWireCoder.decode(encodedResidual.newInput());
        this.restrictionState.write(((KV)decodedResidual.getValue()).getValue());
        Instant watermarkHold = residual.getApplication().getOutputWatermarksMap().isEmpty() ? this.inputTimestamp : new Instant(Iterables.getOnlyElement(residual.getApplication().getOutputWatermarksMap().values()));
        Preconditions.checkArgument(!watermarkHold.isBefore((ReadableInstant)this.inputTimestamp), "Watermark hold %s can not be before input timestamp %s", (Object)watermarkHold, (Object)this.inputTimestamp);
        this.holdState.add((Object)watermarkHold);
        Instant requestedWakeupTime = new Instant(Timestamps.toMillis((Timestamp)residual.getRequestedExecutionTime()));
        Instant wakeupTime = this.timerInternals.currentProcessingTime().isBefore((ReadableInstant)requestedWakeupTime) ? requestedWakeupTime : this.timerInternals.currentProcessingTime();
        this.timerInternals.setTimer(this.stateNamespace, "sdfContinuation", wakeupTime, TimeDomain.PROCESSING_TIME);
    }

    public void split(List<BeamFnApi.BundleApplication> primaryRoots, List<BeamFnApi.DelayedBundleApplication> residualRoots) {
        Preconditions.checkState(this.primaryRoots == null, "At most 1 split supported, however got new split (%s, %s) in addition to existing (%s, %s)", primaryRoots, residualRoots, this.primaryRoots, this.residualRoots);
        this.primaryRoots = primaryRoots;
        this.residualRoots = residualRoots;
    }

    private void initState(StateNamespace ns) {
        this.stateNamespace = ns;
        this.seedState = this.stateInternals.state(ns, this.seedTag);
        this.restrictionState = this.stateInternals.state(ns, this.restrictionTag);
        this.holdState = this.stateInternals.state(ns, this.watermarkHoldTag);
    }
}

