/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.hystrix.metric.consumer;

import com.netflix.hystrix.metric.CachedValuesHistogram;
import com.netflix.hystrix.metric.HystrixEvent;
import com.netflix.hystrix.metric.HystrixEventStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.HdrHistogram.Histogram;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observers.Subscribers;
import rx.subjects.BehaviorSubject;

public class RollingDistributionStream<Event extends HystrixEvent> {
    private AtomicReference<Subscription> rollingDistributionSubscription = new AtomicReference<Object>(null);
    private final BehaviorSubject<CachedValuesHistogram> rollingDistribution = BehaviorSubject.create(CachedValuesHistogram.backedBy(RollingDistributionStream.getNewHistogram()));
    private final Observable<CachedValuesHistogram> rollingDistributionStream;
    private static final Func2<Histogram, Histogram, Histogram> distributionAggregator = new Func2<Histogram, Histogram, Histogram>(){

        @Override
        public Histogram call(Histogram initialDistribution, Histogram distributionToAdd) {
            initialDistribution.add(distributionToAdd);
            return initialDistribution;
        }
    };
    private static final Func1<Observable<Histogram>, Observable<Histogram>> reduceWindowToSingleDistribution = new Func1<Observable<Histogram>, Observable<Histogram>>(){

        @Override
        public Observable<Histogram> call(Observable<Histogram> window) {
            return window.reduce(distributionAggregator);
        }
    };
    private static final Action1<List<CachedValuesHistogram>> releaseOlderOfTwoDistributions = new Action1<List<CachedValuesHistogram>>(){

        @Override
        public void call(List<CachedValuesHistogram> histograms) {
            if (histograms != null && histograms.size() == 2) {
                RollingDistributionStream.releaseHistogram(histograms.get(0).getUnderlying());
            }
        }
    };
    private static final Func1<Histogram, CachedValuesHistogram> cacheHistogramValues = new Func1<Histogram, CachedValuesHistogram>(){

        @Override
        public CachedValuesHistogram call(Histogram histogram) {
            return CachedValuesHistogram.backedBy(histogram);
        }
    };
    private static final Func1<Observable<CachedValuesHistogram>, Observable<List<CachedValuesHistogram>>> convertToList = new Func1<Observable<CachedValuesHistogram>, Observable<List<CachedValuesHistogram>>>(){

        @Override
        public Observable<List<CachedValuesHistogram>> call(Observable<CachedValuesHistogram> windowOf2) {
            return windowOf2.toList();
        }
    };
    static int POOL_SIZE = 1000;
    static ConcurrentLinkedQueue<Histogram> HISTOGRAM_POOL = new ConcurrentLinkedQueue();

    protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs, final Func2<Histogram, Event, Histogram> addValuesToBucket) {
        final ArrayList<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
        for (int i = 0; i < numBuckets; ++i) {
            emptyDistributionsToStart.add(RollingDistributionStream.getNewHistogram());
        }
        final Func1 reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>(){

            @Override
            public Observable<Histogram> call(Observable<Event> bucket) {
                return bucket.reduce(RollingDistributionStream.getNewHistogram(), addValuesToBucket);
            }
        };
        this.rollingDistributionStream = Observable.defer(new Func0<Observable<CachedValuesHistogram>>(){

            @Override
            public Observable<CachedValuesHistogram> call() {
                return stream.observe().window((long)bucketSizeInMs, TimeUnit.MILLISECONDS).flatMap(reduceBucketToSingleDistribution).startWith(emptyDistributionsToStart).window(numBuckets, 1).flatMap(reduceWindowToSingleDistribution).map(cacheHistogramValues);
            }
        }).share();
    }

    public Observable<CachedValuesHistogram> observe() {
        return this.rollingDistributionStream;
    }

    public int getLatestMean() {
        CachedValuesHistogram latest = this.getLatest();
        if (latest != null) {
            return latest.getMean();
        }
        return 0;
    }

    public int getLatestPercentile(double percentile) {
        CachedValuesHistogram latest = this.getLatest();
        if (latest != null) {
            return latest.getValueAtPercentile(percentile);
        }
        return 0;
    }

    public void startCachingStreamValuesIfUnstarted() {
        if (this.rollingDistributionSubscription.get() == null) {
            Subscription candidateSubscription = this.observe().subscribe(this.rollingDistribution);
            if (this.rollingDistributionSubscription.compareAndSet(null, candidateSubscription)) {
                this.rollingDistribution.window(2, 1).flatMap(convertToList).doOnNext(releaseOlderOfTwoDistributions).unsafeSubscribe(Subscribers.empty());
            } else {
                candidateSubscription.unsubscribe();
            }
        }
    }

    CachedValuesHistogram getLatest() {
        this.startCachingStreamValuesIfUnstarted();
        if (this.rollingDistribution.hasValue()) {
            return this.rollingDistribution.getValue();
        }
        return null;
    }

    public void unsubscribe() {
        Subscription s = this.rollingDistributionSubscription.get();
        if (s != null) {
            s.unsubscribe();
            this.rollingDistributionSubscription.compareAndSet(s, null);
        }
    }

    private static Histogram getNewHistogram() {
        Histogram histogram = HISTOGRAM_POOL.poll();
        if (histogram == null) {
            histogram = new Histogram(3);
        }
        return histogram;
    }

    private static void releaseHistogram(Histogram histogram) {
        histogram.reset();
        HISTOGRAM_POOL.offer(histogram);
    }

    static {
        for (int i = 0; i < POOL_SIZE; ++i) {
            HISTOGRAM_POOL.add(new Histogram(3));
        }
    }
}

