package com.solacesystems.jcsmp.protocol.nio.impl;

import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPStreamingPublishEventHandler;
import com.solacesystems.jcsmp.protocol.nio.Notification;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/solacesystems/jcsmp/protocol/nio/impl/ProducerNotificationDispatcher.class */
public class ProducerNotificationDispatcher implements Runnable {
    private static final Log log = LogFactory.getLog(ProducerNotificationDispatcher.class);
    private ArrayBlockingQueue<Notification> _dispatcherQ;
    private Thread serviceThread;

    private ProducerNotificationDispatcher(String str) {
        this._dispatcherQ = null;
        this.serviceThread = null;
        this._dispatcherQ = new ArrayBlockingQueue<>(JCSMPFactory.onlyInstance().getGlobalProperties().getProducerDispatcherQueueSize());
        this.serviceThread = new Thread(this);
        this.serviceThread.setName(String.format("Context_%s_ProducerDispatcher", str));
        this.serviceThread.setDaemon(true);
    }

    public static ProducerNotificationDispatcher create(String str) {
        ProducerNotificationDispatcher producerNotificationDispatcher = new ProducerNotificationDispatcher(str);
        producerNotificationDispatcher.serviceThread.start();
        return producerNotificationDispatcher;
    }

    public synchronized void enqueueNotification(Notification notification) {
        if (isFull() && log.isWarnEnabled()) {
            log.warn(String.format("ProducerNotificationDispatcher queue (size=%s) low space warning.", Integer.valueOf(this._dispatcherQ.size())));
        }
        try {
            this._dispatcherQ.put(notification);
        } catch (InterruptedException e) {
            if (log.isDebugEnabled()) {
                log.debug(Thread.currentThread().getName() + " is interrupted");
            }
        }
    }

    public boolean isFull() {
        return this._dispatcherQ.remainingCapacity() == 0;
    }

    public int getSize() {
        return this._dispatcherQ.size();
    }

    public int getRemainingCapacity() {
        return this._dispatcherQ.remainingCapacity();
    }

    public synchronized void purgeNotifications(JCSMPStreamingPublishEventHandler jCSMPStreamingPublishEventHandler, List<Notification> list) {
        Notification next;
        if (jCSMPStreamingPublishEventHandler == null) {
            return;
        }
        Iterator<Notification> it = this._dispatcherQ.iterator();
        while (it.hasNext() && (next = it.next()) != null) {
            if (next.usesHandler(jCSMPStreamingPublishEventHandler)) {
                next.cancel();
            }
        }
    }

    private void eventLoop() throws InterruptedException {
        while ((this._dispatcherQ.take().handleNotification() & 1) == 0) {
            try {
                try {
                } catch (Throwable th) {
                    log.warn("Exception occurred in message producer notification handler", th);
                }
            } catch (Throwable th2) {
                log.error("Unexpected exception occurred in message producer notification handler", th2);
                return;
            }
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        if (log.isDebugEnabled()) {
            log.debug("Producer dispatcher thread starts");
        }
        try {
            eventLoop();
        } catch (InterruptedException e) {
            if (log.isDebugEnabled()) {
                log.debug(Thread.currentThread().getName() + " is interrupted");
            }
        }
        if (log.isDebugEnabled()) {
            log.debug(String.format("Producer dispatcher thread [%s] exits", this.serviceThread.getName()));
        }
    }
}
