package org.apache.samza.util;

import java.util.HashMap;
import java.util.Map;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.coordinator.CoordinationConstants;
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.Delete;
import org.apache.samza.job.JobRunner$;
import org.apache.samza.metadatastore.MetadataStore;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.GenSet;
import scala.collection.JavaConverters$;
import scala.collection.MapLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

/* compiled from: CoordinatorStreamUtil.scala */
/* loaded from: input_file:org/apache/samza/util/CoordinatorStreamUtil$.class */
public final class CoordinatorStreamUtil$ implements Logging {
    public static CoordinatorStreamUtil$ MODULE$;
    private final String loggerName;
    private Logger logger;
    private final String startupLoggerName;
    private Logger startupLogger;
    private volatile byte bitmap$0;

    static {
        new CoordinatorStreamUtil$();
    }

    @Override // org.apache.samza.util.Logging
    public void startupLog(Function0<Object> function0) {
        startupLog(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void trace(Function0<Object> function0) {
        trace(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        trace(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void debug(Function0<Object> function0) {
        debug(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        debug(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void info(Function0<Object> function0) {
        info(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        info(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void warn(Function0<Object> function0) {
        warn(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        warn(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void error(Function0<Object> function0) {
        error(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        error(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void putMDC(Function0<String> function0, Function0<String> function02) {
        putMDC(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public String getMDC(Function0<String> function0) {
        String mdc;
        mdc = getMDC(function0);
        return mdc;
    }

    @Override // org.apache.samza.util.Logging
    public void removeMDC(Function0<String> function0) {
        removeMDC(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void clearMDC() {
        clearMDC();
    }

    @Override // org.apache.samza.util.Logging
    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.util.CoordinatorStreamUtil$] */
    private Logger logger$lzycompute() {
        Logger logger;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                logger = logger();
                this.logger = logger;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.logger;
    }

    @Override // org.apache.samza.util.Logging
    public Logger logger() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? logger$lzycompute() : this.logger;
    }

    @Override // org.apache.samza.util.Logging
    public String startupLoggerName() {
        return this.startupLoggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.util.CoordinatorStreamUtil$] */
    private Logger startupLogger$lzycompute() {
        Logger startupLogger;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                startupLogger = startupLogger();
                this.startupLogger = startupLogger;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.startupLogger;
    }

    @Override // org.apache.samza.util.Logging
    public Logger startupLogger() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? startupLogger$lzycompute() : this.startupLogger;
    }

    @Override // org.apache.samza.util.Logging
    public void org$apache$samza$util$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    @Override // org.apache.samza.util.Logging
    public void org$apache$samza$util$Logging$_setter_$startupLoggerName_$eq(String str) {
        this.startupLoggerName = str;
    }

    public MapConfig buildCoordinatorStreamConfig(Config config) {
        return new MapConfig(((CoordinatorStreamConfigFactory) Class.forName(new JobConfig(config).getCoordinatorStreamFactory()).newInstance()).buildCoordinatorStreamConfig(config));
    }

    public void createCoordinatorStream(Config config) {
        info(() -> {
            return "Creating coordinator stream";
        });
        SystemStream coordinatorSystemStream = getCoordinatorSystemStream(config);
        SystemAdmin admin = new SystemConfig(config).getSystemFactories().get(coordinatorSystemStream.getSystem()).getAdmin(coordinatorSystemStream.getSystem(), config, DiagnosticsUtil.class.getSimpleName());
        admin.start();
        createCoordinatorStream(coordinatorSystemStream, admin);
        admin.stop();
    }

    public void createCoordinatorStream(SystemStream systemStream, SystemAdmin systemAdmin) {
        String stream = systemStream.getStream();
        if (systemAdmin.createStream(StreamSpec.createCoordinatorStreamSpec(stream, systemStream.getSystem()))) {
            info(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Created coordinator stream: %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{stream}));
            });
        } else {
            info(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Coordinator stream: %s already exists.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{stream}));
            });
        }
    }

    public SystemStream getCoordinatorSystemStream(Config config) {
        JobConfig jobConfig = new JobConfig(config);
        String coordinatorSystemName = jobConfig.getCoordinatorSystemName();
        Tuple2<String, String> jobNameAndId = getJobNameAndId(jobConfig);
        if (jobNameAndId == null) {
            throw new MatchError(jobNameAndId);
        }
        Tuple2 tuple2 = new Tuple2((String) jobNameAndId._1(), (String) jobNameAndId._2());
        return new SystemStream(coordinatorSystemName, getCoordinatorStreamName((String) tuple2._1(), (String) tuple2._2()));
    }

    public SystemFactory getCoordinatorSystemFactory(Config config) {
        String coordinatorSystemName = new JobConfig(config).getCoordinatorSystemName();
        return (SystemFactory) ReflectionUtil.getObj((String) ScalaJavaUtil$JavaOptionals$.MODULE$.toRichOptional(new SystemConfig(config).getSystemFactory(coordinatorSystemName)).toOption().getOrElse(() -> {
            throw new SamzaException(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(23).append("Missing configuration: ").append(SystemConfig.SYSTEM_FACTORY_FORMAT).toString())).format(Predef$.MODULE$.genericWrapArray(new Object[]{coordinatorSystemName})));
        }), SystemFactory.class);
    }

    public String getCoordinatorStreamName(String str, String str2) {
        return new StringOps(Predef$.MODULE$.augmentString("__samza_coordinator_%s_%s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str.replaceAll("_", "-"), str2.replaceAll("_", "-")}));
    }

    private Tuple2<String, String> getJobNameAndId(JobConfig jobConfig) {
        return new Tuple2<>(ScalaJavaUtil$JavaOptionals$.MODULE$.toRichOptional(jobConfig.getName()).toOption().getOrElse(() -> {
            throw new ConfigException("Missing required config: job.name");
        }), jobConfig.getJobId());
    }

    public Config readLaunchConfigFromCoordinatorStream(Config config, MetadataStore metadataStore) {
        if (!config.getBoolean(JobConfig.JOB_AUTOSIZING_ENABLED, false)) {
            return new MapConfig();
        }
        return new MapConfig((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(((MapLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(readConfigFromCoordinatorStream(metadataStore)).asScala()).filterKeys(str -> {
            return BoxesRunTime.boxToBoolean(JobConfig.isAutosizingConfig(str));
        })).asJava());
    }

    public Config readConfigFromCoordinatorStream(MetadataStore metadataStore) {
        Map<String, byte[]> all = new NamespaceAwareCoordinatorStreamStore(metadataStore, "set-config").all();
        HashMap hashMap = new HashMap();
        ((TraversableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(all).asScala()).withFilter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$readConfigFromCoordinatorStream$1(tuple2));
        }).foreach(tuple22 -> {
            Object put;
            if (tuple22 != null) {
                String str = (String) tuple22._1();
                byte[] bArr = (byte[]) tuple22._2();
                if (str != null && bArr != null) {
                    if (bArr == null) {
                        MODULE$.warn(() -> {
                            return new StringOps(Predef$.MODULE$.augmentString("Value for key: %s in config is null. Ignoring it.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
                        });
                        put = BoxedUnit.UNIT;
                    } else {
                        String m96fromBytes = new CoordinatorStreamValueSerde("set-config").m96fromBytes(bArr);
                        if (m96fromBytes == null) {
                            MODULE$.warn(() -> {
                                return new StringOps(Predef$.MODULE$.augmentString("Value for key: %s in config is decoded to be null. Ignoring it.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
                            });
                            put = BoxedUnit.UNIT;
                        } else {
                            put = hashMap.put(str, m96fromBytes);
                        }
                    }
                    return put;
                }
            }
            throw new MatchError(tuple22);
        });
        return new MapConfig(hashMap);
    }

    public void writeConfigToCoordinatorStream(MetadataStore metadataStore, Config config) {
        NamespaceAwareCoordinatorStreamStore namespaceAwareCoordinatorStreamStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, "set-config");
        CoordinatorStreamValueSerde coordinatorStreamValueSerde = new CoordinatorStreamValueSerde("set-config");
        config.entrySet().forEach(entry -> {
            String str = (String) entry.getKey();
            String str2 = (String) entry.getValue();
            if (str2 == null) {
                MODULE$.warn(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("Value for key: %s in config is null. Ignoring it.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
                });
                return;
            }
            byte[] bytes = coordinatorStreamValueSerde.toBytes(str2);
            if (bytes == null) {
                MODULE$.warn(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("Deserialized value for key: %s in config is null. Ignoring it.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
                });
            } else {
                namespaceAwareCoordinatorStreamStore.put(str, bytes);
            }
        });
    }

    public void writeConfigToCoordinatorStream(Config config, boolean z) {
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("config: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{config}));
        });
        CoordinatorStreamSystemConsumer coordinatorStreamSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
        CoordinatorStreamSystemProducer coordinatorStreamSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap());
        createCoordinatorStream(config);
        if (z) {
            info(() -> {
                return "Storing config in coordinator stream.";
            });
            coordinatorStreamSystemProducer.register(JobRunner$.MODULE$.SOURCE());
            coordinatorStreamSystemProducer.start();
            coordinatorStreamSystemProducer.writeConfig(JobRunner$.MODULE$.SOURCE(), config);
        }
        info(() -> {
            return "Loading old config from coordinator stream.";
        });
        coordinatorStreamSystemConsumer.register();
        coordinatorStreamSystemConsumer.start();
        coordinatorStreamSystemConsumer.bootstrap();
        coordinatorStreamSystemConsumer.stop();
        Config config2 = coordinatorStreamSystemConsumer.getConfig();
        if (z) {
            ObjectRef create = ObjectRef.create(((TraversableOnce) JavaConverters$.MODULE$.asScalaSetConverter(config2.keySet()).asScala()).toSet().diff((GenSet) JavaConverters$.MODULE$.asScalaSetConverter(config.keySet()).asScala()));
            JobConfig jobConfig = new JobConfig(config);
            if (jobConfig.getAutosizingEnabled()) {
                create.elem = (Set) ((Set) create.elem).filter(str -> {
                    return BoxesRunTime.boxToBoolean($anonfun$writeConfigToCoordinatorStream$7(str));
                });
            }
            if (jobConfig.getApplicationMasterHighAvailabilityEnabled()) {
                create.elem = (Set) ((Set) create.elem).filter(str2 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$writeConfigToCoordinatorStream$8(str2));
                });
            }
            info(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Deleting old configs that are no longer defined: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{(Set) create.elem}));
            });
            ((Set) create.elem).foreach(str3 -> {
                $anonfun$writeConfigToCoordinatorStream$10(coordinatorStreamSystemProducer, str3);
                return BoxedUnit.UNIT;
            });
        }
        coordinatorStreamSystemProducer.stop();
    }

    public boolean writeConfigToCoordinatorStream$default$2() {
        return true;
    }

    public static final /* synthetic */ boolean $anonfun$readConfigFromCoordinatorStream$1(Tuple2 tuple2) {
        boolean z;
        if (tuple2 != null) {
            String str = (String) tuple2._1();
            byte[] bArr = (byte[]) tuple2._2();
            if (str != null && bArr != null) {
                z = true;
                return z;
            }
        }
        z = false;
        return z;
    }

    public static final /* synthetic */ boolean $anonfun$writeConfigToCoordinatorStream$7(String str) {
        return !JobConfig.isAutosizingConfig(str);
    }

    public static final /* synthetic */ boolean $anonfun$writeConfigToCoordinatorStream$8(String str) {
        return !str.equals(CoordinationConstants.YARN_COORDINATOR_URL);
    }

    public static final /* synthetic */ void $anonfun$writeConfigToCoordinatorStream$10(CoordinatorStreamSystemProducer coordinatorStreamSystemProducer, String str) {
        coordinatorStreamSystemProducer.send(new Delete(JobRunner$.MODULE$.SOURCE(), str, "set-config"));
    }

    private CoordinatorStreamUtil$() {
        MODULE$ = this;
        Logging.$init$(this);
    }
}
