Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

0x08-g Multi-Thread Pipeline Design

🇺🇸 English    |    🇨🇳 中文

🇺🇸 English

📦 Code Changes: View Diff | Key File: pipeline_mt.rs

Overview

The Multi-Thread Pipeline distributes processing logic across 4 independent threads, communicating via lock-free queues to achieve high throughput order processing.

Architecture

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  Ingestion  │────▶│   UBSCore   │────▶│     ME      │────▶│ Settlement  │
│  (Thread 1) │     │  (Thread 2) │     │  (Thread 3) │     │  (Thread 4) │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
      │                   │ ▲                 │                   │
      │                   │ │                 │                   │
      ▼                   ▼ │                 ▼                   ▼
  order_queue ────▶ action_queue      balance_update_queue   trade_queue
                           │                                balance_event_queue
                           └──────────────────────────────────────┘

Thread Responsibilities

ThreadResponsibilityInput QueueOutput
IngestionParse orders, assign SeqNumorders (iterator)order_queue
UBSCorePre-Trade (WAL + Lock) + Post-Trade (Settle)order_queue, balance_update_queueaction_queue, balance_event_queue
MEMatch, Cancel handlingaction_queuetrade_queue, balance_update_queue
SettlementPersist Events (Trade, Balance)trade_queue, balance_event_queueledgers

Queue Design

Using crossbeam-queue::ArrayQueue for lock-free MPSC queues:

#![allow(unused)]
fn main() {
pub struct MultiThreadQueues {
    pub order_queue: Arc<ArrayQueue<OrderAction>>,     // 64K
    pub action_queue: Arc<ArrayQueue<ValidAction>>,    // 64K
    pub trade_queue: Arc<ArrayQueue<TradeEvent>>,      // 64K
    pub balance_update_queue: Arc<ArrayQueue<BalanceUpdateRequest>>,  // 64K
    pub balance_event_queue: Arc<ArrayQueue<BalanceEvent>>,           // 64K
}
}

Cancel Handling

  1. Ingestion: Create OrderAction::Cancel.
  2. UBSCore: Pass to action_queue (No lock needed).
  3. ME: Remove from OrderBook, send BalanceUpdateRequest::Cancel.
  4. UBSCore: Process unlock, generate BalanceEvent::Unlock.
  5. Settlement: Persist BalanceEvent.

Consistency Verification

Test Script

# Run full comparison test
./scripts/test_pipeline_compare.sh highbal

# Supported Datasets:
#   100k    - 100k orders without cancel
#   cancel  - 1.3M orders with 30% cancel
#   highbal - 1.3M orders with 30% cancel, high balance (Recommended)

Verification Results (1.3M orders, 30% cancel, high balance)

╔════════════════════════════════════════════════════════════════╗
║                    ✅ ALL TESTS PASSED                         ║
║  Multi-thread pipeline matches single-thread exactly!          ║
╚════════════════════════════════════════════════════════════════╝

Key Metrics

DatasetTotalPlaceCancelTradesResult
100k100,000100,000047,886✅ Match
1.3M HighBal1,300,0001,000,000300,000667,567✅ Match

Important Considerations

Balance Sufficiency

Insufficient balance may cause rejections. In concurrent environments, rejection timing can vary due to settlement latency, leading to non-deterministic results. Solution: Use highbal dataset (1000 BTC + 100M USDT per user).

Shutdown Synchronization

Wait for queues to drain before signaling shutdown:

#![allow(unused)]
fn main() {
while !queues.all_empty() {
    std::hint::spin_loop();
}
shutdown.request_shutdown();
}

Performance

Mode100k orders1.3M orders
Single-Thread350ms15.5s
Multi-Thread330ms15.6s

Note: Multi-thread version includes overhead for BalanceEvent generation/persistence, matching Single-Thread performance. Future optimizations: Batch I/O, reduce contention.

Queue Priority Strategy (Future)

Current Implementation: Prioritize draining balance_update_queue completely before processing order_queue.

Future: Weighted Round-Robin: Allow alternating processing to improve responsiveness.

#![allow(unused)]
fn main() {
const SETTLE_WEIGHT: u32 = 3;  // settle : order = 3 : 1
}

File Structure

src/
├── pipeline.rs       # Shared types
├── pipeline_mt.rs    # Multi-thread impl
├── pipeline_runner.rs # Single-thread impl
└── main.rs



🇨🇳 中文

📦 代码变更: 查看 Diff | 关键文件: pipeline_mt.rs

概述

Multi-Thread Pipeline 将处理逻辑分布在 4 个独立线程中,通过无锁队列通信,实现高吞吐量的订单处理。

架构

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  Ingestion  │────▶│   UBSCore   │────▶│     ME      │────▶│ Settlement  │
│  (Thread 1) │     │  (Thread 2) │     │  (Thread 3) │     │  (Thread 4) │
└─────────────┘     └─────────────┘     └─────────────┘     └─────────────┘
      │                   │ ▲                 │                   │
      │                   │ │                 │                   │
      ▼                   ▼ │                 ▼                   ▼
  order_queue ────▶ action_queue      balance_update_queue   trade_queue
                           │                                balance_event_queue
                           └──────────────────────────────────────┘

线程职责

线程职责输入队列输出
Ingestion订单解析、序列号分配orders (iterator)order_queue
UBSCorePre-Trade (WAL + Lock) + Post-Trade (Settle)order_queue, balance_update_queueaction_queue, balance_event_queue
ME订单撮合、取消处理action_queuetrade_queue, balance_update_queue
Settlement事件持久化 (TradeEvent, BalanceEvent)trade_queue, balance_event_queueledger files

队列设计

使用 crossbeam-queue::ArrayQueue 实现无锁 MPSC 队列:

#![allow(unused)]
fn main() {
pub struct MultiThreadQueues {
    pub order_queue: Arc<ArrayQueue<OrderAction>>,     // 64K capacity
    pub action_queue: Arc<ArrayQueue<ValidAction>>,    // 64K capacity
    pub trade_queue: Arc<ArrayQueue<TradeEvent>>,      // 64K capacity
    pub balance_update_queue: Arc<ArrayQueue<BalanceUpdateRequest>>,  // 64K
    pub balance_event_queue: Arc<ArrayQueue<BalanceEvent>>,           // 64K
}
}

Cancel 订单处理

Cancel 订单流程:

  1. Ingestion: 创建 OrderAction::Cancel { order_id, user_id }
  2. UBSCore: 直接传递到 action_queue(无需 balance lock)
  3. ME: 从 OrderBook 移除订单,发送 BalanceUpdateRequest::Cancel
  4. UBSCore (Post-Trade): 处理 unlock,生成 BalanceEvent::Unlock
  5. Settlement: 持久化 BalanceEvent

一致性验证

测试脚本

# 运行完整对比测试
./scripts/test_pipeline_compare.sh highbal

# 支持的数据集:
#   100k    - 100k orders without cancel
#   cancel  - 1.3M orders with 30% cancel
#   highbal - 1.3M orders with 30% cancel, high balance (推荐)

验证结果 (1.3M orders, 30% cancel, high balance)

╔════════════════════════════════════════════════════════════════╗
║                    ✅ ALL TESTS PASSED                         ║
║  Multi-thread pipeline matches single-thread exactly!          ║
╚════════════════════════════════════════════════════════════════╝

关键指标

数据集总订单PlaceCancelTrades结果
100k (无 cancel)100,000100,000047,886✅ 完全一致
1.3M + 30% cancel (高余额)1,300,0001,000,000300,000667,567✅ 完全一致

注意事项

余额充足性

如果测试数据中用户余额不足,可能导致部分订单被 reject。在并发环境中,由于 settle 时序不同,这些 reject 可能与单线程结果不同。

解决方案: 使用 highbal 数据集,确保每个用户有充足余额(1000 BTC + 100M USDT)。

Shutdown 同步

Multi-thread pipeline 在 shutdown 时需要确保所有队列都已 drain:

#![allow(unused)]
fn main() {
while !queues.all_empty() {
    std::hint::spin_loop();
}
shutdown.request_shutdown();
}

性能

模式100k orders1.3M orders
Single-Thread350ms15.5s
Multi-Thread330ms15.6s

注:Multi-thread 当前版本包含 BalanceEvent 生成和持久化开销,性能与 Single-Thread 相当。未来优化方向包括批量 I/O 和减少队列竞争。

队列优先级策略 (未来)

当前实现: 完全优先 drain balance_update_queue,然后才处理新订单。

未来优化: 加权轮询 (Weighted Round-Robin): 允许交替处理,提高响应性。

#![allow(unused)]
fn main() {
const SETTLE_WEIGHT: u32 = 3;  // settle : order = 3 : 1
}

文件结构

src/
├── pipeline.rs       # 共享类型: PipelineStats, MultiThreadQueues, ShutdownSignal
├── pipeline_mt.rs    # Multi-thread 实现: run_pipeline_multi_thread()
├── pipeline_runner.rs # Single-thread 实现: run_pipeline()
└── main.rs           # --pipeline / --pipeline-mt 模式选择