package org.apache.samza.storage.kv;

import java.io.File;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.StorageEngine;
import org.apache.samza.storage.StorageEngineFactory;
import org.apache.samza.storage.StoreProperties;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.util.HighResolutionClock;
import org.apache.samza.util.ScalaJavaUtil;

/* loaded from: input_file:org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.class */
public abstract class BaseKeyValueStorageEngineFactory<K, V> implements StorageEngineFactory<K, V> {
    private static final String INMEMORY_KV_STORAGE_ENGINE_FACTORY = "org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory";
    private static final String WRITE_BATCH_SIZE = "write.batch.size";
    private static final int DEFAULT_WRITE_BATCH_SIZE = 500;
    private static final String OBJECT_CACHE_SIZE = "object.cache.size";
    private static final int DEFAULT_OBJECT_CACHE_SIZE = 1000;

    protected abstract KeyValueStore<byte[], byte[]> getKVStore(String str, File file, MetricsRegistry metricsRegistry, JobContext jobContext, ContainerContext containerContext, StorageEngineFactory.StoreMode storeMode);

    public StorageEngine getStorageEngine(String str, File file, Serde<K> serde, Serde<V> serde2, MessageCollector messageCollector, MetricsRegistry metricsRegistry, SystemStreamPartition systemStreamPartition, JobContext jobContext, ContainerContext containerContext, StorageEngineFactory.StoreMode storeMode) {
        Config subset = jobContext.getConfig().subset("stores." + str + ".", true);
        StorageConfig storageConfig = new StorageConfig(jobContext.getConfig());
        Optional storageFactoryClassName = storageConfig.getStorageFactoryClassName(str);
        StoreProperties.StorePropertiesBuilder storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder();
        if (!storageFactoryClassName.isPresent() || StringUtils.isBlank((CharSequence) storageFactoryClassName.get())) {
            throw new SamzaException(String.format("Store factory not defined for store %s. Cannot proceed with KV store creation!", str));
        }
        if (!((String) storageFactoryClassName.get()).equals(INMEMORY_KV_STORAGE_ENGINE_FACTORY)) {
            storePropertiesBuilder.setPersistedToDisk(true);
        }
        storePropertiesBuilder.setIsDurable(!storageConfig.getStoreBackupFactories(str).isEmpty());
        int i = subset.getInt(WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
        int i2 = subset.getInt(OBJECT_CACHE_SIZE, Math.max(i, DEFAULT_OBJECT_CACHE_SIZE));
        if (i2 > 0 && i2 < i) {
            throw new SamzaException(String.format("cache.size for store %s cannot be less than batch.size as batched values reside in cache.", str));
        }
        if (serde == null) {
            throw new SamzaException(String.format("Must define a key serde when using key value storage for store %s.", str));
        }
        if (serde2 == null) {
            throw new SamzaException(String.format("Must define a message serde when using key value storage for store %s.", str));
        }
        KeyValueStore<byte[], byte[]> kVStore = getKVStore(str, file, metricsRegistry, jobContext, containerContext, storeMode);
        NullSafeKeyValueStore nullSafeKeyValueStore = new NullSafeKeyValueStore(buildMaybeAccessLoggedStore(str, buildStoreWithLargeMessageHandling(str, metricsRegistry, buildMaybeLoggedStore(systemStreamPartition, str, metricsRegistry, storePropertiesBuilder, kVStore, messageCollector), storageConfig, i2, i, serde, serde2), messageCollector, systemStreamPartition, storageConfig, serde));
        KeyValueStorageEngineMetrics keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(str, metricsRegistry);
        HighResolutionClock buildClock = buildClock(jobContext.getConfig());
        StoreProperties build = storePropertiesBuilder.build();
        buildClock.getClass();
        return new KeyValueStorageEngine(str, file, build, nullSafeKeyValueStore, kVStore, systemStreamPartition, messageCollector, keyValueStorageEngineMetrics, i, ScalaJavaUtil.toScalaFunction(buildClock::nanoTime));
    }

    private static KeyValueStore<byte[], byte[]> buildMaybeLoggedStore(SystemStreamPartition systemStreamPartition, String str, MetricsRegistry metricsRegistry, StoreProperties.StorePropertiesBuilder storePropertiesBuilder, KeyValueStore<byte[], byte[]> keyValueStore, MessageCollector messageCollector) {
        if (systemStreamPartition == null) {
            return keyValueStore;
        }
        LoggedStoreMetrics loggedStoreMetrics = new LoggedStoreMetrics(str, metricsRegistry);
        storePropertiesBuilder.setLoggedStore(true);
        return new LoggedStore(keyValueStore, systemStreamPartition, messageCollector, loggedStoreMetrics);
    }

    private static <T, U> KeyValueStore<T, U> buildStoreWithLargeMessageHandling(String str, MetricsRegistry metricsRegistry, KeyValueStore<byte[], byte[]> keyValueStore, StorageConfig storageConfig, int i, int i2, Serde<T> serde, Serde<U> serde2) {
        int changelogMaxMsgSizeBytes = storageConfig.getChangelogMaxMsgSizeBytes(str);
        if (storageConfig.getDisallowLargeMessages(str)) {
            return buildSerializedStore(str, metricsRegistry, new LargeMessageSafeStore(buildMaybeCachedStore(str, metricsRegistry, keyValueStore, i, i2), str, false, changelogMaxMsgSizeBytes), serde, serde2);
        }
        return buildMaybeCachedStore(str, metricsRegistry, buildSerializedStore(str, metricsRegistry, storageConfig.getDropLargeMessages(str) ? new LargeMessageSafeStore(keyValueStore, str, true, changelogMaxMsgSizeBytes) : keyValueStore, serde, serde2), i, i2);
    }

    private static <T, U> KeyValueStore<T, U> buildMaybeCachedStore(String str, MetricsRegistry metricsRegistry, KeyValueStore<T, U> keyValueStore, int i, int i2) {
        return i > 0 ? new CachedStore(keyValueStore, i, i2, new CachedStoreMetrics(str, metricsRegistry)) : keyValueStore;
    }

    private static <T, U> KeyValueStore<T, U> buildSerializedStore(String str, MetricsRegistry metricsRegistry, KeyValueStore<byte[], byte[]> keyValueStore, Serde<T> serde, Serde<U> serde2) {
        return new SerializedKeyValueStore(keyValueStore, serde, serde2, new SerializedKeyValueStoreMetrics(str, metricsRegistry));
    }

    private static <T, U> KeyValueStore<T, U> buildMaybeAccessLoggedStore(String str, KeyValueStore<T, U> keyValueStore, MessageCollector messageCollector, SystemStreamPartition systemStreamPartition, StorageConfig storageConfig, Serde<T> serde) {
        return storageConfig.getAccessLogEnabled(str) ? new AccessLoggedStore(keyValueStore, messageCollector, systemStreamPartition, storageConfig, str, serde) : keyValueStore;
    }

    private static HighResolutionClock buildClock(Config config) {
        return new MetricsConfig(config).getMetricsTimerEnabled() ? System::nanoTime : () -> {
            return 0L;
        };
    }
}
