/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.nexmark.sources;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.sources.generator.Generator;
import org.apache.beam.sdk.nexmark.sources.generator.GeneratorConfig;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Instant;

public class BoundedEventSource
extends BoundedSource<Event> {
    private final GeneratorConfig config;
    private final int numEventGenerators;

    public BoundedEventSource(GeneratorConfig config, int numEventGenerators) {
        this.config = config;
        this.numEventGenerators = numEventGenerators;
    }

    public List<BoundedEventSource> split(long desiredBundleSizeBytes, PipelineOptions options) {
        NexmarkUtils.info("slitting bounded source %s into %d sub-sources", this.config, this.numEventGenerators);
        ArrayList<BoundedEventSource> results = new ArrayList<BoundedEventSource>();
        for (GeneratorConfig subConfig : this.config.split(this.numEventGenerators)) {
            results.add(new BoundedEventSource(subConfig, 1));
        }
        return results;
    }

    public long getEstimatedSizeBytes(PipelineOptions options) {
        return this.config.getEstimatedSizeBytes();
    }

    public EventReader createReader(PipelineOptions options) {
        NexmarkUtils.info("creating initial bounded reader for %s", this.config);
        return new EventReader(this, this.config);
    }

    public void validate() {
    }

    public Coder<Event> getDefaultOutputCoder() {
        return Event.CODER;
    }

    private static class EventReader
    extends BoundedSource.BoundedReader<Event> {
        private BoundedEventSource source;
        private final Generator generator;
        private boolean reportedStop;
        @Nullable
        private TimestampedValue<Event> currentEvent;

        public EventReader(BoundedEventSource source, GeneratorConfig config) {
            this.source = source;
            this.generator = new Generator(config);
            this.reportedStop = false;
        }

        public synchronized boolean start() {
            NexmarkUtils.info("starting bounded generator %s", this.generator);
            return this.advance();
        }

        public synchronized boolean advance() {
            if (!this.generator.hasNext()) {
                if (!this.reportedStop) {
                    this.reportedStop = true;
                    NexmarkUtils.info("stopped bounded generator %s", this.generator);
                }
                return false;
            }
            this.currentEvent = this.generator.next();
            return true;
        }

        public synchronized Event getCurrent() throws NoSuchElementException {
            if (this.currentEvent == null) {
                throw new NoSuchElementException();
            }
            return (Event)this.currentEvent.getValue();
        }

        public synchronized Instant getCurrentTimestamp() throws NoSuchElementException {
            if (this.currentEvent == null) {
                throw new NoSuchElementException();
            }
            return this.currentEvent.getTimestamp();
        }

        public void close() throws IOException {
        }

        public synchronized Double getFractionConsumed() {
            return this.generator.getFractionConsumed();
        }

        public synchronized BoundedSource<Event> getCurrentSource() {
            return this.source;
        }

        @Nullable
        public synchronized BoundedEventSource splitAtFraction(double fraction) {
            long stopId;
            long size;
            long startId = this.generator.getCurrentConfig().getStartEventId();
            long splitEventId = startId + Math.min((long)((int)((double)(size = (stopId = this.generator.getCurrentConfig().getStopEventId()) - startId) * fraction)), size);
            if (splitEventId <= this.generator.getNextEventId() || splitEventId == stopId) {
                NexmarkUtils.info("split failed for bounded generator %s at %f", this.generator, fraction);
                return null;
            }
            NexmarkUtils.info("about to split bounded generator %s at %d", this.generator, splitEventId);
            GeneratorConfig remainingConfig = this.generator.splitAtEventId(splitEventId);
            NexmarkUtils.info("split bounded generator into %s and %s", this.generator, remainingConfig);
            this.source = new BoundedEventSource(this.generator.getCurrentConfig(), this.source.numEventGenerators);
            return new BoundedEventSource(remainingConfig, this.source.numEventGenerators);
        }
    }
}

