Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

0x09-b Settlement Persistence: TDengine Integration

🇺🇸 English    |    🇨🇳 中文

🇺🇸 English

📦 Code Changes: View Diff

Core Objective: Persist trade data to TDengine and implement Order Query & History APIs.


Background: From Memory to Persistence

In Gateway Phase 1 (0x09-a), we completed:

  • ✅ HTTP API (create_order, cancel_order)
  • ✅ Order Validation
  • ✅ Ring Buffer Integration
  • Data Persistence ← This Chapter

Current System Issue:

┌─────────────────────────────────────────────────────────────────┐
│                    Trading Core (In-Memory)                      │
│                                                                  │
│    Orders → Match → Trades → Settle → Balance Update             │
│       ↓         ↓           ↓                                   │
│      ❌         ❌           ❌    ← Data LOST on restart!       │
└─────────────────────────────────────────────────────────────────┘

This Chapter’s Solution:

┌─────────────────────────────────────────────────────────────────┐
│                    Trading Core                                  │
│                                                                  │
│    Orders → Match → Trades → Settle → Balance Update             │
│       ↓         ↓           ↓                                   │
│    ┌─────────────────────────────────────────────────┐          │
│    │              TDengine (Persistence)              │          │
│    │    orders | trades | balances                   │          │
│    └─────────────────────────────────────────────────┘          │
└─────────────────────────────────────────────────────────────────┘

1. Why TDengine?

Detailed comparison: Database Selection Analysis

Core Advantages

FeatureTDenginePostgreSQL
Write Speed1M/sec10k/sec
Time-SeriesNative SupportIndex Optimization Needed
Storage1/101x
Real-time AnalyticsBuilt-in StreamExternal Tools Needed
Rust Client✅ Official taostokio-postgres

2. Schema Design

2.1 Super Table Architecture

TDengine uses the Super Table concept:

┌─────────────────────────────────────────────────────────┐
│              Super Table: orders                         │
│    (Unified schema, auto-create sub-table per symbol)    │
├─────────────────┬─────────────────┬────────────────────┤
│ orders_1        │ orders_2        │ orders_N           │
│ (BTC_USDT)      │ (ETH_USDT)      │ (...)              │
└─────────────────┴─────────────────┴────────────────────┘

2.2 DDL Definitions

-- Database Setup
CREATE DATABASE IF NOT EXISTS trading 
    KEEP 365d              -- Retain data for 1 year
    DURATION 10d           -- Partition every 10 days
    BUFFER 256             -- 256MB Write Buffer
    WAL_LEVEL 2            -- WAL Persistence Level
    PRECISION 'us';        -- Microsecond Precision

USE trading;

-- Orders Super Table
CREATE STABLE IF NOT EXISTS orders (
    ts TIMESTAMP,               -- Timestamp (PK)
    order_id BIGINT UNSIGNED,
    user_id BIGINT UNSIGNED,
    side TINYINT UNSIGNED,      -- 0=BUY, 1=SELL
    order_type TINYINT UNSIGNED,-- 0=LIMIT, 1=MARKET
    price BIGINT UNSIGNED,      -- Integer representation
    qty BIGINT UNSIGNED,
    filled_qty BIGINT UNSIGNED,
    status TINYINT UNSIGNED,
    cid NCHAR(64)               -- Client Order ID
) TAGS (
    symbol_id INT UNSIGNED      -- Partition Key
);

-- Trades Super Table
CREATE STABLE IF NOT EXISTS trades (
    ts TIMESTAMP,
    trade_id BIGINT UNSIGNED,
    order_id BIGINT UNSIGNED,
    user_id BIGINT UNSIGNED,
    side TINYINT UNSIGNED,
    price BIGINT UNSIGNED,
    qty BIGINT UNSIGNED,
    fee BIGINT UNSIGNED,
    role TINYINT UNSIGNED       -- 0=MAKER, 1=TAKER
) TAGS (
    symbol_id INT UNSIGNED
);

-- Balances Super Table
CREATE STABLE IF NOT EXISTS balances (
    ts TIMESTAMP,
    avail BIGINT UNSIGNED,
    frozen BIGINT UNSIGNED,
    lock_version BIGINT UNSIGNED,
    settle_version BIGINT UNSIGNED
) TAGS (
    user_id BIGINT UNSIGNED,
    asset_id INT UNSIGNED
);

2.3 Status Enums

#![allow(unused)]
fn main() {
// New Enum
pub enum TradeRole {
    Maker = 0,
    Taker = 1,
}
}

3. API Design

3.1 Query Endpoints

EndpointMethodDescription
/api/v1/order/{order_id}GETQuery single order
/api/v1/ordersGETQuery order list
/api/v1/tradesGETQuery trade history
/api/v1/balancesGETQuery user balances

3.2 Request/Response Format

GET /api/v1/order/{order_id}:

{
    "code": 0,
    "msg": "ok",
    "data": {
        "order_id": 1001,
        "symbol": "BTC_USDT",
        "status": "PARTIALLY_FILLED",
        "filled_qty": "0.0005",
        "created_at": 1734533784000
    }
}

GET /api/v1/balances:

{
    "code": 0,
    "msg": "ok",
    "data": {
        "balances": [
             { "asset": "BTC", "avail": "1.50000000", "frozen": "0.10000000" }
        ]
    }
}

4. Implementation Architecture

4.1 Module Structure

src/
├── persistence/
│   ├── mod.rs              // Entry
│   ├── tdengine.rs         // Connection Manager
│   ├── orders.rs           // Order Persistence
│   ├── trades.rs           // Trade Persistence
│   └── balances.rs         // Balance Persistence

4.2 Data Flow

┌─────────────────────────────────────────────────────────────────┐
│                      Settlement Thread                           │
│                                                                  │
│    trade_queue.pop() ──┬── Update In-Memory Balance              │
│                        │                                         │
│                        └── Write to TDengine                     │
│                             ├── INSERT trades                    │
│                             ├── INSERT order_events              │
│                             └── INSERT balances (Snapshot)       │
└─────────────────────────────────────────────────────────────────┘

4.3 Batch Write Optimization

#![allow(unused)]
fn main() {
// Batch write to reduce I/O overhead
const BATCH_SIZE: usize = 1000;

async fn flush_trades(trades: Vec<Trade>) {
    let mut sql = String::from("INSERT INTO ");
    // Construct bulk insert SQL...
    client.exec(&sql).await;
}
}

5. Implementation Plan

Phase 1: Basic Persistence (This Chapter)

  • TDengine Connection
  • Schema Initialization
  • Trade/Order/Balance Writes

Phase 2: Query APIs

  • Implement GET Endpoints

Phase 3: Optimization

  • Batch Writes
  • Connection Pool
  • Redis Cache

6. Verification Plan

6.1 Integration Test

# 1. Start TDengine
docker run -d -p 6030:6030 -p 6041:6041 tdengine/tdengine:latest

# 2. Run Gateway
cargo run --release -- --gateway --port 8080

# 3. Submit Order
curl -X POST http://localhost:8080/api/v1/create_order ...

# 4. Query Order (Verify Persistence)
curl http://localhost:8080/api/v1/order/1

Summary

This chapter implements Settlement Persistence.

Core Philosophy:

Persistence is a side-channel operation, not blocking the main trading flow. The Settlement thread writes to TDengine asynchronously.




🇨🇳 中文

📦 代码变更: 查看 Diff

本节核心目标:将成交数据持久化到 TDengine,实现订单查询和历史记录 API。


背景:从内存到持久化

在 Gateway Phase 1 (0x09-a) 中,我们完成了:

  • ✅ HTTP API (create_order, cancel_order)
  • ✅ 订单验证和转换
  • ✅ Ring Buffer 队列集成
  • 数据持久化 ← 本章

当前系统的问题:

┌─────────────────────────────────────────────────────────────────┐
│                    Trading Core (内存中)                         │
│                                                                  │
│    Orders → 匹配 → Trades → 结算 → 余额更新                      │
│       ↓         ↓           ↓                                   │
│      ❌         ❌           ❌    ← 重启后数据丢失!              │
└─────────────────────────────────────────────────────────────────┘

本章解决方案:

┌─────────────────────────────────────────────────────────────────┐
│                    Trading Core                                  │
│                                                                  │
│    Orders → 匹配 → Trades → 结算 → 余额更新                      │
│       ↓         ↓           ↓                                   │
│    ┌─────────────────────────────────────────────────┐          │
│    │              TDengine (持久化)                   │          │
│    │    orders | trades | balances                   │          │
│    └─────────────────────────────────────────────────┘          │
└─────────────────────────────────────────────────────────────────┘

1. 为什么选择 TDengine

详细对比见: 数据库选型分析

核心优势

特性TDenginePostgreSQL
写入速度100万/秒1万/秒
时序查询原生支持需要索引优化
存储空间1/101x
实时分析内置流计算需要额外工具
Rust 客户端✅ 官方 taostokio-postgres

2. Schema 设计

2.1 Super Table 架构

TDengine 使用 Super Table 概念:

┌─────────────────────────────────────────────────────────┐
│              Super Table: orders                         │
│    (统一 schema,自动按 symbol_id 创建子表)               │
├─────────────────┬─────────────────┬────────────────────┤
│ orders_1        │ orders_2        │ orders_N           │
│ (BTC_USDT)      │ (ETH_USDT)      │ (...)              │
└─────────────────┴─────────────────┴────────────────────┘

2.2 DDL 定义

-- Database Setup
CREATE DATABASE IF NOT EXISTS trading 
    KEEP 365d              -- 数据保留 1 年
    DURATION 10d           -- 每 10 天一个分区
    BUFFER 256             -- 写缓冲 256MB
    WAL_LEVEL 2            -- WAL 持久化级别
    PRECISION 'us';        -- 微秒精度

USE trading;

-- Orders Super Table
CREATE STABLE IF NOT EXISTS orders (
    ts TIMESTAMP,               -- 订单时间戳 (主键)
    order_id BIGINT UNSIGNED,   -- 订单 ID
    user_id BIGINT UNSIGNED,    -- 用户 ID
    side TINYINT UNSIGNED,      -- 0=BUY, 1=SELL
    order_type TINYINT UNSIGNED,-- 0=LIMIT, 1=MARKET
    price BIGINT UNSIGNED,      -- 价格 (整数)
    qty BIGINT UNSIGNED,        -- 原始数量
    filled_qty BIGINT UNSIGNED, -- 已成交数量
    status TINYINT UNSIGNED,    -- 订单状态
    cid NCHAR(64)               -- 客户端订单 ID
) TAGS (
    symbol_id INT UNSIGNED      -- 交易对 ID (分区键)
);

-- Trades Super Table
CREATE STABLE IF NOT EXISTS trades (
    ts TIMESTAMP,               -- 成交时间戳
    trade_id BIGINT UNSIGNED,   -- 成交 ID
    order_id BIGINT UNSIGNED,   -- 订单 ID
    user_id BIGINT UNSIGNED,    -- 用户 ID
    side TINYINT UNSIGNED,      -- 0=BUY, 1=SELL
    price BIGINT UNSIGNED,      -- 成交价格
    qty BIGINT UNSIGNED,        -- 成交数量
    fee BIGINT UNSIGNED,        -- 手续费
    role TINYINT UNSIGNED       -- 0=MAKER, 1=TAKER
) TAGS (
    symbol_id INT UNSIGNED
);

-- Balances Super Table
CREATE STABLE IF NOT EXISTS balances (
    ts TIMESTAMP,               -- 快照时间
    avail BIGINT UNSIGNED,      -- 可用余额
    frozen BIGINT UNSIGNED,     -- 冻结余额
    lock_version BIGINT UNSIGNED,   -- 锁定版本
    settle_version BIGINT UNSIGNED  -- 结算版本
) TAGS (
    user_id BIGINT UNSIGNED,    -- 用户 ID
    asset_id INT UNSIGNED       -- 资产 ID
);

2.3 状态枚举

#![allow(unused)]
fn main() {
// 新增
pub enum TradeRole {
    Maker = 0,
    Taker = 1,
}
}

3. API 设计

3.1 查询端点

端点方法描述
/api/v1/order/{order_id}GET查询单个订单
/api/v1/ordersGET查询订单列表
/api/v1/tradesGET查询成交历史
/api/v1/balancesGET查询用户余额

3.2 请求/响应格式

GET /api/v1/order/{order_id}:

{
    "code": 0,
    "msg": "ok",
    "data": {
        "order_id": 1001,
        "symbol": "BTC_USDT",
        "status": "PARTIALLY_FILLED",
        "filled_qty": "0.0005",
        "created_at": 1734533784000
    }
}

4. 实现架构

4.1 模块结构

src/
├── persistence/
│   ├── mod.rs              // 模块入口
│   ├── tdengine.rs         // TDengine 连接管理
│   ├── orders.rs           // 订单持久化
│   ├── trades.rs           // 成交持久化
│   └── balances.rs         // 余额持久化

4.2 数据流

┌─────────────────────────────────────────────────────────────────┐
│                      Settlement 线程                             │
│                                                                  │
│    trade_queue.pop() ──┬── 更新内存余额                          │
│                        │                                         │
│                        └── 写入 TDengine                         │
│                             ├── INSERT trades                    │
│                             ├── INSERT order_events              │
│                             └── INSERT balances (快照)           │
└─────────────────────────────────────────────────────────────────┘

4.3 批量写入优化

#![allow(unused)]
fn main() {
// 批量写入,减少 I/O 开销
const BATCH_SIZE: usize = 1000;

async fn flush_trades(trades: Vec<Trade>) {
    let mut sql = String::from("INSERT INTO ");
    // ... 构建批量插入 SQL
    client.exec(&sql).await;
}
}

5. 实现计划

Phase 1: 基础持久化 (本次)

  • TDengine 连接管理
  • Schema 初始化
  • 成交/订单/余额写入

Phase 2: 查询接口

  • 实现 GET 端点

Phase 3: 优化

  • 批量写入
  • 连接池
  • Redis 缓存

6. 验证计划

6.1 集成测试

# 1. 启动 TDengine
docker run -d -p 6030:6030 -p 6041:6041 tdengine/tdengine:latest

# 2. 运行 Gateway
cargo run --release -- --gateway --port 8080

# 3. 提交订单
curl -X POST http://localhost:8080/api/v1/create_order ...

# 4. 查询订单 (验证持久化)
curl http://localhost:8080/api/v1/order/1

Summary

本章实现 Settlement Persistence:

核心理念

持久化是旁路操作,不阻塞主交易流程。Trading Core 保持高性能,Settlement 线程异步写入 TDengine。

下一章 (0x09-c) 将实现 WebSocket 实时推送。