/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.nexmark.queries.sql;

import org.apache.beam.repackaged.beam_sdks_java_nexmark.com.google.common.base.Joiner;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.model.AuctionCount;
import org.apache.beam.sdk.nexmark.model.Bid;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.model.sql.ToRow;
import org.apache.beam.sdk.nexmark.model.sql.adapter.ModelAdaptersMapping;
import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;

public class SqlQuery5
extends PTransform<PCollection<Event>, PCollection<AuctionCount>> {
    private static final String QUERY_TEMPLATE = Joiner.on("\n\t").join(" SELECT AuctionBids.auction, AuctionBids.num", " FROM (", "   SELECT", "     B1.auction,", "     count(*) AS num,", "     HOP_START(B1.dateTime, INTERVAL '%1$d' SECOND, INTERVAL '%2$d' SECOND) AS starttime", "   FROM Bid B1 ", "   GROUP BY ", "     B1.auction,", "     HOP(B1.dateTime, INTERVAL '%1$d' SECOND, INTERVAL '%2$d' SECOND)", " ) AS AuctionBids", " JOIN (", "   SELECT ", "     max(CountBids.num) AS maxnum, ", "     CountBids.starttime", "   FROM (", "     SELECT", "       count(*) AS num,", "       HOP_START(B2.dateTime, INTERVAL '%1$d' SECOND, INTERVAL '%2$d' SECOND) AS starttime", "     FROM Bid B2 ", "     GROUP BY ", "       B2.auction, ", "       HOP(B2.dateTime, INTERVAL '%1$d' SECOND, INTERVAL '%2$d' SECOND)", "     ) AS CountBids", "   GROUP BY CountBids.starttime", " ) AS MaxBids ", " ON AuctionBids.starttime = MaxBids.starttime AND AuctionBids.num >= MaxBids.maxnum ");
    private final PTransform<PInput, PCollection<Row>> query;

    public SqlQuery5(NexmarkConfiguration configuration) {
        super("SqlQuery5");
        String queryString = String.format(QUERY_TEMPLATE, configuration.windowPeriodSec, configuration.windowSizeSec);
        this.query = SqlTransform.query((String)queryString);
    }

    public PCollection<AuctionCount> expand(PCollection<Event> allEvents) {
        RowCoder bidRecordCoder = this.getBidRowCoder();
        PCollection bids = ((PCollection)((PCollection)allEvents.apply((PTransform)Filter.by(NexmarkQuery.IS_BID))).apply(this.getName() + ".ToRow", ToRow.parDo())).setCoder((Coder)bidRecordCoder);
        PCollection queryResultsRows = (PCollection)PCollectionTuple.of((TupleTag)new TupleTag("Bid"), (PCollection)bids).apply(this.query);
        return ((PCollection)queryResultsRows.apply(this.auctionCountParDo())).setCoder(AuctionCount.CODER);
    }

    private RowCoder getBidRowCoder() {
        return ModelAdaptersMapping.ADAPTERS.get(Bid.class).getSchema().getRowCoder();
    }

    private ParDo.SingleOutput<Row, AuctionCount> auctionCountParDo() {
        return ModelAdaptersMapping.ADAPTERS.get(AuctionCount.class).parDo();
    }
}

