/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.DelegateCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.AppliedCombineFn;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;

public class Combine {
    private Combine() {
    }

    public static <V> Globally<V, V> globally(SerializableFunction<Iterable<V>, V> combiner) {
        return Combine.globally(IterableCombineFn.of(combiner), Combine.displayDataForFn(combiner));
    }

    public static <V> Globally<V, V> globally(SerializableBiFunction<V, V, V> combiner) {
        return Combine.globally(BinaryCombineFn.of(combiner), Combine.displayDataForFn(combiner));
    }

    public static <InputT, OutputT> Globally<InputT, OutputT> globally(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn) {
        return Combine.globally(fn, Combine.displayDataForFn(fn));
    }

    private static <T> DisplayData.ItemSpec<? extends Class<?>> displayDataForFn(T fn) {
        return DisplayData.item("combineFn", fn.getClass()).withLabel("Combiner");
    }

    private static <InputT, OutputT> Globally<InputT, OutputT> globally(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
        return new Globally(fn, fnDisplayData, true, 0, ImmutableList.of());
    }

    public static <K, V> PerKey<K, V, V> perKey(SerializableFunction<Iterable<V>, V> fn) {
        return Combine.perKey(IterableCombineFn.of(fn), Combine.displayDataForFn(fn));
    }

    public static <K, V> PerKey<K, V, V> perKey(SerializableBiFunction<V, V, V> fn) {
        return Combine.perKey(BinaryCombineFn.of(fn), Combine.displayDataForFn(fn));
    }

    public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn) {
        return Combine.perKey(fn, Combine.displayDataForFn(fn));
    }

    private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
        return new PerKey((CombineFnBase.GlobalCombineFn)fn, (DisplayData.ItemSpec)fnDisplayData, false);
    }

    private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> fewKeys(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
        return new PerKey((CombineFnBase.GlobalCombineFn)fn, (DisplayData.ItemSpec)fnDisplayData, true);
    }

    public static <K, V> GroupedValues<K, V, V> groupedValues(SerializableFunction<Iterable<V>, V> fn) {
        return Combine.groupedValues(IterableCombineFn.of(fn), Combine.displayDataForFn(fn));
    }

    public static <K, V> GroupedValues<K, V, V> groupedValues(SerializableBiFunction<V, V, V> fn) {
        return Combine.groupedValues(BinaryCombineFn.of(fn), Combine.displayDataForFn(fn));
    }

    public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn) {
        return Combine.groupedValues(fn, Combine.displayDataForFn(fn));
    }

    private static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
        return new GroupedValues((CombineFnBase.GlobalCombineFn)fn, (DisplayData.ItemSpec)fnDisplayData);
    }

    private static void populateDisplayData(DisplayData.Builder builder, HasDisplayData fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayItem) {
        builder.include("combineFn", fn).add(fnDisplayItem);
    }

    private static void populateGlobalDisplayData(DisplayData.Builder builder, int fanout, boolean insertDefault) {
        builder.addIfNotDefault(DisplayData.item("fanout", fanout).withLabel("Key Fanout Size"), 0).add(DisplayData.item("emitDefaultOnEmptyInput", insertDefault).withLabel("Emit Default On Empty Input"));
    }

    public static class GroupedValues<K, InputT, OutputT>
    extends PTransform<PCollection<? extends KV<K, ? extends Iterable<InputT>>>, PCollection<KV<K, OutputT>>> {
        private final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn;
        private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
        private final List<PCollectionView<?>> sideInputs;

        private GroupedValues(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
            this.fn = SerializableUtils.clone(fn);
            this.fnDisplayData = fnDisplayData;
            this.sideInputs = ImmutableList.of();
        }

        private GroupedValues(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, List<PCollectionView<?>> sideInputs) {
            this.fn = SerializableUtils.clone(fn);
            this.fnDisplayData = fnDisplayData;
            this.sideInputs = sideInputs;
        }

        public GroupedValues<K, InputT, OutputT> withSideInputs(PCollectionView<?> ... sideInputs) {
            return this.withSideInputs(Arrays.asList(sideInputs));
        }

        public GroupedValues<K, InputT, OutputT> withSideInputs(Iterable<? extends PCollectionView<?>> sideInputs) {
            return new GroupedValues<K, InputT, OutputT>(this.fn, this.fnDisplayData, ImmutableList.copyOf(sideInputs));
        }

        public CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> getFn() {
            return this.fn;
        }

        public List<PCollectionView<?>> getSideInputs() {
            return this.sideInputs;
        }

        @Override
        public PCollection<KV<K, OutputT>> expand(PCollection<? extends KV<K, ? extends Iterable<InputT>>> input) {
            PCollection output = (PCollection)input.apply(ParDo.of(new DoFn<KV<K, ? extends Iterable<InputT>>, KV<K, OutputT>>(){

                @DoFn.ProcessElement
                public void processElement(final DoFn.ProcessContext c) {
                    Object output;
                    Object key = ((KV)c.element()).getKey();
                    if (fn instanceof CombineWithContext.CombineFnWithContext) {
                        output = ((CombineWithContext.CombineFnWithContext)fn).apply((Iterable)((KV)c.element()).getValue(), new CombineWithContext.Context(){

                            @Override
                            public PipelineOptions getPipelineOptions() {
                                return c.getPipelineOptions();
                            }

                            @Override
                            public <T> T sideInput(PCollectionView<T> view) {
                                return c.sideInput(view);
                            }
                        });
                    } else if (fn instanceof CombineFn) {
                        output = ((CombineFn)fn).apply((Iterable)((KV)c.element()).getValue());
                    } else {
                        throw new IllegalStateException(String.format("Unknown type of CombineFn: %s", fn.getClass()));
                    }
                    c.output(KV.of(key, output));
                }

                @Override
                public void populateDisplayData(DisplayData.Builder builder) {
                    builder.delegate(this);
                }
            }).withSideInputs(this.sideInputs));
            try {
                KvCoder<K, InputT> kvCoder = this.getKvCoder(input.getCoder());
                Coder<OutputT> outputValueCoder = this.fn.getDefaultOutputCoder(input.getPipeline().getCoderRegistry(), kvCoder.getValueCoder());
                output.setCoder(KvCoder.of(kvCoder.getKeyCoder(), outputValueCoder));
            }
            catch (CannotProvideCoderException cannotProvideCoderException) {
                // empty catch block
            }
            return output;
        }

        public AppliedCombineFn<? super K, ? super InputT, ?, OutputT> getAppliedFn(CoderRegistry registry, Coder<? extends KV<K, ? extends Iterable<InputT>>> inputCoder, WindowingStrategy<?, ?> windowingStrategy) {
            KvCoder<K, InputT> kvCoder = this.getKvCoder(inputCoder);
            return AppliedCombineFn.withInputCoder(this.fn, registry, kvCoder, this.sideInputs, windowingStrategy);
        }

        private KvCoder<K, InputT> getKvCoder(Coder<? extends KV<K, ? extends Iterable<InputT>>> inputCoder) {
            if (!(inputCoder instanceof KvCoder)) {
                throw new IllegalStateException("Combine.GroupedValues requires its input to use KvCoder");
            }
            KvCoder kvCoder = (KvCoder)inputCoder;
            Coder keyCoder = kvCoder.getKeyCoder();
            Coder kvValueCoder = kvCoder.getValueCoder();
            if (!(kvValueCoder instanceof IterableCoder)) {
                throw new IllegalStateException("Combine.GroupedValues requires its input values to use IterableCoder");
            }
            IterableCoder inputValuesCoder = (IterableCoder)kvValueCoder;
            Coder inputValueCoder = inputValuesCoder.getElemCoder();
            return KvCoder.of(keyCoder, inputValueCoder);
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            Combine.populateDisplayData(builder, this.fn, this.fnDisplayData);
        }
    }

    public static class PerKeyWithHotKeyFanout<K, InputT, OutputT>
    extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
        private final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn;
        private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
        private final SerializableFunction<? super K, Integer> hotKeyFanout;

        private PerKeyWithHotKeyFanout(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, SerializableFunction<? super K, Integer> hotKeyFanout) {
            this.fn = fn;
            this.fnDisplayData = fnDisplayData;
            this.hotKeyFanout = hotKeyFanout;
        }

        @Override
        protected String getKindString() {
            return String.format("Combine.perKeyWithFanout(%s)", NameUtils.approximateSimpleName(this.fn));
        }

        @Override
        public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
            return this.applyHelper(input);
        }

        private <AccumT> PCollection<KV<K, OutputT>> applyHelper(PCollection<KV<K, InputT>> input) {
            CombineFnBase.AbstractGlobalCombineFn postCombine;
            CombineFnBase.AbstractGlobalCombineFn hotPreCombine;
            Coder<?> accumCoder;
            CombineFnBase.GlobalCombineFn typedFn = this.fn;
            if (!(input.getCoder() instanceof KvCoder)) {
                throw new IllegalStateException("Expected input coder to be KvCoder, but was " + input.getCoder());
            }
            final KvCoder inputCoder = (KvCoder)input.getCoder();
            try {
                accumCoder = typedFn.getAccumulatorCoder(input.getPipeline().getCoderRegistry(), inputCoder.getValueCoder());
            }
            catch (CannotProvideCoderException e) {
                throw new IllegalStateException("Unable to determine accumulator coder.", e);
            }
            InputOrAccum.InputOrAccumCoder inputOrAccumCoder = new InputOrAccum.InputOrAccumCoder(inputCoder.getValueCoder(), accumCoder);
            if (typedFn instanceof CombineFn) {
                final CombineFn fn = (CombineFn)typedFn;
                hotPreCombine = new CombineFn<InputT, AccumT, AccumT>(){

                    @Override
                    public AccumT createAccumulator() {
                        return fn.createAccumulator();
                    }

                    @Override
                    public AccumT addInput(AccumT accumulator, InputT value) {
                        return fn.addInput(accumulator, value);
                    }

                    @Override
                    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
                        return fn.mergeAccumulators(accumulators);
                    }

                    @Override
                    public AccumT compact(AccumT accumulator) {
                        return fn.compact(accumulator);
                    }

                    @Override
                    public AccumT extractOutput(AccumT accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
                        return accumCoder;
                    }

                    @Override
                    public void populateDisplayData(DisplayData.Builder builder) {
                        builder.delegate(this);
                    }
                };
                postCombine = new CombineFn<InputOrAccum<InputT, AccumT>, AccumT, OutputT>(){

                    @Override
                    public AccumT createAccumulator() {
                        return fn.createAccumulator();
                    }

                    @Override
                    public AccumT addInput(AccumT accumulator, InputOrAccum<InputT, AccumT> value) {
                        if (value.accum == null) {
                            return fn.addInput(accumulator, value.input);
                        }
                        return fn.mergeAccumulators(ImmutableList.of(accumulator, value.accum));
                    }

                    @Override
                    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
                        return fn.mergeAccumulators(accumulators);
                    }

                    @Override
                    public AccumT compact(AccumT accumulator) {
                        return fn.compact(accumulator);
                    }

                    @Override
                    public OutputT extractOutput(AccumT accumulator) {
                        return fn.extractOutput(accumulator);
                    }

                    @Override
                    public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder) throws CannotProvideCoderException {
                        return fn.getDefaultOutputCoder(registry, inputCoder.getValueCoder());
                    }

                    @Override
                    public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> inputCoder2) throws CannotProvideCoderException {
                        return accumCoder;
                    }

                    @Override
                    public void populateDisplayData(DisplayData.Builder builder) {
                        builder.delegate(this);
                    }
                };
            } else if (typedFn instanceof CombineWithContext.CombineFnWithContext) {
                final CombineWithContext.CombineFnWithContext fnWithContext = (CombineWithContext.CombineFnWithContext)typedFn;
                hotPreCombine = new CombineWithContext.CombineFnWithContext<InputT, AccumT, AccumT>(){

                    @Override
                    public AccumT createAccumulator(CombineWithContext.Context c) {
                        return fnWithContext.createAccumulator(c);
                    }

                    @Override
                    public AccumT addInput(AccumT accumulator, InputT value, CombineWithContext.Context c) {
                        return fnWithContext.addInput(accumulator, value, c);
                    }

                    @Override
                    public AccumT mergeAccumulators(Iterable<AccumT> accumulators, CombineWithContext.Context c) {
                        return fnWithContext.mergeAccumulators(accumulators, c);
                    }

                    @Override
                    public AccumT compact(AccumT accumulator, CombineWithContext.Context c) {
                        return fnWithContext.compact(accumulator, c);
                    }

                    @Override
                    public AccumT extractOutput(AccumT accumulator, CombineWithContext.Context c) {
                        return accumulator;
                    }

                    @Override
                    public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException {
                        return accumCoder;
                    }

                    @Override
                    public void populateDisplayData(DisplayData.Builder builder) {
                        builder.delegate(this);
                    }
                };
                postCombine = new CombineWithContext.CombineFnWithContext<InputOrAccum<InputT, AccumT>, AccumT, OutputT>(){

                    @Override
                    public AccumT createAccumulator(CombineWithContext.Context c) {
                        return fnWithContext.createAccumulator(c);
                    }

                    @Override
                    public AccumT addInput(AccumT accumulator, InputOrAccum<InputT, AccumT> value, CombineWithContext.Context c) {
                        if (value.accum == null) {
                            return fnWithContext.addInput(accumulator, value.input, c);
                        }
                        return fnWithContext.mergeAccumulators(ImmutableList.of(accumulator, value.accum), c);
                    }

                    @Override
                    public AccumT mergeAccumulators(Iterable<AccumT> accumulators, CombineWithContext.Context c) {
                        return fnWithContext.mergeAccumulators(accumulators, c);
                    }

                    @Override
                    public AccumT compact(AccumT accumulator, CombineWithContext.Context c) {
                        return fnWithContext.compact(accumulator, c);
                    }

                    @Override
                    public OutputT extractOutput(AccumT accumulator, CombineWithContext.Context c) {
                        return fnWithContext.extractOutput(accumulator, c);
                    }

                    @Override
                    public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder) throws CannotProvideCoderException {
                        return fnWithContext.getDefaultOutputCoder(registry, inputCoder.getValueCoder());
                    }

                    @Override
                    public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> inputCoder2) throws CannotProvideCoderException {
                        return accumCoder;
                    }

                    @Override
                    public void populateDisplayData(DisplayData.Builder builder) {
                        builder.delegate(this);
                    }
                };
            } else {
                throw new IllegalStateException(String.format("Unknown type of CombineFn: %s", typedFn.getClass()));
            }
            final TupleTag hot = new TupleTag();
            final TupleTag cold = new TupleTag();
            PCollectionTuple split = (PCollectionTuple)input.apply("AddNonce", ParDo.of(new DoFn<KV<K, InputT>, KV<K, InputT>>(){
                transient int nonce;

                @DoFn.StartBundle
                public void startBundle() {
                    this.nonce = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE);
                }

                @DoFn.ProcessElement
                public void processElement(@DoFn.Element KV<K, InputT> kv, DoFn.MultiOutputReceiver receiver) {
                    int spread = (Integer)hotKeyFanout.apply(kv.getKey());
                    if (spread <= 1) {
                        receiver.get(cold).output(kv);
                    } else {
                        receiver.get(hot).output(KV.of(KV.of(kv.getKey(), this.nonce % spread), kv.getValue()));
                    }
                }
            }).withOutputTags(cold, TupleTagList.of(hot)));
            WindowingStrategy<?, ?> preCombineStrategy = input.getWindowingStrategy();
            if (preCombineStrategy.getMode() == WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) {
                preCombineStrategy = preCombineStrategy.withMode(WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES);
            }
            PCollection precombinedHot = ((PCollection)((PCollection)((PCollection)split.get(hot).setCoder(KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()), inputCoder.getValueCoder())).setWindowingStrategyInternal(preCombineStrategy).apply("PreCombineHot", Combine.perKey(hotPreCombine, this.fnDisplayData))).apply("StripNonce", MapElements.via(new SimpleFunction<KV<KV<K, Integer>, AccumT>, KV<K, InputOrAccum<InputT, AccumT>>>(){

                @Override
                public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<KV<K, Integer>, AccumT> elem) {
                    return KV.of(elem.getKey().getKey(), InputOrAccum.accum(elem.getValue()));
                }
            }))).setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder)).apply(Window.remerge())).setWindowingStrategyInternal(input.getWindowingStrategy());
            PCollection preprocessedCold = ((PCollection)split.get(cold).setCoder(inputCoder).apply("PrepareCold", MapElements.via(new SimpleFunction<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>(){

                @Override
                public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<K, InputT> element) {
                    return KV.of(element.getKey(), InputOrAccum.input(element.getValue()));
                }
            }))).setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder));
            return (PCollection)((PCollection)PCollectionList.of(precombinedHot).and(preprocessedCold).apply(Flatten.pCollections())).apply("PostCombine", Combine.perKey(postCombine, this.fnDisplayData));
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            Combine.populateDisplayData(builder, this.fn, this.fnDisplayData);
            if (this.hotKeyFanout instanceof HasDisplayData) {
                builder.include("hotKeyFanout", (HasDisplayData)((Object)this.hotKeyFanout));
            }
            builder.add(DisplayData.item("fanoutFn", this.hotKeyFanout.getClass()).withLabel("Fanout Function"));
        }

        private static class InputOrAccum<InputT, AccumT> {
            @Nullable
            public final InputT input;
            @Nullable
            public final AccumT accum;

            private InputOrAccum(@Nullable InputT input, @Nullable AccumT aggr) {
                this.input = input;
                this.accum = aggr;
            }

            public static <InputT, AccumT> InputOrAccum<InputT, AccumT> input(InputT input) {
                return new InputOrAccum<InputT, Object>(input, null);
            }

            public static <InputT, AccumT> InputOrAccum<InputT, AccumT> accum(AccumT aggr) {
                return new InputOrAccum<Object, AccumT>(null, aggr);
            }

            private static class InputOrAccumCoder<InputT, AccumT>
            extends StructuredCoder<InputOrAccum<InputT, AccumT>> {
                private final Coder<InputT> inputCoder;
                private final Coder<AccumT> accumCoder;

                public InputOrAccumCoder(Coder<InputT> inputCoder, Coder<AccumT> accumCoder) {
                    this.inputCoder = inputCoder;
                    this.accumCoder = accumCoder;
                }

                @Override
                public void encode(InputOrAccum<InputT, AccumT> value, OutputStream outStream) throws CoderException, IOException {
                    this.encode(value, outStream, Coder.Context.NESTED);
                }

                @Override
                public void encode(InputOrAccum<InputT, AccumT> value, OutputStream outStream, Coder.Context context) throws CoderException, IOException {
                    if (value.input != null) {
                        outStream.write(0);
                        this.inputCoder.encode(value.input, outStream, context);
                    } else {
                        outStream.write(1);
                        this.accumCoder.encode(value.accum, outStream, context);
                    }
                }

                @Override
                public InputOrAccum<InputT, AccumT> decode(InputStream inStream) throws CoderException, IOException {
                    return this.decode(inStream, Coder.Context.NESTED);
                }

                @Override
                public InputOrAccum<InputT, AccumT> decode(InputStream inStream, Coder.Context context) throws CoderException, IOException {
                    if (inStream.read() == 0) {
                        return InputOrAccum.input(this.inputCoder.decode(inStream, context));
                    }
                    return InputOrAccum.accum(this.accumCoder.decode(inStream, context));
                }

                @Override
                public List<? extends Coder<?>> getCoderArguments() {
                    return ImmutableList.of(this.inputCoder, this.accumCoder);
                }

                @Override
                public void verifyDeterministic() throws Coder.NonDeterministicException {
                    this.inputCoder.verifyDeterministic();
                    this.accumCoder.verifyDeterministic();
                }
            }
        }
    }

    public static class PerKey<K, InputT, OutputT>
    extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
        private final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn;
        private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
        private final boolean fewKeys;
        private final List<PCollectionView<?>> sideInputs;

        private PerKey(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) {
            this.fn = fn;
            this.fnDisplayData = fnDisplayData;
            this.fewKeys = fewKeys;
            this.sideInputs = ImmutableList.of();
        }

        private PerKey(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys, List<PCollectionView<?>> sideInputs) {
            this.fn = fn;
            this.fnDisplayData = fnDisplayData;
            this.fewKeys = fewKeys;
            this.sideInputs = sideInputs;
        }

        @Override
        protected String getKindString() {
            return String.format("Combine.perKey(%s)", NameUtils.approximateSimpleName(this.fn));
        }

        public PerKey<K, InputT, OutputT> withSideInputs(PCollectionView<?> ... sideInputs) {
            return this.withSideInputs(Arrays.asList(sideInputs));
        }

        public PerKey<K, InputT, OutputT> withSideInputs(Iterable<? extends PCollectionView<?>> sideInputs) {
            Preconditions.checkState(this.fn instanceof CombineWithContext.RequiresContextInternal);
            return new PerKey<K, InputT, OutputT>(this.fn, this.fnDisplayData, this.fewKeys, ImmutableList.copyOf(sideInputs));
        }

        public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(SerializableFunction<? super K, Integer> hotKeyFanout) {
            return new PerKeyWithHotKeyFanout(this.fn, this.fnDisplayData, hotKeyFanout);
        }

        public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(final int hotKeyFanout) {
            return new PerKeyWithHotKeyFanout(this.fn, this.fnDisplayData, new SimpleFunction<K, Integer>(){

                @Override
                public void populateDisplayData(DisplayData.Builder builder) {
                    super.populateDisplayData(builder);
                    builder.add(DisplayData.item("fanout", hotKeyFanout).withLabel("Key Fanout Size"));
                }

                @Override
                public Integer apply(K unused) {
                    return hotKeyFanout;
                }
            });
        }

        public CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> getFn() {
            return this.fn;
        }

        public List<PCollectionView<?>> getSideInputs() {
            return this.sideInputs;
        }

        @Override
        public Map<TupleTag<?>, PValue> getAdditionalInputs() {
            return PCollectionViews.toAdditionalInputs(this.sideInputs);
        }

        @Override
        public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
            return (PCollection)((PCollection)input.apply(this.fewKeys ? GroupByKey.createWithFewKeys() : GroupByKey.create())).apply(Combine.groupedValues(this.fn, this.fnDisplayData).withSideInputs(this.sideInputs));
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            Combine.populateDisplayData(builder, this.fn, this.fnDisplayData);
        }
    }

    @Deprecated
    public static class SimpleCombineFn<V>
    extends IterableCombineFn<V> {
        public static <V> SimpleCombineFn<V> of(SerializableFunction<Iterable<V>, V> combiner) {
            return new SimpleCombineFn<V>(combiner);
        }

        protected SimpleCombineFn(SerializableFunction<Iterable<V>, V> combiner) {
            super(combiner, 20);
        }
    }

    public static class IterableCombineFn<V>
    extends CombineFn<V, List<V>, V>
    implements NameUtils.NameOverride {
        private static final int DEFAULT_BUFFER_SIZE = 20;
        private final SerializableFunction<Iterable<V>, V> combiner;
        private final int bufferSize;

        public static <V> IterableCombineFn<V> of(SerializableFunction<Iterable<V>, V> combiner) {
            return IterableCombineFn.of(combiner, 20);
        }

        public static <V> IterableCombineFn<V> of(SerializableFunction<Iterable<V>, V> combiner, int bufferSize) {
            return new IterableCombineFn<V>(combiner, bufferSize);
        }

        private IterableCombineFn(SerializableFunction<Iterable<V>, V> combiner, int bufferSize) {
            this.combiner = combiner;
            this.bufferSize = bufferSize;
        }

        @Override
        public List<V> createAccumulator() {
            return new ArrayList();
        }

        @Override
        public List<V> addInput(List<V> accumulator, V input) {
            accumulator.add(input);
            if (accumulator.size() > this.bufferSize) {
                return this.mergeToSingleton(accumulator);
            }
            return accumulator;
        }

        @Override
        public List<V> mergeAccumulators(Iterable<List<V>> accumulators) {
            return this.mergeToSingleton(Iterables.concat(accumulators));
        }

        @Override
        public V extractOutput(List<V> accumulator) {
            return this.combiner.apply(accumulator);
        }

        @Override
        public List<V> compact(List<V> accumulator) {
            return accumulator.size() > 1 ? this.mergeToSingleton(accumulator) : accumulator;
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("combineFn", this.combiner.getClass()).withLabel("Combiner"));
        }

        private List<V> mergeToSingleton(Iterable<V> values) {
            ArrayList<V> singleton = new ArrayList<V>();
            singleton.add(this.combiner.apply(values));
            return singleton;
        }

        @Override
        public String getNameOverride() {
            return NameUtils.approximateSimpleName(this.combiner);
        }
    }

    public static class GloballyAsSingletonView<InputT, OutputT>
    extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
        private final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn;
        private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
        private final boolean insertDefault;
        private final int fanout;

        private GloballyAsSingletonView(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout) {
            this.fn = fn;
            this.fnDisplayData = fnDisplayData;
            this.insertDefault = insertDefault;
            this.fanout = fanout;
        }

        @Override
        public PCollectionView<OutputT> expand(PCollection<InputT> input) {
            PCollection combined = (PCollection)input.apply(Combine.globally(this.fn).withoutDefaults().withFanout(this.fanout));
            PCollection materializationInput = (PCollection)combined.apply(new View.VoidKeyToMultimapMaterialization());
            PCollectionView<Object> view = PCollectionViews.singletonView(materializationInput, input.getWindowingStrategy(), this.insertDefault, this.insertDefault ? (Object)this.fn.defaultValue() : null, combined.getCoder());
            materializationInput.apply(View.CreatePCollectionView.of(view));
            return view;
        }

        public int getFanout() {
            return this.fanout;
        }

        public boolean getInsertDefault() {
            return this.insertDefault;
        }

        public CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> getCombineFn() {
            return this.fn;
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            Combine.populateDisplayData(builder, this.fn, this.fnDisplayData);
            Combine.populateGlobalDisplayData(builder, this.fanout, this.insertDefault);
        }
    }

    public static class Globally<InputT, OutputT>
    extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
        private final CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn;
        private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
        private final boolean insertDefault;
        private final int fanout;
        private final List<PCollectionView<?>> sideInputs;

        private Globally(CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean insertDefault, int fanout, List<PCollectionView<?>> sideInputs) {
            this.fn = fn;
            this.fnDisplayData = fnDisplayData;
            this.insertDefault = insertDefault;
            this.fanout = fanout;
            this.sideInputs = sideInputs;
        }

        @Override
        protected String getKindString() {
            return String.format("Combine.globally(%s)", NameUtils.approximateSimpleName(this.fn));
        }

        public GloballyAsSingletonView<InputT, OutputT> asSingletonView() {
            return new GloballyAsSingletonView(this.fn, this.fnDisplayData, this.insertDefault, this.fanout);
        }

        public Globally<InputT, OutputT> withoutDefaults() {
            return new Globally<InputT, OutputT>(this.fn, this.fnDisplayData, false, this.fanout, this.sideInputs);
        }

        public Globally<InputT, OutputT> withFanout(int fanout) {
            return new Globally<InputT, OutputT>(this.fn, this.fnDisplayData, this.insertDefault, fanout, this.sideInputs);
        }

        public Globally<InputT, OutputT> withSideInputs(PCollectionView<?> ... sideInputs) {
            return this.withSideInputs(Arrays.asList(sideInputs));
        }

        public Globally<InputT, OutputT> withSideInputs(Iterable<? extends PCollectionView<?>> sideInputs) {
            Preconditions.checkState(this.fn instanceof CombineWithContext.RequiresContextInternal);
            return new Globally<InputT, OutputT>(this.fn, this.fnDisplayData, this.insertDefault, this.fanout, ImmutableList.copyOf(sideInputs));
        }

        public CombineFnBase.GlobalCombineFn<? super InputT, ?, OutputT> getFn() {
            return this.fn;
        }

        public List<PCollectionView<?>> getSideInputs() {
            return this.sideInputs;
        }

        @Override
        public Map<TupleTag<?>, PValue> getAdditionalInputs() {
            return PCollectionViews.toAdditionalInputs(this.sideInputs);
        }

        public boolean isInsertDefault() {
            return this.insertDefault;
        }

        @Override
        public PCollection<OutputT> expand(PCollection<InputT> input) {
            PCollection withKeys = ((PCollection)input.apply(WithKeys.of((Void)null))).setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
            PerKey combine = Combine.fewKeys(this.fn, this.fnDisplayData);
            if (!this.sideInputs.isEmpty()) {
                combine = combine.withSideInputs(this.sideInputs);
            }
            PCollection combined = this.fanout >= 2 ? (PCollection)withKeys.apply(combine.withHotKeyFanout(this.fanout)) : (PCollection)withKeys.apply(combine);
            PCollection output = (PCollection)combined.apply(Values.create());
            if (this.insertDefault) {
                if (!output.getWindowingStrategy().getWindowFn().isCompatible(new GlobalWindows())) {
                    throw new IllegalStateException(this.fn.getIncompatibleGlobalWindowErrorMessage());
                }
                return this.insertDefaultValueIfEmpty(output);
            }
            return output;
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            Combine.populateDisplayData(builder, this.fn, this.fnDisplayData);
            Combine.populateGlobalDisplayData(builder, this.fanout, this.insertDefault);
        }

        private PCollection<OutputT> insertDefaultValueIfEmpty(PCollection<OutputT> maybeEmpty) {
            final PCollectionView maybeEmptyView = (PCollectionView)maybeEmpty.apply(View.asIterable());
            final OutputT defaultValue = this.fn.defaultValue();
            PCollection<OutputT> defaultIfEmpty = ((PCollection)((PCollection)maybeEmpty.getPipeline().apply("CreateVoid", Create.of(null, new Void[0]).withCoder(VoidCoder.of()))).apply("ProduceDefault", ParDo.of(new DoFn<Void, OutputT>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c) {
                    Iterator combined = ((Iterable)c.sideInput(maybeEmptyView)).iterator();
                    if (!combined.hasNext()) {
                        c.output(defaultValue);
                    }
                }
            }).withSideInputs(maybeEmptyView))).setCoder(maybeEmpty.getCoder()).setWindowingStrategyInternal(maybeEmpty.getWindowingStrategy());
            return (PCollection)PCollectionList.of(maybeEmpty).and(defaultIfEmpty).apply(Flatten.pCollections());
        }
    }

    public static abstract class AccumulatingCombineFn<InputT, AccumT extends Accumulator<InputT, AccumT, OutputT>, OutputT>
    extends CombineFn<InputT, AccumT, OutputT> {
        @Override
        public final AccumT addInput(AccumT accumulator, InputT input) {
            accumulator.addInput(input);
            return accumulator;
        }

        @Override
        public final AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
            Accumulator accumulator = (Accumulator)this.createAccumulator();
            for (Accumulator partial : accumulators) {
                accumulator.mergeAccumulator(partial);
            }
            return (AccumT)accumulator;
        }

        @Override
        public final OutputT extractOutput(AccumT accumulator) {
            return accumulator.extractOutput();
        }

        public static interface Accumulator<InputT, AccumT, OutputT> {
            public void addInput(InputT var1);

            public void mergeAccumulator(AccumT var1);

            public OutputT extractOutput();
        }
    }

    public static abstract class BinaryCombineDoubleFn
    extends CombineFn<Double, double[], Double> {
        public abstract double apply(double var1, double var3);

        public abstract double identity();

        @Override
        public double[] createAccumulator() {
            return BinaryCombineDoubleFn.wrap(this.identity());
        }

        @Override
        public double[] addInput(double[] accumulator, Double input) {
            accumulator[0] = this.apply(accumulator[0], input);
            return accumulator;
        }

        @Override
        public double[] mergeAccumulators(Iterable<double[]> accumulators) {
            Iterator<double[]> iter = accumulators.iterator();
            if (!iter.hasNext()) {
                return this.createAccumulator();
            }
            double[] running = iter.next();
            while (iter.hasNext()) {
                running[0] = this.apply(running[0], iter.next()[0]);
            }
            return running;
        }

        @Override
        public Double extractOutput(double[] accumulator) {
            return accumulator[0];
        }

        @Override
        public Coder<double[]> getAccumulatorCoder(CoderRegistry registry, Coder<Double> inputCoder) {
            return DelegateCoder.of(inputCoder, new ToDoubleCodingFunction(), new FromDoubleCodingFunction());
        }

        @Override
        public Coder<Double> getDefaultOutputCoder(CoderRegistry registry, Coder<Double> inputCoder) {
            return inputCoder;
        }

        private static double[] wrap(double value) {
            return new double[]{value};
        }

        private static final class FromDoubleCodingFunction
        implements DelegateCoder.CodingFunction<Double, double[]> {
            private FromDoubleCodingFunction() {
            }

            @Override
            public double[] apply(Double value) {
                return BinaryCombineDoubleFn.wrap(value);
            }

            public boolean equals(Object o) {
                return o instanceof FromDoubleCodingFunction;
            }

            public int hashCode() {
                return this.getClass().hashCode();
            }
        }

        private static final class ToDoubleCodingFunction
        implements DelegateCoder.CodingFunction<double[], Double> {
            private ToDoubleCodingFunction() {
            }

            @Override
            public Double apply(double[] accumulator) {
                return accumulator[0];
            }

            public boolean equals(Object o) {
                return o instanceof ToDoubleCodingFunction;
            }

            public int hashCode() {
                return this.getClass().hashCode();
            }
        }
    }

    public static abstract class BinaryCombineLongFn
    extends CombineFn<Long, long[], Long> {
        public abstract long apply(long var1, long var3);

        public abstract long identity();

        @Override
        public long[] createAccumulator() {
            return BinaryCombineLongFn.wrap(this.identity());
        }

        @Override
        public long[] addInput(long[] accumulator, Long input) {
            accumulator[0] = this.apply(accumulator[0], input);
            return accumulator;
        }

        @Override
        public long[] mergeAccumulators(Iterable<long[]> accumulators) {
            Iterator<long[]> iter = accumulators.iterator();
            if (!iter.hasNext()) {
                return this.createAccumulator();
            }
            long[] running = iter.next();
            while (iter.hasNext()) {
                running[0] = this.apply(running[0], iter.next()[0]);
            }
            return running;
        }

        @Override
        public Long extractOutput(long[] accumulator) {
            return accumulator[0];
        }

        @Override
        public Coder<long[]> getAccumulatorCoder(CoderRegistry registry, Coder<Long> inputCoder) {
            return DelegateCoder.of(inputCoder, new ToLongCodingFunction(), new FromLongCodingFunction());
        }

        @Override
        public Coder<Long> getDefaultOutputCoder(CoderRegistry registry, Coder<Long> inputCoder) {
            return inputCoder;
        }

        private static long[] wrap(long value) {
            return new long[]{value};
        }

        private static final class FromLongCodingFunction
        implements DelegateCoder.CodingFunction<Long, long[]> {
            private FromLongCodingFunction() {
            }

            @Override
            public long[] apply(Long value) {
                return BinaryCombineLongFn.wrap(value);
            }

            public boolean equals(Object o) {
                return o instanceof FromLongCodingFunction;
            }

            public int hashCode() {
                return this.getClass().hashCode();
            }
        }

        private static final class ToLongCodingFunction
        implements DelegateCoder.CodingFunction<long[], Long> {
            private ToLongCodingFunction() {
            }

            @Override
            public Long apply(long[] accumulator) {
                return accumulator[0];
            }

            public boolean equals(Object o) {
                return o instanceof ToLongCodingFunction;
            }

            public int hashCode() {
                return this.getClass().hashCode();
            }
        }
    }

    public static abstract class BinaryCombineIntegerFn
    extends CombineFn<Integer, int[], Integer> {
        public abstract int apply(int var1, int var2);

        public abstract int identity();

        @Override
        public int[] createAccumulator() {
            return BinaryCombineIntegerFn.wrap(this.identity());
        }

        @Override
        public int[] addInput(int[] accumulator, Integer input) {
            accumulator[0] = this.apply(accumulator[0], input);
            return accumulator;
        }

        @Override
        public int[] mergeAccumulators(Iterable<int[]> accumulators) {
            Iterator<int[]> iter = accumulators.iterator();
            if (!iter.hasNext()) {
                return this.createAccumulator();
            }
            int[] running = iter.next();
            while (iter.hasNext()) {
                running[0] = this.apply(running[0], iter.next()[0]);
            }
            return running;
        }

        @Override
        public Integer extractOutput(int[] accumulator) {
            return accumulator[0];
        }

        @Override
        public Coder<int[]> getAccumulatorCoder(CoderRegistry registry, Coder<Integer> inputCoder) {
            return DelegateCoder.of(inputCoder, new ToIntegerCodingFunction(), new FromIntegerCodingFunction());
        }

        @Override
        public Coder<Integer> getDefaultOutputCoder(CoderRegistry registry, Coder<Integer> inputCoder) {
            return inputCoder;
        }

        private static int[] wrap(int value) {
            return new int[]{value};
        }

        private static final class FromIntegerCodingFunction
        implements DelegateCoder.CodingFunction<Integer, int[]> {
            private FromIntegerCodingFunction() {
            }

            @Override
            public int[] apply(Integer value) {
                return BinaryCombineIntegerFn.wrap(value);
            }

            public boolean equals(Object o) {
                return o instanceof FromIntegerCodingFunction;
            }

            public int hashCode() {
                return this.getClass().hashCode();
            }
        }

        private static final class ToIntegerCodingFunction
        implements DelegateCoder.CodingFunction<int[], Integer> {
            private ToIntegerCodingFunction() {
            }

            @Override
            public Integer apply(int[] accumulator) {
                return accumulator[0];
            }

            public boolean equals(Object o) {
                return o instanceof ToIntegerCodingFunction;
            }

            public int hashCode() {
                return this.getClass().hashCode();
            }
        }
    }

    private static class HolderCoder<V>
    extends StructuredCoder<Holder<V>> {
        private Coder<V> valueCoder;

        public HolderCoder(Coder<V> valueCoder) {
            this.valueCoder = valueCoder;
        }

        @Override
        public void encode(Holder<V> accumulator, OutputStream outStream) throws CoderException, IOException {
            this.encode(accumulator, outStream, Coder.Context.NESTED);
        }

        @Override
        public void encode(Holder<V> accumulator, OutputStream outStream, Coder.Context context) throws CoderException, IOException {
            if (((Holder)accumulator).present) {
                outStream.write(1);
                this.valueCoder.encode(((Holder)accumulator).value, outStream, context);
            } else {
                outStream.write(0);
            }
        }

        @Override
        public Holder<V> decode(InputStream inStream) throws CoderException, IOException {
            return this.decode(inStream, Coder.Context.NESTED);
        }

        @Override
        public Holder<V> decode(InputStream inStream, Coder.Context context) throws CoderException, IOException {
            if (inStream.read() == 1) {
                return new Holder(this.valueCoder.decode(inStream, context));
            }
            return new Holder();
        }

        @Override
        public List<? extends Coder<?>> getCoderArguments() {
            return Collections.singletonList(this.valueCoder);
        }

        @Override
        public void verifyDeterministic() throws Coder.NonDeterministicException {
            this.valueCoder.verifyDeterministic();
        }
    }

    public static class Holder<V> {
        @Nullable
        private V value;
        private boolean present;

        private Holder() {
        }

        private Holder(V value) {
            this.set(value);
        }

        private void set(V value) {
            this.present = true;
            this.value = value;
        }
    }

    public static abstract class BinaryCombineFn<V>
    extends CombineFn<V, Holder<V>, V> {
        public static <V> BinaryCombineFn<V> of(final SerializableBiFunction<V, V, V> combiner) {
            return new BinaryCombineFn<V>(){

                @Override
                public V apply(V left, V right) {
                    return combiner.apply(left, right);
                }
            };
        }

        public abstract V apply(V var1, V var2);

        @Nullable
        public V identity() {
            return null;
        }

        @Override
        public Holder<V> createAccumulator() {
            return new Holder();
        }

        @Override
        public Holder<V> addInput(Holder<V> accumulator, V input) {
            if (((Holder)accumulator).present) {
                ((Holder)accumulator).set(this.apply(((Holder)accumulator).value, input));
            } else {
                ((Holder)accumulator).set(input);
            }
            return accumulator;
        }

        @Override
        public Holder<V> mergeAccumulators(Iterable<Holder<V>> accumulators) {
            Iterator<Holder<V>> iter = accumulators.iterator();
            if (!iter.hasNext()) {
                return this.createAccumulator();
            }
            Holder<V> running = iter.next();
            while (iter.hasNext()) {
                Holder<V> accum = iter.next();
                if (!((Holder)accum).present) continue;
                if (((Holder)running).present) {
                    ((Holder)running).set(this.apply(((Holder)running).value, ((Holder)accum).value));
                    continue;
                }
                ((Holder)running).set(((Holder)accum).value);
            }
            return running;
        }

        @Override
        public V extractOutput(Holder<V> accumulator) {
            if (((Holder)accumulator).present) {
                return (V)((Holder)accumulator).value;
            }
            return this.identity();
        }

        @Override
        public Coder<Holder<V>> getAccumulatorCoder(CoderRegistry registry, Coder<V> inputCoder) {
            return new HolderCoder<V>(inputCoder);
        }

        @Override
        public Coder<V> getDefaultOutputCoder(CoderRegistry registry, Coder<V> inputCoder) {
            return inputCoder;
        }
    }

    public static abstract class CombineFn<InputT, AccumT, OutputT>
    extends CombineFnBase.AbstractGlobalCombineFn<InputT, AccumT, OutputT> {
        public abstract AccumT createAccumulator();

        public abstract AccumT addInput(AccumT var1, InputT var2);

        public abstract AccumT mergeAccumulators(Iterable<AccumT> var1);

        public abstract OutputT extractOutput(AccumT var1);

        public AccumT compact(AccumT accumulator) {
            return accumulator;
        }

        public OutputT apply(Iterable<? extends InputT> inputs) {
            AccumT accum = this.createAccumulator();
            for (InputT input : inputs) {
                accum = this.addInput(accum, input);
            }
            return this.extractOutput(accum);
        }

        @Override
        public OutputT defaultValue() {
            return this.extractOutput(this.createAccumulator());
        }

        public TypeDescriptor<OutputT> getOutputType() {
            return new TypeDescriptor<OutputT>(this.getClass()){};
        }

        public TypeDescriptor<InputT> getInputType() {
            return new TypeDescriptor<InputT>(this.getClass()){};
        }
    }
}

