package org.apache.samza.container;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.URL;
import java.util.Base64;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.samza.SamzaException;
import org.apache.samza.application.ApplicationUtil;
import org.apache.samza.checkpoint.CheckpointListener;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.checkpoint.OffsetManager;
import org.apache.samza.checkpoint.OffsetManager$;
import org.apache.samza.checkpoint.OffsetManagerMetrics;
import org.apache.samza.clustermanager.StandbyTaskUtil;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.ShellCommandConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.disk.DiskQuotaPolicy;
import org.apache.samza.container.disk.DiskQuotaPolicyFactory;
import org.apache.samza.container.disk.DiskSpaceMonitor;
import org.apache.samza.container.disk.NoThrottlingDiskQuotaPolicyFactory;
import org.apache.samza.container.disk.PollingScanDiskSpaceMonitor;
import org.apache.samza.container.host.StatisticsMonitorImpl;
import org.apache.samza.container.host.SystemStatisticsMonitor;
import org.apache.samza.context.ApplicationContainerContext;
import org.apache.samza.context.ApplicationContainerContextFactory;
import org.apache.samza.context.ApplicationTaskContext;
import org.apache.samza.context.ApplicationTaskContextFactory;
import org.apache.samza.context.ContainerContextImpl;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.diagnostics.DiagnosticsManager;
import org.apache.samza.drain.DrainMonitor;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.JvmMetrics;
import org.apache.samza.metrics.MetricsHelper;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.metrics.MetricsReporter;
import org.apache.samza.serializers.IntermediateMessageSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
import org.apache.samza.serializers.SerdeManager;
import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.startpoint.StartpointManager;
import org.apache.samza.storage.ContainerStorageManager;
import org.apache.samza.storage.StateBackendFactory;
import org.apache.samza.storage.StorageEngineFactory;
import org.apache.samza.storage.StorageManagerUtil;
import org.apache.samza.storage.TaskStorageCommitManager;
import org.apache.samza.storage.blobstore.index.DirIndex;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.StreamMetadataCache$;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.system.SystemConsumers$;
import org.apache.samza.system.SystemConsumersMetrics;
import org.apache.samza.system.SystemConsumersMetrics$;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducers;
import org.apache.samza.system.SystemProducersMetrics;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.chooser.DefaultChooser;
import org.apache.samza.system.chooser.DefaultChooser$;
import org.apache.samza.system.chooser.MessageChooserFactory;
import org.apache.samza.table.TableManager;
import org.apache.samza.task.AsyncStreamTaskFactory;
import org.apache.samza.task.StreamTaskFactory;
import org.apache.samza.task.TaskExecutorFactory;
import org.apache.samza.task.TaskFactory;
import org.apache.samza.task.TaskFactoryUtil;
import org.apache.samza.task.TaskInstanceCollector;
import org.apache.samza.util.ExponentialSleepStrategy;
import org.apache.samza.util.ExponentialSleepStrategy$;
import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.util.HttpUtil$;
import org.apache.samza.util.Logging;
import org.apache.samza.util.MetricsReporterLoader;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.ScalaJavaUtil$JavaOptionals$;
import org.apache.samza.util.StreamUtil;
import org.apache.samza.util.SystemClock;
import org.apache.samza.util.Throttleable;
import org.slf4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.JavaConverters$;
import scala.collection.MapLike;
import scala.collection.SetLike;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.runtime.BoxesRunTime;
import scala.util.Random$;

/* compiled from: SamzaContainer.scala */
/* loaded from: input_file:org/apache/samza/container/SamzaContainer$.class */
public final class SamzaContainer$ implements Logging {
    public static SamzaContainer$ MODULE$;
    private final int DEFAULT_READ_JOBMODEL_DELAY_MS;
    private final String DISK_POLL_INTERVAL_KEY;
    private final String loggerName;
    private Logger logger;
    private final String startupLoggerName;
    private Logger startupLogger;
    private volatile byte bitmap$0;

    static {
        new SamzaContainer$();
    }

    @Override // org.apache.samza.util.Logging
    public void startupLog(Function0<Object> function0) {
        startupLog(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void trace(Function0<Object> function0) {
        trace(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void trace(Function0<Object> function0, Function0<Throwable> function02) {
        trace(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void debug(Function0<Object> function0) {
        debug(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void debug(Function0<Object> function0, Function0<Throwable> function02) {
        debug(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void info(Function0<Object> function0) {
        info(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void info(Function0<Object> function0, Function0<Throwable> function02) {
        info(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void warn(Function0<Object> function0) {
        warn(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void warn(Function0<Object> function0, Function0<Throwable> function02) {
        warn(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void error(Function0<Object> function0) {
        error(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void error(Function0<Object> function0, Function0<Throwable> function02) {
        error(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public void putMDC(Function0<String> function0, Function0<String> function02) {
        putMDC(function0, function02);
    }

    @Override // org.apache.samza.util.Logging
    public String getMDC(Function0<String> function0) {
        String mdc;
        mdc = getMDC(function0);
        return mdc;
    }

    @Override // org.apache.samza.util.Logging
    public void removeMDC(Function0<String> function0) {
        removeMDC(function0);
    }

    @Override // org.apache.samza.util.Logging
    public void clearMDC() {
        clearMDC();
    }

    @Override // org.apache.samza.util.Logging
    public String loggerName() {
        return this.loggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.container.SamzaContainer$] */
    private Logger logger$lzycompute() {
        Logger logger;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                logger = logger();
                this.logger = logger;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.logger;
    }

    @Override // org.apache.samza.util.Logging
    public Logger logger() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? logger$lzycompute() : this.logger;
    }

    @Override // org.apache.samza.util.Logging
    public String startupLoggerName() {
        return this.startupLoggerName;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [org.apache.samza.container.SamzaContainer$] */
    private Logger startupLogger$lzycompute() {
        Logger startupLogger;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                startupLogger = startupLogger();
                this.startupLogger = startupLogger;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.startupLogger;
    }

    @Override // org.apache.samza.util.Logging
    public Logger startupLogger() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? startupLogger$lzycompute() : this.startupLogger;
    }

    @Override // org.apache.samza.util.Logging
    public void org$apache$samza$util$Logging$_setter_$loggerName_$eq(String str) {
        this.loggerName = str;
    }

    @Override // org.apache.samza.util.Logging
    public void org$apache$samza$util$Logging$_setter_$startupLoggerName_$eq(String str) {
        this.startupLoggerName = str;
    }

    public DiskSpaceMonitor $lessinit$greater$default$9() {
        return null;
    }

    public SystemStatisticsMonitor $lessinit$greater$default$10() {
        return null;
    }

    public OffsetManager $lessinit$greater$default$11() {
        return new OffsetManager(OffsetManager$.MODULE$.$lessinit$greater$default$1(), OffsetManager$.MODULE$.$lessinit$greater$default$2(), OffsetManager$.MODULE$.$lessinit$greater$default$3(), OffsetManager$.MODULE$.$lessinit$greater$default$4(), OffsetManager$.MODULE$.$lessinit$greater$default$5(), OffsetManager$.MODULE$.$lessinit$greater$default$6());
    }

    public LocalityManager $lessinit$greater$default$12() {
        return null;
    }

    public SecurityManager $lessinit$greater$default$13() {
        return null;
    }

    public Map<String, MetricsReporter> $lessinit$greater$default$14() {
        return Predef$.MODULE$.Map().apply(Nil$.MODULE$);
    }

    public JvmMetrics $lessinit$greater$default$15() {
        return null;
    }

    public ExecutorService $lessinit$greater$default$16() {
        return null;
    }

    public ExecutorService $lessinit$greater$default$17() {
        return null;
    }

    public ScheduledExecutorService $lessinit$greater$default$18() {
        return Executors.newSingleThreadScheduledExecutor();
    }

    public DrainMonitor $lessinit$greater$default$23() {
        return null;
    }

    public Option<DiagnosticsManager> $lessinit$greater$default$24() {
        return Option$.MODULE$.empty();
    }

    public int DEFAULT_READ_JOBMODEL_DELAY_MS() {
        return this.DEFAULT_READ_JOBMODEL_DELAY_MS;
    }

    public String DISK_POLL_INTERVAL_KEY() {
        return this.DISK_POLL_INTERVAL_KEY;
    }

    public JobModel readJobModel(String str, int i) {
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Fetching configuration from: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
        });
        return (JobModel) SamzaObjectMapper.getObjectMapper().readValue(HttpUtil$.MODULE$.read(new URL(str), HttpUtil$.MODULE$.read$default$2(), new ExponentialSleepStrategy(ExponentialSleepStrategy$.MODULE$.$lessinit$greater$default$1(), i, ExponentialSleepStrategy$.MODULE$.$lessinit$greater$default$3())), JobModel.class);
    }

    public int readJobModel$default$2() {
        return Random$.MODULE$.nextInt(DEFAULT_READ_JOBMODEL_DELAY_MS()) + 1;
    }

    public File getNonLoggedStorageBaseDir(JobConfig jobConfig, File file) {
        File file2;
        Some option = ScalaJavaUtil$JavaOptionals$.MODULE$.toRichOptional(jobConfig.getNonLoggedStorePath()).toOption();
        if (option instanceof Some) {
            file2 = new File((String) option.value());
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            file2 = file;
        }
        return file2;
    }

    public File getLoggedStorageBaseDir(JobConfig jobConfig, File file) {
        File file2;
        File file3;
        Some option = ScalaJavaUtil$JavaOptionals$.MODULE$.toRichOptional(jobConfig.getLoggedStorePath()).toOption();
        if (option instanceof Some) {
            file2 = new File((String) option.value());
        } else {
            if (!None$.MODULE$.equals(option)) {
                throw new MatchError(option);
            }
            file2 = file;
        }
        File file4 = file2;
        if (System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
            Tuple2 tuple2 = new Tuple2(ScalaJavaUtil$JavaOptionals$.MODULE$.toRichOptional(jobConfig.getName()).toOption().getOrElse(() -> {
                throw new ConfigException("Missing required config: job.name");
            }), jobConfig.getJobId());
            file3 = new File(new StringBuilder(1).append(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR)).append(File.separator).append(tuple2._1()).append("-").append(tuple2._2()).toString());
        } else {
            if (!jobConfig.getLoggedStorePath().isPresent()) {
                warn(() -> {
                    return "No override was provided for logged store base directory. This disables local state re-use on application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment variable in all machines running the Samza container or configure job.logged.store.base.dir for your application";
                });
            }
            file3 = file4;
        }
        return file3;
    }

    public SamzaContainer apply(String str, JobModel jobModel, Map<String, MetricsReporter> map, MetricsRegistryMap metricsRegistryMap, TaskFactory<?> taskFactory, JobContext jobContext, Option<ApplicationContainerContextFactory<ApplicationContainerContext>> option, Option<ApplicationTaskContextFactory<ApplicationTaskContext>> option2, Option<ExternalContext> option3, LocalityManager localityManager, StartpointManager startpointManager, Option<DiagnosticsManager> option4, DrainMonitor drainMonitor) {
        MapConfig config;
        if (StandbyTaskUtil.isStandbyContainer(str)) {
            HashMap hashMap = new HashMap((java.util.Map) jobContext.getConfig());
            hashMap.put(TaskConfig.INTERNAL_CHECKPOINT_MANAGER_CONSUMER_STOP_AFTER_FIRST_READ, Boolean.FALSE.toString());
            config = new MapConfig(hashMap);
        } else {
            config = jobContext.getConfig();
        }
        MapConfig mapConfig = config;
        JobConfig jobConfig = new JobConfig(mapConfig);
        SystemConfig systemConfig = new SystemConfig(mapConfig);
        ContainerModel containerModel = (ContainerModel) jobModel.getContainers().get(str);
        String format = new StringOps(Predef$.MODULE$.augmentString("samza-container-%s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str}));
        String name = ManagementFactory.getRuntimeMXBean().getName();
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Setting up Samza container: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{format}));
        });
        startupLog(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Samza container PID: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{name}));
        });
        Predef$.MODULE$.println(new StringOps(Predef$.MODULE$.augmentString("Container PID: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{name})));
        startupLog(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Using configuration: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{mapConfig}));
        });
        startupLog(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Using container model: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{containerModel}));
        });
        final SamzaContainerMetrics samzaContainerMetrics = new SamzaContainerMetrics(format, metricsRegistryMap, SamzaContainerMetrics$.MODULE$.$lessinit$greater$default$3());
        SystemProducersMetrics systemProducersMetrics = new SystemProducersMetrics(metricsRegistryMap);
        SystemConsumersMetrics systemConsumersMetrics = new SystemConsumersMetrics(metricsRegistryMap, SystemConsumersMetrics$.MODULE$.$lessinit$greater$default$2());
        OffsetManagerMetrics offsetManagerMetrics = new OffsetManagerMetrics(metricsRegistryMap);
        MetricsConfig metricsConfig = new MetricsConfig(mapConfig);
        HighResolutionClock highResolutionClock = metricsConfig.getMetricsTimerEnabled() ? new HighResolutionClock() { // from class: org.apache.samza.container.SamzaContainer$$anon$1
            @Override // org.apache.samza.util.HighResolutionClock
            public long nanoTime() {
                return System.nanoTime();
            }
        } : new HighResolutionClock() { // from class: org.apache.samza.container.SamzaContainer$$anon$2
            @Override // org.apache.samza.util.HighResolutionClock
            public long nanoTime() {
                return 0L;
            }
        };
        Set set = ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(containerModel.getTasks().values()).asScala()).flatMap(taskModel -> {
            return (scala.collection.mutable.Set) JavaConverters$.MODULE$.asScalaSetConverter(taskModel.getSystemStreamPartitions()).asScala();
        }, Iterable$.MODULE$.canBuildFrom())).toSet();
        StorageConfig storageConfig = new StorageConfig(mapConfig);
        Map map2 = ((TraversableOnce) ((TraversableLike) ((TraversableLike) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(storageConfig.getStoreNames()).asScala()).map(str2 -> {
            return new Tuple2(str2, JavaConverters$.MODULE$.asScalaBufferConverter(storageConfig.getSideInputs(str2)).asScala());
        }, Buffer$.MODULE$.canBuildFrom())).filter(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$apply$7(tuple2));
        })).map(tuple22 -> {
            if (tuple22 != null) {
                return new Tuple2((String) tuple22._1(), ((Buffer) tuple22._2()).map(str3 -> {
                    return StreamUtil.getSystemStreamFromNameOrId(mapConfig, str3);
                }, Buffer$.MODULE$.canBuildFrom()));
            }
            throw new MatchError(tuple22);
        }, Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        Set set2 = ((TraversableOnce) map2.values().flatMap(buffer -> {
            return buffer.toStream();
        }, Iterable$.MODULE$.canBuildFrom())).toSet();
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got side input store system streams: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{set2}));
        });
        Set<SystemStream> diff = ((Set) set.map(systemStreamPartition -> {
            return systemStreamPartition.getSystemStream();
        }, Set$.MODULE$.canBuildFrom())).toSet().diff(set2);
        Set set3 = ((Set) diff.map(systemStream -> {
            return systemStream.getSystem();
        }, Set$.MODULE$.canBuildFrom())).toSet();
        Buffer buffer2 = (Buffer) JavaConverters$.MODULE$.asScalaBufferConverter(systemConfig.getSystemNames()).asScala();
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got system names: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{buffer2}));
        });
        StreamConfig streamConfig = new StreamConfig(mapConfig);
        Set set4 = (Set) buffer2.foldLeft(Predef$.MODULE$.Set().apply(Nil$.MODULE$), (set5, str3) -> {
            return set5.$plus$plus((GenTraversableOnce) JavaConverters$.MODULE$.asScalaSetConverter(streamConfig.getSerdeStreams(str3)).asScala());
        });
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got serde streams: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{set4}));
        });
        Map map3 = ((TraversableOnce) buffer2.map(str4 -> {
            return new Tuple2(str4, ReflectionUtil.getObj((String) ScalaJavaUtil$JavaOptionals$.MODULE$.toRichOptional(systemConfig.getSystemFactory(str4)).toOption().getOrElse(() -> {
                throw new SamzaException(new StringOps(Predef$.MODULE$.augmentString("A stream uses system %s, which is missing from the configuration.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str4})));
            }), SystemFactory.class));
        }, Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got system factories: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{map3.keys()}));
        });
        SystemAdmins systemAdmins = new SystemAdmins(mapConfig, getClass().getSimpleName());
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got system admins: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{systemAdmins.getSystemNames()}));
        });
        StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, StreamMetadataCache$.MODULE$.$lessinit$greater$default$2(), StreamMetadataCache$.MODULE$.$lessinit$greater$default$3());
        scala.collection.Map<SystemStream, SystemStreamMetadata> streamMetadata = streamMetadataCache.getStreamMetadata(diff, streamMetadataCache.getStreamMetadata$default$2());
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got input stream metadata: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{streamMetadata}));
        });
        Map map4 = ((TraversableOnce) ((TraversableLike) set3.map(str5 -> {
            try {
                return new Tuple2(str5, ((SystemFactory) map3.apply(str5)).getConsumer(str5, mapConfig, samzaContainerMetrics.mo47registry(), MODULE$.getClass().getSimpleName()));
            } catch (Exception e) {
                MODULE$.error(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("Failed to create a consumer for %s, so skipping.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str5}));
                }, () -> {
                    return e;
                });
                return new Tuple2(str5, (Object) null);
            }
        }, Set$.MODULE$.canBuildFrom())).filter(tuple23 -> {
            return BoxesRunTime.boxToBoolean($anonfun$apply$25(tuple23));
        })).toMap(Predef$.MODULE$.$conforms());
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got system consumers: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{map4.keys()}));
        });
        Map map5 = (Map) ((TraversableLike) map3.map(tuple24 -> {
            Tuple2 tuple24;
            if (tuple24 == null) {
                throw new MatchError(tuple24);
            }
            String str6 = (String) tuple24._1();
            try {
                tuple24 = new Tuple2(str6, ((SystemFactory) tuple24._2()).getProducer(str6, mapConfig, samzaContainerMetrics.mo47registry(), MODULE$.getClass().getSimpleName()));
            } catch (Exception e) {
                MODULE$.error(() -> {
                    return new StringOps(Predef$.MODULE$.augmentString("Failed to create a producer for %s, so skipping.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str6}));
                }, () -> {
                    return e;
                });
                tuple24 = new Tuple2(str6, (Object) null);
            }
            return tuple24;
        }, Map$.MODULE$.canBuildFrom())).filter(tuple25 -> {
            return BoxesRunTime.boxToBoolean($anonfun$apply$30(tuple25));
        });
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got system producers: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{map5.keys()}));
        });
        SerializerConfig serializerConfig = new SerializerConfig(mapConfig);
        Map map6 = ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(serializerConfig.getSerdeNames()).asScala()).map(str6 -> {
            return new Tuple2(str6, ((SerdeFactory) ReflectionUtil.getObj((String) ScalaJavaUtil$JavaOptionals$.MODULE$.toRichOptional(serializerConfig.getSerdeFactoryClass(str6)).toOption().getOrElse(() -> {
                return SerializerConfig.getPredefinedSerdeFactoryName(str6);
            }), SerdeFactory.class)).getSerde(str6, mapConfig));
        }, Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got serdes from factories: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{map6.keys()}));
        });
        SerializableSerde serializableSerde = new SerializableSerde();
        scala.collection.mutable.Map map7 = (scala.collection.mutable.Map) ((TraversableLike) ((TraversableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(mapConfig.subset(new StringOps(Predef$.MODULE$.augmentString(SerializerConfig.SERIALIZER_PREFIX)).format(Predef$.MODULE$.genericWrapArray(new Object[]{DirIndex.ROOT_DIR_NAME})))).asScala()).filter(tuple26 -> {
            return BoxesRunTime.boxToBoolean($anonfun$apply$35(tuple26));
        })).flatMap(tuple27 -> {
            Iterable option2Iterable;
            if (tuple27 == null) {
                throw new MatchError(tuple27);
            }
            String str7 = (String) tuple27._1();
            String str8 = (String) tuple27._2();
            String replace = str7.replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX, DirIndex.ROOT_DIR_NAME);
            MODULE$.debug(() -> {
                return new StringBuilder(41).append("Trying to deserialize serde instance for ").append(replace).toString();
            });
            try {
                Serde fromBytes = serializableSerde.fromBytes(Base64.getDecoder().decode(str8));
                MODULE$.debug(() -> {
                    return new StringBuilder(34).append("Returning serialized instance for ").append(replace).toString();
                });
                option2Iterable = Option$.MODULE$.option2Iterable(new Some(new Tuple2(replace, fromBytes)));
            } catch (Exception e) {
                MODULE$.warn(() -> {
                    return new StringBuilder(43).append("Ignoring invalid serialized instance for ").append(replace).append(": ").append(str8).toString();
                }, () -> {
                    return e;
                });
                option2Iterable = Option$.MODULE$.option2Iterable(None$.MODULE$);
            }
            return option2Iterable;
        }, scala.collection.mutable.Map$.MODULE$.canBuildFrom());
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got serdes from serialized instances: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{map7.keys()}));
        });
        Map $plus$plus = map6.$plus$plus(map7);
        Function1 function1 = function12 -> {
            return ((TraversableOnce) ((TraversableLike) buffer2.filter(str7 -> {
                return BoxesRunTime.boxToBoolean($anonfun$apply$43(function12, str7));
            })).flatMap(str8 -> {
                String str8 = (String) ((Option) function12.apply(str8)).get();
                return Option$.MODULE$.option2Iterable(Option$.MODULE$.apply((Serde) $plus$plus.getOrElse(str8, () -> {
                    throw new SamzaException(new StringOps(Predef$.MODULE$.augmentString("buildSystemSerdeMap: No class defined for serde: %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str8})));
                })).filter(serde -> {
                    return BoxesRunTime.boxToBoolean($anonfun$apply$46(serde));
                }).map(serde2 -> {
                    return new Tuple2(str8, serde2);
                }));
            }, Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        };
        Function1 function13 = function14 -> {
            return ((TraversableOnce) ((TraversableLike) set4.$plus$plus(set).filter(systemStream2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$apply$49(function14, systemStream2));
            })).flatMap(systemStream3 -> {
                String str7 = (String) ((Optional) function14.apply(systemStream3)).get();
                return Option$.MODULE$.option2Iterable(Option$.MODULE$.apply((Serde) $plus$plus.getOrElse(str7, () -> {
                    throw new SamzaException(new StringOps(Predef$.MODULE$.augmentString("buildSystemStreamSerdeMap: No serde found for name: %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str7})));
                })).filter(serde -> {
                    return BoxesRunTime.boxToBoolean($anonfun$apply$52(serde));
                }).map(serde2 -> {
                    return new Tuple2(systemStream3, serde2);
                }));
            }, Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        };
        Map map8 = (Map) function1.apply(str7 -> {
            return ScalaJavaUtil$JavaOptionals$.MODULE$.toRichOptional(systemConfig.getSystemKeySerde(str7)).toOption();
        });
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got system key serdes: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{map8}));
        });
        Map map9 = (Map) function1.apply(str8 -> {
            return ScalaJavaUtil$JavaOptionals$.MODULE$.toRichOptional(systemConfig.getSystemMsgSerde(str8)).toOption();
        });
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got system message serdes: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{map9}));
        });
        Map map10 = (Map) function13.apply(systemStream2 -> {
            return streamConfig.getStreamKeySerde(systemStream2);
        });
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got system stream key serdes: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{map10}));
        });
        Map map11 = (Map) function13.apply(systemStream3 -> {
            return streamConfig.getStreamMsgSerde(systemStream3);
        });
        debug(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got system stream message serdes: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{map11}));
        });
        java.util.Map<String, SystemStream> storeChangelogs = storageConfig.getStoreChangelogs();
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got change log system streams: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{storeChangelogs}));
        });
        List list = ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaSetConverter(streamConfig.getStreamIds()).asScala()).filter(str9 -> {
            return BoxesRunTime.boxToBoolean(streamConfig.getIsIntermediateStream(str9));
        })).toList();
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got intermediate streams: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{list}));
        });
        SerdeManager serdeManager = new SerdeManager($plus$plus, map8, map9, map10, map11, ((MapLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(storeChangelogs).asScala()).values().toSet(), ((TraversableOnce) list.flatMap(str10 -> {
            SystemStream streamIdToSystemStream = streamConfig.streamIdToSystemStream(str10);
            return Option$.MODULE$.option2Iterable(map10.get(streamIdToSystemStream).orElse(() -> {
                return map8.get(streamIdToSystemStream.getSystem());
            }).map(serde -> {
                return new Tuple2(streamIdToSystemStream, new StringSerde("UTF-8"));
            }));
        }, List$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()), ((TraversableOnce) list.flatMap(str11 -> {
            SystemStream streamIdToSystemStream = streamConfig.streamIdToSystemStream(str11);
            return Option$.MODULE$.option2Iterable(map11.get(streamIdToSystemStream).orElse(() -> {
                return map9.get(streamIdToSystemStream.getSystem());
            }).map(serde -> {
                return new Tuple2(streamIdToSystemStream, new IntermediateMessageSerde(serde));
            }));
        }, List$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms()));
        info(() -> {
            return "Setting up JVM metrics.";
        });
        JvmMetrics jvmMetrics = new JvmMetrics(samzaContainerMetrics.mo47registry());
        info(() -> {
            return "Setting up message chooser.";
        });
        TaskConfig taskConfig = new TaskConfig(mapConfig);
        DefaultChooser apply = DefaultChooser$.MODULE$.apply(streamMetadata, (MessageChooserFactory) ReflectionUtil.getObj(taskConfig.getMessageChooserClass(), MessageChooserFactory.class), mapConfig, samzaContainerMetrics.mo47registry(), systemAdmins);
        info(() -> {
            return "Setting up metrics reporters.";
        });
        Map $plus$plus2 = ((TraversableOnce) JavaConverters$.MODULE$.mapAsScalaMapConverter(MetricsReporterLoader.getMetricsReporters(metricsConfig, format)).asScala()).toMap(Predef$.MODULE$.$conforms()).$plus$plus(map);
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got metrics reporters: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{$plus$plus2.keys()}));
        });
        Some option5 = ScalaJavaUtil$JavaOptionals$.MODULE$.toRichOptional(jobConfig.getSecurityManagerFactory()).toOption();
        SecurityManager securityManager = option5 instanceof Some ? ((SecurityManagerFactory) ReflectionUtil.getObj((String) option5.value(), SecurityManagerFactory.class)).getSecurityManager(mapConfig) : null;
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got security manager: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{securityManager}));
        });
        CheckpointManager orElse = taskConfig.getCheckpointManager(samzaContainerMetrics.mo47registry()).orElse(null);
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got checkpoint manager: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{orElse}));
        });
        scala.collection.Map<String, CheckpointListener> map12 = (Map) ((TraversableLike) map4.filter(tuple28 -> {
            return BoxesRunTime.boxToBoolean($anonfun$apply$77(tuple28));
        })).map(tuple29 -> {
            if (tuple29 != null) {
                return new Tuple2((String) tuple29._1(), (SystemConsumer) tuple29._2());
            }
            throw new MatchError(tuple29);
        }, Map$.MODULE$.canBuildFrom());
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got checkpointListeners : %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{map12}));
        });
        OffsetManager apply2 = OffsetManager$.MODULE$.apply(streamMetadata, mapConfig, orElse, startpointManager, systemAdmins, map12, offsetManagerMetrics);
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got offset manager: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{apply2}));
        });
        boolean dropDeserializationErrors = taskConfig.getDropDeserializationErrors();
        boolean dropSerializationErrors = taskConfig.getDropSerializationErrors();
        int pollIntervalMs = taskConfig.getPollIntervalMs();
        ApplicationConfig applicationConfig = new ApplicationConfig(mapConfig);
        SystemConsumers systemConsumers = new SystemConsumers(apply, map4, systemAdmins, serdeManager, systemConsumersMetrics, SystemConsumers$.MODULE$.$lessinit$greater$default$6(), dropDeserializationErrors, pollIntervalMs, () -> {
            return highResolutionClock.nanoTime();
        }, jobConfig.getElasticityFactor(), applicationConfig.getRunId());
        SystemProducers systemProducers = new SystemProducers(map5, serdeManager, systemProducersMetrics, dropSerializationErrors);
        Map map13 = ((TraversableOnce) ((TraversableLike) JavaConverters$.MODULE$.asScalaBufferConverter(storageConfig.getStoreNames()).asScala()).map(str12 -> {
            return new Tuple2(str12, ReflectionUtil.getObj((String) ScalaJavaUtil$JavaOptionals$.MODULE$.toRichOptional(storageConfig.getStorageFactoryClassName(str12)).toOption().getOrElse(() -> {
                throw new SamzaException(new StringOps(Predef$.MODULE$.augmentString("Missing storage factory for %s.")).format(Predef$.MODULE$.genericWrapArray(new Object[]{str12})));
            }), StorageEngineFactory.class));
        }, Buffer$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got storage engines: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{map13.keys()}));
        });
        int threadPoolSize = jobConfig.getThreadPoolSize();
        info(() -> {
            return new StringBuilder(22).append("Got thread pool size: ").append(threadPoolSize).toString();
        });
        samzaContainerMetrics.containerThreadPoolSize().set(BoxesRunTime.boxToLong(threadPoolSize));
        ExecutorService taskExecutor = threadPoolSize > 0 ? ((TaskExecutorFactory) ReflectionUtil.getObj(jobConfig.getTaskExecutorFactory(), TaskExecutorFactory.class)).getTaskExecutor(mapConfig) : null;
        TaskFactory finalizeTaskFactory = TaskFactoryUtil.finalizeTaskFactory(taskFactory, taskExecutor);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(Math.min(Math.max(containerModel.getTasks().size() * 2, jobConfig.getCommitThreadPoolSize()), jobConfig.getCommitThreadPoolMaxSize()), new ThreadFactoryBuilder().setNameFormat("Samza Task Commit Thread-%d").setDaemon(true).build());
        Iterable iterable = (Iterable) JavaConverters$.MODULE$.collectionAsScalaIterableConverter(containerModel.getTasks().values()).asScala();
        ContainerContextImpl containerContextImpl = new ContainerContextImpl(containerModel, samzaContainerMetrics.mo47registry());
        Option map14 = option.map(applicationContainerContextFactory -> {
            return applicationContainerContextFactory.create((ExternalContext) option3.orNull(Predef$.MODULE$.$conforms()), jobContext, containerContextImpl);
        });
        HashSet hashSet = new HashSet();
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        Map map15 = ((TraversableOnce) iterable.map(taskModel2 -> {
            return new Tuple2(taskModel2.getTaskName(), new TaskInstanceMetrics(new StringOps(Predef$.MODULE$.augmentString("TaskName-%s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{taskModel2.getTaskName()})), TaskInstanceMetrics$.MODULE$.$lessinit$greater$default$2(), TaskInstanceMetrics$.MODULE$.$lessinit$greater$default$3()));
        }, Iterable$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        Map map16 = ((TraversableOnce) iterable.map(taskModel3 -> {
            return new Tuple2(taskModel3.getTaskName(), new TaskInstanceCollector(systemProducers, (TaskInstanceMetrics) map15.get(taskModel3.getTaskName()).get()));
        }, Iterable$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        File file = new File(System.getProperty("user.dir"), "state");
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got default storage engine base directory: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{file}));
        });
        File nonLoggedStorageBaseDir = getNonLoggedStorageBaseDir(jobConfig, file);
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got base directory for non logged data stores: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{nonLoggedStorageBaseDir}));
        });
        File loggedStorageBaseDir = getLoggedStorageBaseDir(jobConfig, file);
        info(() -> {
            return new StringOps(Predef$.MODULE$.augmentString("Got base directory for logged data stores: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{loggedStorageBaseDir}));
        });
        java.util.Set<String> backupFactories = storageConfig.getBackupFactories();
        java.util.Set<String> restoreFactories = storageConfig.getRestoreFactories();
        if (!backupFactories.containsAll(restoreFactories)) {
            backupFactories.removeAll(restoreFactories);
            throw new SamzaException(new StringBuilder(102).append("Restore state backend factories is not a subset of backup state backend factories, missing factories: ").append(backupFactories.toString()).toString());
        }
        scala.collection.mutable.Set set6 = (scala.collection.mutable.Set) ((SetLike) JavaConverters$.MODULE$.asScalaSetConverter(backupFactories).asScala()).map(str13 -> {
            return (StateBackendFactory) ReflectionUtil.getObj(str13, StateBackendFactory.class);
        }, scala.collection.mutable.Set$.MODULE$.canBuildFrom());
        ContainerStorageManager containerStorageManager = new ContainerStorageManager(orElse, containerModel, streamMetadataCache, systemAdmins, storeChangelogs, (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map2.mapValues(buffer3 -> {
            return (java.util.Set) JavaConverters$.MODULE$.setAsJavaSetConverter(buffer3.toSet()).asJava();
        })).asJava(), (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map13).asJava(), (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map3).asJava(), (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter($plus$plus).asJava(), mapConfig, (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map15).asJava(), samzaContainerMetrics, jobContext, containerContextImpl, (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(((TraversableOnce) ((SetLike) JavaConverters$.MODULE$.asScalaSetConverter(restoreFactories).asScala()).map(str14 -> {
            return new Tuple2(str14, ReflectionUtil.getObj(str14, StateBackendFactory.class));
        }, scala.collection.mutable.Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava(), (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(map16).asJava(), loggedStorageBaseDir, nonLoggedStorageBaseDir, serdeManager, SystemClock.instance());
        hashSet.addAll(containerStorageManager.getStoreDirectoryPaths());
        Map map17 = ((TraversableOnce) ((TraversableLike) iterable.filter(taskModel4 -> {
            return BoxesRunTime.boxToBoolean($anonfun$apply$95(taskModel4));
        })).map(taskModel5 -> {
            Object createInstance;
            MODULE$.debug(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Setting up task instance: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{taskModel5}));
            });
            final TaskName taskName = taskModel5.getTaskName();
            if (finalizeTaskFactory instanceof AsyncStreamTaskFactory) {
                createInstance = ((AsyncStreamTaskFactory) finalizeTaskFactory).createInstance();
            } else {
                if (!(finalizeTaskFactory instanceof StreamTaskFactory)) {
                    throw new MatchError(finalizeTaskFactory);
                }
                createInstance = ((StreamTaskFactory) finalizeTaskFactory).createInstance();
            }
            Set set7 = ((TraversableOnce) JavaConverters$.MODULE$.asScalaSetConverter(taskModel5.getSystemStreamPartitions()).asScala()).toSet();
            MODULE$.info(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Got task SSPs: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{set7}));
            });
            Set set8 = ((TraversableOnce) map2.mapValues(buffer4 -> {
                return (java.util.Set) JavaConverters$.MODULE$.setAsJavaSetConverter((scala.collection.Set) set7.filter(systemStreamPartition2 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$apply$100(buffer4, systemStreamPartition2));
                })).asJava();
            }).values().flatMap(set9 -> {
                return (scala.collection.mutable.Set) JavaConverters$.MODULE$.asScalaSetConverter(set9).asScala();
            }, Iterable$.MODULE$.canBuildFrom())).toSet();
            MODULE$.info(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Got task side input SSPs: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{set8}));
            });
            final HashMap hashMap2 = new HashMap();
            final java.util.Map<String, SystemAdmin> systemAdmins2 = systemAdmins.getSystemAdmins();
            ((Iterable) JavaConverters$.MODULE$.mutableSetAsJavaSetConverter(set6).asJava()).forEach(new Consumer<StateBackendFactory>(map15, taskName, jobContext, containerModel, taskModel5, systemAdmins2, newFixedThreadPool, mapConfig, loggedStorageBaseDir, nonLoggedStorageBaseDir, hashMap2) { // from class: org.apache.samza.container.SamzaContainer$$anon$3
                private final Map taskInstanceMetrics$1;
                private final TaskName taskName$1;
                private final JobContext jobContext$1;
                private final ContainerModel containerModel$1;
                private final TaskModel taskModel$1;
                private final java.util.Map systemAdminsMap$1;
                private final ExecutorService commitThreadPool$1;
                private final Config config$1;
                private final File loggedStorageBaseDir$1;
                private final File nonLoggedStorageBaseDir$1;
                private final HashMap taskBackupManagerMap$1;

                @Override // java.util.function.Consumer
                public Consumer<StateBackendFactory> andThen(Consumer<? super StateBackendFactory> consumer) {
                    return super.andThen(consumer);
                }

                @Override // java.util.function.Consumer
                public void accept(StateBackendFactory stateBackendFactory) {
                    this.taskBackupManagerMap$1.put(stateBackendFactory.getClass().getName(), stateBackendFactory.getBackupManager(this.jobContext$1, this.containerModel$1, this.taskModel$1, this.systemAdminsMap$1, this.commitThreadPool$1, (this.taskInstanceMetrics$1.contains(this.taskName$1) && this.taskInstanceMetrics$1.get(this.taskName$1).isDefined()) ? ((TaskInstanceMetrics) this.taskInstanceMetrics$1.get(this.taskName$1).get()).mo47registry() : new MetricsRegistryMap(), this.config$1, SystemClock.instance(), this.loggedStorageBaseDir$1, this.nonLoggedStorageBaseDir$1));
                }

                {
                    this.taskInstanceMetrics$1 = map15;
                    this.taskName$1 = taskName;
                    this.jobContext$1 = jobContext;
                    this.containerModel$1 = containerModel;
                    this.taskModel$1 = taskModel5;
                    this.systemAdminsMap$1 = systemAdmins2;
                    this.commitThreadPool$1 = newFixedThreadPool;
                    this.config$1 = mapConfig;
                    this.loggedStorageBaseDir$1 = loggedStorageBaseDir;
                    this.nonLoggedStorageBaseDir$1 = nonLoggedStorageBaseDir;
                    this.taskBackupManagerMap$1 = hashMap2;
                }
            });
            TaskStorageCommitManager taskStorageCommitManager = new TaskStorageCommitManager(taskName, hashMap2, containerStorageManager, storeChangelogs, taskModel5.getChangelogPartition(), orElse, mapConfig, newFixedThreadPool, new StorageManagerUtil(), loggedStorageBaseDir, (TaskInstanceMetrics) map15.get(taskName).get());
            TableManager tableManager = new TableManager(mapConfig);
            MODULE$.info(() -> {
                return "Got table manager";
            });
            return new Tuple2(taskName, createTaskInstance$1(createInstance, taskModel5, map15, taskName, systemAdmins, systemConsumers, map16, apply2, taskStorageCommitManager, containerStorageManager, tableManager, set7, set8, taskConfig, jobModel, streamMetadataCache, streamMetadata, newSingleThreadScheduledExecutor, newFixedThreadPool, jobContext, containerContextImpl, map14, option2, option3, jobConfig));
        }, Iterable$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        final Runnable createRunLoop = RunLoopFactory.createRunLoop(map17, systemConsumers, taskExecutor, mapConfig.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1L)), samzaContainerMetrics, taskConfig, highResolutionClock, jobConfig.getElasticityFactor(), applicationConfig.getRunId(), ApplicationUtil.isHighLevelApiJob(mapConfig));
        StatisticsMonitorImpl statisticsMonitorImpl = new StatisticsMonitorImpl();
        statisticsMonitorImpl.registerListener(new SamzaContainerMonitorListener(mapConfig, samzaContainerMetrics, taskExecutor));
        final long j = mapConfig.getLong("container.disk.quota.bytes", Long.MAX_VALUE);
        samzaContainerMetrics.diskQuotaBytes().set(BoxesRunTime.boxToLong(j));
        final DiskQuotaPolicy create = ((DiskQuotaPolicyFactory) ReflectionUtil.getObj(mapConfig.get("container.disk.quota.policy.factory", NoThrottlingDiskQuotaPolicyFactory.class.getName()), DiskQuotaPolicyFactory.class)).create(mapConfig);
        PollingScanDiskSpaceMonitor pollingScanDiskSpaceMonitor = null;
        int i = mapConfig.getInt(DISK_POLL_INTERVAL_KEY(), 0);
        if (i != 0) {
            pollingScanDiskSpaceMonitor = new PollingScanDiskSpaceMonitor(hashSet, i);
            pollingScanDiskSpaceMonitor.registerListener(new DiskSpaceMonitor.Listener(create, j, createRunLoop, samzaContainerMetrics) { // from class: org.apache.samza.container.SamzaContainer$$anon$4
                private final DiskQuotaPolicy diskQuotaPolicy$1;
                private final long diskQuotaBytes$1;
                private final Runnable runLoop$1;
                private final SamzaContainerMetrics samzaContainerMetrics$1;

                @Override // org.apache.samza.container.disk.DiskSpaceMonitor.Listener
                public void onUpdate(long j2) {
                    ((Throttleable) this.runLoop$1).setWorkFactor(this.diskQuotaPolicy$1.apply(1.0d - (j2 / this.diskQuotaBytes$1)));
                    this.samzaContainerMetrics$1.executorWorkFactor().set(BoxesRunTime.boxToDouble(((Throttleable) this.runLoop$1).getWorkFactor()));
                    this.samzaContainerMetrics$1.diskUsageBytes().set(BoxesRunTime.boxToLong(j2));
                }

                {
                    this.diskQuotaPolicy$1 = create;
                    this.diskQuotaBytes$1 = j;
                    this.runLoop$1 = createRunLoop;
                    this.samzaContainerMetrics$1 = samzaContainerMetrics;
                }
            });
            info(() -> {
                return new StringOps(Predef$.MODULE$.augmentString("Initialized disk space monitor watch paths to: %s")).format(Predef$.MODULE$.genericWrapArray(new Object[]{hashSet}));
            });
        } else {
            info(() -> {
                return new StringBuilder(59).append("Disk quotas disabled because polling interval is not set (").append(MODULE$.DISK_POLL_INTERVAL_KEY()).append(")").toString();
            });
        }
        info(() -> {
            return "Samza container setup complete.";
        });
        return new SamzaContainer(mapConfig, map17, map15, createRunLoop, systemAdmins, systemConsumers, systemProducers, samzaContainerMetrics, pollingScanDiskSpaceMonitor, statisticsMonitorImpl, apply2, localityManager, securityManager, $plus$plus2, jvmMetrics, taskExecutor, newFixedThreadPool, newSingleThreadScheduledExecutor, containerContextImpl, map14, option3, containerStorageManager, drainMonitor, option4);
    }

    public Map<String, MetricsReporter> apply$default$3() {
        return Predef$.MODULE$.Map().apply(Nil$.MODULE$);
    }

    public LocalityManager apply$default$10() {
        return null;
    }

    public StartpointManager apply$default$11() {
        return null;
    }

    public Option<DiagnosticsManager> apply$default$12() {
        return Option$.MODULE$.empty();
    }

    public DrainMonitor apply$default$13() {
        return null;
    }

    public static final /* synthetic */ boolean $anonfun$apply$7(Tuple2 tuple2) {
        if (tuple2 != null) {
            return ((Buffer) tuple2._2()).nonEmpty();
        }
        throw new MatchError(tuple2);
    }

    public static final /* synthetic */ boolean $anonfun$apply$25(Tuple2 tuple2) {
        return tuple2._2() != null;
    }

    public static final /* synthetic */ boolean $anonfun$apply$30(Tuple2 tuple2) {
        return tuple2._2() != null;
    }

    public static final /* synthetic */ boolean $anonfun$apply$35(Tuple2 tuple2) {
        if (tuple2 != null) {
            return ((String) tuple2._1()).endsWith(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX);
        }
        throw new MatchError(tuple2);
    }

    public static final /* synthetic */ boolean $anonfun$apply$43(Function1 function1, String str) {
        return ((Option) function1.apply(str)).isDefined();
    }

    public static final /* synthetic */ boolean $anonfun$apply$46(Serde serde) {
        return !(serde instanceof NoOpSerde);
    }

    public static final /* synthetic */ boolean $anonfun$apply$49(Function1 function1, SystemStream systemStream) {
        return ((Optional) function1.apply(systemStream)).isPresent();
    }

    public static final /* synthetic */ boolean $anonfun$apply$52(Serde serde) {
        return !(serde instanceof NoOpSerde);
    }

    public static final /* synthetic */ boolean $anonfun$apply$77(Tuple2 tuple2) {
        return tuple2._2() instanceof CheckpointListener;
    }

    public static final /* synthetic */ boolean $anonfun$apply$95(TaskModel taskModel) {
        return taskModel.getTaskMode() == TaskMode.Active;
    }

    public static final /* synthetic */ boolean $anonfun$apply$100(Buffer buffer, SystemStreamPartition systemStreamPartition) {
        return buffer.contains(systemStreamPartition.getSystemStream());
    }

    private static final TaskInstance createTaskInstance$1(Object obj, TaskModel taskModel, Map map, TaskName taskName, SystemAdmins systemAdmins, SystemConsumers systemConsumers, Map map2, OffsetManager offsetManager, TaskStorageCommitManager taskStorageCommitManager, ContainerStorageManager containerStorageManager, TableManager tableManager, Set set, Set set2, TaskConfig taskConfig, JobModel jobModel, StreamMetadataCache streamMetadataCache, Map map3, ScheduledExecutorService scheduledExecutorService, ExecutorService executorService, JobContext jobContext, ContainerContextImpl containerContextImpl, Option option, Option option2, Option option3, JobConfig jobConfig) {
        return new TaskInstance(obj, taskModel, (TaskInstanceMetrics) map.get(taskName).get(), systemAdmins, systemConsumers, (TaskInstanceCollector) map2.get(taskName).get(), offsetManager, taskStorageCommitManager, containerStorageManager, tableManager, (java.util.Set) JavaConverters$.MODULE$.setAsJavaSetConverter(set.$minus$minus(set2)).asJava(), TaskInstanceExceptionHandler$.MODULE$.apply((MetricsHelper) map.get(taskName).get(), taskConfig), jobModel, streamMetadataCache, map3, scheduledExecutorService, executorService, jobContext, containerContextImpl, option, option2, option3, jobConfig.getElasticityFactor());
    }

    private SamzaContainer$() {
        MODULE$ = this;
        Logging.$init$(this);
        this.DEFAULT_READ_JOBMODEL_DELAY_MS = 100;
        this.DISK_POLL_INTERVAL_KEY = "container.disk.poll.interval.ms";
    }
}
