/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.nexmark.queries;

import com.fasterxml.jackson.annotation.JsonCreator;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import org.apache.beam.repackaged.beam_sdks_java_nexmark.com.google.common.base.Preconditions;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.AuctionBid;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

public class WinningBids
extends PTransform<PCollection<Event>, PCollection<AuctionBid>> {
    private final AuctionOrBidWindowFn auctionOrBidWindowFn;

    public WinningBids(String name, NexmarkConfiguration configuration) {
        super(name);
        long[] interEventDelayUs = configuration.rateShape.interEventDelayUs(configuration.firstEventRate, configuration.nextEventRate, configuration.rateUnit, configuration.numEventGenerators);
        long longestDelayUs = 0L;
        for (long interEventDelayU : interEventDelayUs) {
            longestDelayUs = Math.max(longestDelayUs, interEventDelayU);
        }
        longestDelayUs = longestDelayUs * 50L / 3L;
        long expectedAuctionDurationMs = ((longestDelayUs *= (long)configuration.numInFlightAuctions) + 999L) / 1000L;
        NexmarkUtils.console("Expected auction duration is %d ms", expectedAuctionDurationMs);
        this.auctionOrBidWindowFn = new AuctionOrBidWindowFn(expectedAuctionDurationMs);
    }

    public PCollection<AuctionBid> expand(PCollection<Event> events) {
        events = (PCollection)events.apply("Window", (PTransform)Window.into((WindowFn)this.auctionOrBidWindowFn));
        PCollection auctionsById = (PCollection)((PCollection)events.apply(NexmarkQuery.JUST_NEW_AUCTIONS)).apply("AuctionById:", NexmarkQuery.AUCTION_BY_ID);
        PCollection bidsByAuctionId = (PCollection)((PCollection)events.apply(NexmarkQuery.JUST_BIDS)).apply("BidByAuction", NexmarkQuery.BID_BY_AUCTION);
        return (PCollection)((PCollection)KeyedPCollectionTuple.of(NexmarkQuery.AUCTION_TAG, (PCollection)auctionsById).and(NexmarkQuery.BID_TAG, bidsByAuctionId).apply((PTransform)CoGroupByKey.create())).apply(this.name + ".Join", (PTransform)ParDo.of((DoFn)new DoFn<KV<Long, CoGbkResult>, AuctionBid>(){
            private final Counter noAuctionCounter;
            private final Counter underReserveCounter;
            private final Counter noValidBidsCounter;
            {
                this.noAuctionCounter = Metrics.counter((String)WinningBids.this.name, (String)"noAuction");
                this.underReserveCounter = Metrics.counter((String)WinningBids.this.name, (String)"underReserve");
                this.noValidBidsCounter = Metrics.counter((String)WinningBids.this.name, (String)"noValidBids");
            }

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                Auction auction = (Auction)((CoGbkResult)((KV)c.element()).getValue()).getOnly(NexmarkQuery.AUCTION_TAG, null);
                if (auction == null) {
                    this.noAuctionCounter.inc();
                    return;
                }
                Bid bestBid = null;
                for (Bid bid : ((CoGbkResult)((KV)c.element()).getValue()).getAll(NexmarkQuery.BID_TAG)) {
                    Preconditions.checkState(bid.dateTime < auction.expires);
                    if (bid.price < auction.reserve) {
                        this.underReserveCounter.inc();
                        continue;
                    }
                    if (bestBid != null && Bid.PRICE_THEN_DESCENDING_TIME.compare(bid, bestBid) <= 0) continue;
                    bestBid = bid;
                }
                if (bestBid == null) {
                    this.noValidBidsCounter.inc();
                    return;
                }
                c.output((Object)new AuctionBid(auction, bestBid));
            }
        }));
    }

    public int hashCode() {
        return Objects.hash(new Object[]{this.auctionOrBidWindowFn});
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || ((Object)((Object)this)).getClass() != o.getClass()) {
            return false;
        }
        WinningBids that = (WinningBids)((Object)o);
        return ((Object)((Object)this.auctionOrBidWindowFn)).equals((Object)that.auctionOrBidWindowFn);
    }

    private static class AuctionOrBidWindowFn
    extends WindowFn<Event, AuctionOrBidWindow> {
        private final long expectedAuctionDurationMs;

        public AuctionOrBidWindowFn(long expectedAuctionDurationMs) {
            this.expectedAuctionDurationMs = expectedAuctionDurationMs;
        }

        public Collection<AuctionOrBidWindow> assignWindows(WindowFn.AssignContext c) {
            Event event = (Event)c.element();
            if (event.newAuction != null) {
                return Collections.singletonList(AuctionOrBidWindow.forAuction(c.timestamp(), event.newAuction));
            }
            if (event.bid != null) {
                return Collections.singletonList(AuctionOrBidWindow.forBid(this.expectedAuctionDurationMs, c.timestamp(), event.bid));
            }
            throw new IllegalArgumentException(String.format("%s can only assign windows to auctions and bids, but received %s", ((Object)((Object)this)).getClass().getSimpleName(), c.element()));
        }

        public void mergeWindows(WindowFn.MergeContext c) throws Exception {
            TreeMap<Long, AuctionOrBidWindow> idToTrueAuctionWindow = new TreeMap<Long, AuctionOrBidWindow>();
            TreeMap<Long, List> idToBidAuctionWindows = new TreeMap<Long, List>();
            for (AuctionOrBidWindow auctionOrBidWindow : c.windows()) {
                if (auctionOrBidWindow.isAuctionWindow()) {
                    idToTrueAuctionWindow.put(auctionOrBidWindow.auction, auctionOrBidWindow);
                    continue;
                }
                List bidWindows = idToBidAuctionWindows.computeIfAbsent(auctionOrBidWindow.auction, k -> new ArrayList());
                bidWindows.add(auctionOrBidWindow);
            }
            for (Map.Entry entry : idToTrueAuctionWindow.entrySet()) {
                long auction = (Long)entry.getKey();
                AuctionOrBidWindow auctionWindow = (AuctionOrBidWindow)((Object)entry.getValue());
                List bidWindows = (List)idToBidAuctionWindows.get(auction);
                if (bidWindows == null) continue;
                ArrayList<AuctionOrBidWindow> toBeMerged = new ArrayList<AuctionOrBidWindow>();
                for (AuctionOrBidWindow bidWindow : bidWindows) {
                    if (!bidWindow.start().isBefore((ReadableInstant)auctionWindow.end())) continue;
                    toBeMerged.add(bidWindow);
                }
                if (toBeMerged.isEmpty()) continue;
                toBeMerged.add(auctionWindow);
                c.merge(toBeMerged, (BoundedWindow)auctionWindow);
            }
        }

        public boolean isCompatible(WindowFn<?, ?> other) {
            return other instanceof AuctionOrBidWindowFn;
        }

        public Coder<AuctionOrBidWindow> windowCoder() {
            return AuctionOrBidWindowCoder.of();
        }

        public WindowMappingFn<AuctionOrBidWindow> getDefaultWindowMappingFn() {
            throw new UnsupportedOperationException("AuctionWindowFn not supported for side inputs");
        }

        public Instant getOutputTime(Instant inputTimestamp, AuctionOrBidWindow window) {
            return window.maxTimestamp();
        }
    }

    private static class AuctionOrBidWindowCoder
    extends CustomCoder<AuctionOrBidWindow> {
        private static final AuctionOrBidWindowCoder INSTANCE = new AuctionOrBidWindowCoder();
        private static final Coder<IntervalWindow> SUPER_CODER = IntervalWindow.getCoder();
        private static final Coder<Long> ID_CODER = VarLongCoder.of();
        private static final Coder<Integer> INT_CODER = VarIntCoder.of();

        private AuctionOrBidWindowCoder() {
        }

        @JsonCreator
        public static AuctionOrBidWindowCoder of() {
            return INSTANCE;
        }

        public void encode(AuctionOrBidWindow window, OutputStream outStream) throws IOException, CoderException {
            SUPER_CODER.encode((Object)window, outStream);
            ID_CODER.encode((Object)window.auction, outStream);
            INT_CODER.encode((Object)(window.isAuctionWindow ? 1 : 0), outStream);
        }

        public AuctionOrBidWindow decode(InputStream inStream) throws IOException, CoderException {
            IntervalWindow superWindow = (IntervalWindow)SUPER_CODER.decode(inStream);
            long auction = (Long)ID_CODER.decode(inStream);
            boolean isAuctionWindow = (Integer)INT_CODER.decode(inStream) != 0;
            return new AuctionOrBidWindow(superWindow.start(), superWindow.end(), auction, isAuctionWindow);
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
        }

        public Object structuralValue(AuctionOrBidWindow value) {
            return value;
        }
    }

    private static class AuctionOrBidWindow
    extends IntervalWindow {
        public final long auction;
        public final boolean isAuctionWindow;

        private AuctionOrBidWindow() {
            super(TIMESTAMP_MIN_VALUE, TIMESTAMP_MAX_VALUE);
            this.auction = 0L;
            this.isAuctionWindow = false;
        }

        private AuctionOrBidWindow(Instant start, Instant end, long auctionId, boolean isAuctionWindow) {
            super(start, end);
            this.auction = auctionId;
            this.isAuctionWindow = isAuctionWindow;
        }

        public static AuctionOrBidWindow forAuction(Instant timestamp, Auction auction) {
            return new AuctionOrBidWindow(timestamp, new Instant(auction.expires), auction.id, true);
        }

        public static AuctionOrBidWindow forBid(long expectedAuctionDurationMs, Instant timestamp, Bid bid) {
            return new AuctionOrBidWindow(timestamp, timestamp.plus(expectedAuctionDurationMs * 2L), bid.auction, false);
        }

        public boolean isAuctionWindow() {
            return this.isAuctionWindow;
        }

        public String toString() {
            return String.format("AuctionOrBidWindow{start:%s; end:%s; auction:%d; isAuctionWindow:%s}", this.start(), this.end(), this.auction, this.isAuctionWindow);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || ((Object)((Object)this)).getClass() != o.getClass()) {
                return false;
            }
            if (!super.equals(o)) {
                return false;
            }
            AuctionOrBidWindow that = (AuctionOrBidWindow)((Object)o);
            return this.isAuctionWindow == that.isAuctionWindow && this.auction == that.auction;
        }

        public int hashCode() {
            return Objects.hash(super.hashCode(), this.isAuctionWindow, this.auction);
        }
    }
}

