package quickfix.mina;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import quickfix.LogUtil;
import quickfix.Message;
import quickfix.Session;
import quickfix.SessionID;

/* loaded from: input_file:WEB-INF/lib/quickfixj-core-1.3.1.jar:quickfix/mina/ThreadPerSessionEventHandlingStrategy.class */
public class ThreadPerSessionEventHandlingStrategy implements EventHandlingStrategy {
    private final Map<SessionID, MessageDispatchingThread> dispatchers = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/quickfixj-core-1.3.1.jar:quickfix/mina/ThreadPerSessionEventHandlingStrategy$MessageDispatchingThread.class */
    public class MessageDispatchingThread extends Thread {
        private final Session quickfixSession;
        private final BlockingQueue<Message> messages;

        public MessageDispatchingThread(Session session) {
            super("QF/J Session dispatcher: " + session.getSessionID());
            this.messages = new LinkedBlockingQueue();
            this.quickfixSession = session;
        }

        public void enqueue(Message message) {
            try {
                this.messages.put(message);
            } catch (InterruptedException e) {
                this.quickfixSession.getLog().onEvent(e.getMessage());
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Message nextMessage = ThreadPerSessionEventHandlingStrategy.this.getNextMessage(this.messages);
                    if (this.quickfixSession.hasResponder()) {
                        this.quickfixSession.next(nextMessage);
                    }
                } catch (InterruptedException e) {
                    return;
                } catch (Throwable th) {
                    LogUtil.logThrowable(this.quickfixSession.getSessionID(), "Error during message processing", th);
                }
            }
        }
    }

    @Override // quickfix.mina.EventHandlingStrategy
    public void onMessage(Session session, Message message) {
        MessageDispatchingThread messageDispatchingThread = this.dispatchers.get(session.getSessionID());
        if (messageDispatchingThread == null) {
            messageDispatchingThread = new MessageDispatchingThread(session);
            this.dispatchers.put(session.getSessionID(), messageDispatchingThread);
            startDispatcherThread(messageDispatchingThread);
        }
        messageDispatchingThread.enqueue(message);
    }

    protected void startDispatcherThread(MessageDispatchingThread messageDispatchingThread) {
        messageDispatchingThread.start();
    }

    BlockingQueue<Message> getMessages(SessionID sessionID) {
        return getDispatcher(sessionID).messages;
    }

    MessageDispatchingThread getDispatcher(SessionID sessionID) {
        return this.dispatchers.get(sessionID);
    }

    Message getNextMessage(BlockingQueue<Message> blockingQueue) throws InterruptedException {
        return blockingQueue.take();
    }
}
