/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DurationCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.SnappyCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.AutoValue_Watch_Growth;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Requirements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Ordering;
import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.Funnel;
import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.Funnels;
import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.HashCode;
import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.Hashing;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SPLITTABLE_DO_FN)
public class Watch {
    private static final Logger LOG = LoggerFactory.getLogger(Watch.class);

    public static <InputT, OutputT> Growth<InputT, OutputT, OutputT> growthOf(Growth.PollFn<InputT, OutputT> pollFn, Requirements requirements) {
        return new AutoValue_Watch_Growth.Builder().setTerminationPerInput(Growth.never()).setPollFn(Contextful.of(pollFn, requirements)).setOutputKeyFn(null).build();
    }

    public static <InputT, OutputT> Growth<InputT, OutputT, OutputT> growthOf(Growth.PollFn<InputT, OutputT> pollFn) {
        return Watch.growthOf(pollFn, Requirements.empty());
    }

    public static <InputT, OutputT, KeyT> Growth<InputT, OutputT, KeyT> growthOf(Contextful<Growth.PollFn<InputT, OutputT>> pollFn, SerializableFunction<OutputT, KeyT> outputKeyFn) {
        Preconditions.checkArgument(pollFn != null, "pollFn can not be null");
        Preconditions.checkArgument(outputKeyFn != null, "outputKeyFn can not be null");
        return new AutoValue_Watch_Growth.Builder().setTerminationPerInput(Growth.never()).setPollFn(pollFn).setOutputKeyFn(outputKeyFn).build();
    }

    private static class GrowthStateCoder<OutputT, KeyT, TerminationStateT>
    extends StructuredCoder<GrowthState<OutputT, KeyT, TerminationStateT>> {
        private static final Coder<Boolean> BOOLEAN_CODER = BooleanCoder.of();
        private static final Coder<Instant> INSTANT_CODER = NullableCoder.of(InstantCoder.of());
        private static final Coder<HashCode> HASH_CODE_CODER = HashCode128Coder.of();
        private final Coder<OutputT> outputCoder;
        private final Coder<Map<HashCode, Instant>> completedCoder;
        private final Coder<TimestampedValue<OutputT>> timestampedOutputCoder;
        private final Coder<TerminationStateT> terminationStateCoder;

        public static <OutputT, KeyT, TerminationStateT> GrowthStateCoder<OutputT, KeyT, TerminationStateT> of(Coder<OutputT> outputCoder, Coder<TerminationStateT> terminationStateCoder) {
            return new GrowthStateCoder<OutputT, KeyT, TerminationStateT>(outputCoder, terminationStateCoder);
        }

        private GrowthStateCoder(Coder<OutputT> outputCoder, Coder<TerminationStateT> terminationStateCoder) {
            this.outputCoder = outputCoder;
            this.terminationStateCoder = terminationStateCoder;
            this.completedCoder = MapCoder.of(HASH_CODE_CODER, INSTANT_CODER);
            this.timestampedOutputCoder = TimestampedValue.TimestampedValueCoder.of(outputCoder);
        }

        @Override
        public void encode(GrowthState<OutputT, KeyT, TerminationStateT> value, OutputStream os) throws IOException {
            this.completedCoder.encode(((GrowthState)value).completed, os);
            VarIntCoder.of().encode(((GrowthState)value).pending.size(), os);
            for (Map.Entry entry : ((GrowthState)value).pending.entrySet()) {
                HASH_CODE_CODER.encode((HashCode)entry.getKey(), os);
                this.timestampedOutputCoder.encode((TimestampedValue)entry.getValue(), os);
            }
            BOOLEAN_CODER.encode(((GrowthState)value).isOutputComplete, os);
            this.terminationStateCoder.encode(((GrowthState)value).terminationState, os);
            INSTANT_CODER.encode(((GrowthState)value).pollWatermark, os);
        }

        @Override
        public GrowthState<OutputT, KeyT, TerminationStateT> decode(InputStream is) throws IOException {
            Map<HashCode, Instant> completed = this.completedCoder.decode(is);
            int numPending = VarIntCoder.of().decode(is);
            ImmutableMap.Builder<HashCode, TimestampedValue<OutputT>> pending = ImmutableMap.builder();
            for (int i = 0; i < numPending; ++i) {
                HashCode hash = HASH_CODE_CODER.decode(is);
                TimestampedValue<OutputT> output = this.timestampedOutputCoder.decode(is);
                pending.put(hash, output);
            }
            boolean isOutputComplete = BOOLEAN_CODER.decode(is);
            TerminationStateT terminationState = this.terminationStateCoder.decode(is);
            Instant pollWatermark = INSTANT_CODER.decode(is);
            return new GrowthState(ImmutableMap.copyOf(completed), pending.build(), isOutputComplete, terminationState, pollWatermark);
        }

        @Override
        public List<? extends Coder<?>> getCoderArguments() {
            return Arrays.asList(this.outputCoder, this.terminationStateCoder);
        }

        @Override
        public void verifyDeterministic() throws Coder.NonDeterministicException {
            this.outputCoder.verifyDeterministic();
        }
    }

    private static class HashCode128Coder
    extends AtomicCoder<HashCode> {
        private static final HashCode128Coder INSTANCE = new HashCode128Coder();

        private HashCode128Coder() {
        }

        public static HashCode128Coder of() {
            return INSTANCE;
        }

        @Override
        public void encode(HashCode value, OutputStream os) throws IOException {
            Preconditions.checkArgument(value.bits() == 128, "Expected a 128-bit hash code, but got %s bits", value.bits());
            byte[] res = new byte[16];
            value.writeBytesTo(res, 0, 16);
            os.write(res);
        }

        @Override
        public HashCode decode(InputStream is) throws IOException {
            byte[] res = new byte[16];
            int numRead = is.read(res, 0, 16);
            Preconditions.checkArgument(numRead == 16, "Expected to read 16 bytes, but read %s", numRead);
            return HashCode.fromBytes(res);
        }
    }

    @VisibleForTesting
    static class GrowthTracker<OutputT, KeyT, TerminationStateT>
    extends RestrictionTracker<GrowthState<OutputT, KeyT, TerminationStateT>, HashCode> {
        private final Funnel<OutputT> coderFunnel;
        private final Growth.TerminationCondition<?, TerminationStateT> terminationCondition;
        private GrowthState<OutputT, KeyT, TerminationStateT> state;
        private Map<HashCode, TimestampedValue<OutputT>> pending;
        private Map<HashCode, TimestampedValue<OutputT>> claimed = Maps.newLinkedHashMap();
        private boolean isOutputComplete;
        @Nullable
        private TerminationStateT terminationState;
        @Nullable
        private Instant pollWatermark;
        private boolean shouldStop = false;

        GrowthTracker(SerializableFunction<OutputT, KeyT> keyFn, Coder<KeyT> outputKeyCoder, GrowthState<OutputT, KeyT, TerminationStateT> state, Growth.TerminationCondition<?, TerminationStateT> terminationCondition) {
            this.coderFunnel = (from, into) -> {
                try {
                    Object outputKey = keyFn.apply(from);
                    outputKeyCoder.encode(outputKey, Funnels.asOutputStream(into));
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            };
            this.terminationCondition = terminationCondition;
            this.state = state;
            this.isOutputComplete = ((GrowthState)state).isOutputComplete;
            this.pollWatermark = ((GrowthState)state).pollWatermark;
            this.terminationState = ((GrowthState)state).terminationState;
            this.pending = Maps.newLinkedHashMap(((GrowthState)state).pending);
        }

        @Override
        public synchronized GrowthState<OutputT, KeyT, TerminationStateT> currentRestriction() {
            return this.state;
        }

        @Override
        public synchronized GrowthState<OutputT, KeyT, TerminationStateT> checkpoint() {
            Preconditions.checkState(!this.claimed.isEmpty(), "Can't checkpoint before any element was successfully claimed");
            GrowthState primary = new GrowthState(((GrowthState)this.state).completed, ImmutableMap.copyOf(this.claimed), true, null, BoundedWindow.TIMESTAMP_MAX_VALUE);
            ImmutableMap.Builder<HashCode, Instant> newCompleted = ImmutableMap.builder();
            newCompleted.putAll(((GrowthState)this.state).completed);
            for (Map.Entry<HashCode, TimestampedValue<OutputT>> claimedOutput : this.claimed.entrySet()) {
                newCompleted.put(claimedOutput.getKey(), claimedOutput.getValue().getTimestamp());
            }
            GrowthState residual = new GrowthState(newCompleted.build(), ImmutableMap.copyOf(this.pending), this.isOutputComplete, this.terminationState, this.pollWatermark);
            this.state = primary;
            this.isOutputComplete = primary.isOutputComplete;
            this.pollWatermark = primary.pollWatermark;
            this.terminationState = null;
            this.pending = Maps.newLinkedHashMap();
            this.shouldStop = true;
            return residual;
        }

        private HashCode hash128(OutputT value) {
            return Hashing.murmur3_128().hashObject(value, this.coderFunnel);
        }

        @Override
        public synchronized void checkDone() throws IllegalStateException {
            if (this.shouldStop) {
                return;
            }
            Preconditions.checkState(!this.shouldPollMore(), "Polling is still allowed to continue");
            Preconditions.checkState(this.pending.isEmpty(), "There are %s unclaimed pending outputs", this.pending.size());
        }

        @VisibleForTesting
        synchronized boolean hasPending() {
            return !this.pending.isEmpty();
        }

        private synchronized int getNumPending() {
            return this.pending.size();
        }

        @Nullable
        @VisibleForTesting
        synchronized Map.Entry<HashCode, TimestampedValue<OutputT>> getNextPending() {
            if (this.pending.isEmpty()) {
                return null;
            }
            return this.pending.entrySet().iterator().next();
        }

        @Override
        protected synchronized boolean tryClaimImpl(HashCode hash) {
            if (this.shouldStop) {
                return false;
            }
            Preconditions.checkState(!this.pending.isEmpty(), "No more unclaimed pending outputs");
            TimestampedValue<OutputT> value = this.pending.remove(hash);
            Preconditions.checkArgument(value != null, "Attempted to claim unknown hash %s", (Object)hash);
            this.claimed.put(hash, value);
            return true;
        }

        @VisibleForTesting
        synchronized boolean shouldPollMore() {
            return !this.isOutputComplete && !this.terminationCondition.canStopPolling(Instant.now(), this.terminationState);
        }

        @VisibleForTesting
        synchronized int addNewAsPending(Growth.PollResult<OutputT> pollResult) {
            Preconditions.checkState(((GrowthState)this.state).pending.isEmpty(), "Should have drained all old pending outputs before adding new, but there are %s old pending outputs", ((GrowthState)this.state).pending.size());
            HashMap<HashCode, TimestampedValue<OutputT>> newPending = Maps.newHashMap();
            for (TimestampedValue<OutputT> output : pollResult.getOutputs()) {
                OutputT value = output.getValue();
                HashCode hash = this.hash128(value);
                if (((GrowthState)this.state).completed.containsKey(hash) || newPending.containsKey(hash)) continue;
                newPending.put(hash, TimestampedValue.of(value, output.getTimestamp()));
            }
            if (!newPending.isEmpty()) {
                this.terminationState = this.terminationCondition.onSeenNewOutput(Instant.now(), this.terminationState);
            }
            List sortedPending = Ordering.natural().onResultOf(entry -> ((TimestampedValue)entry.getValue()).getTimestamp()).sortedCopy(newPending.entrySet());
            this.pending = Maps.newLinkedHashMap();
            for (Map.Entry entry2 : sortedPending) {
                this.pending.put((HashCode)entry2.getKey(), (TimestampedValue)entry2.getValue());
            }
            if (pollResult.getWatermark() != null) {
                this.pollWatermark = pollResult.getWatermark();
            } else if (!this.pending.isEmpty()) {
                this.pollWatermark = this.pending.values().iterator().next().getTimestamp();
            }
            if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(this.pollWatermark)) {
                this.isOutputComplete = true;
            }
            return this.pending.size();
        }

        @VisibleForTesting
        synchronized Instant getWatermark() {
            return Ordering.natural().nullsLast().min(this.pollWatermark, this.pending.isEmpty() ? null : this.pending.values().iterator().next().getTimestamp());
        }

        public synchronized String toString() {
            return "GrowthTracker{state=" + this.state.toString(this.terminationCondition) + ", pending=<" + this.pending.size() + " elements" + (this.pending.isEmpty() ? "" : ", earliest " + this.pending.values().iterator().next()) + ">, claimed=<" + this.claimed.size() + " elements>, isOutputComplete=" + this.isOutputComplete + ", terminationState=" + this.terminationState + ", pollWatermark=" + this.pollWatermark + ", shouldStop=" + this.shouldStop + '}';
        }
    }

    @VisibleForTesting
    static class GrowthState<OutputT, KeyT, TerminationStateT> {
        private final ImmutableMap<HashCode, Instant> completed;
        private final ImmutableMap<HashCode, TimestampedValue<OutputT>> pending;
        private final boolean isOutputComplete;
        @Nullable
        private final TerminationStateT terminationState;
        @Nullable
        private final Instant pollWatermark;

        GrowthState(TerminationStateT terminationState) {
            this.completed = ImmutableMap.of();
            this.pending = ImmutableMap.of();
            this.isOutputComplete = false;
            this.terminationState = Preconditions.checkNotNull(terminationState);
            this.pollWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
        }

        GrowthState(ImmutableMap<HashCode, Instant> completed, ImmutableMap<HashCode, TimestampedValue<OutputT>> pending, boolean isOutputComplete, @Nullable TerminationStateT terminationState, @Nullable Instant pollWatermark) {
            if (!isOutputComplete) {
                Preconditions.checkNotNull(terminationState);
            }
            this.completed = completed;
            this.pending = pending;
            this.isOutputComplete = isOutputComplete;
            this.terminationState = terminationState;
            this.pollWatermark = pollWatermark;
        }

        public String toString(Growth.TerminationCondition<?, TerminationStateT> terminationCondition) {
            return "GrowthState{completed=<" + this.completed.size() + " elements>, pending=<" + this.pending.size() + " elements" + (this.pending.isEmpty() ? "" : ", earliest " + ((ImmutableCollection)this.pending.values()).iterator().next()) + ">, isOutputComplete=" + this.isOutputComplete + ", terminationState=" + terminationCondition.toString(this.terminationState) + ", pollWatermark=" + this.pollWatermark + '}';
        }
    }

    @DoFn.UnboundedPerElement
    private static class WatchGrowthFn<InputT, OutputT, KeyT, TerminationStateT>
    extends DoFn<InputT, KV<InputT, OutputT>> {
        private final Growth<InputT, OutputT, KeyT> spec;
        private final Coder<OutputT> outputCoder;
        private final SerializableFunction<OutputT, KeyT> outputKeyFn;
        private final Coder<KeyT> outputKeyCoder;

        private WatchGrowthFn(Growth<InputT, OutputT, KeyT> spec, Coder<OutputT> outputCoder, SerializableFunction<OutputT, KeyT> outputKeyFn, Coder<KeyT> outputKeyCoder) {
            this.spec = spec;
            this.outputCoder = outputCoder;
            this.outputKeyFn = outputKeyFn;
            this.outputKeyCoder = outputKeyCoder;
        }

        @DoFn.ProcessElement
        public DoFn.ProcessContinuation process(DoFn.ProcessContext c, GrowthTracker<OutputT, KeyT, TerminationStateT> tracker) throws Exception {
            if (!tracker.hasPending() && !((GrowthState)tracker.currentRestriction()).isOutputComplete) {
                Instant now = Instant.now();
                Growth.PollResult res = (Growth.PollResult)this.spec.getPollFn().getClosure().apply(c.element(), Contextful.Fn.Context.wrapProcessContext(c));
                int numPending = tracker.addNewAsPending(res);
                if (numPending > 0) {
                    LOG.info("{} - current round of polling took {} ms and returned {} results, of which {} were new. The output is {}.", c.element(), new Duration(now, Instant.now()).getMillis(), res.getOutputs().size(), numPending, BoundedWindow.TIMESTAMP_MAX_VALUE.equals(res.getWatermark()) ? "final" : "not yet final");
                }
            }
            int numEmittedInThisRound = 0;
            int numTotalPending = ((GrowthTracker)tracker).getNumPending();
            int numPreviouslyEmitted = ((GrowthState)tracker.currentRestriction()).completed.size();
            int numTotalKnown = numPreviouslyEmitted + numTotalPending;
            while (true) {
                c.updateWatermark(tracker.getWatermark());
                Map.Entry<HashCode, TimestampedValue<OutputT>> entry = tracker.getNextPending();
                if (entry == null || !tracker.tryClaim(entry.getKey())) break;
                TimestampedValue<OutputT> nextPending = entry.getValue();
                c.outputWithTimestamp(KV.of(c.element(), nextPending.getValue()), nextPending.getTimestamp());
                ++numEmittedInThisRound;
            }
            LOG.info("{} - emitted {} new results (of {} total known: {} emitted so far, {} more to emit).", c.element(), numEmittedInThisRound, numTotalKnown, numEmittedInThisRound + numPreviouslyEmitted, numTotalPending - numEmittedInThisRound);
            Instant watermark = tracker.getWatermark();
            if (watermark != null) {
                c.updateWatermark(watermark);
            }
            if (tracker.shouldPollMore()) {
                LOG.info("{} - emitted all {} known results so far; will resume polling in {} ms", c.element(), numTotalKnown, this.spec.getPollInterval().getMillis());
                return DoFn.ProcessContinuation.resume().withResumeDelay(this.spec.getPollInterval());
            }
            return DoFn.ProcessContinuation.stop();
        }

        private Growth.TerminationCondition<InputT, TerminationStateT> getTerminationCondition() {
            return this.spec.getTerminationPerInput();
        }

        @DoFn.GetInitialRestriction
        public GrowthState<OutputT, KeyT, TerminationStateT> getInitialRestriction(InputT element) {
            return new GrowthState(this.getTerminationCondition().forNewInput(Instant.now(), element));
        }

        @DoFn.NewTracker
        public GrowthTracker<OutputT, KeyT, TerminationStateT> newTracker(GrowthState<OutputT, KeyT, TerminationStateT> restriction) {
            return new GrowthTracker<OutputT, KeyT, TerminationStateT>(this.outputKeyFn, this.outputKeyCoder, restriction, this.getTerminationCondition());
        }

        @DoFn.GetRestrictionCoder
        public Coder<GrowthState<OutputT, KeyT, TerminationStateT>> getRestrictionCoder() {
            return SnappyCoder.of(GrowthStateCoder.of(this.outputCoder, this.spec.getTerminationPerInput().getStateCoder()));
        }
    }

    @AutoValue
    public static abstract class Growth<InputT, OutputT, KeyT>
    extends PTransform<PCollection<InputT>, PCollection<KV<InputT, OutputT>>> {
        public static <InputT> Never<InputT> never() {
            return new Never();
        }

        public static <InputT, StateT> TerminationCondition<InputT, StateT> ignoreInput(TerminationCondition<?, StateT> condition) {
            return new IgnoreInput(condition);
        }

        public static <InputT> AfterTotalOf<InputT> afterTotalOf(ReadableDuration timeSinceInput) {
            return Growth.afterTotalOf(SerializableFunctions.constant(timeSinceInput));
        }

        public static <InputT> AfterTotalOf<InputT> afterTotalOf(SerializableFunction<InputT, ReadableDuration> timeSinceInput) {
            return new AfterTotalOf(timeSinceInput);
        }

        public static <InputT> AfterTimeSinceNewOutput<InputT> afterTimeSinceNewOutput(ReadableDuration timeSinceNewOutput) {
            return Growth.afterTimeSinceNewOutput(SerializableFunctions.constant(timeSinceNewOutput));
        }

        public static <InputT> AfterTimeSinceNewOutput<InputT> afterTimeSinceNewOutput(SerializableFunction<InputT, ReadableDuration> timeSinceNewOutput) {
            return new AfterTimeSinceNewOutput(timeSinceNewOutput);
        }

        public static <InputT, FirstStateT, SecondStateT> BinaryCombined<InputT, FirstStateT, SecondStateT> eitherOf(TerminationCondition<InputT, FirstStateT> first, TerminationCondition<InputT, SecondStateT> second) {
            return new BinaryCombined<InputT, FirstStateT, SecondStateT>(BinaryCombined.Operation.OR, first, second);
        }

        public static <InputT, FirstStateT, SecondStateT> BinaryCombined<InputT, FirstStateT, SecondStateT> allOf(TerminationCondition<InputT, FirstStateT> first, TerminationCondition<InputT, SecondStateT> second) {
            return new BinaryCombined<InputT, FirstStateT, SecondStateT>(BinaryCombined.Operation.AND, first, second);
        }

        abstract Contextful<PollFn<InputT, OutputT>> getPollFn();

        @Nullable
        abstract SerializableFunction<OutputT, KeyT> getOutputKeyFn();

        @Nullable
        abstract Coder<KeyT> getOutputKeyCoder();

        @Nullable
        abstract Duration getPollInterval();

        @Nullable
        abstract TerminationCondition<InputT, ?> getTerminationPerInput();

        @Nullable
        abstract Coder<OutputT> getOutputCoder();

        abstract Builder<InputT, OutputT, KeyT> toBuilder();

        public Growth<InputT, OutputT, KeyT> withOutputKeyCoder(Coder<KeyT> outputKeyCoder) {
            return this.toBuilder().setOutputKeyCoder(outputKeyCoder).build();
        }

        public Growth<InputT, OutputT, KeyT> withTerminationPerInput(TerminationCondition<InputT, ?> terminationPerInput) {
            return this.toBuilder().setTerminationPerInput(terminationPerInput).build();
        }

        public Growth<InputT, OutputT, KeyT> withPollInterval(Duration pollInterval) {
            return this.toBuilder().setPollInterval(pollInterval).build();
        }

        public Growth<InputT, OutputT, KeyT> withOutputCoder(Coder<OutputT> outputCoder) {
            return this.toBuilder().setOutputCoder(outputCoder).build();
        }

        @Override
        public PCollection<KV<InputT, OutputT>> expand(PCollection<InputT> input) {
            Preconditions.checkNotNull(this.getPollInterval(), "pollInterval");
            Preconditions.checkNotNull(this.getTerminationPerInput(), "terminationPerInput");
            Coder<Object> outputCoder = this.getOutputCoder();
            if (outputCoder == null) {
                TypeDescriptor outputT = TypeDescriptors.extractFromTypeParameters(this.getPollFn().getClosure(), PollFn.class, new TypeDescriptors.TypeVariableExtractor<PollFn<InputT, OutputT>, OutputT>(){});
                try {
                    outputCoder = input.getPipeline().getCoderRegistry().getCoder(outputT);
                }
                catch (CannotProvideCoderException e) {
                    throw new RuntimeException("Unable to infer coder for OutputT (" + outputT + "). Specify it explicitly using withOutputCoder().");
                }
            }
            Coder<Object> outputKeyCoder = this.getOutputKeyCoder();
            SerializableFunction<Object, Object> outputKeyFn = this.getOutputKeyFn();
            if (this.getOutputKeyFn() == null) {
                outputKeyCoder = outputCoder;
                outputKeyFn = SerializableFunctions.identity();
            } else {
                if (outputKeyCoder == null) {
                    TypeDescriptor<KeyT> keyT = TypeDescriptors.outputOf(this.getOutputKeyFn());
                    try {
                        outputKeyCoder = input.getPipeline().getCoderRegistry().getCoder(keyT);
                    }
                    catch (CannotProvideCoderException e) {
                        throw new RuntimeException("Unable to infer coder for KeyT (" + keyT + "). Specify it explicitly using withOutputKeyCoder().");
                    }
                }
                try {
                    outputKeyCoder.verifyDeterministic();
                }
                catch (Coder.NonDeterministicException e) {
                    throw new IllegalArgumentException("Key coder " + outputKeyCoder + " must be deterministic");
                }
            }
            return ((PCollection)input.apply(ParDo.of(new WatchGrowthFn(this, outputCoder, outputKeyFn, outputKeyCoder)).withSideInputs(this.getPollFn().getRequirements().getSideInputs()))).setCoder(KvCoder.of(input.getCoder(), outputCoder));
        }

        @AutoValue.Builder
        static abstract class Builder<InputT, OutputT, KeyT> {
            Builder() {
            }

            abstract Builder<InputT, OutputT, KeyT> setPollFn(Contextful<PollFn<InputT, OutputT>> var1);

            abstract Builder<InputT, OutputT, KeyT> setOutputKeyFn(@Nullable SerializableFunction<OutputT, KeyT> var1);

            abstract Builder<InputT, OutputT, KeyT> setOutputKeyCoder(Coder<KeyT> var1);

            abstract Builder<InputT, OutputT, KeyT> setTerminationPerInput(TerminationCondition<InputT, ?> var1);

            abstract Builder<InputT, OutputT, KeyT> setPollInterval(Duration var1);

            abstract Builder<InputT, OutputT, KeyT> setOutputCoder(Coder<OutputT> var1);

            abstract Growth<InputT, OutputT, KeyT> build();
        }

        static class BinaryCombined<InputT, FirstStateT, SecondStateT>
        implements TerminationCondition<InputT, KV<FirstStateT, SecondStateT>> {
            private final Operation operation;
            private final TerminationCondition<InputT, FirstStateT> first;
            private final TerminationCondition<InputT, SecondStateT> second;

            public BinaryCombined(Operation operation, TerminationCondition<InputT, FirstStateT> first, TerminationCondition<InputT, SecondStateT> second) {
                this.operation = operation;
                this.first = first;
                this.second = second;
            }

            @Override
            public Coder<KV<FirstStateT, SecondStateT>> getStateCoder() {
                return KvCoder.of(this.first.getStateCoder(), this.second.getStateCoder());
            }

            @Override
            public KV<FirstStateT, SecondStateT> forNewInput(Instant now, InputT input) {
                return KV.of(this.first.forNewInput(now, input), this.second.forNewInput(now, input));
            }

            @Override
            public KV<FirstStateT, SecondStateT> onSeenNewOutput(Instant now, KV<FirstStateT, SecondStateT> state) {
                return KV.of(this.first.onSeenNewOutput(now, state.getKey()), this.second.onSeenNewOutput(now, state.getValue()));
            }

            @Override
            public boolean canStopPolling(Instant now, KV<FirstStateT, SecondStateT> state) {
                switch (this.operation) {
                    case OR: {
                        return this.first.canStopPolling(now, state.getKey()) || this.second.canStopPolling(now, state.getValue());
                    }
                    case AND: {
                        return this.first.canStopPolling(now, state.getKey()) && this.second.canStopPolling(now, state.getValue());
                    }
                }
                throw new UnsupportedOperationException("Unexpected operation " + (Object)((Object)this.operation));
            }

            @Override
            public String toString(KV<FirstStateT, SecondStateT> state) {
                return (Object)((Object)this.operation) + "{first=" + this.first.toString(state.getKey()) + ", second=" + this.second.toString(state.getValue()) + '}';
            }

            private static enum Operation {
                OR,
                AND;

            }
        }

        static class AfterTimeSinceNewOutput<InputT>
        implements TerminationCondition<InputT, KV<Instant, ReadableDuration>> {
            private final SerializableFunction<InputT, ReadableDuration> maxTimeSinceNewOutput;

            private AfterTimeSinceNewOutput(SerializableFunction<InputT, ReadableDuration> maxTimeSinceNewOutput) {
                this.maxTimeSinceNewOutput = maxTimeSinceNewOutput;
            }

            @Override
            public Coder<KV<Instant, ReadableDuration>> getStateCoder() {
                return KvCoder.of(NullableCoder.of(InstantCoder.of()), DurationCoder.of());
            }

            @Override
            public KV<Instant, ReadableDuration> forNewInput(Instant now, InputT input) {
                return KV.of(null, this.maxTimeSinceNewOutput.apply(input));
            }

            @Override
            public KV<Instant, ReadableDuration> onSeenNewOutput(Instant now, KV<Instant, ReadableDuration> state) {
                return KV.of(now, state.getValue());
            }

            @Override
            public boolean canStopPolling(Instant now, KV<Instant, ReadableDuration> state) {
                Instant timeOfLastNewOutput = state.getKey();
                ReadableDuration maxTimeSinceNewOutput = state.getValue();
                return timeOfLastNewOutput != null && new Duration(timeOfLastNewOutput, now).isLongerThan(maxTimeSinceNewOutput);
            }

            @Override
            public String toString(KV<Instant, ReadableDuration> state) {
                return "AfterTimeSinceNewOutput{timeOfLastNewOutput=" + state.getKey() + ", maxTimeSinceNewOutput=" + state.getValue() + '}';
            }
        }

        static class AfterTotalOf<InputT>
        implements TerminationCondition<InputT, KV<Instant, ReadableDuration>> {
            private final SerializableFunction<InputT, ReadableDuration> maxTimeSinceInput;

            private AfterTotalOf(SerializableFunction<InputT, ReadableDuration> maxTimeSinceInput) {
                this.maxTimeSinceInput = maxTimeSinceInput;
            }

            @Override
            public Coder<KV<Instant, ReadableDuration>> getStateCoder() {
                return KvCoder.of(InstantCoder.of(), DurationCoder.of());
            }

            @Override
            public KV<Instant, ReadableDuration> forNewInput(Instant now, InputT input) {
                return KV.of(now, this.maxTimeSinceInput.apply(input));
            }

            @Override
            public KV<Instant, ReadableDuration> onSeenNewOutput(Instant now, KV<Instant, ReadableDuration> state) {
                return state;
            }

            @Override
            public boolean canStopPolling(Instant now, KV<Instant, ReadableDuration> state) {
                return new Duration(state.getKey(), now).isLongerThan(state.getValue());
            }

            @Override
            public String toString(KV<Instant, ReadableDuration> state) {
                return "AfterTotalOf{timeStarted=" + state.getKey() + ", maxTimeSinceInput=" + state.getValue() + '}';
            }
        }

        static class IgnoreInput<InputT, StateT>
        implements TerminationCondition<InputT, StateT> {
            private final TerminationCondition<?, StateT> wrapped;

            IgnoreInput(TerminationCondition<?, StateT> wrapped) {
                this.wrapped = wrapped;
            }

            @Override
            public Coder<StateT> getStateCoder() {
                return this.wrapped.getStateCoder();
            }

            @Override
            public StateT forNewInput(Instant now, InputT input) {
                return this.wrapped.forNewInput(now, null);
            }

            @Override
            public StateT onSeenNewOutput(Instant now, StateT state) {
                return this.wrapped.onSeenNewOutput(now, state);
            }

            @Override
            public boolean canStopPolling(Instant now, StateT state) {
                return this.wrapped.canStopPolling(now, state);
            }

            @Override
            public String toString(StateT state) {
                return this.wrapped.toString(state);
            }
        }

        static class Never<InputT>
        implements TerminationCondition<InputT, Integer> {
            Never() {
            }

            @Override
            public Coder<Integer> getStateCoder() {
                return VarIntCoder.of();
            }

            @Override
            public Integer forNewInput(Instant now, InputT input) {
                return 0;
            }

            @Override
            public Integer onSeenNewOutput(Instant now, Integer state) {
                return state;
            }

            @Override
            public boolean canStopPolling(Instant now, Integer state) {
                return false;
            }

            @Override
            public String toString(Integer state) {
                return "Never";
            }
        }

        public static interface TerminationCondition<InputT, StateT>
        extends Serializable {
            public Coder<StateT> getStateCoder();

            public StateT forNewInput(Instant var1, @Nullable InputT var2);

            public StateT onSeenNewOutput(Instant var1, StateT var2);

            public boolean canStopPolling(Instant var1, StateT var2);

            public String toString(StateT var1);
        }

        public static abstract class PollFn<InputT, OutputT>
        implements Contextful.Fn<InputT, PollResult<OutputT>> {
        }

        public static final class PollResult<OutputT> {
            private final List<TimestampedValue<OutputT>> outputs;
            @Nullable
            private final Instant watermark;

            private PollResult(List<TimestampedValue<OutputT>> outputs, @Nullable Instant watermark) {
                this.outputs = outputs;
                this.watermark = watermark;
            }

            List<TimestampedValue<OutputT>> getOutputs() {
                return this.outputs;
            }

            @Nullable
            Instant getWatermark() {
                return this.watermark;
            }

            public PollResult<OutputT> withWatermark(Instant watermark) {
                Preconditions.checkNotNull(watermark, "watermark");
                return new PollResult<OutputT>(this.outputs, watermark);
            }

            public static <OutputT> PollResult<OutputT> complete(List<TimestampedValue<OutputT>> outputs) {
                return new PollResult<OutputT>(outputs, BoundedWindow.TIMESTAMP_MAX_VALUE);
            }

            public static <OutputT> PollResult<OutputT> complete(Instant timestamp, List<OutputT> outputs) {
                return new PollResult<OutputT>(PollResult.addTimestamp(timestamp, outputs), BoundedWindow.TIMESTAMP_MAX_VALUE);
            }

            public static <OutputT> PollResult<OutputT> incomplete(List<TimestampedValue<OutputT>> outputs) {
                return new PollResult<OutputT>(outputs, null);
            }

            public static <OutputT> PollResult<OutputT> incomplete(Instant timestamp, List<OutputT> outputs) {
                return new PollResult<OutputT>(PollResult.addTimestamp(timestamp, outputs), null);
            }

            private static <OutputT> List<TimestampedValue<OutputT>> addTimestamp(Instant timestamp, List<OutputT> outputs) {
                ArrayList<TimestampedValue<OutputT>> res = Lists.newArrayListWithExpectedSize(outputs.size());
                for (OutputT output : outputs) {
                    res.add(TimestampedValue.of(output, timestamp));
                }
                return res;
            }
        }
    }
}

