package com.hazelcast.jet.impl;

import com.hazelcast.cluster.Cluster;
import com.hazelcast.collection.IList;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.JetCacheManager;
import com.hazelcast.jet.JetInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.JobAlreadyExistsException;
import com.hazelcast.jet.Observable;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.JobNotFoundException;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.impl.observer.ObservableImpl;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.replicatedmap.ReplicatedMap;
import com.hazelcast.ringbuffer.impl.RingbufferService;
import com.hazelcast.topic.ITopic;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

/* loaded from: input_file:com/hazelcast/jet/impl/AbstractJetInstance.class */
public abstract class AbstractJetInstance implements JetInstance {
    private final HazelcastInstance hazelcastInstance;
    private final Map<String, Observable> observables = new ConcurrentHashMap();
    private final JetCacheManagerImpl cacheManager = new JetCacheManagerImpl(this);
    private final Supplier<JobRepository> jobRepository = Util.memoizeConcurrent(() -> {
        return new JobRepository(this);
    });

    public AbstractJetInstance(HazelcastInstance hazelcastInstance) {
        this.hazelcastInstance = hazelcastInstance;
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public Job newJob(@Nonnull DAG dag, @Nonnull JobConfig jobConfig) {
        return newJobProxy(uploadResourcesAndAssignId(jobConfig), dag, jobConfig);
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public Job newJobIfAbsent(@Nonnull DAG dag, @Nonnull JobConfig jobConfig) {
        JobStatus status;
        if (jobConfig.getName() == null) {
            return newJob(dag, jobConfig);
        }
        while (true) {
            Job job = getJob(jobConfig.getName());
            if (job != null && (status = job.getStatus()) != JobStatus.FAILED && status != JobStatus.COMPLETED) {
                return job;
            }
            try {
                return newJob(dag, jobConfig);
            } catch (JobAlreadyExistsException e) {
                LoggingUtil.logFine(getLogger(), "Could not submit job with duplicate name: %s, ignoring", jobConfig.getName());
            }
        }
    }

    @Override // com.hazelcast.jet.JetInstance
    public Job getJob(long j) {
        try {
            Job newJobProxy = newJobProxy(j);
            newJobProxy.getStatus();
            return newJobProxy;
        } catch (Throwable th) {
            if (ExceptionUtil.peel(th) instanceof JobNotFoundException) {
                return null;
            }
            throw ExceptionUtil.rethrow(th);
        }
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public List<Job> getJobs(@Nonnull String str) {
        return Util.toList(getJobIdsByName(str), (v1) -> {
            return newJobProxy(v1);
        });
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public Cluster getCluster() {
        return getHazelcastInstance().getCluster();
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public String getName() {
        return this.hazelcastInstance.getName();
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public HazelcastInstance getHazelcastInstance() {
        return this.hazelcastInstance;
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public <K, V> IMap<K, V> getMap(@Nonnull String str) {
        return this.hazelcastInstance.getMap(str);
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public <K, V> ReplicatedMap<K, V> getReplicatedMap(@Nonnull String str) {
        return this.hazelcastInstance.getReplicatedMap(str);
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public <E> IList<E> getList(@Nonnull String str) {
        return this.hazelcastInstance.getList(str);
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public <T> ITopic<T> getReliableTopic(@Nonnull String str) {
        return this.hazelcastInstance.getReliableTopic(str);
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public JetCacheManager getCacheManager() {
        return this.cacheManager;
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public <T> Observable<T> getObservable(@Nonnull String str) {
        return this.observables.computeIfAbsent(str, str2 -> {
            return new ObservableImpl(str2, this.hazelcastInstance, this::onDestroy, getLogger());
        });
    }

    @Override // com.hazelcast.jet.JetInstance
    @Nonnull
    public Collection<Observable<?>> getObservables() {
        return (Collection) this.hazelcastInstance.getDistributedObjects().stream().filter(distributedObject -> {
            return distributedObject.getServiceName().equals(RingbufferService.SERVICE_NAME);
        }).filter(distributedObject2 -> {
            return distributedObject2.getName().startsWith(ObservableImpl.JET_OBSERVABLE_NAME_PREFIX);
        }).map(distributedObject3 -> {
            return distributedObject3.getName().substring(ObservableImpl.JET_OBSERVABLE_NAME_PREFIX.length());
        }).map(this::getObservable).collect(Collectors.toList());
    }

    @Override // com.hazelcast.jet.JetInstance
    public void shutdown() {
        this.observables.values().forEach((v0) -> {
            v0.destroy();
        });
        this.hazelcastInstance.shutdown();
    }

    private void onDestroy(Observable<?> observable) {
        this.observables.remove(observable.name());
    }

    public abstract boolean existsDistributedObject(@Nonnull String str, @Nonnull String str2);

    private long uploadResourcesAndAssignId(JobConfig jobConfig) {
        return this.jobRepository.get().uploadJobResources(jobConfig);
    }

    public abstract ILogger getLogger();

    public abstract Job newJobProxy(long j);

    public abstract Job newJobProxy(long j, DAG dag, JobConfig jobConfig);

    public abstract List<Long> getJobIdsByName(String str);
}
