/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.nexmark;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_sdks_java_nexmark.com.google.common.base.Preconditions;
import org.apache.beam.repackaged.beam_sdks_java_nexmark.com.google.common.base.Strings;
import org.apache.beam.repackaged.beam_sdks_java_nexmark.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_sdks_java_nexmark.com.google.common.collect.Lists;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.nexmark.Monitor;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.NexmarkOptions;
import org.apache.beam.sdk.nexmark.NexmarkPerf;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.PubsubHelper;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.AuctionCount;
import org.apache.beam.sdk.nexmark.model.AuctionPrice;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.model.KnownSize;
import org.apache.beam.sdk.nexmark.model.NameCityStateId;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
import org.apache.beam.sdk.nexmark.queries.NexmarkQueryModel;
import org.apache.beam.sdk.nexmark.queries.Query0;
import org.apache.beam.sdk.nexmark.queries.Query0Model;
import org.apache.beam.sdk.nexmark.queries.Query1;
import org.apache.beam.sdk.nexmark.queries.Query10;
import org.apache.beam.sdk.nexmark.queries.Query11;
import org.apache.beam.sdk.nexmark.queries.Query12;
import org.apache.beam.sdk.nexmark.queries.Query1Model;
import org.apache.beam.sdk.nexmark.queries.Query2;
import org.apache.beam.sdk.nexmark.queries.Query2Model;
import org.apache.beam.sdk.nexmark.queries.Query3;
import org.apache.beam.sdk.nexmark.queries.Query3Model;
import org.apache.beam.sdk.nexmark.queries.Query4;
import org.apache.beam.sdk.nexmark.queries.Query4Model;
import org.apache.beam.sdk.nexmark.queries.Query5;
import org.apache.beam.sdk.nexmark.queries.Query5Model;
import org.apache.beam.sdk.nexmark.queries.Query6;
import org.apache.beam.sdk.nexmark.queries.Query6Model;
import org.apache.beam.sdk.nexmark.queries.Query7;
import org.apache.beam.sdk.nexmark.queries.Query7Model;
import org.apache.beam.sdk.nexmark.queries.Query8;
import org.apache.beam.sdk.nexmark.queries.Query8Model;
import org.apache.beam.sdk.nexmark.queries.Query9;
import org.apache.beam.sdk.nexmark.queries.Query9Model;
import org.apache.beam.sdk.nexmark.queries.sql.NexmarkSqlQuery;
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery0;
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery1;
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery2;
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery3;
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery5;
import org.apache.beam.sdk.nexmark.queries.sql.SqlQuery7;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NexmarkLauncher<OptionT extends NexmarkOptions> {
    private static final Logger LOG = LoggerFactory.getLogger(NexmarkLauncher.class);
    private static final String SQL = "sql";
    private static final int MIN_SAMPLES = 9;
    private static final Duration MIN_WINDOW = Duration.standardMinutes((long)2L);
    private static final Duration PERF_DELAY = Duration.standardSeconds((long)15L);
    private static final Duration DONE_DELAY = Duration.standardMinutes((long)1L);
    private static final Duration STUCK_WARNING_DELAY = Duration.standardMinutes((long)10L);
    private static final Duration STUCK_TERMINATE_DELAY = Duration.standardDays((long)3L);
    private final OptionT options;
    @Nullable
    private NexmarkConfiguration configuration;
    @Nullable
    private Monitor<Event> publisherMonitor;
    @Nullable
    private PipelineResult publisherResult;
    @Nullable
    private PipelineResult mainResult;
    @Nullable
    private String queryName;
    @Nullable
    private String pubsubTopic;
    @Nullable
    private String pubsubSubscription;
    @Nullable
    private PubsubHelper pubsubHelper;
    static final DoFn<Event, byte[]> EVENT_TO_BYTEARRAY = new DoFn<Event, byte[]>(){

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws IOException {
            byte[] encodedEvent = CoderUtils.encodeToByteArray(Event.CODER, (Object)((Event)c.element()));
            c.output((Object)encodedEvent);
        }
    };
    static final DoFn<KV<Long, byte[]>, Event> BYTEARRAY_TO_EVENT = new DoFn<KV<Long, byte[]>, Event>(){

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws IOException {
            byte[] encodedEvent = (byte[])((KV)c.element()).getValue();
            Event event = (Event)CoderUtils.decodeFromByteArray(Event.CODER, (byte[])encodedEvent);
            c.output((Object)event);
        }
    };
    private static final TupleTag<String> MAIN = new TupleTag<String>(){};
    private static final TupleTag<String> SIDE = new TupleTag<String>(){};

    public NexmarkLauncher(OptionT options) {
        this.options = options;
    }

    private boolean isStreaming() {
        return this.options.isStreaming();
    }

    private int maxNumWorkers() {
        return 5;
    }

    private long getCounterMetric(PipelineResult result, String namespace, String name, long defaultValue) {
        MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named((String)namespace, (String)name)).build());
        Iterable counters = metrics.getCounters();
        try {
            MetricResult metricResult = (MetricResult)counters.iterator().next();
            return (Long)metricResult.getAttempted();
        }
        catch (NoSuchElementException e) {
            LOG.error("Failed to get metric {}, from namespace {}", (Object)name, (Object)namespace);
            return defaultValue;
        }
    }

    private long getDistributionMetric(PipelineResult result, String namespace, String name, DistributionType distType, long defaultValue) {
        MetricQueryResults metrics = result.metrics().queryMetrics(MetricsFilter.builder().addNameFilter(MetricNameFilter.named((String)namespace, (String)name)).build());
        Iterable distributions = metrics.getDistributions();
        try {
            MetricResult distributionResult = (MetricResult)distributions.iterator().next();
            switch (distType) {
                case MIN: {
                    return ((DistributionResult)distributionResult.getAttempted()).getMin();
                }
                case MAX: {
                    return ((DistributionResult)distributionResult.getAttempted()).getMax();
                }
            }
            return defaultValue;
        }
        catch (NoSuchElementException e) {
            LOG.error("Failed to get distribution metric {} for namespace {}", (Object)name, (Object)namespace);
            return defaultValue;
        }
    }

    private long getTimestampMetric(long now, long value) {
        if (Math.abs(value - now) > Duration.standardDays((long)10000L).getMillis()) {
            return -1L;
        }
        return value;
    }

    private void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
        int dataEnd;
        int dataStart;
        if (!this.options.isStreaming()) {
            return;
        }
        for (dataStart = 0; dataStart < snapshots.size() && (snapshots.get((int)dataStart).numEvents < 0L || snapshots.get((int)dataStart).numResults < 0L); ++dataStart) {
        }
        for (dataEnd = snapshots.size() - 1; dataEnd > dataStart && !snapshots.get(dataEnd).anyActivity(snapshots.get(dataEnd - 1)); --dataEnd) {
        }
        int numSamples = dataEnd - dataStart + 1;
        if (numSamples < 9) {
            NexmarkUtils.console("%d samples not enough to calculate steady-state event rate", numSamples);
            return;
        }
        int sampleStart = dataStart + numSamples / 3;
        int sampleEnd = dataEnd - numSamples / 3;
        double sampleSec = snapshots.get((int)sampleEnd).secSinceStart - snapshots.get((int)sampleStart).secSinceStart;
        if (sampleSec < (double)MIN_WINDOW.getStandardSeconds()) {
            NexmarkUtils.console("sample of %.1f sec not long enough to calculate steady-state event rate", sampleSec);
            return;
        }
        double sumxx = 0.0;
        double sumxy = 0.0;
        long prevNumEvents = -1L;
        for (int i = sampleStart; i <= sampleEnd; ++i) {
            if (prevNumEvents == snapshots.get((int)i).numEvents) continue;
            double x = snapshots.get((int)i).runtimeSec;
            prevNumEvents = snapshots.get((int)i).numEvents;
            double y = prevNumEvents;
            sumxx += x * x;
            sumxy += x * y;
        }
        double eventsPerSec = sumxy / sumxx;
        NexmarkUtils.console("revising events/sec from %.1f to %.1f", perf.eventsPerSec, eventsPerSec);
        perf.eventsPerSec = eventsPerSec;
    }

    private NexmarkPerf currentPerf(long startMsSinceEpoch, long now, PipelineResult result, List<NexmarkPerf.ProgressSnapshot> snapshots, Monitor<?> eventMonitor, Monitor<?> resultMonitor) {
        NexmarkPerf perf = new NexmarkPerf();
        long numEvents = this.getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".elements", -1L);
        long numEventBytes = this.getCounterMetric(result, eventMonitor.name, eventMonitor.prefix + ".bytes", -1L);
        long eventStart = this.getTimestampMetric(now, this.getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".startTime", DistributionType.MIN, -1L));
        long eventEnd = this.getTimestampMetric(now, this.getDistributionMetric(result, eventMonitor.name, eventMonitor.prefix + ".endTime", DistributionType.MAX, -1L));
        long numResults = this.getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".elements", -1L);
        long numResultBytes = this.getCounterMetric(result, resultMonitor.name, resultMonitor.prefix + ".bytes", -1L);
        long resultStart = this.getTimestampMetric(now, this.getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".startTime", DistributionType.MIN, -1L));
        long resultEnd = this.getTimestampMetric(now, this.getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".endTime", DistributionType.MAX, -1L));
        long timestampStart = this.getTimestampMetric(now, this.getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".startTimestamp", DistributionType.MIN, -1L));
        long timestampEnd = this.getTimestampMetric(now, this.getDistributionMetric(result, resultMonitor.name, resultMonitor.prefix + ".endTimestamp", DistributionType.MAX, -1L));
        long effectiveEnd = -1L;
        if (eventEnd >= 0L && resultEnd >= 0L) {
            effectiveEnd = Math.max(eventEnd, resultEnd);
        } else if (resultEnd >= 0L) {
            effectiveEnd = resultEnd;
        } else if (eventEnd >= 0L) {
            effectiveEnd = eventEnd;
        }
        if (effectiveEnd >= 0L && eventStart >= 0L && effectiveEnd >= eventStart) {
            perf.runtimeSec = (double)(effectiveEnd - eventStart) / 1000.0;
        }
        if (numEvents >= 0L) {
            perf.numEvents = numEvents;
        }
        if (numEvents >= 0L && perf.runtimeSec > 0.0) {
            perf.eventsPerSec = (double)numEvents / perf.runtimeSec;
        }
        if (numEventBytes >= 0L && perf.runtimeSec > 0.0) {
            perf.eventBytesPerSec = (double)numEventBytes / perf.runtimeSec;
        }
        if (numResults >= 0L) {
            perf.numResults = numResults;
        }
        if (numResults >= 0L && perf.runtimeSec > 0.0) {
            perf.resultsPerSec = (double)numResults / perf.runtimeSec;
        }
        if (numResultBytes >= 0L && perf.runtimeSec > 0.0) {
            perf.resultBytesPerSec = (double)numResultBytes / perf.runtimeSec;
        }
        if (eventStart >= 0L) {
            perf.startupDelaySec = (double)(eventStart - startMsSinceEpoch) / 1000.0;
        }
        if (resultStart >= 0L && eventStart >= 0L && resultStart >= eventStart) {
            perf.processingDelaySec = (double)(resultStart - eventStart) / 1000.0;
        }
        if (timestampStart >= 0L && timestampEnd >= 0L && perf.runtimeSec > 0.0) {
            double eventRuntimeSec = (double)(timestampEnd - timestampStart) / 1000.0;
            perf.timeDilation = eventRuntimeSec / perf.runtimeSec;
        }
        if (resultEnd >= 0L) {
            perf.shutdownDelaySec = (double)(now - resultEnd) / 1000.0;
        }
        NexmarkPerf.ProgressSnapshot snapshot = new NexmarkPerf.ProgressSnapshot();
        snapshot.secSinceStart = (double)(now - startMsSinceEpoch) / 1000.0;
        snapshot.runtimeSec = perf.runtimeSec;
        snapshot.numEvents = numEvents;
        snapshot.numResults = numResults;
        snapshots.add(snapshot);
        this.captureSteadyState(perf, snapshots);
        return perf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder<NexmarkOptions> builder) {
        String jobName = this.options.getJobName();
        String appName = this.options.getAppName();
        int numWorkers = this.options.getNumWorkers();
        int maxNumWorkers = this.options.getMaxNumWorkers();
        this.options.setJobName("p-" + jobName);
        this.options.setAppName("p-" + appName);
        int eventGeneratorWorkers = this.configuration.numEventGenerators;
        if (numWorkers > 0 && eventGeneratorWorkers > 0) {
            this.options.setNumWorkers(Math.min(numWorkers, eventGeneratorWorkers));
        }
        if (maxNumWorkers > 0 && eventGeneratorWorkers > 0) {
            this.options.setMaxNumWorkers(Math.min(maxNumWorkers, eventGeneratorWorkers));
        }
        try {
            builder.build((NexmarkOptions)this.options);
        }
        finally {
            this.options.setJobName(jobName);
            this.options.setAppName(appName);
            this.options.setNumWorkers(numWorkers);
            this.options.setMaxNumWorkers(maxNumWorkers);
        }
    }

    @Nullable
    private NexmarkPerf monitor(NexmarkQuery query) {
        if (!this.options.getMonitorJobs()) {
            return null;
        }
        if (this.configuration.debug) {
            NexmarkUtils.console("Waiting for main pipeline to 'finish'", new Object[0]);
        } else {
            NexmarkUtils.console("--debug=false, so job will not self-cancel", new Object[0]);
        }
        PipelineResult job = this.mainResult;
        PipelineResult publisherJob = this.publisherResult;
        ArrayList<NexmarkPerf.ProgressSnapshot> snapshots = new ArrayList<NexmarkPerf.ProgressSnapshot>();
        long startMsSinceEpoch = System.currentTimeMillis();
        long endMsSinceEpoch = -1L;
        if (this.options.getRunningTimeMinutes() != null) {
            endMsSinceEpoch = startMsSinceEpoch + Duration.standardMinutes((long)this.options.getRunningTimeMinutes()).getMillis() - Duration.standardSeconds((long)this.configuration.preloadSeconds).getMillis();
        }
        long lastActivityMsSinceEpoch = -1L;
        NexmarkPerf perf = null;
        boolean waitingForShutdown = false;
        boolean cancelJob = false;
        boolean publisherCancelled = false;
        ArrayList<String> errors = new ArrayList<String>();
        while (true) {
            long now = System.currentTimeMillis();
            if (endMsSinceEpoch >= 0L && now > endMsSinceEpoch && !waitingForShutdown) {
                NexmarkUtils.console("Reached end of test, cancelling job", new Object[0]);
                try {
                    cancelJob = true;
                    job.cancel();
                }
                catch (IOException e) {
                    throw new RuntimeException("Unable to cancel main job: ", e);
                }
                if (this.publisherResult != null) {
                    try {
                        publisherJob.cancel();
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Unable to cancel publisher job: ", e);
                    }
                    publisherCancelled = true;
                }
                waitingForShutdown = true;
            }
            PipelineResult.State state = job.getState();
            NexmarkUtils.console("%s %s%s", state, this.queryName, waitingForShutdown ? " (waiting for shutdown)" : "");
            NexmarkPerf currPerf = this.configuration.debug ? this.currentPerf(startMsSinceEpoch, now, job, snapshots, query.eventMonitor, query.resultMonitor) : null;
            if (perf == null || perf.anyActivity(currPerf)) {
                lastActivityMsSinceEpoch = now;
            }
            if (this.options.isStreaming() && !waitingForShutdown) {
                Duration quietFor = new Duration(lastActivityMsSinceEpoch, now);
                long fatalCount = this.getCounterMetric(job, query.getName(), "fatal", 0L);
                if (fatalCount > 0L) {
                    NexmarkUtils.console("job has fatal errors, cancelling.", new Object[0]);
                    errors.add(String.format("Pipeline reported %s fatal errors", fatalCount));
                    waitingForShutdown = true;
                    cancelJob = true;
                } else if (this.configuration.debug && this.configuration.numEvents > 0L && currPerf.numEvents == this.configuration.numEvents && currPerf.numResults >= 0L && quietFor.isLongerThan((ReadableDuration)DONE_DELAY)) {
                    NexmarkUtils.console("streaming query appears to have finished waiting for completion.", new Object[0]);
                    waitingForShutdown = true;
                } else if (quietFor.isLongerThan((ReadableDuration)STUCK_TERMINATE_DELAY)) {
                    NexmarkUtils.console("streaming query appears to have gotten stuck, cancelling job.", new Object[0]);
                    errors.add("Cancelling streaming job since it appeared stuck");
                    waitingForShutdown = true;
                    cancelJob = true;
                } else if (quietFor.isLongerThan((ReadableDuration)STUCK_WARNING_DELAY)) {
                    NexmarkUtils.console("WARNING: streaming query appears to have been stuck for %d min.", quietFor.getStandardMinutes());
                    errors.add(String.format("Streaming query was stuck for %d min", quietFor.getStandardMinutes()));
                }
                if (cancelJob) {
                    try {
                        job.cancel();
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Unable to cancel main job: ", e);
                    }
                }
            }
            perf = currPerf;
            boolean running = true;
            switch (state) {
                case UNKNOWN: 
                case STOPPED: 
                case RUNNING: {
                    break;
                }
                case DONE: {
                    running = false;
                    break;
                }
                case CANCELLED: {
                    running = false;
                    if (cancelJob) break;
                    errors.add("Job was unexpectedly cancelled");
                    break;
                }
                case FAILED: 
                case UPDATED: {
                    running = false;
                    errors.add("Job was unexpectedly updated");
                }
            }
            if (!running) break;
            if (lastActivityMsSinceEpoch == now) {
                NexmarkUtils.console("new perf %s", perf);
            } else {
                NexmarkUtils.console("no activity", new Object[0]);
            }
            try {
                Thread.sleep(PERF_DELAY.getMillis());
            }
            catch (InterruptedException e) {
                Thread.interrupted();
                NexmarkUtils.console("Interrupted: pipeline is still running", new Object[0]);
            }
        }
        perf.errors = errors;
        perf.snapshots = snapshots;
        if (this.publisherResult != null) {
            NexmarkUtils.console("Shutting down publisher pipeline.", new Object[0]);
            try {
                if (!publisherCancelled) {
                    publisherJob.cancel();
                }
                publisherJob.waitUntilFinish(Duration.standardMinutes((long)5L));
            }
            catch (IOException e) {
                throw new RuntimeException("Unable to cancel publisher job: ", e);
            }
        }
        return perf;
    }

    private String shortTopic(long now) {
        String baseTopic = this.options.getPubsubTopic();
        if (Strings.isNullOrEmpty(baseTopic)) {
            throw new RuntimeException("Missing --pubsubTopic");
        }
        switch (this.options.getResourceNameMode()) {
            case VERBATIM: {
                return baseTopic;
            }
            case QUERY: {
                return String.format("%s_%s_source", baseTopic, this.queryName);
            }
            case QUERY_AND_SALT: {
                return String.format("%s_%s_%d_source", baseTopic, this.queryName, now);
            }
            case QUERY_RUNNER_AND_MODE: {
                return String.format("%s_%s_%s_%s_source", baseTopic, this.queryName, this.options.getRunner().getSimpleName(), this.options.isStreaming());
            }
        }
        throw new RuntimeException("Unrecognized enum " + (Object)((Object)this.options.getResourceNameMode()));
    }

    private String shortSubscription(long now) {
        String baseSubscription = this.options.getPubsubSubscription();
        if (Strings.isNullOrEmpty(baseSubscription)) {
            throw new RuntimeException("Missing --pubsubSubscription");
        }
        switch (this.options.getResourceNameMode()) {
            case VERBATIM: {
                return baseSubscription;
            }
            case QUERY: {
                return String.format("%s_%s_source", baseSubscription, this.queryName);
            }
            case QUERY_AND_SALT: {
                return String.format("%s_%s_%d_source", baseSubscription, this.queryName, now);
            }
            case QUERY_RUNNER_AND_MODE: {
                return String.format("%s_%s_%s_%s_source", baseSubscription, this.queryName, this.options.getRunner().getSimpleName(), this.options.isStreaming());
            }
        }
        throw new RuntimeException("Unrecognized enum " + (Object)((Object)this.options.getResourceNameMode()));
    }

    private String textFilename(long now) {
        String baseFilename = this.options.getOutputPath();
        if (Strings.isNullOrEmpty(baseFilename)) {
            throw new RuntimeException("Missing --outputPath");
        }
        switch (this.options.getResourceNameMode()) {
            case VERBATIM: {
                return baseFilename;
            }
            case QUERY: {
                return String.format("%s/nexmark_%s.txt", baseFilename, this.queryName);
            }
            case QUERY_AND_SALT: {
                return String.format("%s/nexmark_%s_%d.txt", baseFilename, this.queryName, now);
            }
            case QUERY_RUNNER_AND_MODE: {
                return String.format("%s/nexmark_%s_%s_%s", baseFilename, this.queryName, this.options.getRunner().getSimpleName(), this.options.isStreaming());
            }
        }
        throw new RuntimeException("Unrecognized enum " + (Object)((Object)this.options.getResourceNameMode()));
    }

    private String logsDir(long now) {
        String baseFilename = this.options.getOutputPath();
        if (Strings.isNullOrEmpty(baseFilename)) {
            throw new RuntimeException("Missing --outputPath");
        }
        switch (this.options.getResourceNameMode()) {
            case VERBATIM: {
                return baseFilename;
            }
            case QUERY: {
                return String.format("%s/logs_%s", baseFilename, this.queryName);
            }
            case QUERY_AND_SALT: {
                return String.format("%s/logs_%s_%d", baseFilename, this.queryName, now);
            }
            case QUERY_RUNNER_AND_MODE: {
                return String.format("%s/logs_%s_%s_%s", baseFilename, this.queryName, this.options.getRunner().getSimpleName(), this.options.isStreaming());
            }
        }
        throw new RuntimeException("Unrecognized enum " + (Object)((Object)this.options.getResourceNameMode()));
    }

    private PCollection<Event> sourceEventsFromSynthetic(Pipeline p) {
        if (this.isStreaming()) {
            NexmarkUtils.console("Generating %d events in streaming mode", this.configuration.numEvents);
            return (PCollection)p.apply(this.queryName + ".ReadUnbounded", NexmarkUtils.streamEventsSource(this.configuration));
        }
        NexmarkUtils.console("Generating %d events in batch mode", this.configuration.numEvents);
        return (PCollection)p.apply(this.queryName + ".ReadBounded", NexmarkUtils.batchEventsSource(this.configuration));
    }

    private PCollection<Event> sourceEventsFromPubsub(Pipeline p) {
        NexmarkUtils.console("Reading events from Pubsub %s", this.pubsubSubscription);
        PubsubIO.Read io = PubsubIO.readMessagesWithAttributes().fromSubscription(this.pubsubSubscription).withIdAttribute("id");
        if (!this.configuration.usePubsubPublishTime) {
            io = io.withTimestampAttribute("timestamp");
        }
        return (PCollection)((PCollection)p.apply(this.queryName + ".ReadPubsubEvents", (PTransform)io)).apply(this.queryName + ".PubsubMessageToEvent", (PTransform)ParDo.of((DoFn)new PubsubMessageEventDoFn()));
    }

    private void sinkEventsToKafka(PCollection<Event> events) {
        Preconditions.checkArgument(this.options.getBootstrapServers() != null, "Missing --bootstrapServers");
        NexmarkUtils.console("Writing events to Kafka Topic %s", this.options.getKafkaTopic());
        PCollection eventToBytes = (PCollection)events.apply("Event to bytes", (PTransform)ParDo.of(EVENT_TO_BYTEARRAY));
        eventToBytes.apply(KafkaIO.write().withBootstrapServers(this.options.getBootstrapServers()).withTopic(this.options.getKafkaTopic()).withValueSerializer(ByteArraySerializer.class).values());
    }

    private PCollection<Event> sourceEventsFromKafka(Pipeline p, Instant now) {
        Preconditions.checkArgument(this.options.getBootstrapServers() != null, "Missing --bootstrapServers");
        NexmarkUtils.console("Reading events from Kafka Topic %s", this.options.getKafkaTopic());
        KafkaIO.Read read = KafkaIO.read().withBootstrapServers(this.options.getBootstrapServers()).withTopic(this.options.getKafkaTopic()).withKeyDeserializer(LongDeserializer.class).withValueDeserializer(ByteArrayDeserializer.class).withStartReadTime(now).withMaxNumRecords(this.options.getNumEvents() != null ? this.options.getNumEvents() : Long.MAX_VALUE);
        return (PCollection)((PCollection)p.apply(this.queryName + ".ReadKafkaEvents", read.withoutMetadata())).apply(this.queryName + ".KafkaToEvents", (PTransform)ParDo.of(BYTEARRAY_TO_EVENT));
    }

    private PCollection<Event> sourceEventsFromAvro(Pipeline p) {
        String filename = this.options.getInputPath();
        if (Strings.isNullOrEmpty(filename)) {
            throw new RuntimeException("Missing --inputPath");
        }
        NexmarkUtils.console("Reading events from Avro files at %s", filename);
        return (PCollection)((PCollection)p.apply(this.queryName + ".ReadAvroEvents", (PTransform)AvroIO.read(Event.class).from(filename + "*.avro"))).apply("OutputWithTimestamp", NexmarkQuery.EVENT_TIMESTAMP_FROM_DATA);
    }

    private void sinkEventsToPubsub(PCollection<Event> events) {
        Preconditions.checkState(this.pubsubTopic != null, "Pubsub topic needs to be set up before initializing sink");
        NexmarkUtils.console("Writing events to Pubsub %s", this.pubsubTopic);
        PubsubIO.Write io = PubsubIO.writeMessages().to(this.pubsubTopic).withIdAttribute("id");
        if (!this.configuration.usePubsubPublishTime) {
            io = io.withTimestampAttribute("timestamp");
        }
        ((PCollection)events.apply(this.queryName + ".EventToPubsubMessage", (PTransform)ParDo.of((DoFn)new EventPubsubMessageDoFn()))).apply(this.queryName + ".WritePubsubEvents", (PTransform)io);
    }

    private void sinkResultsToKafka(PCollection<String> formattedResults) {
        Preconditions.checkArgument(this.options.getBootstrapServers() != null, "Missing --bootstrapServers");
        NexmarkUtils.console("Writing results to Kafka Topic %s", this.options.getKafkaResultsTopic());
        formattedResults.apply(this.queryName + ".WriteKafkaResults", KafkaIO.write().withBootstrapServers(this.options.getBootstrapServers()).withTopic(this.options.getKafkaResultsTopic()).withValueSerializer(StringSerializer.class).values());
    }

    private void sinkResultsToPubsub(PCollection<String> formattedResults, long now) {
        String shortTopic = this.shortTopic(now);
        NexmarkUtils.console("Writing results to Pubsub %s", shortTopic);
        PubsubIO.Write io = PubsubIO.writeStrings().to(shortTopic).withIdAttribute("id");
        if (!this.configuration.usePubsubPublishTime) {
            io = io.withTimestampAttribute("timestamp");
        }
        formattedResults.apply(this.queryName + ".WritePubsubResults", (PTransform)io);
    }

    private void sinkEventsToAvro(PCollection<Event> source) {
        String filename = this.options.getOutputPath();
        if (Strings.isNullOrEmpty(filename)) {
            throw new RuntimeException("Missing --outputPath");
        }
        NexmarkUtils.console("Writing events to Avro files at %s", filename);
        source.apply(this.queryName + ".WriteAvroEvents", (PTransform)AvroIO.write(Event.class).to(filename + "/event").withSuffix(".avro"));
        ((PCollection)source.apply(NexmarkQuery.JUST_BIDS)).apply(this.queryName + ".WriteAvroBids", (PTransform)AvroIO.write(Bid.class).to(filename + "/bid").withSuffix(".avro"));
        ((PCollection)source.apply(NexmarkQuery.JUST_NEW_AUCTIONS)).apply(this.queryName + ".WriteAvroAuctions", (PTransform)AvroIO.write(Auction.class).to(filename + "/auction").withSuffix(".avro"));
        ((PCollection)source.apply(NexmarkQuery.JUST_NEW_PERSONS)).apply(this.queryName + ".WriteAvroPeople", (PTransform)AvroIO.write(Person.class).to(filename + "/person").withSuffix(".avro"));
    }

    private void sinkResultsToText(PCollection<String> formattedResults, long now) {
        String filename = this.textFilename(now);
        NexmarkUtils.console("Writing results to text files at %s", filename);
        formattedResults.apply(this.queryName + ".WriteTextResults", (PTransform)TextIO.write().to(filename));
    }

    private void sinkResultsToBigQuery(PCollection<String> formattedResults, long now, String version) {
        String tableSpec = NexmarkUtils.tableSpec(this.options, this.queryName, now, version);
        TableSchema tableSchema = new TableSchema().setFields(ImmutableList.of(new TableFieldSchema().setName("result").setType("STRING"), new TableFieldSchema().setName("records").setMode("REPEATED").setType("RECORD").setFields(ImmutableList.of(new TableFieldSchema().setName("index").setType("INTEGER"), new TableFieldSchema().setName("value").setType("STRING")))));
        NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec);
        BigQueryIO.Write io = BigQueryIO.write().to(tableSpec).withSchema(tableSchema).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED).withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
        ((PCollection)formattedResults.apply(this.queryName + ".StringToTableRow", (PTransform)ParDo.of((DoFn)new StringToTableRow()))).apply(this.queryName + ".WriteBigQueryResults", (PTransform)io);
    }

    private void setupPubSubResources(long now) throws IOException {
        String shortTopic = this.shortTopic(now);
        String shortSubscription = this.shortSubscription(now);
        this.pubsubTopic = !this.options.getManageResources() || this.configuration.pubSubMode == NexmarkUtils.PubSubMode.SUBSCRIBE_ONLY ? this.pubsubHelper.reuseTopic(shortTopic).getPath() : this.pubsubHelper.createTopic(shortTopic).getPath();
        if (this.configuration.pubSubMode != NexmarkUtils.PubSubMode.PUBLISH_ONLY) {
            this.pubsubSubscription = this.options.getManageResources() ? this.pubsubHelper.createSubscription(shortTopic, shortSubscription).getPath() : this.pubsubHelper.reuseSubscription(shortTopic, shortSubscription).getPath();
        }
    }

    private PCollection<Event> createSource(Pipeline p, Instant now) throws IOException {
        PCollection<Event> source = null;
        block0 : switch (this.configuration.sourceType) {
            case DIRECT: {
                source = this.sourceEventsFromSynthetic(p);
                break;
            }
            case AVRO: {
                source = this.sourceEventsFromAvro(p);
                break;
            }
            case KAFKA: 
            case PUBSUB: {
                if (this.configuration.sourceType == NexmarkUtils.SourceType.PUBSUB) {
                    this.setupPubSubResources(now.getMillis());
                }
                switch (this.configuration.pubSubMode) {
                    case SUBSCRIBE_ONLY: {
                        break;
                    }
                    case PUBLISH_ONLY: {
                        PCollection events = (PCollection)this.sourceEventsFromSynthetic(p).apply(this.queryName + ".Snoop", NexmarkUtils.snoop(this.queryName));
                        if (this.configuration.sourceType == NexmarkUtils.SourceType.KAFKA) {
                            this.sinkEventsToKafka((PCollection<Event>)events);
                            break;
                        }
                        this.sinkEventsToPubsub((PCollection<Event>)events);
                        break;
                    }
                    case COMBINED: {
                        this.invokeBuilderForPublishOnlyPipeline(publishOnlyOptions -> {
                            Pipeline sp = Pipeline.create((PipelineOptions)publishOnlyOptions);
                            NexmarkUtils.setupPipeline(this.configuration.coderStrategy, sp);
                            this.publisherMonitor = new Monitor(this.queryName, "publisher");
                            PCollection events = (PCollection)this.sourceEventsFromSynthetic(sp).apply(this.queryName + ".Monitor", this.publisherMonitor.getTransform());
                            if (this.configuration.sourceType == NexmarkUtils.SourceType.KAFKA) {
                                this.sinkEventsToKafka((PCollection<Event>)events);
                            } else {
                                this.sinkEventsToPubsub((PCollection<Event>)events);
                            }
                            this.publisherResult = sp.run();
                            NexmarkUtils.console("Publisher job is started.", new Object[0]);
                        });
                    }
                }
                switch (this.configuration.pubSubMode) {
                    case PUBLISH_ONLY: {
                        break block0;
                    }
                    case SUBSCRIBE_ONLY: 
                    case COMBINED: {
                        source = this.configuration.sourceType == NexmarkUtils.SourceType.KAFKA ? this.sourceEventsFromKafka(p, (Instant)(this.configuration.pubSubMode == NexmarkUtils.PubSubMode.COMBINED ? now : null)) : this.sourceEventsFromPubsub(p);
                    }
                }
            }
        }
        return source;
    }

    private void sink(PCollection<TimestampedValue<KnownSize>> results, long now) {
        if (this.configuration.sinkType == NexmarkUtils.SinkType.COUNT_ONLY) {
            results.apply(this.queryName + ".DevNull", NexmarkUtils.devNull(this.queryName));
            return;
        }
        PCollection formattedResults = (PCollection)results.apply(this.queryName + ".Format", NexmarkUtils.format(this.queryName));
        if (this.options.getLogResults()) {
            formattedResults = (PCollection)formattedResults.apply(this.queryName + ".Results.Log", NexmarkUtils.log(this.queryName + ".Results"));
        }
        switch (this.configuration.sinkType) {
            case DEVNULL: {
                formattedResults.apply(this.queryName + ".DevNull", NexmarkUtils.devNull(this.queryName));
                break;
            }
            case PUBSUB: {
                this.sinkResultsToPubsub((PCollection<String>)formattedResults, now);
                break;
            }
            case KAFKA: {
                this.sinkResultsToKafka((PCollection<String>)formattedResults);
                break;
            }
            case TEXT: {
                this.sinkResultsToText((PCollection<String>)formattedResults, now);
                break;
            }
            case AVRO: {
                NexmarkUtils.console("WARNING: with --sinkType=AVRO, actual query results will be discarded.", new Object[0]);
                break;
            }
            case BIGQUERY: {
                PCollectionTuple res = (PCollectionTuple)formattedResults.apply(this.queryName + ".Partition", (PTransform)ParDo.of((DoFn)new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE)));
                this.sinkResultsToBigQuery((PCollection<String>)res.get(MAIN), now, "main");
                this.sinkResultsToBigQuery((PCollection<String>)res.get(SIDE), now, "side");
                this.sinkResultsToBigQuery((PCollection<String>)formattedResults, now, "copy");
                break;
            }
            case COUNT_ONLY: {
                throw new RuntimeException();
            }
        }
    }

    private void modelResultRates(NexmarkQueryModel model) {
        ArrayList<Long> counts = Lists.newArrayList(model.simulator().resultsPerWindow());
        Collections.sort(counts);
        int n = counts.size();
        if (n < 5) {
            NexmarkUtils.console("Query%d: only %d samples", model.configuration.query, n);
        } else {
            NexmarkUtils.console("Query%d: N:%d; min:%d; 1st%%:%d; mean:%d; 3rd%%:%d; max:%d", model.configuration.query, n, counts.get(0), counts.get(n / 4), counts.get(n / 2), counts.get(n - 1 - n / 4), counts.get(n - 1));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    public NexmarkPerf run(NexmarkConfiguration runConfiguration) throws IOException {
        if (this.options.getManageResources() && !this.options.getMonitorJobs()) {
            throw new RuntimeException("If using --manageResources then must also use --monitorJobs.");
        }
        Preconditions.checkState(this.configuration == null);
        Preconditions.checkState(this.queryName == null);
        this.configuration = runConfiguration;
        if (this.configuration.sourceType.equals((Object)NexmarkUtils.SourceType.PUBSUB)) {
            this.pubsubHelper = PubsubHelper.create(this.options);
        }
        try {
            NexmarkUtils.console("Running %s", this.configuration.toShortString());
            if (this.configuration.numEvents < 0L) {
                NexmarkUtils.console("skipping since configuration is disabled", new Object[0]);
                NexmarkPerf nexmarkPerf = null;
                return nexmarkPerf;
            }
            NexmarkQuery query = this.getNexmarkQuery();
            if (query == null) {
                NexmarkUtils.console("skipping since configuration is not implemented", new Object[0]);
                NexmarkPerf nexmarkPerf = null;
                return nexmarkPerf;
            }
            this.queryName = query.getName();
            NexmarkQueryModel model = this.getNexmarkQueryModel();
            if (this.options.getJustModelResultRate()) {
                if (model == null) {
                    throw new RuntimeException(String.format("No model for %s", this.queryName));
                }
                this.modelResultRates(model);
                NexmarkPerf nexmarkPerf = null;
                return nexmarkPerf;
            }
            Instant now = Instant.now();
            Pipeline p = Pipeline.create(this.options);
            NexmarkUtils.setupPipeline(this.configuration.coderStrategy, p);
            PCollection source = this.createSource(p, now);
            if (this.options.getLogEvents()) {
                source = (PCollection)source.apply(this.queryName + ".Events.Log", NexmarkUtils.log(this.queryName + ".Events"));
            }
            if (source != null) {
                if (this.configuration.sinkType == NexmarkUtils.SinkType.AVRO) {
                    this.sinkEventsToAvro(source);
                }
                if (this.configuration.query == 10) {
                    String path = null;
                    if (this.options.getOutputPath() != null && !this.options.getOutputPath().isEmpty()) {
                        path = this.logsDir(now.getMillis());
                    }
                    ((Query10)query).setOutputPath(path);
                    ((Query10)query).setMaxNumWorkers(this.maxNumWorkers());
                }
                PCollection results = (PCollection)source.apply((PTransform)query);
                if (this.options.getAssertCorrectness()) {
                    if (model == null) {
                        throw new RuntimeException(String.format("No model for %s", this.queryName));
                    }
                    results.setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
                    PAssert.that((PCollection)results).satisfies(model.assertionFor());
                }
                this.sink((PCollection<TimestampedValue<KnownSize>>)results, now.getMillis());
            }
            this.mainResult = p.run();
            this.mainResult.waitUntilFinish(Duration.standardSeconds((long)this.configuration.streamTimeout));
            NexmarkPerf nexmarkPerf = this.monitor(query);
            return nexmarkPerf;
        }
        finally {
            if (this.pubsubHelper != null) {
                this.pubsubHelper.cleanup();
                this.pubsubHelper = null;
            }
            this.configuration = null;
            this.queryName = null;
        }
    }

    private boolean isSql() {
        return SQL.equalsIgnoreCase(this.options.getQueryLanguage());
    }

    private NexmarkQueryModel getNexmarkQueryModel() {
        List<NexmarkQueryModel> models = this.createQueryModels();
        return models.get(this.configuration.query);
    }

    private NexmarkQuery getNexmarkQuery() {
        List<NexmarkQuery> queries = this.createQueries();
        if (this.configuration.query >= queries.size()) {
            return null;
        }
        return queries.get(this.configuration.query);
    }

    private List<NexmarkQueryModel> createQueryModels() {
        return this.isSql() ? this.createSqlQueryModels() : this.createJavaQueryModels();
    }

    private List<NexmarkQueryModel> createSqlQueryModels() {
        return Arrays.asList(null, null, null, null, null, null, null, null, null, null, null, null);
    }

    private List<NexmarkQueryModel> createJavaQueryModels() {
        return Arrays.asList(new Query0Model(this.configuration), new Query1Model(this.configuration), new Query2Model(this.configuration), new Query3Model(this.configuration), new Query4Model(this.configuration), new Query5Model(this.configuration), new Query6Model(this.configuration), new Query7Model(this.configuration), new Query8Model(this.configuration), new Query9Model(this.configuration), null, null, null);
    }

    private List<NexmarkQuery> createQueries() {
        return this.isSql() ? this.createSqlQueries() : this.createJavaQueries();
    }

    private List<NexmarkQuery> createSqlQueries() {
        return Arrays.asList(new NexmarkQuery[]{new NexmarkSqlQuery<Bid>(this.configuration, new SqlQuery0()), new NexmarkSqlQuery<Bid>(this.configuration, new SqlQuery1()), new NexmarkSqlQuery<AuctionPrice>(this.configuration, new SqlQuery2(this.configuration.auctionSkip)), new NexmarkSqlQuery<NameCityStateId>(this.configuration, new SqlQuery3(this.configuration)), null, new NexmarkSqlQuery<AuctionCount>(this.configuration, new SqlQuery5(this.configuration)), null, new NexmarkSqlQuery<Bid>(this.configuration, new SqlQuery7(this.configuration))});
    }

    private List<NexmarkQuery> createJavaQueries() {
        return Arrays.asList(new NexmarkQuery[]{new Query0(this.configuration), new Query1(this.configuration), new Query2(this.configuration), new Query3(this.configuration), new Query4(this.configuration), new Query5(this.configuration), new Query6(this.configuration), new Query7(this.configuration), new Query8(this.configuration), new Query9(this.configuration), new Query10(this.configuration), new Query11(this.configuration), new Query12(this.configuration)});
    }

    private static class EventPubsubMessageDoFn
    extends DoFn<Event, PubsubMessage> {
        private EventPubsubMessageDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws IOException {
            byte[] payload = CoderUtils.encodeToByteArray(Event.CODER, (Object)((Event)c.element()));
            c.output((Object)new PubsubMessage(payload, Collections.emptyMap()));
        }
    }

    private static class PubsubMessageEventDoFn
    extends DoFn<PubsubMessage, Event> {
        private PubsubMessageEventDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) throws IOException {
            byte[] payload = ((PubsubMessage)c.element()).getPayload();
            Event event = (Event)CoderUtils.decodeFromByteArray(Event.CODER, (byte[])payload);
            c.output((Object)event);
        }
    }

    private static class PartitionDoFn
    extends DoFn<String, String> {
        private PartitionDoFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            if (((String)c.element()).hashCode() % 2 == 0) {
                c.output((Object)((String)c.element()));
            } else {
                c.output(SIDE, (Object)((String)c.element()));
            }
        }
    }

    private static class StringToTableRow
    extends DoFn<String, TableRow> {
        private StringToTableRow() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            int n = ThreadLocalRandom.current().nextInt(10);
            ArrayList<TableRow> records = new ArrayList<TableRow>(n);
            for (int i = 0; i < n; ++i) {
                records.add(new TableRow().set("index", (Object)i).set("value", (Object)Integer.toString(i)));
            }
            c.output((Object)new TableRow().set("result", c.element()).set("records", records));
        }
    }

    static interface PipelineBuilder<OptionT extends NexmarkOptions> {
        public void build(OptionT var1);
    }

    private static enum DistributionType {
        MIN,
        MAX;

    }
}

