package com.netflix.atlas.eval.stream;

import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import org.apache.pekko.NotUsed;
import org.apache.pekko.http.scaladsl.model.HttpMethods$;
import org.apache.pekko.http.scaladsl.model.HttpRequest;
import org.apache.pekko.http.scaladsl.model.HttpRequest$;
import org.apache.pekko.http.scaladsl.model.HttpResponse;
import org.apache.pekko.http.scaladsl.model.MediaRange$;
import org.apache.pekko.http.scaladsl.model.MediaTypes$;
import org.apache.pekko.http.scaladsl.model.StatusCode;
import org.apache.pekko.http.scaladsl.model.StatusCodes;
import org.apache.pekko.http.scaladsl.model.StatusCodes$;
import org.apache.pekko.http.scaladsl.model.Uri$;
import org.apache.pekko.http.scaladsl.model.headers.Accept$;
import org.apache.pekko.http.scaladsl.model.headers.Content$minusEncoding$;
import org.apache.pekko.http.scaladsl.model.headers.HttpEncodings$;
import org.apache.pekko.stream.scaladsl.Compression$;
import org.apache.pekko.stream.scaladsl.Flow;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import org.apache.pekko.util.ByteString;
import scala.MatchError;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: HostSource.scala */
/* loaded from: input_file:com/netflix/atlas/eval/stream/HostSource$.class */
public final class HostSource$ implements StrictLogging {
    public static final HostSource$ MODULE$ = new HostSource$();
    private static Logger logger;

    static {
        StrictLogging.$init$(MODULE$);
    }

    public Logger logger() {
        return logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger2) {
        logger = logger2;
    }

    public Source<ByteString, NotUsed> apply(String str, Flow<HttpRequest, Try<HttpResponse>, NotUsed> flow, FiniteDuration finiteDuration) {
        return EvaluationFlows$.MODULE$.repeat(str, finiteDuration).flatMapConcat(str2 -> {
            return MODULE$.singleCall(flow, str2);
        });
    }

    public FiniteDuration apply$default$3() {
        return new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(1)).second();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Source<ByteString, Object> singleCall(Flow<HttpRequest, Try<HttpResponse>, NotUsed> flow, String str) {
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("subscribing to {}", str);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        return Source$.MODULE$.single(HttpRequest$.MODULE$.apply(HttpMethods$.MODULE$.GET(), Uri$.MODULE$.apply(str), new $colon.colon(Accept$.MODULE$.apply(MediaRange$.MODULE$.apply(MediaTypes$.MODULE$.text$divevent$minusstream()), Nil$.MODULE$), Nil$.MODULE$), HttpRequest$.MODULE$.apply$default$4(), HttpRequest$.MODULE$.apply$default$5())).via(flow).flatMapConcat(r6 -> {
            HttpResponse httpResponse;
            boolean z = false;
            Success success = null;
            if (r6 instanceof Success) {
                z = true;
                success = (Success) r6;
                HttpResponse httpResponse2 = (HttpResponse) success.value();
                if (httpResponse2 != null) {
                    StatusCode status = httpResponse2.status();
                    StatusCodes.Success OK = StatusCodes$.MODULE$.OK();
                    if (status != null ? status.equals(OK) : OK == null) {
                        return MODULE$.unzipIfNeeded(httpResponse2).via(EvaluationFlows$.MODULE$.sseFraming()).recover(new HostSource$$anonfun$$nestedInanonfun$singleCall$1$1(str)).watchTermination((obj, future) -> {
                            $anonfun$singleCall$2(str, obj, future);
                            return BoxedUnit.UNIT;
                        });
                    }
                }
            }
            if (z && (httpResponse = (HttpResponse) success.value()) != null) {
                if (MODULE$.logger().underlying().isWarnEnabled()) {
                    MODULE$.logger().underlying().warn("subscription attempt failed with status {}", httpResponse.status());
                    BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                }
                return MODULE$.empty();
            }
            if (!(r6 instanceof Failure)) {
                throw new MatchError(r6);
            }
            Throwable exception = ((Failure) r6).exception();
            if (MODULE$.logger().underlying().isWarnEnabled()) {
                MODULE$.logger().underlying().warn("subscription attempt failed with exception", exception);
                BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            } else {
                BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
            }
            return MODULE$.empty();
        });
    }

    private Source<ByteString, NotUsed> empty() {
        return Source$.MODULE$.empty();
    }

    private Source<ByteString, Object> unzipIfNeeded(HttpResponse httpResponse) {
        boolean contains = httpResponse.headers().contains(Content$minusEncoding$.MODULE$.apply(HttpEncodings$.MODULE$.gzip(), Nil$.MODULE$));
        Source<ByteString, Object> dataBytes = httpResponse.entity().withoutSizeLimit().dataBytes();
        return contains ? dataBytes.via(Compression$.MODULE$.gunzip(Compression$.MODULE$.gunzip$default$1())) : dataBytes;
    }

    public static final /* synthetic */ void $anonfun$singleCall$3(String str, Try r5) {
        if (r5 instanceof Success) {
            if (!MODULE$.logger().underlying().isInfoEnabled()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            } else {
                MODULE$.logger().underlying().info("lost connection to {}", str);
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        if (!(r5 instanceof Failure)) {
            throw new MatchError(r5);
        }
        Throwable exception = ((Failure) r5).exception();
        if (!MODULE$.logger().underlying().isInfoEnabled()) {
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            MODULE$.logger().underlying().info("lost connection to " + str, exception);
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$singleCall$2(String str, Object obj, Future future) {
        future.onComplete(r4 -> {
            $anonfun$singleCall$3(str, r4);
            return BoxedUnit.UNIT;
        }, ExecutionContext$Implicits$.MODULE$.global());
    }

    private HostSource$() {
    }
}
