package org.apache.beam.runners.direct;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.MoreObjects;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Optional;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Preconditions;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheBuilder;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheLoader;
import org.apache.beam.runners.direct.repackaged.com.google.common.cache.LoadingCache;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.Sets;
import org.apache.beam.runners.direct.repackaged.javax.annotation.Nullable;
import org.apache.beam.runners.direct.repackaged.runners.core.InMemoryMultimapSideInputView;
import org.apache.beam.runners.direct.repackaged.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollectionView;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/SideInputContainer.class */
public class SideInputContainer {
    private final Collection<PCollectionView<?>> containedViews;
    private final LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> viewByWindows;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/SideInputContainer$CallbackSchedulingLoader.class */
    public static class CallbackSchedulingLoader extends CacheLoader<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> {
        private final EvaluationContext context;

        public CallbackSchedulingLoader(EvaluationContext evaluationContext) {
            this.context = evaluationContext;
        }

        @Override // org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheLoader
        public AtomicReference<Iterable<? extends WindowedValue<?>>> load(PCollectionViewWindow<?> pCollectionViewWindow) {
            AtomicReference<Iterable<? extends WindowedValue<?>>> atomicReference = new AtomicReference<>();
            this.context.scheduleAfterOutputWouldBeProduced(pCollectionViewWindow.getView(), pCollectionViewWindow.getWindow(), pCollectionViewWindow.getView().getWindowingStrategyInternal(), new WriteEmptyViewContents(pCollectionViewWindow.getView(), pCollectionViewWindow.getWindow(), atomicReference));
            return atomicReference;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/SideInputContainer$CurrentViewContentsLoader.class */
    private class CurrentViewContentsLoader extends CacheLoader<PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> {
        private CurrentViewContentsLoader() {
        }

        @Override // org.apache.beam.runners.direct.repackaged.com.google.common.cache.CacheLoader
        public Optional<? extends Iterable<? extends WindowedValue<?>>> load(PCollectionViewWindow<?> pCollectionViewWindow) {
            return Optional.fromNullable(((AtomicReference) SideInputContainer.this.viewByWindows.getUnchecked(pCollectionViewWindow)).get());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/SideInputContainer$SideInputContainerSideInputReader.class */
    public final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
        private final Collection<PCollectionView<?>> readerViews;
        private final LoadingCache<PCollectionViewWindow<?>, Optional<? extends Iterable<? extends WindowedValue<?>>>> viewContents;

        private SideInputContainerSideInputReader(Collection<PCollectionView<?>> collection) {
            this.readerViews = ImmutableSet.copyOf((Collection) collection);
            this.viewContents = CacheBuilder.newBuilder().build(new CurrentViewContentsLoader());
        }

        @Override // org.apache.beam.runners.direct.repackaged.runners.core.ReadyCheckingSideInputReader
        public boolean isReady(PCollectionView<?> pCollectionView, BoundedWindow boundedWindow) {
            Preconditions.checkArgument(this.readerViews.contains(pCollectionView), "Tried to check if view %s was ready in a SideInputReader that does not contain it. Contained views; %s", pCollectionView, this.readerViews);
            return this.viewContents.getUnchecked(PCollectionViewWindow.of(pCollectionView, boundedWindow)).isPresent();
        }

        @Override // org.apache.beam.runners.direct.repackaged.runners.core.SideInputReader
        @Nullable
        public <T> T get(PCollectionView<T> pCollectionView, BoundedWindow boundedWindow) {
            Preconditions.checkArgument(this.readerViews.contains(pCollectionView), "call to get(PCollectionView) with unknown view: %s", pCollectionView);
            Preconditions.checkArgument(isReady(pCollectionView, boundedWindow), "calling get() on PCollectionView %s that is not ready in window %s", pCollectionView, boundedWindow);
            return (T) pCollectionView.getViewFn().apply(InMemoryMultimapSideInputView.fromIterable(pCollectionView.getCoderInternal().getKeyCoder(), Iterables.transform(this.viewContents.getUnchecked(PCollectionViewWindow.of(pCollectionView, boundedWindow)).get(), (v0) -> {
                return v0.getValue();
            })));
        }

        @Override // org.apache.beam.runners.direct.repackaged.runners.core.SideInputReader
        public <T> boolean contains(PCollectionView<T> pCollectionView) {
            return this.readerViews.contains(pCollectionView);
        }

        @Override // org.apache.beam.runners.direct.repackaged.runners.core.SideInputReader
        public boolean isEmpty() {
            return this.readerViews.isEmpty();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/SideInputContainer$WriteEmptyViewContents.class */
    public static class WriteEmptyViewContents implements Runnable {
        private final PCollectionView<?> view;
        private final BoundedWindow window;
        private final AtomicReference<Iterable<? extends WindowedValue<?>>> contents;

        private WriteEmptyViewContents(PCollectionView<?> pCollectionView, BoundedWindow boundedWindow, AtomicReference<Iterable<? extends WindowedValue<?>>> atomicReference) {
            this.contents = atomicReference;
            this.view = pCollectionView;
            this.window = boundedWindow;
        }

        @Override // java.lang.Runnable
        public void run() {
            this.contents.compareAndSet(null, Collections.emptyList());
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("view", this.view).add("window", this.window).toString();
        }
    }

    public static SideInputContainer create(EvaluationContext evaluationContext, Collection<PCollectionView<?>> collection) {
        for (PCollectionView<?> pCollectionView : collection) {
            Preconditions.checkArgument("urn:beam:sideinput:materialization:multimap:0.1".equals(pCollectionView.getViewFn().getMaterialization().getUrn()), "This handler is only capable of dealing with %s materializations but was asked to handle %s for PCollectionView with tag %s.", "urn:beam:sideinput:materialization:multimap:0.1", pCollectionView.getViewFn().getMaterialization().getUrn(), pCollectionView.getTagInternal().getId());
        }
        return new SideInputContainer(collection, CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(evaluationContext)));
    }

    private SideInputContainer(Collection<PCollectionView<?>> collection, LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> loadingCache) {
        this.containedViews = ImmutableSet.copyOf((Collection) collection);
        this.viewByWindows = loadingCache;
    }

    public ReadyCheckingSideInputReader createReaderForViews(Collection<PCollectionView<?>> collection) {
        if (this.containedViews.containsAll(collection)) {
            return new SideInputContainerSideInputReader(collection);
        }
        throw new IllegalArgumentException("Can't create a SideInputReader with unknown views " + Sets.difference(ImmutableSet.copyOf((Collection) collection), ImmutableSet.copyOf((Collection) this.containedViews)));
    }

    public void write(PCollectionView<?> pCollectionView, Iterable<? extends WindowedValue<?>> iterable) {
        for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> entry : indexValuesByWindow(iterable).entrySet()) {
            updatePCollectionViewWindowValues(pCollectionView, entry.getKey(), entry.getValue());
        }
    }

    private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(Iterable<? extends WindowedValue<?>> iterable) {
        HashMap hashMap = new HashMap();
        for (WindowedValue<?> windowedValue : iterable) {
            Iterator it = windowedValue.getWindows().iterator();
            while (it.hasNext()) {
                ((Collection) hashMap.computeIfAbsent((BoundedWindow) it.next(), boundedWindow -> {
                    return new ArrayList();
                })).add(windowedValue);
            }
        }
        return hashMap;
    }

    private void updatePCollectionViewWindowValues(PCollectionView<?> pCollectionView, BoundedWindow boundedWindow, Collection<WindowedValue<?>> collection) {
        Iterable<? extends WindowedValue<?>> iterable;
        AtomicReference<Iterable<? extends WindowedValue<?>>> unchecked = this.viewByWindows.getUnchecked(PCollectionViewWindow.of(pCollectionView, boundedWindow));
        if (unchecked.compareAndSet(null, collection)) {
            return;
        }
        PaneInfo pane = collection.iterator().next().getPane();
        do {
            iterable = unchecked.get();
            if (pane.getIndex() <= (Iterables.isEmpty(iterable) ? -1L : iterable.iterator().next().getPane().getIndex())) {
                return;
            }
        } while (!unchecked.compareAndSet(iterable, collection));
    }
}
