0x08-f Ring Buffer Pipeline Implementation
๐บ๐ธ English ย ย ย |ย ย ย ๐จ๐ณ ไธญๆ
๐บ๐ธ English
๐ฆ Code Changes: View Diff
Goal: Connect services using Ring Buffers to implement a true Pipeline architecture.
Part 1: Single-Thread Pipeline
1.1 Background
Legacy Execution (Synchronous Serial):
for order in orders:
1. ubscore.process_order(order) # WAL + Lock
2. engine.process_order(order) # Match
3. ubscore.settle_trade(trade) # Settle
4. ledger.write(event) # Persist
Problem: No pipeline parallelism, latency accumulates.
1.2 Single-Thread Pipeline Architecture
Decouple services using Ring Buffers, but polling within a single thread loop:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Single-Thread Pipeline (Round-Robin) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Stage 1: Ingestion โ order_queue โ
โ Stage 2: UBSCore Pre-Trade โ valid_order_queue โ
โ Stage 3: Matching Engine โ trade_queue โ
โ Stage 4: Settlement โ (Ledger) โ
โ โ
โ All Stages executed in a round-robin loop โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Core Data Structures:
#![allow(unused)]
fn main() {
pub struct PipelineQueues {
pub order_queue: Arc<ArrayQueue<SequencedOrder>>,
pub valid_order_queue: Arc<ArrayQueue<ValidOrder>>,
pub trade_queue: Arc<ArrayQueue<TradeEvent>>,
}
}
Execution Loop:
#![allow(unused)]
fn main() {
loop {
// UBSCore: order_queue โ valid_order_queue
if let Some(order) = queues.order_queue.pop() {
// ...
}
// ME: valid_order_queue โ trade_queue
if let Some(valid_order) = queues.valid_order_queue.pop() {
// ...
}
// Settlement: trade_queue โ persist
if let Some(trade) = queues.trade_queue.pop() {
// ...
}
}
}
Part 2: Multi-Thread Pipeline
2.1 Architecture
Full Multi-Threaded Pipeline based on 0x08-a design:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Multi-Thread Pipeline (Full) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Thread 1: Ingestion Thread 2: UBSCore Thread 3: ME โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ Read orders โ โ PRE-TRADE: โ โ Match Order โ โ
โ โ Assign SeqNum โโโโโโโโถโ - Write WAL โโโโโโโโถโ in OrderBook โ โ
โ โ โ โ โ - process_order() โ โข โ โ โ
โ โโโโโโโโโโโโโโโโโโโ โ - lock_balance() โ โ Generate โ โ
โ โ โ โ TradeEvents โ โ
โ โโโโโโโโโโโโฌโโโโโโโโโโโโ โโโโโโโโโโฌโโโโโโโโโ โ
โ โฒ โ โ
โ โ โ โ
โ โ โค balance_update_queue โ โฃ trade_queue โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโ โผ โ
โ โ POST-TRADE: โ โโโโโโโโโโโโโโโโโโโ โ
โ โ - settle_trade() โ โ Thread 4: โ โ
โ โ - spend_frozen() โโโโโโโโถโ Settlement โ โ
โ โ - deposit() โ โฅ โ โ โ
โ โ - Generate Balance โ โ Persist: โ โ
โ โ Update Events โ โ - Trade Events โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโ โ - Balance Eventsโ โ
โ โ - Ledger โ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
2.2 Key Design Points
- ME Fan-out: ME sends
TradeEventin parallel to:trade_queueโ Settlement (Persist)balance_update_queueโ UBSCore (Balance Settle)
- UBSCore as Single Balance Entry: Handles Pre-Trade Lock, Post-Trade Settle, and Refunds.
- Settlement Consolidation: Consumes both Trade Events and Balance Events.
2.3 Data Types
BalanceUpdateRequest (ME โ UBSCore): Contains Trade Event and optional Price Improvement data.
BalanceEvent (UBSCore โ Settlement): The unified channel for ALL balance changes (Lock, Settle, Credit, Refund).
#![allow(unused)]
fn main() {
pub enum BalanceEventType {
Lock, // Pre-Trade
SpendFrozen, // Post-Trade
Credit, // Post-Trade
RefundFrozen, // Price Improvement
// ...
}
}
2.4 Implementation Status
| Component | Status |
|---|---|
| All Queues | โ Implemented |
| UBSCore BalanceEvent Gen | โ Implemented |
| Settlement Persistence | โ Implemented |
Verification & Performance (2025-12-17)
Correctness
E2E tests pass for both pipeline modes.
Performance Comparison
1.3M Orders (with 300k Cancel):
| Mode | Time | Throughput | Trades |
|---|---|---|---|
| UBSCore (Baseline) | 23.5s | 55k ops/s | 538,487 |
| Single-Thread Pipeline | 22.1s | 59k ops/s | 538,487 |
| Multi-Thread Pipeline | 29.1s | 45k ops/s | 489,804 |
- Issue: Multi-Thread mode is currently slower (-30%) on large datasets and skips cancel orders.
100k Orders (Place only):
| Mode | Time | Throughput | vs Baseline |
|---|---|---|---|
| UBSCore | 755ms | 132k ops/s | - |
| Single-Thread | 519ms | 193k ops/s | +46% |
| Multi-Thread | 391ms | 256k ops/s | +93% |
- Observation: Multi-threading shines on smaller, simpler datasets (+93%).
Analysis
Multi-threaded pipeline overhead (context switching, queue contention, event generation) outweighs benefits when per-order processing time is very low (due to optimizations). Also, missing Cancel logic reduces correctness.
Key Design Decisions
- Backpressure: Spin Wait (prioritize low latency).
- Shutdown: Graceful drain using Atomic Signals.
- Error Handling: Logging and metric counting; critical paths must succeed.
๐จ๐ณ ไธญๆ
๐ฆ ไปฃ็ ๅๆด: ๆฅ็ Diff
็ฎๆ ๏ผไฝฟ็จ Ring Buffer ไธฒๆฅไธๅๆๅก๏ผๅฎ็ฐ็ๆญฃ็ Pipeline ๆถๆ
Part 1: ๅ็บฟ็จ Pipeline
1.1 ่ๆฏ
ๅๅงๆง่กๆจกๅผ (ๅๆญฅไธฒ่ก):
for order in orders:
1. ubscore.process_order(order) # WAL + Lock
2. engine.process_order(order) # Match
3. ubscore.settle_trade(trade) # Settle
4. ledger.write(event) # Persist
้ฎ้ข๏ผๆฒกๆ Pipeline ๅนถ่ก๏ผๅปถ่ฟ็ดฏๅ
1.2 ๅ็บฟ็จ Pipeline ๆถๆ
ไฝฟ็จ Ring Buffer ่งฃ่ฆๅๆๅก๏ผไฝไปๅจๅ็บฟ็จไธญ่ฝฎ่ฏขๆง่ก๏ผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Single-Thread Pipeline (Round-Robin) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Stage 1: Ingestion โ order_queue โ
โ Stage 2: UBSCore Pre-Trade โ valid_order_queue โ
โ Stage 3: Matching Engine โ trade_queue โ
โ Stage 4: Settlement โ (Ledger) โ
โ โ
โ ๆๆ Stage ๅจๅไธไธช while ๅพช็ฏไธญ่ฝฎ่ฏขๆง่ก โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
ๆ ธๅฟๆฐๆฎ็ปๆ:
#![allow(unused)]
fn main() {
pub struct PipelineQueues {
pub order_queue: Arc<ArrayQueue<SequencedOrder>>,
pub valid_order_queue: Arc<ArrayQueue<ValidOrder>>,
pub trade_queue: Arc<ArrayQueue<TradeEvent>>,
}
}
ๆง่กๆต็จ:
#![allow(unused)]
fn main() {
loop {
// UBSCore: order_queue โ valid_order_queue
if let Some(order) = queues.order_queue.pop() {
// ...
}
// ME: valid_order_queue โ trade_queue
if let Some(valid_order) = queues.valid_order_queue.pop() {
// ...
}
// Settlement: trade_queue โ persist
if let Some(trade) = queues.trade_queue.pop() {
// ...
}
}
}
Part 2: ๅค็บฟ็จ Pipeline
2.1 ๆถๆ
ๆ นๆฎ 0x08-a ๅๅง่ฎพ่ฎก๏ผๅฎๆด็ๅค็บฟ็จ Pipeline ๆฐๆฎๆตๅฆไธ๏ผ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Multi-Thread Pipeline (ๅฎๆด็) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Thread 1: Ingestion Thread 2: UBSCore Thread 3: ME โ
โ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โ
โ โ Read orders โ โ PRE-TRADE: โ โ Match Order โ โ
โ โ Assign SeqNum โโโโโโโโถโ - Write WAL โโโโโโโโถโ in OrderBook โ โ
โ โ โ โ โ - process_order() โ โข โ โ โ
โ โโโโโโโโโโโโโโโโโโโ โ - lock_balance() โ โ Generate โ โ
โ โ โ โ TradeEvents โ โ
โ โโโโโโโโโโโโฌโโโโโโโโโโโโ โโโโโโโโโโฌโโโโโโโโโ โ
โ โฒ โ โ
โ โ โ โ
โ โ โค balance_update_queue โ โฃ trade_queue โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค โ
โ โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโ โผ โ
โ โ POST-TRADE: โ โโโโโโโโโโโโโโโโโโโ โ
โ โ - settle_trade() โ โ Thread 4: โ โ
โ โ - spend_frozen() โโโโโโโโถโ Settlement โ โ
โ โ - deposit() โ โฅ โ โ โ
โ โ - Generate Balance โ โ Persist: โ โ
โ โ Update Events โ โ - Trade Events โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโ โ - Balance Eventsโ โ
โ โ - Ledger โ โ
โ โโโโโโโโโโโโโโโโโโโ โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
2.2 ๅ ณ้ฎ่ฎพ่ฎก็น
- ME Fan-out: ME ๅฐ
TradeEventๅนถ่กๅ้ๅฐ๏ผtrade_queueโ Settlement (ๆไน ๅไบคๆ่ฎฐๅฝ)balance_update_queueโ UBSCore (ไฝ้ข็ป็ฎ)
- UBSCore ๆฏไฝ้ขๆไฝ็ๅฏไธๅ ฅๅฃ: ๅค็ Pre-Trade ้ๅฎใPost-Trade ็ป็ฎๅ้ๆฌพใ
- Settlement ่ๅ: ๅๆถๆถ่ดนไบคๆไบไปถๅไฝ้ขไบไปถใ
2.3 ๆฐๆฎ็ฑปๅ
BalanceUpdateRequest (ME โ UBSCore): ๅ ๅซๆไบคไบไปถๅๅฏ่ฝ็ไปทๆ ผๆนๅ(Price Improvement)ๆฐๆฎใ
BalanceEvent (UBSCore โ Settlement): ๆๆไฝ้ขๅๆด็็ปไธ้้ (Lock, Settle, Credit, Refund)ใ
#![allow(unused)]
fn main() {
pub enum BalanceEventType {
Lock, // Pre-Trade
SpendFrozen, // Post-Trade
Credit, // Post-Trade
RefundFrozen, // Price Improvement
// ...
}
}
2.4 ๅฎ็ฐ็ถๆ
| ็ปไปถ | ็ถๆ |
|---|---|
| ๆๆ้ๅ | โ ๅทฒๅฎ็ฐ |
| UBSCore BalanceEvent ็ๆ | โ ๅทฒๅฎ็ฐ |
| Settlement ๆไน ๅ | โ ๅทฒๅฎ็ฐ |
้ช่ฏไธๆง่ฝ (2025-12-17)
ๆญฃ็กฎๆง
E2E ๆต่ฏๅจไธค็งๆจกๅผไธๅ้่ฟใ
ๆง่ฝๅฏนๆฏ
1.3M ่ฎขๅ (ๅซ 30 ไธๆคๅ):
| ๆจกๅผ | ๆง่กๆถ้ด | ๅๅ้ | ๆไบคๆฐ |
|---|---|---|---|
| UBSCore (Baseline) | 23.5s | 55k ops/s | 538,487 |
| ๅ็บฟ็จ Pipeline | 22.1s | 59k ops/s | 538,487 |
| ๅค็บฟ็จ Pipeline | 29.1s | 45k ops/s | 489,804 |
- ้ฎ้ข: ๅค็บฟ็จๆจกๅผๅจๅคงๆฐๆฎ้ไธๅ่ๆดๆ ข (-30%)๏ผไธ็ฎๅ่ทณ่ฟไบๆคๅๅค็ใ
100k ่ฎขๅ (ไป Place):
| ๆจกๅผ | ๆถ้ด | ๅๅ้ | ๆๅ |
|---|---|---|---|
| UBSCore | 755ms | 132k ops/s | - |
| ๅ็บฟ็จ | 519ms | 193k ops/s | +46% |
| ๅค็บฟ็จ | 391ms | 256k ops/s | +93% |
- ่งๅฏ: ๅค็บฟ็จๅจ็ฎๅ็ๅฐๆฐๆฎ้ไธ่กจ็ฐๅบ่ฒ (+93%)ใ
ๅๆ
ๅจๅ็ฌๅค็ๆๅฟซ็ๆ ๅตไธ๏ผๅค็บฟ็จๅธฆๆฅ็ๅผ้๏ผไธไธๆๅๆขใ้ๅ็ซไบใไบไปถ็ๆ๏ผ่ถ ่ฟไบๅนถ่ก็ๆถ็ใๆญคๅค๏ผ็ผบๅคฑๆคๅ้ป่พ้ไฝไบๆญฃ็กฎๆงใ
ๅ ณ้ฎ่ฎพ่ฎกๅณ็ญ
- ่ๅ: ่ชๆ็ญๅพ (Spin Wait)๏ผไผๅ ไฝๅปถ่ฟใ
- ๅ ณ้ญ: ไฝฟ็จๅๅญไฟกๅทไผ้ ้ๅบใ
- ้่ฏฏๅค็: ๆฅๅฟ่ฎฐๅฝ๏ผๆ ธๅฟ่ทฏๅพๅฟ ้กปๆๅใ