package ca.uhn.fhir.jpa.subscription;

import ca.uhn.fhir.context.ConfigurationException;
import ca.uhn.fhir.context.FhirContext;
import ca.uhn.fhir.context.FhirVersionEnum;
import ca.uhn.fhir.jpa.dao.IFhirResourceDao;
import ca.uhn.fhir.jpa.dao.SearchParameterMap;
import ca.uhn.fhir.jpa.provider.ServletSubRequestDetails;
import ca.uhn.fhir.jpa.search.StaleSearchDeletingSvcImpl;
import ca.uhn.fhir.jpa.subscription.CanonicalSubscription;
import ca.uhn.fhir.jpa.util.JpaConstants;
import ca.uhn.fhir.jpa.util.StopWatch;
import ca.uhn.fhir.model.api.IQueryParameterOr;
import ca.uhn.fhir.model.api.IQueryParameterType;
import ca.uhn.fhir.model.dstu2.resource.Subscription;
import ca.uhn.fhir.rest.api.RestOperationTypeEnum;
import ca.uhn.fhir.rest.api.server.IBundleProvider;
import ca.uhn.fhir.rest.api.server.RequestDetails;
import ca.uhn.fhir.rest.param.TokenOrListParam;
import ca.uhn.fhir.rest.param.TokenParam;
import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
import ca.uhn.fhir.rest.server.exceptions.PreconditionFailedException;
import ca.uhn.fhir.rest.server.interceptor.ServerOperationInterceptorAdapter;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.hl7.fhir.exceptions.FHIRException;
import org.hl7.fhir.instance.model.api.IBaseReference;
import org.hl7.fhir.instance.model.api.IBaseResource;
import org.hl7.fhir.instance.model.api.IIdType;
import org.hl7.fhir.r4.model.EventDefinition;
import org.hl7.fhir.r4.model.Extension;
import org.hl7.fhir.r4.model.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallbackWithoutResult;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.transaction.support.TransactionTemplate;

/* loaded from: input_file:ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor.class */
public abstract class BaseSubscriptionInterceptor<S extends IBaseResource> extends ServerOperationInterceptorAdapter {
    static final String SUBSCRIPTION_STATUS = "Subscription.status";
    static final String SUBSCRIPTION_TYPE = "Subscription.channel.type";
    static final String SUBSCRIPTION_CRITERIA = "Subscription.criteria";
    static final String SUBSCRIPTION_ENDPOINT = "Subscription.channel.endpoint";
    static final String SUBSCRIPTION_PAYLOAD = "Subscription.channel.payload";
    static final String SUBSCRIPTION_HEADER = "Subscription.channel.header";
    private static final Integer MAX_SUBSCRIPTION_RESULTS = 1000;
    private SubscribableChannel myProcessingChannel;
    private SubscribableChannel myDeliveryChannel;
    private ExecutorService myProcessingExecutor;
    private int myExecutorThreadCount;
    private SubscriptionActivatingSubscriber mySubscriptionActivatingSubscriber;
    private MessageHandler mySubscriptionCheckingSubscriber;
    private ConcurrentHashMap<String, CanonicalSubscription> myIdToSubscription = new ConcurrentHashMap<>();
    private Logger ourLog = LoggerFactory.getLogger(BaseSubscriptionInterceptor.class);
    private ThreadPoolExecutor myDeliveryExecutor;
    private LinkedBlockingQueue<Runnable> myProcessingExecutorQueue;
    private LinkedBlockingQueue<Runnable> myDeliveryExecutorQueue;
    private IFhirResourceDao<?> mySubscriptionDao;

    @Autowired
    private List<IFhirResourceDao<?>> myResourceDaos;

    @Autowired
    private FhirContext myCtx;

    @Autowired(required = false)
    @Qualifier("myEventDefinitionDaoR4")
    private IFhirResourceDao<EventDefinition> myEventDefinitionDaoR4;

    @Autowired
    private PlatformTransactionManager myTxManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor$5, reason: invalid class name */
    /* loaded from: input_file:ca/uhn/fhir/jpa/subscription/BaseSubscriptionInterceptor$5.class */
    public static /* synthetic */ class AnonymousClass5 {
        static final /* synthetic */ int[] $SwitchMap$ca$uhn$fhir$context$FhirVersionEnum = new int[FhirVersionEnum.values().length];

        static {
            try {
                $SwitchMap$ca$uhn$fhir$context$FhirVersionEnum[FhirVersionEnum.DSTU2.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$context$FhirVersionEnum[FhirVersionEnum.DSTU3.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$ca$uhn$fhir$context$FhirVersionEnum[FhirVersionEnum.R4.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public BaseSubscriptionInterceptor() {
        setExecutorThreadCount(5);
    }

    protected CanonicalSubscription canonicalize(S s) {
        switch (AnonymousClass5.$SwitchMap$ca$uhn$fhir$context$FhirVersionEnum[this.myCtx.getVersion().getVersion().ordinal()]) {
            case 1:
                return canonicalizeDstu2(s);
            case 2:
                return canonicalizeDstu3(s);
            case 3:
                return canonicalizeR4(s);
            default:
                throw new ConfigurationException("Subscription not supported for version: " + this.myCtx.getVersion().getVersion());
        }
    }

    protected CanonicalSubscription canonicalizeDstu2(IBaseResource iBaseResource) {
        Subscription subscription = (Subscription) iBaseResource;
        CanonicalSubscription canonicalSubscription = new CanonicalSubscription();
        try {
            canonicalSubscription.setStatus(Subscription.SubscriptionStatus.fromCode(subscription.getStatus()));
            canonicalSubscription.setBackingSubscription(this.myCtx, iBaseResource);
            canonicalSubscription.setChannelType(Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType()));
            canonicalSubscription.setCriteriaString(subscription.getCriteria());
            canonicalSubscription.setEndpointUrl(subscription.getChannel().getEndpoint());
            canonicalSubscription.setHeaders(subscription.getChannel().getHeader());
            canonicalSubscription.setIdElement(subscription.getIdElement());
            canonicalSubscription.setPayloadString(subscription.getChannel().getPayload());
            return canonicalSubscription;
        } catch (FHIRException e) {
            throw new InternalErrorException(e);
        }
    }

    protected CanonicalSubscription canonicalizeDstu3(IBaseResource iBaseResource) {
        org.hl7.fhir.dstu3.model.Subscription subscription = (org.hl7.fhir.dstu3.model.Subscription) iBaseResource;
        CanonicalSubscription canonicalSubscription = new CanonicalSubscription();
        try {
            canonicalSubscription.setStatus(Subscription.SubscriptionStatus.fromCode(subscription.getStatus().toCode()));
            canonicalSubscription.setBackingSubscription(this.myCtx, iBaseResource);
            canonicalSubscription.setChannelType(Subscription.SubscriptionChannelType.fromCode(subscription.getChannel().getType().toCode()));
            canonicalSubscription.setCriteriaString(subscription.getCriteria());
            canonicalSubscription.setEndpointUrl(subscription.getChannel().getEndpoint());
            canonicalSubscription.setHeaders(subscription.getChannel().getHeader());
            canonicalSubscription.setIdElement(subscription.getIdElement());
            canonicalSubscription.setPayloadString(subscription.getChannel().getPayload());
            if (canonicalSubscription.getChannelType() == Subscription.SubscriptionChannelType.EMAIL) {
                try {
                    String extensionString = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_EMAIL_FROM);
                    String extensionString2 = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
                    canonicalSubscription.getEmailDetails().setFrom(extensionString);
                    canonicalSubscription.getEmailDetails().setSubjectTemplate(extensionString2);
                } catch (FHIRException e) {
                    throw new ConfigurationException("Failed to extract subscription extension(s): " + e.getMessage(), e);
                }
            }
            return canonicalSubscription;
        } catch (FHIRException e2) {
            throw new InternalErrorException(e2);
        }
    }

    protected CanonicalSubscription canonicalizeR4(IBaseResource iBaseResource) {
        org.hl7.fhir.r4.model.Subscription subscription = (org.hl7.fhir.r4.model.Subscription) iBaseResource;
        CanonicalSubscription canonicalSubscription = new CanonicalSubscription();
        canonicalSubscription.setStatus(subscription.getStatus());
        canonicalSubscription.setBackingSubscription(this.myCtx, iBaseResource);
        canonicalSubscription.setChannelType(subscription.getChannel().getType());
        canonicalSubscription.setCriteriaString(subscription.getCriteria());
        canonicalSubscription.setEndpointUrl(subscription.getChannel().getEndpoint());
        canonicalSubscription.setHeaders(subscription.getChannel().getHeader());
        canonicalSubscription.setIdElement(subscription.getIdElement());
        canonicalSubscription.setPayloadString(subscription.getChannel().getPayload());
        if (canonicalSubscription.getChannelType() == Subscription.SubscriptionChannelType.EMAIL) {
            try {
                String extensionString = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_EMAIL_FROM);
                String extensionString2 = subscription.getChannel().getExtensionString(JpaConstants.EXT_SUBSCRIPTION_SUBJECT_TEMPLATE);
                canonicalSubscription.getEmailDetails().setFrom(extensionString);
                canonicalSubscription.getEmailDetails().setSubjectTemplate(extensionString2);
            } catch (FHIRException e) {
                throw new ConfigurationException("Failed to extract subscription extension(s): " + e.getMessage(), e);
            }
        }
        List extensionsByUrl = subscription.getExtensionsByUrl("http://hl7.org/fhir/subscription/topics");
        if (extensionsByUrl.size() > 0) {
            IBaseReference valueAsPrimitive = ((Extension) extensionsByUrl.get(0)).getValueAsPrimitive();
            if (!"EventDefinition".equals(valueAsPrimitive.getReferenceElement().getResourceType())) {
                throw new PreconditionFailedException("Topic reference must be an EventDefinition");
            }
            canonicalSubscription.addTrigger(new CanonicalSubscription.CanonicalEventDefinition(this.myEventDefinitionDaoR4.read(valueAsPrimitive.getReferenceElement())));
        }
        return canonicalSubscription;
    }

    public abstract Subscription.SubscriptionChannelType getChannelType();

    public SubscribableChannel getDeliveryChannel() {
        return this.myDeliveryChannel;
    }

    public void setDeliveryChannel(SubscribableChannel subscribableChannel) {
        this.myDeliveryChannel = subscribableChannel;
    }

    public int getExecutorQueueSizeForUnitTests() {
        return this.myProcessingExecutorQueue.size() + this.myDeliveryExecutorQueue.size();
    }

    public int getExecutorThreadCount() {
        return this.myExecutorThreadCount;
    }

    public void setExecutorThreadCount(int i) {
        Validate.inclusiveBetween(1L, 2147483647L, i);
        this.myExecutorThreadCount = i;
    }

    public Map<String, CanonicalSubscription> getIdToSubscription() {
        return Collections.unmodifiableMap(this.myIdToSubscription);
    }

    public SubscribableChannel getProcessingChannel() {
        return this.myProcessingChannel;
    }

    public void setProcessingChannel(SubscribableChannel subscribableChannel) {
        this.myProcessingChannel = subscribableChannel;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public IFhirResourceDao<?> getSubscriptionDao() {
        return this.mySubscriptionDao;
    }

    public List<CanonicalSubscription> getSubscriptions() {
        return new ArrayList(this.myIdToSubscription.values());
    }

    public boolean hasSubscription(IIdType iIdType) {
        Validate.notNull(iIdType);
        Validate.notBlank(iIdType.getIdPart());
        return this.myIdToSubscription.containsKey(iIdType.getIdPart());
    }

    @Scheduled(fixedDelay = StaleSearchDeletingSvcImpl.DEFAULT_CUTOFF_SLACK)
    public void initSubscriptions() {
        SearchParameterMap searchParameterMap = new SearchParameterMap();
        searchParameterMap.add("type", (IQueryParameterType) new TokenParam((String) null, getChannelType().toCode()));
        searchParameterMap.add("status", (IQueryParameterOr<?>) new TokenOrListParam().addOr(new TokenParam((String) null, Subscription.SubscriptionStatus.REQUESTED.toCode())).addOr(new TokenParam((String) null, Subscription.SubscriptionStatus.ACTIVE.toCode())));
        searchParameterMap.setLoadSynchronousUpTo(MAX_SUBSCRIPTION_RESULTS);
        ServletSubRequestDetails servletSubRequestDetails = new ServletSubRequestDetails();
        servletSubRequestDetails.setSubRequest(true);
        IBundleProvider search = getSubscriptionDao().search(searchParameterMap, servletSubRequestDetails);
        if (search.size().intValue() >= MAX_SUBSCRIPTION_RESULTS.intValue()) {
            this.ourLog.error("Currently over " + MAX_SUBSCRIPTION_RESULTS + " subscriptions.  Some subscriptions have not been loaded.");
        }
        List<IBaseResource> resources = search.getResources(0, search.size().intValue());
        HashSet hashSet = new HashSet();
        for (IBaseResource iBaseResource : resources) {
            hashSet.add(iBaseResource.getIdElement().getIdPart());
            this.mySubscriptionActivatingSubscriber.activateAndRegisterSubscriptionIfRequired(iBaseResource);
        }
        Enumeration<String> keys = this.myIdToSubscription.keys();
        while (keys.hasMoreElements()) {
            String nextElement = keys.nextElement();
            if (!hashSet.contains(nextElement)) {
                this.ourLog.info("Unregistering Subscription/{} as it no longer exists", nextElement);
                this.myIdToSubscription.remove(nextElement);
            }
        }
    }

    @PreDestroy
    public void preDestroy() {
        getProcessingChannel().unsubscribe(this.mySubscriptionCheckingSubscriber);
        unregisterDeliverySubscriber();
    }

    protected abstract void registerDeliverySubscriber();

    public void registerSubscription(IIdType iIdType, S s) {
        Validate.notNull(iIdType);
        Validate.notBlank(iIdType.getIdPart());
        Validate.notNull(s);
        this.myIdToSubscription.put(iIdType.getIdPart(), canonicalize(s));
    }

    protected void registerSubscriptionCheckingSubscriber() {
        if (this.mySubscriptionCheckingSubscriber == null) {
            this.mySubscriptionCheckingSubscriber = new SubscriptionCheckingSubscriber(getSubscriptionDao(), getChannelType(), this);
        }
        getProcessingChannel().subscribe(this.mySubscriptionCheckingSubscriber);
    }

    public void resourceCreated(RequestDetails requestDetails, IBaseResource iBaseResource) {
        ResourceModifiedMessage resourceModifiedMessage = new ResourceModifiedMessage();
        resourceModifiedMessage.setId(iBaseResource.getIdElement());
        resourceModifiedMessage.setOperationType(RestOperationTypeEnum.CREATE);
        resourceModifiedMessage.setNewPayload(this.myCtx, iBaseResource);
        submitResourceModified(resourceModifiedMessage);
    }

    public void resourceDeleted(RequestDetails requestDetails, IBaseResource iBaseResource) {
        ResourceModifiedMessage resourceModifiedMessage = new ResourceModifiedMessage();
        resourceModifiedMessage.setId(iBaseResource.getIdElement());
        resourceModifiedMessage.setOperationType(RestOperationTypeEnum.DELETE);
        submitResourceModified(resourceModifiedMessage);
    }

    public void resourceUpdated(RequestDetails requestDetails, IBaseResource iBaseResource, IBaseResource iBaseResource2) {
        ResourceModifiedMessage resourceModifiedMessage = new ResourceModifiedMessage();
        resourceModifiedMessage.setId(iBaseResource2.getIdElement());
        resourceModifiedMessage.setOperationType(RestOperationTypeEnum.UPDATE);
        resourceModifiedMessage.setNewPayload(this.myCtx, iBaseResource2);
        submitResourceModified(resourceModifiedMessage);
    }

    protected void sendToProcessingChannel(final ResourceModifiedMessage resourceModifiedMessage) {
        this.ourLog.trace("Registering synchronization to send resource modified message to processing channel");
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { // from class: ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor.1
            public void afterCommit() {
                BaseSubscriptionInterceptor.this.ourLog.trace("Sending resource modified message to processing channel");
                BaseSubscriptionInterceptor.this.getProcessingChannel().send(new ResourceModifiedJsonMessage(resourceModifiedMessage));
            }
        });
    }

    public void setFhirContext(FhirContext fhirContext) {
        this.myCtx = fhirContext;
    }

    public void setResourceDaos(List<IFhirResourceDao<?>> list) {
        this.myResourceDaos = list;
    }

    @VisibleForTesting
    public void setTxManager(PlatformTransactionManager platformTransactionManager) {
        this.myTxManager = platformTransactionManager;
    }

    @PostConstruct
    public void start() {
        for (IFhirResourceDao<?> iFhirResourceDao : this.myResourceDaos) {
            if (this.myCtx.getResourceDefinition(iFhirResourceDao.getResourceType()).getName().equals("Subscription")) {
                this.mySubscriptionDao = iFhirResourceDao;
            }
        }
        Validate.notNull(this.mySubscriptionDao);
        if (this.myCtx.getVersion().getVersion() == FhirVersionEnum.R4) {
            Validate.notNull(this.myEventDefinitionDaoR4);
        }
        if (getProcessingChannel() == null) {
            this.myProcessingExecutorQueue = new LinkedBlockingQueue<>(1000);
            RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() { // from class: ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor.2
                @Override // java.util.concurrent.RejectedExecutionHandler
                public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                    BaseSubscriptionInterceptor.this.ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", Integer.valueOf(BaseSubscriptionInterceptor.this.myProcessingExecutorQueue.size()));
                    StopWatch stopWatch = new StopWatch();
                    try {
                        BaseSubscriptionInterceptor.this.myProcessingExecutorQueue.put(runnable);
                        BaseSubscriptionInterceptor.this.ourLog.info("Slot become available after {}ms", Long.valueOf(stopWatch.getMillis()));
                    } catch (InterruptedException e) {
                        throw new RejectedExecutionException("Task " + runnable.toString() + " rejected from " + e.toString());
                    }
                }
            };
            this.myProcessingExecutor = new ThreadPoolExecutor(1, getExecutorThreadCount(), 0L, TimeUnit.MILLISECONDS, this.myProcessingExecutorQueue, new BasicThreadFactory.Builder().namingPattern("subscription-proc-%d").daemon(false).priority(5).build(), rejectedExecutionHandler);
            setProcessingChannel(new ExecutorSubscribableChannel(this.myProcessingExecutor));
        }
        if (getDeliveryChannel() == null) {
            this.myDeliveryExecutorQueue = new LinkedBlockingQueue<>(1000);
            this.myDeliveryExecutor = new ThreadPoolExecutor(1, getExecutorThreadCount(), 0L, TimeUnit.MILLISECONDS, this.myDeliveryExecutorQueue, new BasicThreadFactory.Builder().namingPattern("subscription-delivery-%d").daemon(false).priority(5).build(), new RejectedExecutionHandler() { // from class: ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor.3
                @Override // java.util.concurrent.RejectedExecutionHandler
                public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                    BaseSubscriptionInterceptor.this.ourLog.info("Note: Executor queue is full ({} elements), waiting for a slot to become available!", Integer.valueOf(BaseSubscriptionInterceptor.this.myDeliveryExecutorQueue.size()));
                    StopWatch stopWatch = new StopWatch();
                    try {
                        BaseSubscriptionInterceptor.this.myDeliveryExecutorQueue.put(runnable);
                        BaseSubscriptionInterceptor.this.ourLog.info("Slot become available after {}ms", Long.valueOf(stopWatch.getMillis()));
                    } catch (InterruptedException e) {
                        throw new RejectedExecutionException("Task " + runnable.toString() + " rejected from " + e.toString());
                    }
                }
            });
            setDeliveryChannel(new ExecutorSubscribableChannel(this.myDeliveryExecutor));
        }
        if (this.mySubscriptionActivatingSubscriber == null) {
            this.mySubscriptionActivatingSubscriber = new SubscriptionActivatingSubscriber(getSubscriptionDao(), getChannelType(), this, this.myTxManager);
        }
        registerSubscriptionCheckingSubscriber();
        registerDeliverySubscriber();
        new TransactionTemplate(this.myTxManager).execute(new TransactionCallbackWithoutResult() { // from class: ca.uhn.fhir.jpa.subscription.BaseSubscriptionInterceptor.4
            protected void doInTransactionWithoutResult(TransactionStatus transactionStatus) {
                BaseSubscriptionInterceptor.this.initSubscriptions();
            }
        });
    }

    protected void submitResourceModified(ResourceModifiedMessage resourceModifiedMessage) {
        this.mySubscriptionActivatingSubscriber.handleMessage(resourceModifiedMessage.getOperationType(), resourceModifiedMessage.getId(this.myCtx), resourceModifiedMessage.getNewPayload(this.myCtx));
        sendToProcessingChannel(resourceModifiedMessage);
    }

    protected abstract void unregisterDeliverySubscriber();

    public void unregisterSubscription(IIdType iIdType) {
        Validate.notNull(iIdType);
        Validate.notBlank(iIdType.getIdPart());
        this.myIdToSubscription.remove(iIdType.getIdPart());
    }
}
