Keyboard shortcuts

Press โ† or โ†’ to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

0x09-c WebSocket Push: Real-time Notification

๐Ÿ‡บ๐Ÿ‡ธ English ย ย ย |ย ย ย  ๐Ÿ‡จ๐Ÿ‡ณ ไธญๆ–‡

๐Ÿ‡บ๐Ÿ‡ธ English

๐Ÿ“ฆ Code Changes: View Diff

Core Objective: Implement WebSocket real-time push so clients can receive order updates, trade notifications, and balance changes.


Background: From Polling to Push

Current Query Method (Polling):

Client                    Gateway
  โ”‚                          โ”‚
  โ”œโ”€โ”€โ”€ GET /orders โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€>โ”‚  (Poll)
  โ”‚<โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
  โ”‚       ... seconds ...      โ”‚
  โ”œโ”€โ”€โ”€ GET /orders โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€>โ”‚  (Poll again)
  โ”‚<โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค

Issues:

  • โŒ High Latency
  • โŒ Wasted Resources
  • โŒ Poor Real-time experience

This Chapterโ€™s Solution (Push):

Client                    Gateway                Trading Core
  โ”‚                          โ”‚                        โ”‚
  โ”œโ”€โ”€ WS Connect โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€>โ”‚                        โ”‚
  โ”‚<โ”€โ”€ Connected โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค                        โ”‚
  โ”‚                          โ”‚                        โ”‚
  โ”‚                          โ”‚<โ”€โ”€ Order Filled โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
  โ”‚<โ”€โ”€ push: order.update โ”€โ”€โ”€โ”ค                        โ”‚
  โ”‚                          โ”‚                        โ”‚
  โ”‚                          โ”‚<โ”€โ”€ Trade โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
  โ”‚<โ”€โ”€ push: trade โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค                        โ”‚

1. Push Event Types

1.1 Classification

Event TypeTriggerRecipient
order.updateStatus change (NEW/FILLED/CANCELED)Order Owner
tradeTrade executionBuyer & Seller
balance.updateBalance changeAccount Owner

1.2 Message Format

// Order Update
{
    "type": "order.update",
    "data": {
        "order_id": 1001,
        "symbol": "BTC_USDT",
        "status": "FILLED",
        "filled_qty": "0.001",
        "avg_price": "85000.00",
        "updated_at": 1734533790000
    }
}

// Trade Notification
{
    "type": "trade",
    "data": {
        "trade_id": 5001,
        "order_id": 1001,
        "symbol": "BTC_USDT",
        "side": "BUY",
        "role": "TAKER",
        "traded_at": 1734533790000
    }
}

// Balance Update
{
    "type": "balance.update",
    "data": {
        "asset": "BTC",
        "avail": "1.501000",
        "frozen": "0.000000"
    }
}

2. Architecture Design

2.1 Design Principles

Important

Data Consistency First: When a user receives a push, the database MUST already be updated.

Correct Flow: ME Match โ†’ Settlement Persist โ†’ Push โ†’ User Query โ†’ Data Exists โœ…

Incorrect Flow: ME Match โ†’ Push โ†’ User Query โ†’ Data Not Found โŒ

2.2 System Architecture

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Multi-Thread Pipeline                         โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  Thread 3: ME         โ”€โ”€โ–ถ  trade_queue  โ”€โ”€โ–ถ  Thread 4: Settlementโ”‚
โ”‚                       โ””โ”€โ”€โ–ถ  balance_update_queue                โ”‚
โ”‚                                                                  โ”‚
โ”‚  Thread 4: Settlement โ”€โ”€โ–ถ  push_event_queue  โ”€โ”€โ–ถ  WsService     โ”‚
โ”‚                       โ”‚                                          โ”‚
โ”‚                       โ””โ”€โ”€โ–ถ  TDengine (persist)                   โ”‚
โ”‚                                                                  โ”‚
โ”‚  WsService (Gateway)  โ”€โ”€โ–ถ  ConnectionManager  โ”€โ”€โ–ถ  Clients      โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Key Decisions:

  • โœ… Settlement is the only push source.
  • โœ… Push events generated ONLY after persistence success.
  • โœ… WsService runs in the Gatewayโ€™s tokio runtime.

2.3 Connection Management

ConnectionManager uses DashMap to handle concurrent connections, supporting multiple connections per user.


3. API Design

3.1 Endpoint

ws://host:port/ws

3.2 Connection Flow

  1. Connect.
  2. Send Auth: {"type": "auth", "token": "..."}.
  3. Receive Auth Success.
  4. Receive Push Events.

3.3 Heartbeat

Client sends {"type": "ping"} every 30s, Server responds {"type": "pong"}.


4. Implementation

4.1 Core Structures

PushEvent (Internal Queue):

#![allow(unused)]
fn main() {
pub enum PushEvent {
    OrderUpdate { ... },
    Trade { ... },
    BalanceUpdate { ... },
}
}

TradeEvent Extension: Added taker_filled_qty, maker_filled_qty etc., to TradeEvent to allow Settlement to determine order status (FILLED vs PARTIAL) without querying generic order state.

4.2 Implementation Plan

  • Phase 1: Basic Connection (Manager, Handler, Gateway Integration).
  • Phase 2: Push Integration (push_event_queue, WsService, Settlement logic).
  • Phase 3: Refinement (Error handling, Performance tests).

5. Verification

5.1 Automated Tests

Run sh run_test.sh:

  • Validates WS connection.
  • Submits orders and verifies receiving order_update, trade, and balance_update events.

5.2 Manual Test

websocat "ws://localhost:8080/ws?user_id=1001"
# Send {"type": "ping"} -> Receive {"type": "pong"}

Summary

This chapter implements WebSocket real-time push.

Key Design Decisions:

  1. Settlement-first: Ensuring consistency.
  2. Single Source: All events originate from Settlement.
  3. Extended TradeEvent: Carrying adequate state for downstream consumers.

Next Chapter: 0x09-d K-Line Aggregation.




๐Ÿ‡จ๐Ÿ‡ณ ไธญๆ–‡

๐Ÿ“ฆ ไปฃ็ ๅ˜ๆ›ด: ๆŸฅ็œ‹ Diff

ๆœฌ่Š‚ๆ ธๅฟƒ็›ฎๆ ‡๏ผšๅฎž็Žฐ WebSocket ๅฎžๆ—ถๆŽจ้€๏ผŒๅฎขๆˆท็ซฏๅฏๆŽฅๆ”ถ่ฎขๅ•็Šถๆ€ๆ›ดๆ–ฐใ€ๆˆไบค้€š็Ÿฅใ€ไฝ™้ขๅ˜ๅŒ–ใ€‚


่ƒŒๆ™ฏ๏ผšไปŽ่ฝฎ่ฏขๅˆฐๆŽจ้€

ๅฝ“ๅ‰็ณป็ปŸๆŸฅ่ฏขๆ–นๅผ๏ผˆ่ฝฎ่ฏข๏ผ‰๏ผš

Client                    Gateway
  โ”‚                          โ”‚
  โ”œโ”€โ”€โ”€ GET /orders โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€>โ”‚  (่ฝฎ่ฏข polling)
  โ”‚<โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
  โ”‚       ... ๆ•ฐ็ง’ๅŽ ...       โ”‚
  โ”œโ”€โ”€โ”€ GET /orders โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€>โ”‚  (ๅ†ๆฌก่ฝฎ่ฏข)
  โ”‚<โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค

้—ฎ้ข˜๏ผš

  • โŒ ๅปถ่ฟŸ้ซ˜
  • โŒ ๆตช่ดน่ต„ๆบ
  • โŒ ๅฎžๆ—ถๆ€งๅทฎ

ๆœฌ็ซ ่งฃๅ†ณๆ–นๆกˆ๏ผˆๆŽจ้€๏ผ‰๏ผš

Client                    Gateway                Trading Core
  โ”‚                          โ”‚                        โ”‚
  โ”œโ”€โ”€ WS Connect โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€>โ”‚                        โ”‚
  โ”‚<โ”€โ”€ Connected โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค                        โ”‚
  โ”‚                          โ”‚                        โ”‚
  โ”‚                          โ”‚<โ”€โ”€ Order Filled โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
  โ”‚<โ”€โ”€ push: order.update โ”€โ”€โ”€โ”ค                        โ”‚
  โ”‚                          โ”‚                        โ”‚
  โ”‚                          โ”‚<โ”€โ”€ Trade โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
  โ”‚<โ”€โ”€ push: trade โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค                        โ”‚

1. ๆŽจ้€ไบ‹ไปถ็ฑปๅž‹

1.1 ไบ‹ไปถๅˆ†็ฑป

ไบ‹ไปถ็ฑปๅž‹่งฆๅ‘ๆ—ถๆœบๆŽฅๆ”ถ่€…
order.update่ฎขๅ•็Šถๆ€ๅ˜ๅŒ–่ฎขๅ•ๆ‰€ๆœ‰่€…
tradeๆˆไบคๅ‘็”ŸๅŒๆ–น็”จๆˆท
balance.updateไฝ™้ขๅ˜ๅŒ–่ดฆๆˆทๆ‰€ๆœ‰่€…

1.2 ๆถˆๆฏๆ ผๅผ

// ่ฎขๅ•ๆ›ดๆ–ฐ
{
    "type": "order.update",
    "data": {
        "order_id": 1001,
        "symbol": "BTC_USDT",
        "status": "FILLED",
        "filled_qty": "0.001",
        "avg_price": "85000.00",
        "updated_at": 1734533790000
    }
}

2. ๆžถๆž„่ฎพ่ฎก

2.1 ่ฎพ่ฎกๅŽŸๅˆ™

Important

ๆ•ฐๆฎไธ€่‡ดๆ€งไผ˜ๅ…ˆ: ็”จๆˆทๆ”ถๅˆฐๆŽจ้€ๆ—ถ๏ผŒๆ•ฐๆฎๅบ“ๅฟ…้กปๅทฒๆ›ดๆ–ฐใ€‚

ๆญฃ็กฎๆต็จ‹: ME ๆˆไบค โ†’ Settlement ๆŒไน…ๅŒ– โ†’ ๆŽจ้€ โ†’ ็”จๆˆทๆŸฅ่ฏข โ†’ ๆ•ฐๆฎๅทฒๅญ˜ๅœจ โœ…

2.2 ็ณป็ปŸๆžถๆž„

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    Multi-Thread Pipeline                         โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                                  โ”‚
โ”‚  Thread 3: ME         โ”€โ”€โ–ถ  trade_queue  โ”€โ”€โ–ถ  Thread 4: Settlementโ”‚
โ”‚                       โ””โ”€โ”€โ–ถ  balance_update_queue                โ”‚
โ”‚                                                                  โ”‚
โ”‚  Thread 4: Settlement โ”€โ”€โ–ถ  push_event_queue  โ”€โ”€โ–ถ  WsService     โ”‚
โ”‚                       โ”‚                                          โ”‚
โ”‚                       โ””โ”€โ”€โ–ถ  TDengine (persist)                   โ”‚
โ”‚                                                                  โ”‚
โ”‚  WsService (Gateway)  โ”€โ”€โ–ถ  ConnectionManager  โ”€โ”€โ–ถ  Clients      โ”‚
โ”‚                                                                  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

ๅ…ณ้”ฎ่ฎพ่ฎก:

  • โœ… Settlement ไฝœไธบๅ”ฏไธ€ๆŽจ้€ๆบ
  • โœ… ๆŒไน…ๅŒ–ๆˆๅŠŸๅŽๆ‰็”Ÿๆˆ PushEvent
  • โœ… WsService ่ฟ่กŒๅœจ Gateway ็š„ tokio runtime

3. API ่ฎพ่ฎก

3.1 ็ซฏ็‚น

ws://host:port/ws

3.2 ่ฟžๆŽฅๆต็จ‹

  1. Client ่ฟžๆŽฅ
  2. ๅ‘้€่ฎค่ฏ: {"type": "auth", "token": "..."}
  3. ๆŽฅๆ”ถๆŽจ้€

3.3 ๅฟƒ่ทณ

Client ๅ‘้€ {"type": "ping"} (ๆฏ30็ง’)๏ผŒServer ๅ›žๅค {"type": "pong"}ใ€‚


4. ๅฎž็Žฐ็ป†่Š‚

4.1 ๆ ธๅฟƒ็ป“ๆž„

PushEvent (ๅ†…้ƒจ้˜Ÿๅˆ—): ๅฎšไน‰ไบ†ไธ‰็งๆ ธๅฟƒไบ‹ไปถ็ป“ๆž„ใ€‚

TradeEvent ๆ‰ฉๅฑ•: ๆ–ฐๅขžไบ† taker_filled_qty ็ญ‰ๅญ—ๆฎต๏ผŒๅ…่ฎธ Settlement ๅˆคๆ–ญ่ฎขๅ•ๆœ€็ปˆ็Šถๆ€ใ€‚

4.2 ๅฎž็Žฐ่ฎกๅˆ’

  • Phase 1: ๅŸบ็ก€่ฟžๆŽฅ็ฎก็†
  • Phase 2: ๆŽจ้€้›†ๆˆ (Settlement -> WsService)
  • Phase 3: ๅฎŒๅ–„ไธŽ้ชŒ่ฏ

5. ้ชŒ่ฏ

5.1 ่‡ชๅŠจๅŒ–ๆต‹่ฏ•

่ฟ่กŒ sh run_test.sh๏ผŒ่ฆ†็›–่ฟžๆŽฅใ€ไธ‹ๅ•ใ€ๆŽฅๆ”ถๅ„็ฑปๆŽจ้€็š„ๅ…จๆต็จ‹ใ€‚

5.2 ๆ‰‹ๅŠจๆต‹่ฏ•

websocat "ws://localhost:8080/ws?user_id=1001"

ๆ€ป็ป“

ๆœฌ็ซ ๅฎž็Žฐไบ† WebSocket ๅฎžๆ—ถๆŽจ้€ใ€‚

ๅ…ณ้”ฎ่ฎพ่ฎกๅ†ณ็ญ–:

  1. settlement-first: ็กฎไฟไธ€่‡ดๆ€งใ€‚
  2. ๅ•ไธ€ๆŽจ้€ๆบ: ็ฎ€ๅŒ–ๆžถๆž„ใ€‚
  3. TradeEvent ๆ‰ฉๅฑ•: ๆบๅธฆ่ถณๅคŸ็Šถๆ€ใ€‚

ไธ‹ไธ€็ซ  (0x09-d) ๅฐ†ๅฎž็Žฐ K-Line ่šๅˆๆœๅŠกใ€‚