/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.nexmark.sources;

import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.sources.generator.Generator;
import org.apache.beam.sdk.nexmark.sources.generator.GeneratorCheckpoint;
import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnboundedEventSource
extends UnboundedSource<Event, GeneratorCheckpoint> {
    private static final Duration BACKLOG_PERIOD = Duration.standardSeconds((long)30L);
    private static final Logger LOG = LoggerFactory.getLogger(UnboundedEventSource.class);
    private final GeneratorConfig config;
    private final int numEventGenerators;
    private final long watermarkHoldbackSec;
    private final boolean isRateLimited;

    public UnboundedEventSource(GeneratorConfig config, int numEventGenerators, long watermarkHoldbackSec, boolean isRateLimited) {
        this.config = config;
        this.numEventGenerators = numEventGenerators;
        this.watermarkHoldbackSec = watermarkHoldbackSec;
        this.isRateLimited = isRateLimited;
    }

    public Coder<GeneratorCheckpoint> getCheckpointMarkCoder() {
        return GeneratorCheckpoint.CODER_INSTANCE;
    }

    public List<UnboundedEventSource> split(int desiredNumSplits, PipelineOptions options) {
        LOG.trace("splitting unbounded source into {} sub-sources", (Object)this.numEventGenerators);
        ArrayList<UnboundedEventSource> results = new ArrayList<UnboundedEventSource>();
        for (GeneratorConfig subConfig : this.config.split(this.numEventGenerators)) {
            results.add(new UnboundedEventSource(subConfig, 1, this.watermarkHoldbackSec, this.isRateLimited));
        }
        return results;
    }

    public EventReader createReader(PipelineOptions options, @Nullable GeneratorCheckpoint checkpoint) {
        if (checkpoint == null) {
            LOG.trace("creating initial unbounded reader for {}", (Object)this.config);
            return new EventReader(this.config);
        }
        LOG.trace("resuming unbounded reader from {}", (Object)checkpoint);
        return new EventReader(checkpoint.toGenerator(this.config));
    }

    public void validate() {
    }

    public Coder<Event> getDefaultOutputCoder() {
        return Event.CODER;
    }

    public String toString() {
        return String.format("UnboundedEventSource(%d, %d)", this.config.getStartEventId(), this.config.getStopEventId());
    }

    private class EventReader
    extends UnboundedSource.UnboundedReader<Event> {
        private final Generator generator;
        private long watermark;
        private long backlogDurationMs;
        @Nullable
        private Long backlogBytes;
        private long lastReportedBacklogWallclock;
        private long timestampAtLastReportedBacklogMs;
        @Nullable
        private TimestampedValue<Event> pendingEvent;
        private long pendingEventWallclockTime;
        @Nullable
        private TimestampedValue<Event> currentEvent;
        private final Queue<Generator.NextEvent> heldBackEvents = new PriorityQueue<Generator.NextEvent>();

        public EventReader(Generator generator) {
            this.generator = generator;
            this.watermark = NexmarkUtils.BEGINNING_OF_TIME.getMillis();
            this.lastReportedBacklogWallclock = -1L;
            this.pendingEventWallclockTime = -1L;
            this.timestampAtLastReportedBacklogMs = -1L;
        }

        public EventReader(GeneratorConfig config) {
            this(new Generator(config));
        }

        public boolean start() {
            LOG.trace("starting unbounded generator {}", (Object)this.generator);
            return this.advance();
        }

        public boolean advance() {
            long now = System.currentTimeMillis();
            while (this.pendingEvent == null) {
                if (!this.generator.hasNext() && this.heldBackEvents.isEmpty()) {
                    if (UnboundedEventSource.this.isRateLimited) {
                        this.updateBacklog(System.currentTimeMillis(), 0L);
                    }
                    if (this.watermark < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
                        this.watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
                        LOG.trace("stopped unbounded generator {}", (Object)this.generator);
                    }
                    return false;
                }
                Generator.NextEvent next = this.heldBackEvents.peek();
                if (next != null && next.wallclockTimestamp <= now) {
                    this.heldBackEvents.poll();
                    LOG.debug("replaying held-back event {}ms behind watermark", (Object)(this.watermark - next.eventTimestamp));
                } else if (this.generator.hasNext()) {
                    next = this.generator.nextEvent();
                    if (UnboundedEventSource.this.isRateLimited && UnboundedEventSource.this.config.getProbDelayedEvent() > 0.0 && UnboundedEventSource.this.config.getOccasionalDelaySec() > 0L && ThreadLocalRandom.current().nextDouble() < UnboundedEventSource.this.config.getProbDelayedEvent()) {
                        long delayMs = ThreadLocalRandom.current().nextLong(UnboundedEventSource.this.config.getOccasionalDelaySec() * 1000L) + 1L;
                        LOG.debug("delaying event by {}ms", (Object)delayMs);
                        this.heldBackEvents.add(next.withDelay(delayMs));
                        continue;
                    }
                } else {
                    if (UnboundedEventSource.this.isRateLimited) {
                        this.updateBacklog(now, 0L);
                    }
                    return false;
                }
                this.pendingEventWallclockTime = next.wallclockTimestamp;
                this.pendingEvent = TimestampedValue.of((Object)next.event, (Instant)new Instant(next.eventTimestamp));
                long newWatermark = next.watermark - Duration.standardSeconds((long)UnboundedEventSource.this.watermarkHoldbackSec).getMillis();
                if (newWatermark <= this.watermark) continue;
                this.watermark = newWatermark;
            }
            if (UnboundedEventSource.this.isRateLimited) {
                if (this.pendingEventWallclockTime > now) {
                    this.updateBacklog(now, 0L);
                    return false;
                }
                this.updateBacklog(now, now - this.pendingEventWallclockTime);
            }
            this.currentEvent = this.pendingEvent;
            this.pendingEvent = null;
            return true;
        }

        private void updateBacklog(long now, long newBacklogDurationMs) {
            this.backlogDurationMs = newBacklogDurationMs;
            long interEventDelayUs = this.generator.currentInterEventDelayUs();
            if (interEventDelayUs != 0L) {
                long backlogEvents = (this.backlogDurationMs * 1000L + interEventDelayUs - 1L) / interEventDelayUs;
                this.backlogBytes = this.generator.getCurrentConfig().estimatedBytesForEvents(backlogEvents);
            }
            if (this.lastReportedBacklogWallclock < 0L || now - this.lastReportedBacklogWallclock > BACKLOG_PERIOD.getMillis()) {
                double timeDialation = Double.NaN;
                if (this.pendingEvent != null && this.lastReportedBacklogWallclock >= 0L && this.timestampAtLastReportedBacklogMs >= 0L) {
                    long wallclockProgressionMs = now - this.lastReportedBacklogWallclock;
                    long eventTimeProgressionMs = this.pendingEvent.getTimestamp().getMillis() - this.timestampAtLastReportedBacklogMs;
                    timeDialation = (double)eventTimeProgressionMs / (double)wallclockProgressionMs;
                }
                LOG.debug("unbounded generator backlog now {}ms ({} bytes) at {}us interEventDelay with {} time dilation", new Object[]{this.backlogDurationMs, this.backlogBytes, interEventDelayUs, timeDialation});
                this.lastReportedBacklogWallclock = now;
                if (this.pendingEvent != null) {
                    this.timestampAtLastReportedBacklogMs = this.pendingEvent.getTimestamp().getMillis();
                }
            }
        }

        public Event getCurrent() {
            if (this.currentEvent == null) {
                throw new NoSuchElementException();
            }
            return (Event)this.currentEvent.getValue();
        }

        public Instant getCurrentTimestamp() {
            if (this.currentEvent == null) {
                throw new NoSuchElementException();
            }
            return this.currentEvent.getTimestamp();
        }

        public void close() {
        }

        public UnboundedEventSource getCurrentSource() {
            return UnboundedEventSource.this;
        }

        public Instant getWatermark() {
            return new Instant(this.watermark);
        }

        public GeneratorCheckpoint getCheckpointMark() {
            return this.generator.toCheckpoint();
        }

        public long getSplitBacklogBytes() {
            return this.backlogBytes == null ? -1L : this.backlogBytes;
        }

        public String toString() {
            return String.format("EventReader(%d, %d, %d)", this.generator.getCurrentConfig().getStartEventId(), this.generator.getNextEventId(), this.generator.getCurrentConfig().getStopEventId());
        }
    }
}

