/*
 * Decompiled with CFR 0.152.
 */
package com.lightbend.lagom.javadsl.testkit;

import akka.Done;
import akka.japi.Pair;
import akka.stream.Graph;
import akka.stream.Materializer;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.lightbend.lagom.javadsl.persistence.AggregateEvent;
import com.lightbend.lagom.javadsl.persistence.AggregateEventTag;
import com.lightbend.lagom.javadsl.persistence.AggregateEventTagger;
import com.lightbend.lagom.javadsl.persistence.Offset;
import com.lightbend.lagom.javadsl.persistence.ReadSide;
import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.pcollections.PSequence;
import play.inject.Injector;

@Singleton
public class ReadSideTestDriver
implements ReadSide {
    private final Injector injector;
    private final Materializer materializer;
    private ConcurrentMap<Class<?>, List<CompletionStage<Pair<ReadSideProcessor.ReadSideHandler<?>, Offset>>>> processors = new ConcurrentHashMap();

    @Inject
    public ReadSideTestDriver(Injector injector, Materializer materializer) {
        this.injector = injector;
        this.materializer = materializer;
    }

    public <Event extends AggregateEvent<Event>> void register(Class<? extends ReadSideProcessor<Event>> clazz2) {
        ReadSideProcessor readSideProcessor = (ReadSideProcessor)this.injector.instanceOf(clazz2);
        PSequence pSequence = readSideProcessor.aggregateTags();
        AggregateEventTag aggregateEventTag = (AggregateEventTag)pSequence.get(0);
        ReadSideProcessor.ReadSideHandler readSideHandler = readSideProcessor.buildHandler();
        CompletionStage<Pair> completionStage = readSideHandler.globalPrepare().thenCompose(done -> readSideHandler.prepare(aggregateEventTag)).thenApply(offset -> Pair.create((Object)readSideHandler, (Object)offset));
        List list = this.processors.computeIfAbsent(aggregateEventTag.eventType(), clazz -> new ArrayList());
        list.add(completionStage);
    }

    public <Event extends AggregateEvent<Event>> CompletionStage<Done> feed(Event Event, Offset offset) {
        AggregateEventTagger aggregateEventTagger = Event.aggregateTag();
        List list = (List)this.processors.get(aggregateEventTagger.eventType());
        if (list == null) {
            throw new RuntimeException("No processor registered for Event " + aggregateEventTagger.eventType().getCanonicalName());
        }
        List<CompletionStage<Done>> list2 = list.stream().map(completionStage -> completionStage.thenCompose(pair -> {
            ReadSideProcessor.ReadSideHandler readSideHandler = (ReadSideProcessor.ReadSideHandler)pair.first();
            Flow flow = readSideHandler.handle();
            return (CompletionStage)Source.single((Object)Pair.create((Object)Event, (Object)offset)).via((Graph)flow).runWith((Graph)Sink.ignore(), this.materializer);
        })).collect(Collectors.toList());
        return this.doAll(list2);
    }

    private CompletionStage<Done> doAll(List<CompletionStage<Done>> list) {
        CompletionStage<Done> completionStage = CompletableFuture.completedFuture(Done.getInstance());
        for (CompletionStage<Done> completionStage2 : list) {
            completionStage = completionStage.thenCombine(completionStage2, (done, object) -> Done.getInstance());
        }
        return completionStage;
    }
}

