QuickFIX/J is a full featured messaging engine for the FIX protocol. - This is the official project repository.
QuickFIX/J uses Apache MINA for non-blocking I/O. The threading model for message processing is controlled by the EventHandlingStrategy interface (quickfix.mina.EventHandlingStrategy), with two concrete implementations:
SingleThreadedEventHandlingStrategy — one thread processes messages for all sessions (SocketAcceptor, SocketInitiator)ThreadPerSessionEventHandlingStrategy — one thread per session processes messages (ThreadedSocketAcceptor, ThreadedSocketInitiator)Both strategies co-exist with the timer thread, which is always present and always calls Session.next() (no-arg) on a 1-second schedule, regardless of which event-handling strategy is in use.
| Connector class | Event handling strategy | Thread name(s) |
|---|---|---|
SocketAcceptor |
SingleThreadedEventHandlingStrategy |
QFJ Message Processor |
SocketInitiator |
SingleThreadedEventHandlingStrategy |
QFJ Message Processor |
ThreadedSocketAcceptor |
ThreadPerSessionEventHandlingStrategy |
QF/J Session dispatcher: <sessionID> |
ThreadedSocketInitiator |
ThreadPerSessionEventHandlingStrategy |
QF/J Session dispatcher: <sessionID> |
SingleThreadedEventHandlingStrategy)Class: quickfix.mina.SingleThreadedEventHandlingStrategy
Source: quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java
BlockingQueue<SessionMessageEvent> holds events from all sessions.QFJ Message Processor (a daemon thread) drains the queue and calls session.next(message) for each event via SessionMessageEvent.processMessage().blockInThread(), which creates a ThreadAdapter wrapping the block() loop.onMessage() wraps incoming messages into a SessionMessageEvent and puts them on the shared queue.block() loop polls the queue with a timeout (THREAD_WAIT_FOR_MESSAGE_MS) so it can periodically check the isStopped flag.getQueueSize(SessionID) method returns the total queue size (single queue for all sessions — there is no per-session view).Key point for application developers: Because all sessions share a single processing thread, a slow fromApp() callback will delay processing for all other sessions.
ThreadPerSessionEventHandlingStrategy)Class: quickfix.mina.ThreadPerSessionEventHandlingStrategy
Source: quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java
ConcurrentHashMap<SessionID, MessageDispatchingThread> maps each session to its own dispatcher thread.onMessage() call for a given session, a new MessageDispatchingThread is created and started via startDispatcherThread().MessageDispatchingThread has its own BlockingQueue<Message> (or watermark-tracked queue) and loops calling session.next(message).QF/J Session dispatcher: <BeginString>:<SenderCompID>/<TargetCompID>Executor can be customised via setExecutor(). The default is DedicatedThreadExecutor, which creates a plain new Thread(command, name).start().stopDispatcherThreads() enqueues END_OF_STREAM to every dispatcher, sets stopping=true, and waits (polling every 100 ms) until all dispatchers report isStopped.dispatchers map.Key point for application developers: Since each session has its own thread, a slow fromApp() for one session does not block others. However, your Application implementation must be thread-safe if it shares state across sessions.
Session.next()This is a critical part of the threading model that is orthogonal to the message-processing strategies above.
Class: quickfix.mina.SessionConnector
Source: quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java
QFJ Timer ThreadA single ScheduledExecutorService (a shared static instance using a QFTimerThreadFactory) runs a SessionTimerTask at a fixed rate of every 1000 ms.
// SessionConnector.java
private static class QFTimerThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "QFJ Timer");
thread.setDaemon(true);
return thread;
}
}
The timer is started by startSessionTimer():
protected void startSessionTimer() {
if (checkSessionTimerRunning()) {
return;
}
Runnable timerTask = new SessionTimerTask();
if (shortLivedExecutor != null) {
timerTask = new DelegatingTask(timerTask, shortLivedExecutor);
}
sessionTimerFuture = SCHEDULED_EXECUTOR.scheduleAtFixedRate(timerTask, 0, 1000L,
TimeUnit.MILLISECONDS);
}
Only one timer is ever started per connector. If startSessionTimer() is called again while the timer is still running (e.g. during createDynamicSession()), the existing timer is reused.
SessionTimerTask Iterates All Sessions and Calls Session.next()private class SessionTimerTask implements Runnable {
@Override
public void run() {
try {
for (Session session : sessions.values()) {
try {
session.next();
} catch (IOException e) {
LogUtil.logThrowable(session.getLog(), "Error in session timer processing", e);
}
}
} catch (Throwable e) {
log.error("Error during timer processing", e);
}
}
}
Even though each session may have its own dispatcher thread (in the thread-per-session model), the timer thread also calls session.next() directly on every session. This is independent of which EventHandlingStrategy is in use.
Session.next() (No-arg) Do?Session.next() is called from the timer, not from user code. Its Javadoc states:
Called from the timer-related code in the acceptor/initiator implementations. This is not typically called from application code.
Its responsibilities (from Session.java):
hasResponder() is false).HeartBtInt == 0: returns (no heartbeat management).DisableHeartBeatCheck=Y).generateTestRequest("TEST")).generateHeartbeat()).The full flow:
QFJ Timer thread (every 1 second)
└─► SessionTimerTask.run()
└─► for each Session in sessions.values():
└─► Session.next()
├─ check enabled
├─ check session schedule / reset
├─ check hasResponder()
├─ check logon state (send Logon if initiator)
├─ check logout timeout
└─ heartbeat management
├─ isTimedOut() → disconnect
├─ isTestRequestNeeded() → send TestRequest
└─ isHeartBeatNeeded() → send Heartbeat
Session.next(Message) — Called by DispatchersThe Session.next(Message message) overload is what MessageDispatchingThread and SessionMessageEvent call with an actual FIX message. This processes the received message (validates, dispatches to fromAdmin / fromApp, handles sequence numbers, etc.). This is distinct from the no-arg Session.next() used by the timer.
┌─────────────────────────────────────────────────────────────────────────┐
│ MINA I/O Threads │
│ (NIO selector threads, named "NioProcessor-N") │
│ Receive raw bytes → decode FIX message → call EventHandlingStrategy │
└──────────────────────────────┬──────────────────────────────────────────┘
│ onMessage(session, message)
┌───────────────────┴────────────────────┐
│ │
SingleThreaded ThreadPerSession
────────────── ────────────────
One shared queue Per-session queue
One "QFJ Message Processor" One "QF/J Session dispatcher:
thread calls <sessionID>" thread per session
session.next(msg) calls session.next(msg)
Both strategies co-exist with the Timer Thread:
┌─────────────────────────────────────────────────────────────────────────┐
│ QFJ Timer Thread (daemon) │
│ ScheduledExecutorService fires every 1000ms │
│ SessionTimerTask iterates ALL sessions → calls Session.next() │
│ (handles heartbeats, logon, session schedule, timeouts) │
└─────────────────────────────────────────────────────────────────────────┘
Both strategies support configurable queue capacity:
new SingleThreadedEventHandlingStrategy(connector, queueCapacity) — bounded LinkedBlockingQueue. Producers block when full (back-pressure).new SingleThreadedEventHandlingStrategy(connector, lowerWatermark, upperWatermark) — uses QueueTrackers.newMultiSessionWatermarkTracker(...). Flow control is applied per-session within the shared queue.ThreadPerSessionEventHandlingStrategy, with newSingleSessionWatermarkTracker per session.Executor InjectionBoth strategies accept a custom java.util.concurrent.Executor via setExecutor(executor), called during start() from the connector. This allows integration with application-managed thread pools (e.g. virtual threads in Java 21+):
// Example: use virtual threads for session dispatchers (Java 21+)
ThreadedSocketAcceptor acceptor = new ThreadedSocketAcceptor(...);
acceptor.start(); // internally calls eventHandlingStrategy.setExecutor(longLivedExecutor)
The longLivedExecutor is provided by SessionConnector and can be customised. If no executor is set, DedicatedThreadExecutor creates a plain new Thread(...) per session/strategy.
SocketAcceptor / SocketInitiator (single-threaded): The Application callbacks (fromApp, fromAdmin, etc.) are called from the single QFJ Message Processor thread. No concurrent calls to the same session. However, Session.next() (timer) runs concurrently from the QFJ Timer thread — it does not call application callbacks but it does send messages on the wire.ThreadedSocketAcceptor / ThreadedSocketInitiator (thread-per-session): Each session has its own dispatcher thread. Callbacks for different sessions may execute concurrently. Your Application implementation must be thread-safe if it shares state across sessions.QFJ Timer thread runs concurrently with message-processing threads and calls Session.next() (no-arg), which may send heartbeats or disconnect. Session internally synchronizes on this to protect shared state.