quickfixj

QuickFIX/J is a full featured messaging engine for the FIX protocol. - This is the official project repository.

View the Project on GitHub quickfix-j/quickfixj

QuickFIX/J Threading Model

1. Overview

QuickFIX/J uses Apache MINA for non-blocking I/O. The threading model for message processing is controlled by the EventHandlingStrategy interface (quickfix.mina.EventHandlingStrategy), with two concrete implementations:

Both strategies co-exist with the timer thread, which is always present and always calls Session.next() (no-arg) on a 1-second schedule, regardless of which event-handling strategy is in use.


2. Connector Classes and Their Strategy

Connector class Event handling strategy Thread name(s)
SocketAcceptor SingleThreadedEventHandlingStrategy QFJ Message Processor
SocketInitiator SingleThreadedEventHandlingStrategy QFJ Message Processor
ThreadedSocketAcceptor ThreadPerSessionEventHandlingStrategy QF/J Session dispatcher: <sessionID>
ThreadedSocketInitiator ThreadPerSessionEventHandlingStrategy QF/J Session dispatcher: <sessionID>

3. Single-Threaded Model (SingleThreadedEventHandlingStrategy)

Class: quickfix.mina.SingleThreadedEventHandlingStrategy
Source: quickfixj-core/src/main/java/quickfix/mina/SingleThreadedEventHandlingStrategy.java

Key point for application developers: Because all sessions share a single processing thread, a slow fromApp() callback will delay processing for all other sessions.


4. Thread-per-Session Model (ThreadPerSessionEventHandlingStrategy)

Class: quickfix.mina.ThreadPerSessionEventHandlingStrategy
Source: quickfixj-core/src/main/java/quickfix/mina/ThreadPerSessionEventHandlingStrategy.java

Key point for application developers: Since each session has its own thread, a slow fromApp() for one session does not block others. However, your Application implementation must be thread-safe if it shares state across sessions.


5. The Timer Thread and Session.next()

This is a critical part of the threading model that is orthogonal to the message-processing strategies above.

Class: quickfix.mina.SessionConnector
Source: quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java

5.1 The QFJ Timer Thread

A single ScheduledExecutorService (a shared static instance using a QFTimerThreadFactory) runs a SessionTimerTask at a fixed rate of every 1000 ms.

// SessionConnector.java
private static class QFTimerThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable runnable) {
        Thread thread = new Thread(runnable, "QFJ Timer");
        thread.setDaemon(true);
        return thread;
    }
}

The timer is started by startSessionTimer():

protected void startSessionTimer() {
    if (checkSessionTimerRunning()) {
        return;
    }
    Runnable timerTask = new SessionTimerTask();
    if (shortLivedExecutor != null) {
        timerTask = new DelegatingTask(timerTask, shortLivedExecutor);
    }
    sessionTimerFuture = SCHEDULED_EXECUTOR.scheduleAtFixedRate(timerTask, 0, 1000L,
            TimeUnit.MILLISECONDS);
}

Only one timer is ever started per connector. If startSessionTimer() is called again while the timer is still running (e.g. during createDynamicSession()), the existing timer is reused.

5.2 SessionTimerTask Iterates All Sessions and Calls Session.next()

private class SessionTimerTask implements Runnable {
    @Override
    public void run() {
        try {
            for (Session session : sessions.values()) {
                try {
                    session.next();
                } catch (IOException e) {
                    LogUtil.logThrowable(session.getLog(), "Error in session timer processing", e);
                }
            }
        } catch (Throwable e) {
            log.error("Error during timer processing", e);
        }
    }
}

Even though each session may have its own dispatcher thread (in the thread-per-session model), the timer thread also calls session.next() directly on every session. This is independent of which EventHandlingStrategy is in use.

5.3 What Does Session.next() (No-arg) Do?

Session.next() is called from the timer, not from user code. Its Javadoc states:

Called from the timer-related code in the acceptor/initiator implementations. This is not typically called from application code.

Its responsibilities (from Session.java):

  1. Checks if the session is enabled. If disabled and still logged on, it initiates a Logout.
  2. Checks session schedule. If outside the configured session time window, it may reset sequence numbers or disconnect. This check is throttled to once per second.
  3. Returns early if not connected (hasResponder() is false).
  4. Handles logon state: If logon has not been received, it may send a Logon (for initiators) or detect a logon timeout.
  5. Checks logout timeout if a logout has been sent.
  6. Heartbeat management:
    • If HeartBtInt == 0: returns (no heartbeat management).
    • If timed out waiting for a heartbeat: disconnects (unless DisableHeartBeatCheck=Y).
    • If a TestRequest is needed: sends a TestRequest (generateTestRequest("TEST")).
    • If a Heartbeat is needed: sends a Heartbeat (generateHeartbeat()).

The full flow:

QFJ Timer thread (every 1 second)
  └─► SessionTimerTask.run()
        └─► for each Session in sessions.values():
              └─► Session.next()
                    ├─ check enabled
                    ├─ check session schedule / reset
                    ├─ check hasResponder()
                    ├─ check logon state (send Logon if initiator)
                    ├─ check logout timeout
                    └─ heartbeat management
                         ├─ isTimedOut()          → disconnect
                         ├─ isTestRequestNeeded() → send TestRequest
                         └─ isHeartBeatNeeded()   → send Heartbeat

5.4 The Overloaded Session.next(Message) — Called by Dispatchers

The Session.next(Message message) overload is what MessageDispatchingThread and SessionMessageEvent call with an actual FIX message. This processes the received message (validates, dispatches to fromAdmin / fromApp, handles sequence numbers, etc.). This is distinct from the no-arg Session.next() used by the timer.


6. Thread Interaction Summary

┌─────────────────────────────────────────────────────────────────────────┐
│                         MINA I/O Threads                                │
│       (NIO selector threads, named "NioProcessor-N")                    │
│  Receive raw bytes → decode FIX message → call EventHandlingStrategy    │
└──────────────────────────────┬──────────────────────────────────────────┘
                               │ onMessage(session, message)
           ┌───────────────────┴────────────────────┐
           │                                        │
    SingleThreaded                          ThreadPerSession
    ──────────────                          ────────────────
  One shared queue                       Per-session queue
  One "QFJ Message Processor"            One "QF/J Session dispatcher:
  thread calls                           <sessionID>" thread per session
  session.next(msg)                      calls session.next(msg)

           Both strategies co-exist with the Timer Thread:

┌─────────────────────────────────────────────────────────────────────────┐
│                    QFJ Timer Thread (daemon)                            │
│  ScheduledExecutorService fires every 1000ms                            │
│  SessionTimerTask iterates ALL sessions → calls Session.next()          │
│  (handles heartbeats, logon, session schedule, timeouts)                │
└─────────────────────────────────────────────────────────────────────────┘

7. Queue Capacity and Back-Pressure

Both strategies support configurable queue capacity:


8. Custom Executor Injection

Both strategies accept a custom java.util.concurrent.Executor via setExecutor(executor), called during start() from the connector. This allows integration with application-managed thread pools (e.g. virtual threads in Java 21+):

// Example: use virtual threads for session dispatchers (Java 21+)
ThreadedSocketAcceptor acceptor = new ThreadedSocketAcceptor(...);
acceptor.start(); // internally calls eventHandlingStrategy.setExecutor(longLivedExecutor)

The longLivedExecutor is provided by SessionConnector and can be customised. If no executor is set, DedicatedThreadExecutor creates a plain new Thread(...) per session/strategy.


9. Thread Safety Implications for Application Developers