/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.nexmark.queries;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;

public abstract class AbstractSimulator<InputT, OutputT> {
    private static final Duration WINDOW_SIZE = Duration.standardMinutes((long)1L);
    private final Iterator<TimestampedValue<InputT>> input;
    private boolean isDone;
    private final List<TimestampedValue<OutputT>> pendingResults;
    private long currentWindow;
    private long currentCount;
    private final List<Long> pendingCounts;

    public AbstractSimulator(Iterator<TimestampedValue<InputT>> input) {
        this.input = input;
        this.isDone = false;
        this.pendingResults = new ArrayList<TimestampedValue<OutputT>>();
        this.currentWindow = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();
        this.currentCount = 0L;
        this.pendingCounts = new ArrayList<Long>();
    }

    @Nullable
    TimestampedValue<InputT> nextInput() {
        if (!this.input.hasNext()) {
            return null;
        }
        TimestampedValue<InputT> timestampedInput = this.input.next();
        NexmarkUtils.info("input: %s", timestampedInput);
        return timestampedInput;
    }

    void addIntermediateResult(TimestampedValue<OutputT> result) {
        NexmarkUtils.info("intermediate result: %s", result);
        this.updateCounts(result.getTimestamp());
    }

    void addResult(TimestampedValue<OutputT> result) {
        NexmarkUtils.info("result: %s", result);
        this.pendingResults.add(result);
        this.updateCounts(result.getTimestamp());
    }

    private void updateCounts(Instant timestamp) {
        long window = timestamp.getMillis() - timestamp.getMillis() % WINDOW_SIZE.getMillis();
        if (window > this.currentWindow) {
            if (this.currentWindow > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
                this.pendingCounts.add(this.currentCount);
            }
            this.currentCount = 0L;
            this.currentWindow = window;
        }
        ++this.currentCount;
    }

    void allDone() {
        this.isDone = true;
    }

    protected abstract void run();

    public Iterator<TimestampedValue<OutputT>> results() {
        return new Iterator<TimestampedValue<OutputT>>(){

            @Override
            public boolean hasNext() {
                while (AbstractSimulator.this.pendingResults.isEmpty()) {
                    if (AbstractSimulator.this.isDone) {
                        return false;
                    }
                    AbstractSimulator.this.run();
                }
                return true;
            }

            @Override
            public TimestampedValue<OutputT> next() {
                TimestampedValue result = (TimestampedValue)AbstractSimulator.this.pendingResults.get(0);
                AbstractSimulator.this.pendingResults.remove(0);
                return result;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    public Iterator<Long> resultsPerWindow() {
        return new Iterator<Long>(){

            @Override
            public boolean hasNext() {
                while (AbstractSimulator.this.pendingCounts.isEmpty()) {
                    if (AbstractSimulator.this.isDone) {
                        if (AbstractSimulator.this.currentCount > 0L) {
                            AbstractSimulator.this.pendingCounts.add(AbstractSimulator.this.currentCount);
                            AbstractSimulator.this.currentCount = 0L;
                            AbstractSimulator.this.currentWindow = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
                            return true;
                        }
                        return false;
                    }
                    AbstractSimulator.this.run();
                }
                return true;
            }

            @Override
            public Long next() {
                Long result = (Long)AbstractSimulator.this.pendingCounts.get(0);
                AbstractSimulator.this.pendingCounts.remove(0);
                return result;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }
}

