/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupIntoBatches<K, InputT>
extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> {
    private final long batchSize;

    private GroupIntoBatches(long batchSize) {
        this.batchSize = batchSize;
    }

    public static <K, InputT> GroupIntoBatches<K, InputT> ofSize(long batchSize) {
        return new GroupIntoBatches<K, InputT>(batchSize);
    }

    @Override
    public PCollection<KV<K, Iterable<InputT>>> expand(PCollection<KV<K, InputT>> input) {
        Duration allowedLateness = input.getWindowingStrategy().getAllowedLateness();
        Preconditions.checkArgument(input.getCoder() instanceof KvCoder, "coder specified in the input PCollection is not a KvCoder");
        KvCoder inputCoder = (KvCoder)input.getCoder();
        Coder<?> keyCoder = inputCoder.getCoderArguments().get(0);
        Coder<?> valueCoder = inputCoder.getCoderArguments().get(1);
        return (PCollection)input.apply(ParDo.of(new GroupIntoBatchesDoFn(this.batchSize, allowedLateness, keyCoder, valueCoder)));
    }

    @VisibleForTesting
    static class GroupIntoBatchesDoFn<K, InputT>
    extends DoFn<KV<K, InputT>, KV<K, Iterable<InputT>>> {
        private static final Logger LOG = LoggerFactory.getLogger(GroupIntoBatchesDoFn.class);
        private static final String END_OF_WINDOW_ID = "endOFWindow";
        private static final String BATCH_ID = "batch";
        private static final String NUM_ELEMENTS_IN_BATCH_ID = "numElementsInBatch";
        private static final String KEY_ID = "key";
        private final long batchSize;
        private final Duration allowedLateness;
        @DoFn.TimerId(value="endOFWindow")
        private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
        @DoFn.StateId(value="batch")
        private final StateSpec<BagState<InputT>> batchSpec;
        @DoFn.StateId(value="numElementsInBatch")
        private final StateSpec<CombiningState<Long, long[], Long>> numElementsInBatchSpec;
        @DoFn.StateId(value="key")
        private final StateSpec<ValueState<K>> keySpec;
        private final long prefetchFrequency;

        GroupIntoBatchesDoFn(long batchSize, Duration allowedLateness, Coder<K> inputKeyCoder, Coder<InputT> inputValueCoder) {
            this.batchSize = batchSize;
            this.allowedLateness = allowedLateness;
            this.batchSpec = StateSpecs.bag(inputValueCoder);
            this.numElementsInBatchSpec = StateSpecs.combining(new Combine.BinaryCombineLongFn(){

                @Override
                public long identity() {
                    return 0L;
                }

                @Override
                public long apply(long left, long right) {
                    return left + right;
                }
            });
            this.keySpec = StateSpecs.value(inputKeyCoder);
            this.prefetchFrequency = batchSize / 5L <= 1L ? Long.MAX_VALUE : batchSize / 5L;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.TimerId(value="endOFWindow") Timer timer, @DoFn.StateId(value="batch") BagState<InputT> batch, @DoFn.StateId(value="numElementsInBatch") CombiningState<Long, long[], Long> numElementsInBatch, @DoFn.StateId(value="key") ValueState<K> key, @DoFn.Element KV<K, InputT> element, BoundedWindow window, DoFn.OutputReceiver<KV<K, Iterable<InputT>>> receiver) {
            Instant windowExpires = window.maxTimestamp().plus(this.allowedLateness);
            LOG.debug("*** SET TIMER *** to point in time {} for window {}", (Object)windowExpires.toString(), (Object)window.toString());
            timer.set(windowExpires);
            key.write(element.getKey());
            batch.add(element.getValue());
            LOG.debug("*** BATCH *** Add element for window {} ", (Object)window.toString());
            numElementsInBatch.add(1L);
            Long num = numElementsInBatch.read();
            if (num % this.prefetchFrequency == 0L) {
                batch.readLater();
            }
            if (num >= this.batchSize) {
                LOG.debug("*** END OF BATCH *** for window {}", (Object)window.toString());
                this.flushBatch(receiver, key, batch, numElementsInBatch);
            }
        }

        @DoFn.OnTimer(value="endOFWindow")
        public void onTimerCallback(DoFn.OutputReceiver<KV<K, Iterable<InputT>>> receiver, @DoFn.Timestamp Instant timestamp, @DoFn.StateId(value="key") ValueState<K> key, @DoFn.StateId(value="batch") BagState<InputT> batch, @DoFn.StateId(value="numElementsInBatch") CombiningState<Long, long[], Long> numElementsInBatch, BoundedWindow window) {
            LOG.debug("*** END OF WINDOW *** for timer timestamp {} in windows {}", (Object)timestamp, (Object)window.toString());
            this.flushBatch(receiver, key, batch, numElementsInBatch);
        }

        private void flushBatch(DoFn.OutputReceiver<KV<K, Iterable<InputT>>> receiver, ValueState<K> key, BagState<InputT> batch, CombiningState<Long, long[], Long> numElementsInBatch) {
            Object values = batch.read();
            if (!Iterables.isEmpty(values)) {
                receiver.output(KV.of(key.read(), values));
            }
            batch.clear();
            LOG.debug("*** BATCH *** clear");
            numElementsInBatch.clear();
        }
    }
}

