/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.nexmark;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import org.apache.beam.repackaged.beam_sdks_java_nexmark.com.google.common.base.Strings;
import org.apache.beam.repackaged.beam_sdks_java_nexmark.com.google.common.hash.Hashing;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.NexmarkOptions;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.AuctionBid;
import org.apache.beam.sdk.nexmark.model.AuctionCount;
import org.apache.beam.sdk.nexmark.model.AuctionPrice;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.BidsPerSession;
import org.apache.beam.sdk.nexmark.model.CategoryPrice;
import org.apache.beam.sdk.nexmark.model.Done;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.model.IdNameReserve;
import org.apache.beam.sdk.nexmark.model.KnownSize;
import org.apache.beam.sdk.nexmark.model.NameCityStateId;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.nexmark.model.SellerPrice;
import org.apache.beam.sdk.nexmark.sources.BoundedEventSource;
import org.apache.beam.sdk.nexmark.sources.UnboundedEventSource;
import org.apache.beam.sdk.nexmark.sources.generator.Generator;
import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NexmarkUtils {
    private static final Logger LOG = LoggerFactory.getLogger(NexmarkUtils.class);
    public static final ObjectMapper MAPPER = new ObjectMapper();
    private static final boolean LOG_INFO = false;
    private static final boolean LOG_TO_CONSOLE = false;
    public static final String PUBSUB_TIMESTAMP = "timestamp";
    public static final String PUBSUB_ID = "id";
    private static final long BASE_TIME = Instant.parse((String)"2015-07-15T00:00:00.000Z").getMillis();
    public static final Instant BEGINNING_OF_TIME = new Instant(0L).plus((ReadableDuration)Duration.standardDays((long)365L));
    public static final Instant END_OF_TIME = BoundedWindow.TIMESTAMP_MAX_VALUE.minus((ReadableDuration)Duration.standardDays((long)365L));
    private static final long MASK = 65535L;
    private static final long HASH = 2611923443488327891L;
    private static final long INIT_PLAINTEXT = 50000L;
    private static final int MAX_BUFFER_SIZE = 0x1000000;

    static String tableSpec(NexmarkOptions options, String queryName, long now, String version) {
        String baseTableName = options.getBigQueryTable();
        if (Strings.isNullOrEmpty(baseTableName)) {
            throw new RuntimeException("Missing --bigQueryTable");
        }
        switch (options.getResourceNameMode()) {
            case VERBATIM: {
                return String.format("%s:%s.%s_%s", options.getProject(), options.getBigQueryDataset(), baseTableName, version);
            }
            case QUERY: {
                return String.format("%s:%s.%s_%s_%s", options.getProject(), options.getBigQueryDataset(), baseTableName, queryName, version);
            }
            case QUERY_AND_SALT: {
                return String.format("%s:%s.%s_%s_%s_%d", options.getProject(), options.getBigQueryDataset(), baseTableName, queryName, version, now);
            }
            case QUERY_RUNNER_AND_MODE: {
                return version != null ? String.format("%s:%s.%s_%s_%s_%s_%s", options.getProject(), options.getBigQueryDataset(), baseTableName, queryName, options.getRunner().getSimpleName(), options.isStreaming() ? "streaming" : "batch", version) : String.format("%s:%s.%s_%s_%s_%s", options.getProject(), options.getBigQueryDataset(), baseTableName, queryName, options.getRunner().getSimpleName(), options.isStreaming() ? "streaming" : "batch");
            }
        }
        throw new RuntimeException("Unrecognized enum " + (Object)((Object)options.getResourceNameMode()));
    }

    public static void info(String format, Object ... args) {
    }

    public static void console(String format, Object ... args) {
        System.out.printf("%s %s%n", Instant.now(), String.format(format, args));
    }

    public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) {
        CoderRegistry registry = p.getCoderRegistry();
        switch (coderStrategy) {
            case HAND: {
                registry.registerCoderForClass(Auction.class, Auction.CODER);
                registry.registerCoderForClass(AuctionBid.class, AuctionBid.CODER);
                registry.registerCoderForClass(AuctionCount.class, AuctionCount.CODER);
                registry.registerCoderForClass(AuctionPrice.class, AuctionPrice.CODER);
                registry.registerCoderForClass(Bid.class, Bid.CODER);
                registry.registerCoderForClass(CategoryPrice.class, CategoryPrice.CODER);
                registry.registerCoderForClass(Event.class, Event.CODER);
                registry.registerCoderForClass(IdNameReserve.class, IdNameReserve.CODER);
                registry.registerCoderForClass(NameCityStateId.class, NameCityStateId.CODER);
                registry.registerCoderForClass(Person.class, Person.CODER);
                registry.registerCoderForClass(SellerPrice.class, SellerPrice.CODER);
                registry.registerCoderForClass(Done.class, Done.CODER);
                registry.registerCoderForClass(BidsPerSession.class, BidsPerSession.CODER);
                break;
            }
            case AVRO: {
                registry.registerCoderProvider(AvroCoder.getCoderProvider());
                break;
            }
            case JAVA: {
                registry.registerCoderProvider(SerializableCoder.getCoderProvider());
            }
        }
    }

    private static GeneratorConfig standardGeneratorConfig(NexmarkConfiguration configuration) {
        return new GeneratorConfig(configuration, configuration.useWallclockEventTime ? System.currentTimeMillis() : BASE_TIME, 0L, configuration.numEvents, 0L);
    }

    public static Iterator<TimestampedValue<Event>> standardEventIterator(NexmarkConfiguration configuration) {
        return new Generator(NexmarkUtils.standardGeneratorConfig(configuration));
    }

    public static PTransform<PBegin, PCollection<Event>> batchEventsSource(NexmarkConfiguration configuration) {
        return Read.from((BoundedSource)new BoundedEventSource(NexmarkUtils.standardGeneratorConfig(configuration), configuration.numEventGenerators));
    }

    public static PTransform<PBegin, PCollection<Event>> streamEventsSource(NexmarkConfiguration configuration) {
        return Read.from((UnboundedSource)new UnboundedEventSource(NexmarkUtils.standardGeneratorConfig(configuration), configuration.numEventGenerators, configuration.watermarkHoldbackSec, configuration.isRateLimited));
    }

    public static ParDo.SingleOutput<Event, Event> snoop(final String name) {
        return ParDo.of((DoFn)new DoFn<Event, Event>(){
            final Counter eventCounter;
            final Counter newPersonCounter;
            final Counter newAuctionCounter;
            final Counter bidCounter;
            final Counter endOfStreamCounter;
            {
                this.eventCounter = Metrics.counter((String)name, (String)"events");
                this.newPersonCounter = Metrics.counter((String)name, (String)"newPersons");
                this.newAuctionCounter = Metrics.counter((String)name, (String)"newAuctions");
                this.bidCounter = Metrics.counter((String)name, (String)"bids");
                this.endOfStreamCounter = Metrics.counter((String)name, (String)"endOfStream");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                this.eventCounter.inc();
                if (((Event)c.element()).newPerson != null) {
                    this.newPersonCounter.inc();
                } else if (((Event)c.element()).newAuction != null) {
                    this.newAuctionCounter.inc();
                } else if (((Event)c.element()).bid != null) {
                    this.bidCounter.inc();
                } else {
                    this.endOfStreamCounter.inc();
                }
                NexmarkUtils.info("%s snooping element %s", name, c.element());
                c.output((Object)((Event)c.element()));
            }
        });
    }

    public static <T> ParDo.SingleOutput<T, Void> devNull(final String name) {
        return ParDo.of((DoFn)new DoFn<T, Void>(){
            final Counter discardedCounterMetric;
            {
                this.discardedCounterMetric = Metrics.counter((String)name, (String)"discarded");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                this.discardedCounterMetric.inc();
            }
        });
    }

    public static <T> ParDo.SingleOutput<T, T> log(final String name) {
        return ParDo.of((DoFn)new DoFn<T, T>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                LOG.info("%s: %s", (Object)name, c.element());
                c.output(c.element());
            }
        });
    }

    public static <T> ParDo.SingleOutput<T, String> format(final String name) {
        return ParDo.of((DoFn)new DoFn<T, String>(){
            final Counter recordCounterMetric;
            {
                this.recordCounterMetric = Metrics.counter((String)name, (String)"records");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                this.recordCounterMetric.inc();
                c.output((Object)c.element().toString());
            }
        });
    }

    public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) {
        return ParDo.of((DoFn)new DoFn<T, TimestampedValue<T>>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                c.output((Object)TimestampedValue.of((Object)c.element(), (Instant)c.timestamp()));
            }
        });
    }

    public static <T> PTransform<PCollection<T>, PCollection<Long>> hash(final long numEvents, String name) {
        return new PTransform<PCollection<T>, PCollection<Long>>(name){

            public PCollection<Long> expand(PCollection<T> input) {
                return (PCollection)((PCollection)((PCollection)input.apply((PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)AfterPane.elementCountAtLeast((int)((int)numEvents))).withAllowedLateness(Duration.standardDays((long)1L)).discardingFiredPanes())).apply(this.name + ".Hash", (PTransform)ParDo.of((DoFn)new DoFn<T, Long>(){

                    @DoFn.ProcessElement
                    public void processElement(DoFn.ProcessContext c) {
                        long hash = Hashing.murmur3_128().newHasher().putLong(c.timestamp().getMillis()).putString(c.element().toString(), StandardCharsets.UTF_8).hash().asLong();
                        c.output((Object)hash);
                    }
                }))).apply((PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new Combine.BinaryCombineFn<Long>(){

                    public Long apply(Long left, Long right) {
                        return left ^ right;
                    }
                }));
            }
        };
    }

    public static <T> ParDo.SingleOutput<T, T> cpuDelay(String name, final long delayMs) {
        return ParDo.of((DoFn)new DoFn<T, T>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                long now = System.currentTimeMillis();
                long end = now + delayMs;
                while (now < end) {
                    long t;
                    long p = 50000L;
                    while (((t = Hashing.murmur3_128().hashLong(p).asLong()) & 0xFFFFL) != 2259L) {
                        ++p;
                    }
                    now = System.currentTimeMillis();
                }
                c.output(c.element());
            }
        });
    }

    public static <T> PTransform<PCollection<T>, PCollection<T>> diskBusy(long bytes) {
        return new DiskBusyTransform(bytes);
    }

    private static <T extends KnownSize> ParDo.SingleOutput<T, KnownSize> castToKnownSize() {
        return ParDo.of((DoFn)new DoFn<T, KnownSize>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                c.output((Object)((KnownSize)c.element()));
            }
        });
    }

    private static <T extends KnownSize> Coder<KnownSize> makeCastingCoder(Coder<T> trueCoder) {
        return new CastingCoder<T>(trueCoder);
    }

    public static <T extends KnownSize> PCollection<KnownSize> castToKnownSize(String name, PCollection<T> elements) {
        return ((PCollection)elements.apply(name + ".Forget", NexmarkUtils.castToKnownSize())).setCoder(NexmarkUtils.makeCastingCoder(elements.getCoder()));
    }

    private NexmarkUtils() {
    }

    private static class CastingCoder<T extends KnownSize>
    extends CustomCoder<KnownSize> {
        private final Coder<T> trueCoder;

        public CastingCoder(Coder<T> trueCoder) {
            this.trueCoder = trueCoder;
        }

        public void encode(KnownSize value, OutputStream outStream) throws CoderException, IOException {
            KnownSize typedValue = value;
            this.trueCoder.encode((Object)typedValue, outStream);
        }

        public KnownSize decode(InputStream inStream) throws CoderException, IOException {
            return (KnownSize)this.trueCoder.decode(inStream);
        }
    }

    private static class DiskBusyTransform<T>
    extends PTransform<PCollection<T>, PCollection<T>> {
        private long bytes;

        private DiskBusyTransform(long bytes) {
            this.bytes = bytes;
        }

        public PCollection<T> expand(PCollection<T> input) {
            PCollection kvCollection = (PCollection)input.apply("diskBusy.keyElements", (PTransform)ParDo.of((DoFn)new DoFn<T, KV<Integer, T>>(){

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext context) {
                    context.output((Object)KV.of((Object)0, (Object)context.element()));
                }
            }));
            PCollection output = (PCollection)kvCollection.apply("diskBusy.generateIO", (PTransform)ParDo.of((DoFn)new DoFn<KV<Integer, T>, T>(){
                private static final String DISK_BUSY = "diskBusy";
                @DoFn.StateId(value="diskBusy")
                private final StateSpec<ValueState<byte[]>> spec = StateSpecs.value((Coder)ByteArrayCoder.of());

                @DoFn.ProcessElement
                public void processElement(DoFn.ProcessContext c, @DoFn.StateId(value="diskBusy") ValueState<byte[]> state) {
                    long remain = bytes;
                    long now = System.currentTimeMillis();
                    while (remain > 0L) {
                        long thisBytes = Math.min(remain, 0x1000000L);
                        remain -= thisBytes;
                        byte[] arr = new byte[(int)thisBytes];
                        int i = 0;
                        while ((long)i < thisBytes) {
                            arr[i] = (byte)now;
                            ++i;
                        }
                        state.write((Object)arr);
                        now = System.currentTimeMillis();
                    }
                    c.output(((KV)c.element()).getValue());
                }
            }));
            return output;
        }
    }

    public static enum RateShape {
        SQUARE,
        SINE;

        private static final int N = 10;

        public long interEventDelayUs(int rate, RateUnit unit, int numGenerators) {
            return unit.rateToPeriodUs(rate) * (long)numGenerators;
        }

        public long[] interEventDelayUs(int firstRate, int nextRate, RateUnit unit, int numGenerators) {
            if (firstRate == nextRate) {
                long[] interEventDelayUs = new long[]{unit.rateToPeriodUs(firstRate) * (long)numGenerators};
                return interEventDelayUs;
            }
            switch (this) {
                case SQUARE: {
                    long[] interEventDelayUs = new long[]{unit.rateToPeriodUs(firstRate) * (long)numGenerators, unit.rateToPeriodUs(nextRate) * (long)numGenerators};
                    return interEventDelayUs;
                }
                case SINE: {
                    double mid = (double)(firstRate + nextRate) / 2.0;
                    double amp = (double)(firstRate - nextRate) / 2.0;
                    long[] interEventDelayUs = new long[10];
                    for (int i = 0; i < 10; ++i) {
                        double r = Math.PI * 2 * (double)i / 10.0;
                        double rate = mid + amp * Math.cos(r);
                        interEventDelayUs[i] = unit.rateToPeriodUs(Math.round(rate)) * (long)numGenerators;
                    }
                    return interEventDelayUs;
                }
            }
            throw new RuntimeException();
        }

        public int stepLengthSec(int ratePeriodSec) {
            int n = 0;
            switch (this) {
                case SQUARE: {
                    n = 2;
                    break;
                }
                case SINE: {
                    n = 10;
                }
            }
            return (ratePeriodSec + n - 1) / n;
        }
    }

    public static enum RateUnit {
        PER_SECOND(1000000L),
        PER_MINUTE(60000000L);

        private final long usPerUnit;

        private RateUnit(long usPerUnit) {
            this.usPerUnit = usPerUnit;
        }

        public long rateToPeriodUs(long rate) {
            return (this.usPerUnit + rate / 2L) / rate;
        }
    }

    public static enum ResourceNameMode {
        VERBATIM,
        QUERY,
        QUERY_AND_SALT,
        QUERY_RUNNER_AND_MODE;

    }

    public static enum CoderStrategy {
        HAND,
        AVRO,
        JAVA;

    }

    public static enum PubSubMode {
        PUBLISH_ONLY,
        SUBSCRIBE_ONLY,
        COMBINED;

    }

    public static enum SinkType {
        COUNT_ONLY,
        DEVNULL,
        PUBSUB,
        KAFKA,
        TEXT,
        AVRO,
        BIGQUERY;

    }

    public static enum SourceType {
        DIRECT,
        AVRO,
        PUBSUB,
        KAFKA;

    }
}

