/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.transforms;

import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class WithTimestamps<T>
extends PTransform<PCollection<T>, PCollection<T>> {
    private final SerializableFunction<T, Instant> fn;
    private final Duration allowedTimestampSkew;

    public static <T> WithTimestamps<T> of(SerializableFunction<T, Instant> fn) {
        return new WithTimestamps<T>(fn, Duration.ZERO);
    }

    private WithTimestamps(SerializableFunction<T, Instant> fn, Duration allowedTimestampSkew) {
        this.fn = Preconditions.checkNotNull(fn, "WithTimestamps fn cannot be null");
        this.allowedTimestampSkew = allowedTimestampSkew;
    }

    @Deprecated
    public WithTimestamps<T> withAllowedTimestampSkew(Duration allowedTimestampSkew) {
        return new WithTimestamps<T>(this.fn, allowedTimestampSkew);
    }

    @Deprecated
    public Duration getAllowedTimestampSkew() {
        return this.allowedTimestampSkew;
    }

    @Override
    public PCollection<T> expand(PCollection<T> input) {
        return (PCollection)input.apply("AddTimestamps", ParDo.of(new AddTimestampsDoFn<T>(this.fn, this.allowedTimestampSkew)));
    }

    private static class AddTimestampsDoFn<T>
    extends DoFn<T, T> {
        private final SerializableFunction<T, Instant> fn;
        private final Duration allowedTimestampSkew;

        public AddTimestampsDoFn(SerializableFunction<T, Instant> fn, Duration allowedTimestampSkew) {
            this.fn = fn;
            this.allowedTimestampSkew = allowedTimestampSkew;
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element T element, DoFn.OutputReceiver<T> r) {
            Instant timestamp = this.fn.apply(element);
            Preconditions.checkNotNull(timestamp, "Timestamps for WithTimestamps cannot be null. Timestamp provided by %s.", this.fn);
            r.outputWithTimestamp(element, timestamp);
        }

        @Override
        public Duration getAllowedTimestampSkew() {
            return this.allowedTimestampSkew;
        }
    }
}

