/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.nexmark.queries;

import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Done;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.model.KnownSize;
import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterEach;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Query10
extends NexmarkQuery {
    private static final Logger LOG = LoggerFactory.getLogger(Query10.class);
    private static final int NUM_SHARDS_PER_WORKER = 5;
    private static final Duration LATE_BATCHING_PERIOD = Duration.standardSeconds((long)10L);
    @Nullable
    private String outputPath;
    private int maxNumWorkers;

    public Query10(NexmarkConfiguration configuration) {
        super(configuration, "Query10");
    }

    public void setOutputPath(@Nullable String outputPath) {
        this.outputPath = outputPath;
    }

    public void setMaxNumWorkers(int maxNumWorkers) {
        this.maxNumWorkers = maxNumWorkers;
    }

    private WritableByteChannel openWritableGcsFile(GcsOptions options, String filename) throws IOException {
        throw new UnsupportedOperationException("Disabled after removal of GcsIOChannelFactory");
    }

    private String timingToString(PaneInfo.Timing timing) {
        switch (timing) {
            case EARLY: {
                return "E";
            }
            case ON_TIME: {
                return "O";
            }
            case LATE: {
                return "L";
            }
            case UNKNOWN: {
                return "U";
            }
        }
        throw new RuntimeException();
    }

    private OutputFile outputFileFor(BoundedWindow window, String shard, PaneInfo pane) {
        String filename = this.outputPath == null ? null : String.format("%s/LOG-%s-%s-%03d-%s-%x", this.outputPath, window.maxTimestamp(), shard, pane.getIndex(), this.timingToString(pane.getTiming()), ThreadLocalRandom.current().nextLong());
        return new OutputFile(window.maxTimestamp(), shard, pane.getIndex(), pane.getTiming(), filename);
    }

    @Nullable
    private String indexPathFor(BoundedWindow window) {
        if (this.outputPath == null) {
            return null;
        }
        return String.format("%s/INDEX-%s", this.outputPath, window.maxTimestamp());
    }

    private PCollection<Done> applyTyped(PCollection<Event> events) {
        final int numLogShards = this.maxNumWorkers * 5;
        return (PCollection)((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)events.apply(this.name + ".ShardEvents", (PTransform)ParDo.of((DoFn)new DoFn<Event, KV<String, Event>>(){
            private final Counter lateCounter;
            private final Counter onTimeCounter;
            {
                this.lateCounter = Metrics.counter((String)Query10.this.name, (String)"actuallyLateEvent");
                this.onTimeCounter = Metrics.counter((String)Query10.this.name, (String)"onTimeCounter");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                if (((Event)c.element()).hasAnnotation("LATE")) {
                    this.lateCounter.inc();
                    LOG.info("Observed late: %s", c.element());
                } else {
                    this.onTimeCounter.inc();
                }
                int shardNum = (int)Math.abs((long)((Event)c.element()).hashCode() % (long)numLogShards);
                String shard = String.format("shard-%05d-of-%05d", shardNum, numLogShards);
                c.output((Object)KV.of((Object)shard, (Object)((Event)c.element())));
            }
        }))).apply(this.name + ".WindowEvents", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardSeconds((long)this.configuration.windowSizeSec))).triggering((Trigger)AfterEach.inOrder((Trigger[])new Trigger[]{Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)this.configuration.maxLogEvents)).orFinally((Trigger.OnceTrigger)AfterWatermark.pastEndOfWindow()), Repeatedly.forever((Trigger)AfterFirst.of((Trigger.OnceTrigger[])new Trigger.OnceTrigger[]{AfterPane.elementCountAtLeast((int)this.configuration.maxLogEvents), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(LATE_BATCHING_PERIOD)}))})).discardingFiredPanes().withAllowedLateness(Duration.standardDays((long)1L)))).apply(this.name + ".GroupByKey", (PTransform)GroupByKey.create())).apply(this.name + ".CheckForLateEvents", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, Iterable<Event>>, KV<String, Iterable<Event>>>(){
            private final Counter earlyCounter;
            private final Counter onTimeCounter;
            private final Counter lateCounter;
            private final Counter unexpectedLatePaneCounter;
            private final Counter unexpectedOnTimeElementCounter;
            {
                this.earlyCounter = Metrics.counter((String)Query10.this.name, (String)"earlyShard");
                this.onTimeCounter = Metrics.counter((String)Query10.this.name, (String)"onTimeShard");
                this.lateCounter = Metrics.counter((String)Query10.this.name, (String)"lateShard");
                this.unexpectedLatePaneCounter = Metrics.counter((String)Query10.this.name, (String)"ERROR_unexpectedLatePane");
                this.unexpectedOnTimeElementCounter = Metrics.counter((String)Query10.this.name, (String)"ERROR_unexpectedOnTimeElement");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c, BoundedWindow window) {
                int numLate = 0;
                int numOnTime = 0;
                for (Event event : (Iterable)((KV)c.element()).getValue()) {
                    if (event.hasAnnotation("LATE")) {
                        ++numLate;
                        continue;
                    }
                    ++numOnTime;
                }
                String shard = (String)((KV)c.element()).getKey();
                LOG.info(String.format("%s with timestamp %s has %d actually late and %d on-time elements in pane %s for window %s", shard, c.timestamp(), numLate, numOnTime, c.pane(), window.maxTimestamp()));
                if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
                    if (numLate == 0) {
                        LOG.error("ERROR! No late events in late pane for %s", (Object)shard);
                        this.unexpectedLatePaneCounter.inc();
                    }
                    if (numOnTime > 0) {
                        LOG.error("ERROR! Have %d on-time events in late pane for %s", (Object)numOnTime, (Object)shard);
                        this.unexpectedOnTimeElementCounter.inc();
                    }
                    this.lateCounter.inc();
                } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
                    if (numOnTime + numLate < Query10.this.configuration.maxLogEvents) {
                        LOG.error("ERROR! Only have %d events in early pane for %s", (Object)(numOnTime + numLate), (Object)shard);
                    }
                    this.earlyCounter.inc();
                } else {
                    this.onTimeCounter.inc();
                }
                c.output((Object)((KV)c.element()));
            }
        }))).apply(this.name + ".UploadEvents", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, Iterable<Event>>, KV<Void, OutputFile>>(){
            private final Counter savedFileCounter;
            private final Counter writtenRecordsCounter;
            {
                this.savedFileCounter = Metrics.counter((String)Query10.this.name, (String)"savedFile");
                this.writtenRecordsCounter = Metrics.counter((String)Query10.this.name, (String)"writtenRecords");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c, BoundedWindow window) throws IOException {
                String shard = (String)((KV)c.element()).getKey();
                GcsOptions options = (GcsOptions)c.getPipelineOptions().as(GcsOptions.class);
                OutputFile outputFile = Query10.this.outputFileFor(window, shard, c.pane());
                LOG.info(String.format("Writing %s with record timestamp %s, window timestamp %s, pane %s", shard, c.timestamp(), window.maxTimestamp(), c.pane()));
                if (outputFile.filename != null) {
                    LOG.info("Beginning write to '%s'", (Object)outputFile.filename);
                    int n = 0;
                    try (OutputStream output = Channels.newOutputStream(Query10.this.openWritableGcsFile(options, outputFile.filename));){
                        for (Event event : (Iterable)((KV)c.element()).getValue()) {
                            Event.CODER.encode((Object)event, output, Coder.Context.OUTER);
                            this.writtenRecordsCounter.inc();
                            if (++n % 10000 != 0) continue;
                            LOG.info("So far written %d records to '%s'", (Object)n, (Object)outputFile.filename);
                        }
                    }
                    LOG.info("Written all %d records to '%s'", (Object)n, (Object)outputFile.filename);
                }
                this.savedFileCounter.inc();
                c.output((Object)KV.of(null, (Object)outputFile));
            }
        }))).apply(this.name + ".WindowLogFiles", (PTransform)Window.into((WindowFn)FixedWindows.of((Duration)Duration.standardSeconds((long)this.configuration.windowSizeSec))).triggering((Trigger)AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.standardDays((long)1L)).discardingFiredPanes())).apply(this.name + ".GroupByKey2", (PTransform)GroupByKey.create())).apply(this.name + ".Index", (PTransform)ParDo.of((DoFn)new DoFn<KV<Void, Iterable<OutputFile>>, Done>(){
            private final Counter unexpectedLateCounter;
            private final Counter unexpectedEarlyCounter;
            private final Counter unexpectedIndexCounter;
            private final Counter finalizedCounter;
            {
                this.unexpectedLateCounter = Metrics.counter((String)Query10.this.name, (String)"ERROR_unexpectedLate");
                this.unexpectedEarlyCounter = Metrics.counter((String)Query10.this.name, (String)"ERROR_unexpectedEarly");
                this.unexpectedIndexCounter = Metrics.counter((String)Query10.this.name, (String)"ERROR_unexpectedIndex");
                this.finalizedCounter = Metrics.counter((String)Query10.this.name, (String)"indexed");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c, BoundedWindow window) throws IOException {
                if (c.pane().getTiming() == PaneInfo.Timing.LATE) {
                    this.unexpectedLateCounter.inc();
                    LOG.error("ERROR! Unexpected LATE pane: %s", (Object)c.pane());
                } else if (c.pane().getTiming() == PaneInfo.Timing.EARLY) {
                    this.unexpectedEarlyCounter.inc();
                    LOG.error("ERROR! Unexpected EARLY pane: %s", (Object)c.pane());
                } else if (c.pane().getTiming() == PaneInfo.Timing.ON_TIME && c.pane().getIndex() != 0L) {
                    this.unexpectedIndexCounter.inc();
                    LOG.error("ERROR! Unexpected ON_TIME pane index: %s", (Object)c.pane());
                } else {
                    GcsOptions options = (GcsOptions)c.getPipelineOptions().as(GcsOptions.class);
                    LOG.info("Index with record timestamp %s, window timestamp %s, pane %s", new Object[]{c.timestamp(), window.maxTimestamp(), c.pane()});
                    String filename = Query10.this.indexPathFor(window);
                    if (filename != null) {
                        LOG.info("Beginning write to '%s'", (Object)filename);
                        int n = 0;
                        try (OutputStream output = Channels.newOutputStream(Query10.this.openWritableGcsFile(options, filename));){
                            for (OutputFile outputFile : (Iterable)((KV)c.element()).getValue()) {
                                output.write(outputFile.toString().getBytes(StandardCharsets.UTF_8));
                                ++n;
                            }
                        }
                        LOG.info("Written all %d lines to '%s'", (Object)n, (Object)filename);
                    }
                    c.output((Object)new Done("written for timestamp " + window.maxTimestamp()));
                    this.finalizedCounter.inc();
                }
            }
        }));
    }

    @Override
    protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
        return NexmarkUtils.castToKnownSize(this.name, this.applyTyped(events));
    }

    private static class OutputFile
    implements Serializable {
        private final Instant maxTimestamp;
        private final String shard;
        private final long index;
        private final PaneInfo.Timing timing;
        @Nullable
        private final String filename;

        public OutputFile(Instant maxTimestamp, String shard, long index, PaneInfo.Timing timing, @Nullable String filename) {
            this.maxTimestamp = maxTimestamp;
            this.shard = shard;
            this.index = index;
            this.timing = timing;
            this.filename = filename;
        }

        public String toString() {
            return String.format("%s %s %d %s %s%n", this.maxTimestamp, this.shard, this.index, this.timing, this.filename);
        }
    }
}

