/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.nexmark.queries;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.AuctionBid;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.queries.AbstractSimulator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Instant;

public class WinningBidsSimulator
extends AbstractSimulator<Event, AuctionBid> {
    private final Map<Long, Auction> openAuctions = new TreeMap<Long, Auction>();
    private final Set<Long> closedAuctions = new TreeSet<Long>();
    private final Map<Long, Bid> bestBids = new TreeMap<Long, Bid>();
    private final List<Bid> bidsWithoutAuctions = new ArrayList<Bid>();
    private long lastTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis();

    public WinningBidsSimulator(NexmarkConfiguration configuration) {
        super(NexmarkUtils.standardEventIterator(configuration));
    }

    private boolean captureBestBid(Bid bid, boolean shouldLog) {
        if (this.closedAuctions.contains(bid.auction)) {
            if (shouldLog) {
                NexmarkUtils.info("closed auction: %s", bid);
            }
            return true;
        }
        Auction auction = this.openAuctions.get(bid.auction);
        if (auction == null) {
            if (shouldLog) {
                NexmarkUtils.info("pending auction: %s", bid);
            }
            return false;
        }
        if (bid.price < auction.reserve) {
            if (shouldLog) {
                NexmarkUtils.info("below reserve: %s", bid);
            }
            return true;
        }
        Bid existingBid = this.bestBids.get(bid.auction);
        if (existingBid == null || Bid.PRICE_THEN_DESCENDING_TIME.compare(existingBid, bid) < 0) {
            this.bestBids.put(bid.auction, bid);
            if (shouldLog) {
                NexmarkUtils.info("new winning bid: %s", bid);
            }
        } else if (shouldLog) {
            NexmarkUtils.info("ignoring low bid: %s", bid);
        }
        return true;
    }

    private void flushBidsWithoutAuctions() {
        Iterator<Bid> itr = this.bidsWithoutAuctions.iterator();
        while (itr.hasNext()) {
            Bid bid = itr.next();
            if (!this.captureBestBid(bid, false)) continue;
            NexmarkUtils.info("bid now accounted for: %s", bid);
            itr.remove();
        }
    }

    @Nullable
    private TimestampedValue<AuctionBid> nextWinningBid(long timestamp) {
        TreeMap<Long, List> toBeRetired = new TreeMap<Long, List>();
        for (Map.Entry<Long, Auction> entry : this.openAuctions.entrySet()) {
            if (entry.getValue().expires > timestamp) continue;
            List idsAtTime = toBeRetired.computeIfAbsent(entry.getValue().expires, k -> new ArrayList());
            idsAtTime.add(entry.getKey());
        }
        for (Map.Entry<Long, Auction> entry : toBeRetired.entrySet()) {
            Iterator iterator = ((List)((Object)entry.getValue())).iterator();
            while (iterator.hasNext()) {
                long id = (Long)iterator.next();
                Auction auction = this.openAuctions.get(id);
                NexmarkUtils.info("retiring auction: %s", auction);
                this.openAuctions.remove(id);
                Bid bestBid = this.bestBids.get(id);
                if (bestBid == null) continue;
                TimestampedValue result = TimestampedValue.of((Object)new AuctionBid(auction, bestBid), (Instant)new Instant(auction.expires));
                NexmarkUtils.info("winning: %s", result);
                return result;
            }
        }
        return null;
    }

    @Override
    protected void run() {
        TimestampedValue timestampedEvent;
        if (this.lastTimestamp > BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()) {
            this.flushBidsWithoutAuctions();
            TimestampedValue<AuctionBid> result = this.nextWinningBid(this.lastTimestamp);
            if (result != null) {
                this.addResult(result);
                return;
            }
        }
        if ((timestampedEvent = this.nextInput()) == null) {
            TimestampedValue<AuctionBid> result = this.nextWinningBid(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
            if (result == null) {
                this.allDone();
                return;
            }
            this.addResult(result);
            return;
        }
        Event event = (Event)timestampedEvent.getValue();
        if (event.newPerson != null) {
            return;
        }
        this.lastTimestamp = timestampedEvent.getTimestamp().getMillis();
        if (event.newAuction != null) {
            this.openAuctions.put(event.newAuction.id, event.newAuction);
        } else if (!this.captureBestBid(event.bid, true)) {
            NexmarkUtils.info("bid not yet accounted for: %s", event.bid);
            this.bidsWithoutAuctions.add(event.bid);
        }
    }
}

