package org.apache.reef.wake.examples.accumulate;

import java.lang.Comparable;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.reef.wake.Stage;
import org.apache.reef.wake.rx.Observer;

/* loaded from: input_file:org/apache/reef/wake/examples/accumulate/CombinerStage.class */
public class CombinerStage<K extends Comparable<K>, V> implements Stage {
    private final Combiner<K, V> c;
    private final Observer<Map.Entry<K, V>> o;
    private final CombinerStage<K, V>.OutputThread worker = new OutputThread();
    private final ConcurrentSkipListMap<K, V> register = new ConcurrentSkipListMap<>();
    private volatile boolean done = false;

    /* loaded from: input_file:org/apache/reef/wake/examples/accumulate/CombinerStage$Combiner.class */
    public interface Combiner<K extends Comparable<K>, V> {
        V combine(K k, V v, V v2);
    }

    /* loaded from: input_file:org/apache/reef/wake/examples/accumulate/CombinerStage$OutputThread.class */
    private class OutputThread extends Thread {
        OutputThread() {
            super("grouper-output-thread");
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                if (CombinerStage.this.register.isEmpty()) {
                    synchronized (CombinerStage.this.register) {
                        while (CombinerStage.this.register.isEmpty() && !CombinerStage.this.done) {
                            try {
                                CombinerStage.this.register.wait();
                            } catch (InterruptedException e) {
                                throw new IllegalStateException(e);
                            }
                        }
                        if (CombinerStage.this.done) {
                            CombinerStage.this.o.onCompleted();
                            return;
                        }
                    }
                }
                Map.Entry pollFirstEntry = CombinerStage.this.register.pollFirstEntry();
                while (true) {
                    Map.Entry entry = pollFirstEntry;
                    if (entry != null) {
                        CombinerStage.this.o.onNext(entry);
                        Comparable comparable = (Comparable) CombinerStage.this.register.higherKey(entry.getKey());
                        pollFirstEntry = comparable == null ? null : new Pair(comparable, CombinerStage.this.register.remove(comparable));
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/reef/wake/examples/accumulate/CombinerStage$Pair.class */
    public static class Pair<K extends Comparable<K>, V> implements Map.Entry<K, V>, Comparable<Map.Entry<K, V>> {
        private final K k;
        private final V v;

        public Pair(K k, V v) {
            this.k = k;
            this.v = v;
        }

        @Override // java.util.Map.Entry
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.k.compareTo(((Pair) obj).getKey()) == 0;
        }

        @Override // java.util.Map.Entry
        public int hashCode() {
            return this.k.hashCode();
        }

        @Override // java.lang.Comparable
        public int compareTo(Map.Entry<K, V> entry) {
            return this.k.compareTo(entry.getKey());
        }

        @Override // java.util.Map.Entry
        public K getKey() {
            return this.k;
        }

        @Override // java.util.Map.Entry
        public V getValue() {
            return this.v;
        }

        @Override // java.util.Map.Entry
        public V setValue(V v) {
            throw new UnsupportedOperationException();
        }
    }

    public CombinerStage(Combiner<K, V> combiner, Observer<Map.Entry<K, V>> observer) {
        this.c = combiner;
        this.o = observer;
        this.worker.start();
    }

    public Observer<Map.Entry<K, V>> wireIn() {
        return (Observer<Map.Entry<K, V>>) new Observer<Map.Entry<K, V>>() { // from class: org.apache.reef.wake.examples.accumulate.CombinerStage.1
            /* JADX WARN: Multi-variable type inference failed */
            @Override // org.apache.reef.wake.rx.Observer
            public void onNext(Map.Entry<K, V> entry) {
                boolean isEmpty = CombinerStage.this.register.isEmpty();
                boolean z = false;
                while (!z) {
                    Object obj = CombinerStage.this.register.get(entry.getKey());
                    Object combine = CombinerStage.this.c.combine(entry.getKey(), obj, entry.getValue());
                    z = obj == null ? null == CombinerStage.this.register.putIfAbsent(entry.getKey(), combine) : CombinerStage.this.register.replace(entry.getKey(), obj, combine);
                }
                if (isEmpty) {
                    synchronized (CombinerStage.this.register) {
                        CombinerStage.this.register.notify();
                    }
                }
            }

            @Override // org.apache.reef.wake.rx.Observer
            public void onError(Exception exc) {
                CombinerStage.this.o.onError(exc);
            }

            @Override // org.apache.reef.wake.rx.Observer
            public void onCompleted() {
                synchronized (CombinerStage.this.register) {
                    CombinerStage.this.done = true;
                    if (CombinerStage.this.register.isEmpty()) {
                        CombinerStage.this.register.notify();
                    }
                }
            }
        };
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.worker.join();
    }
}
