0x09-c WebSocket Push: Real-time Notification
๐บ๐ธ English ย ย ย |ย ย ย ๐จ๐ณ ไธญๆ
๐บ๐ธ English
๐ฆ Code Changes: View Diff
Core Objective: Implement WebSocket real-time push so clients can receive order updates, trade notifications, and balance changes.
Background: From Polling to Push
Current Query Method (Polling):
Client Gateway
โ โ
โโโโ GET /orders โโโโโโโโโ>โ (Poll)
โ<โโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ ... seconds ... โ
โโโโ GET /orders โโโโโโโโโ>โ (Poll again)
โ<โโโโโโโโโโโโโโโโโโโโโโโโโโโค
Issues:
- โ High Latency
- โ Wasted Resources
- โ Poor Real-time experience
This Chapterโs Solution (Push):
Client Gateway Trading Core
โ โ โ
โโโ WS Connect โโโโโโโโโโโ>โ โ
โ<โโ Connected โโโโโโโโโโโโโค โ
โ โ โ
โ โ<โโ Order Filled โโโโโโโโค
โ<โโ push: order.update โโโโค โ
โ โ โ
โ โ<โโ Trade โโโโโโโโโโโโโโโค
โ<โโ push: trade โโโโโโโโโโโค โ
1. Push Event Types
1.1 Classification
| Event Type | Trigger | Recipient |
|---|---|---|
order.update | Status change (NEW/FILLED/CANCELED) | Order Owner |
trade | Trade execution | Buyer & Seller |
balance.update | Balance change | Account Owner |
1.2 Message Format
// Order Update
{
"type": "order.update",
"data": {
"order_id": 1001,
"symbol": "BTC_USDT",
"status": "FILLED",
"filled_qty": "0.001",
"avg_price": "85000.00",
"updated_at": 1734533790000
}
}
// Trade Notification
{
"type": "trade",
"data": {
"trade_id": 5001,
"order_id": 1001,
"symbol": "BTC_USDT",
"side": "BUY",
"role": "TAKER",
"traded_at": 1734533790000
}
}
// Balance Update
{
"type": "balance.update",
"data": {
"asset": "BTC",
"avail": "1.501000",
"frozen": "0.000000"
}
}
2. Architecture Design
2.1 Design Principles
Important
Data Consistency First: When a user receives a push, the database MUST already be updated.
Correct Flow: ME Match โ Settlement Persist โ Push โ User Query โ Data Exists โ
Incorrect Flow: ME Match โ Push โ User Query โ Data Not Found โ
2.2 System Architecture
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Multi-Thread Pipeline โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Thread 3: ME โโโถ trade_queue โโโถ Thread 4: Settlementโ
โ โโโโถ balance_update_queue โ
โ โ
โ Thread 4: Settlement โโโถ push_event_queue โโโถ WsService โ
โ โ โ
โ โโโโถ TDengine (persist) โ
โ โ
โ WsService (Gateway) โโโถ ConnectionManager โโโถ Clients โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Key Decisions:
- โ Settlement is the only push source.
- โ Push events generated ONLY after persistence success.
- โ WsService runs in the Gatewayโs tokio runtime.
2.3 Connection Management
ConnectionManager uses DashMap to handle concurrent connections, supporting multiple connections per user.
3. API Design
3.1 Endpoint
ws://host:port/ws
3.2 Connection Flow
- Connect.
- Send Auth:
{"type": "auth", "token": "..."}. - Receive Auth Success.
- Receive Push Events.
3.3 Heartbeat
Client sends {"type": "ping"} every 30s, Server responds {"type": "pong"}.
4. Implementation
4.1 Core Structures
PushEvent (Internal Queue):
#![allow(unused)]
fn main() {
pub enum PushEvent {
OrderUpdate { ... },
Trade { ... },
BalanceUpdate { ... },
}
}
TradeEvent Extension:
Added taker_filled_qty, maker_filled_qty etc., to TradeEvent to allow Settlement to determine order status (FILLED vs PARTIAL) without querying generic order state.
4.2 Implementation Plan
- Phase 1: Basic Connection (Manager, Handler, Gateway Integration).
- Phase 2: Push Integration (
push_event_queue,WsService, Settlement logic). - Phase 3: Refinement (Error handling, Performance tests).
5. Verification
5.1 Automated Tests
Run sh run_test.sh:
- Validates WS connection.
- Submits orders and verifies receiving
order_update,trade, andbalance_updateevents.
5.2 Manual Test
websocat "ws://localhost:8080/ws?user_id=1001"
# Send {"type": "ping"} -> Receive {"type": "pong"}
Summary
This chapter implements WebSocket real-time push.
Key Design Decisions:
- Settlement-first: Ensuring consistency.
- Single Source: All events originate from Settlement.
- Extended TradeEvent: Carrying adequate state for downstream consumers.
Next Chapter: 0x09-d K-Line Aggregation.
๐จ๐ณ ไธญๆ
๐ฆ ไปฃ็ ๅๆด: ๆฅ็ Diff
ๆฌ่ๆ ธๅฟ็ฎๆ ๏ผๅฎ็ฐ WebSocket ๅฎๆถๆจ้๏ผๅฎขๆท็ซฏๅฏๆฅๆถ่ฎขๅ็ถๆๆดๆฐใๆไบค้็ฅใไฝ้ขๅๅใ
่ๆฏ๏ผไป่ฝฎ่ฏขๅฐๆจ้
ๅฝๅ็ณป็ปๆฅ่ฏขๆนๅผ๏ผ่ฝฎ่ฏข๏ผ๏ผ
Client Gateway
โ โ
โโโโ GET /orders โโโโโโโโโ>โ (่ฝฎ่ฏข polling)
โ<โโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ ... ๆฐ็งๅ ... โ
โโโโ GET /orders โโโโโโโโโ>โ (ๅๆฌก่ฝฎ่ฏข)
โ<โโโโโโโโโโโโโโโโโโโโโโโโโโโค
้ฎ้ข๏ผ
- โ ๅปถ่ฟ้ซ
- โ ๆตช่ดน่ตๆบ
- โ ๅฎๆถๆงๅทฎ
ๆฌ็ซ ่งฃๅณๆนๆก๏ผๆจ้๏ผ๏ผ
Client Gateway Trading Core
โ โ โ
โโโ WS Connect โโโโโโโโโโโ>โ โ
โ<โโ Connected โโโโโโโโโโโโโค โ
โ โ โ
โ โ<โโ Order Filled โโโโโโโโค
โ<โโ push: order.update โโโโค โ
โ โ โ
โ โ<โโ Trade โโโโโโโโโโโโโโโค
โ<โโ push: trade โโโโโโโโโโโค โ
1. ๆจ้ไบไปถ็ฑปๅ
1.1 ไบไปถๅ็ฑป
| ไบไปถ็ฑปๅ | ่งฆๅๆถๆบ | ๆฅๆถ่ |
|---|---|---|
order.update | ่ฎขๅ็ถๆๅๅ | ่ฎขๅๆๆ่ |
trade | ๆไบคๅ็ | ๅๆน็จๆท |
balance.update | ไฝ้ขๅๅ | ่ดฆๆทๆๆ่ |
1.2 ๆถๆฏๆ ผๅผ
// ่ฎขๅๆดๆฐ
{
"type": "order.update",
"data": {
"order_id": 1001,
"symbol": "BTC_USDT",
"status": "FILLED",
"filled_qty": "0.001",
"avg_price": "85000.00",
"updated_at": 1734533790000
}
}
2. ๆถๆ่ฎพ่ฎก
2.1 ่ฎพ่ฎกๅๅ
Important
ๆฐๆฎไธ่ดๆงไผๅ : ็จๆทๆถๅฐๆจ้ๆถ๏ผๆฐๆฎๅบๅฟ ้กปๅทฒๆดๆฐใ
ๆญฃ็กฎๆต็จ: ME ๆไบค โ Settlement ๆไน ๅ โ ๆจ้ โ ็จๆทๆฅ่ฏข โ ๆฐๆฎๅทฒๅญๅจ โ
2.2 ็ณป็ปๆถๆ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Multi-Thread Pipeline โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Thread 3: ME โโโถ trade_queue โโโถ Thread 4: Settlementโ
โ โโโโถ balance_update_queue โ
โ โ
โ Thread 4: Settlement โโโถ push_event_queue โโโถ WsService โ
โ โ โ
โ โโโโถ TDengine (persist) โ
โ โ
โ WsService (Gateway) โโโถ ConnectionManager โโโถ Clients โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
ๅ ณ้ฎ่ฎพ่ฎก:
- โ Settlement ไฝไธบๅฏไธๆจ้ๆบ
- โ
ๆไน
ๅๆๅๅๆ็ๆ
PushEvent - โ WsService ่ฟ่กๅจ Gateway ็ tokio runtime
3. API ่ฎพ่ฎก
3.1 ็ซฏ็น
ws://host:port/ws
3.2 ่ฟๆฅๆต็จ
- Client ่ฟๆฅ
- ๅ้่ฎค่ฏ:
{"type": "auth", "token": "..."} - ๆฅๆถๆจ้
3.3 ๅฟ่ทณ
Client ๅ้ {"type": "ping"} (ๆฏ30็ง)๏ผServer ๅๅค {"type": "pong"}ใ
4. ๅฎ็ฐ็ป่
4.1 ๆ ธๅฟ็ปๆ
PushEvent (ๅ ้จ้ๅ): ๅฎไนไบไธ็งๆ ธๅฟไบไปถ็ปๆใ
TradeEvent ๆฉๅฑ: ๆฐๅขไบ taker_filled_qty ็ญๅญๆฎต๏ผๅ
่ฎธ Settlement ๅคๆญ่ฎขๅๆ็ป็ถๆใ
4.2 ๅฎ็ฐ่ฎกๅ
- Phase 1: ๅบ็ก่ฟๆฅ็ฎก็
- Phase 2: ๆจ้้ๆ (Settlement -> WsService)
- Phase 3: ๅฎๅไธ้ช่ฏ
5. ้ช่ฏ
5.1 ่ชๅจๅๆต่ฏ
่ฟ่ก sh run_test.sh๏ผ่ฆ็่ฟๆฅใไธๅใๆฅๆถๅ็ฑปๆจ้็ๅ
จๆต็จใ
5.2 ๆๅจๆต่ฏ
websocat "ws://localhost:8080/ws?user_id=1001"
ๆป็ป
ๆฌ็ซ ๅฎ็ฐไบ WebSocket ๅฎๆถๆจ้ใ
ๅ ณ้ฎ่ฎพ่ฎกๅณ็ญ:
- settlement-first: ็กฎไฟไธ่ดๆงใ
- ๅไธๆจ้ๆบ: ็ฎๅๆถๆใ
- TradeEvent ๆฉๅฑ: ๆบๅธฆ่ถณๅค็ถๆใ
ไธไธ็ซ (0x09-d) ๅฐๅฎ็ฐ K-Line ่ๅๆๅกใ