/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.nexmark.queries;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.nexmark.NexmarkConfiguration;
import org.apache.beam.sdk.nexmark.NexmarkUtils;
import org.apache.beam.sdk.nexmark.model.Auction;
import org.apache.beam.sdk.nexmark.model.Event;
import org.apache.beam.sdk.nexmark.model.KnownSize;
import org.apache.beam.sdk.nexmark.model.NameCityStateId;
import org.apache.beam.sdk.nexmark.model.Person;
import org.apache.beam.sdk.nexmark.queries.NexmarkQuery;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Query3
extends NexmarkQuery {
    private static final Logger LOG = LoggerFactory.getLogger(Query3.class);
    private final JoinDoFn joinDoFn;

    public Query3(NexmarkConfiguration configuration) {
        super(configuration, "Query3");
        this.joinDoFn = new JoinDoFn(this.name, configuration.maxAuctionsWaitingTime);
    }

    private PCollection<NameCityStateId> applyTyped(PCollection<Event> events) {
        int numEventsInPane = 30;
        PCollection eventsWindowed = (PCollection)events.apply((PTransform)Window.into((WindowFn)new GlobalWindows()).triggering((Trigger)Repeatedly.forever((Trigger)AfterPane.elementCountAtLeast((int)numEventsInPane))).discardingFiredPanes().withAllowedLateness(Duration.ZERO));
        PCollection auctionsBySellerId = (PCollection)((PCollection)((PCollection)eventsWindowed.apply(JUST_NEW_AUCTIONS)).apply(this.name + ".InCategory", (PTransform)Filter.by((SerializableFunction & Serializable)auction -> auction.category == 10L))).apply("AuctionBySeller", (PTransform)AUCTION_BY_SELLER);
        PCollection personsById = (PCollection)((PCollection)((PCollection)eventsWindowed.apply(JUST_NEW_PERSONS)).apply(this.name + ".InState", (PTransform)Filter.by((SerializableFunction & Serializable)person -> "OR".equals(person.state) || "ID".equals(person.state) || "CA".equals(person.state)))).apply("PersonById", (PTransform)PERSON_BY_ID);
        return (PCollection)((PCollection)((PCollection)KeyedPCollectionTuple.of((TupleTag)AUCTION_TAG, (PCollection)auctionsBySellerId).and(PERSON_TAG, personsById).apply((PTransform)CoGroupByKey.create())).apply(this.name + ".Join", (PTransform)ParDo.of((DoFn)this.joinDoFn))).apply(this.name + ".Project", (PTransform)ParDo.of((DoFn)new DoFn<KV<Auction, Person>, NameCityStateId>(){

            @DoFn.ProcessElement
            public void processElement(DoFn.ProcessContext c) {
                Auction auction = (Auction)((KV)c.element()).getKey();
                Person person = (Person)((KV)c.element()).getValue();
                c.output((Object)new NameCityStateId(person.name, person.city, person.state, auction.id));
            }
        }));
    }

    @Override
    protected PCollection<KnownSize> applyPrim(PCollection<Event> events) {
        return NexmarkUtils.castToKnownSize(this.name, this.applyTyped(events));
    }

    private static class JoinDoFn
    extends DoFn<KV<Long, CoGbkResult>, KV<Auction, Person>> {
        private final int maxAuctionsWaitingTime;
        private static final String AUCTIONS = "auctions";
        private static final String PERSON = "person";
        @DoFn.StateId(value="person")
        private static final StateSpec<ValueState<Person>> personSpec = StateSpecs.value(Person.CODER);
        private static final String PERSON_STATE_EXPIRING = "personStateExpiring";
        @DoFn.StateId(value="auctions")
        private final StateSpec<ValueState<List<Auction>>> auctionsSpec = StateSpecs.value((Coder)ListCoder.of(Auction.CODER));
        @DoFn.TimerId(value="personStateExpiring")
        private final TimerSpec timerSpec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
        private final String name;
        private final Counter newAuctionCounter;
        private final Counter newPersonCounter;
        private final Counter newNewOutputCounter;
        private final Counter newOldOutputCounter;
        private final Counter oldNewOutputCounter;
        private final Counter fatalCounter;

        private JoinDoFn(String name, int maxAuctionsWaitingTime) {
            this.name = name;
            this.maxAuctionsWaitingTime = maxAuctionsWaitingTime;
            this.newAuctionCounter = Metrics.counter((String)name, (String)"newAuction");
            this.newPersonCounter = Metrics.counter((String)name, (String)"newPerson");
            this.newNewOutputCounter = Metrics.counter((String)name, (String)"newNewOutput");
            this.newOldOutputCounter = Metrics.counter((String)name, (String)"newOldOutput");
            this.oldNewOutputCounter = Metrics.counter((String)name, (String)"oldNewOutput");
            this.fatalCounter = Metrics.counter((String)name, (String)"fatal");
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c, @DoFn.TimerId(value="personStateExpiring") Timer timer, @DoFn.StateId(value="person") ValueState<Person> personState, @DoFn.StateId(value="auctions") ValueState<List<Auction>> auctionsState) {
            Person existingPerson = (Person)personState.read();
            if (existingPerson != null) {
                for (Auction newAuction : ((CoGbkResult)((KV)c.element()).getValue()).getAll(NexmarkQuery.AUCTION_TAG)) {
                    this.newAuctionCounter.inc();
                    this.newOldOutputCounter.inc();
                    c.output((Object)KV.of((Object)newAuction, (Object)existingPerson));
                }
                return;
            }
            Person theNewPerson = null;
            for (Person newPerson : ((CoGbkResult)((KV)c.element()).getValue()).getAll(NexmarkQuery.PERSON_TAG)) {
                if (theNewPerson != null) {
                    if (theNewPerson.equals(newPerson)) {
                        LOG.error("Duplicate person {}", (Object)theNewPerson);
                    } else {
                        LOG.error("Conflicting persons {} and {}", (Object)theNewPerson, (Object)newPerson);
                    }
                    this.fatalCounter.inc();
                    continue;
                }
                theNewPerson = newPerson;
                this.newPersonCounter.inc();
                List pendingAuctions = (List)auctionsState.read();
                if (pendingAuctions != null) {
                    for (Auction pendingAuction : pendingAuctions) {
                        this.oldNewOutputCounter.inc();
                        c.output((Object)KV.of((Object)pendingAuction, (Object)newPerson));
                    }
                    auctionsState.clear();
                }
                for (Auction newAuction : ((CoGbkResult)((KV)c.element()).getValue()).getAll(NexmarkQuery.AUCTION_TAG)) {
                    this.newAuctionCounter.inc();
                    this.newNewOutputCounter.inc();
                    c.output((Object)KV.of((Object)newAuction, (Object)newPerson));
                }
                personState.write((Object)newPerson);
                Instant firingTime = new Instant(newPerson.dateTime).plus((ReadableDuration)Duration.standardSeconds((long)this.maxAuctionsWaitingTime));
                timer.set(firingTime);
            }
            if (theNewPerson != null) {
                return;
            }
            ArrayList<Auction> pendingAuctions = (ArrayList<Auction>)auctionsState.read();
            if (pendingAuctions == null) {
                pendingAuctions = new ArrayList<Auction>();
            }
            for (Auction newAuction : ((CoGbkResult)((KV)c.element()).getValue()).getAll(NexmarkQuery.AUCTION_TAG)) {
                this.newAuctionCounter.inc();
                pendingAuctions.add(newAuction);
            }
            auctionsState.write(pendingAuctions);
        }

        @DoFn.OnTimer(value="personStateExpiring")
        public void onTimerCallback(DoFn.OnTimerContext context, @DoFn.StateId(value="person") ValueState<Person> personState) {
            personState.clear();
        }
    }
}

