0x09-d K-Line Aggregation Service
🇺🇸 English | 🇨🇳 中文
🇺🇸 English
📦 Code Changes: View Diff
Core Objective: Implement real-time K-Line (Candlestick) aggregation service, supporting multiple intervals (1m, 5m, 15m, 30m, 1h, 1d).
Background: Market Data Aggregation
The exchange needs to provide standardized market data:
Trades K-Line (OHLCV)
│ │
├── Trade 1: price=30000, qty=0.1 │
├── Trade 2: price=30100, qty=0.2 ──▶ 1-Min K-Line:
├── Trade 3: price=29900, qty=0.1 │ Open: 30000
└── Trade 4: price=30050, qty=0.3 │ High: 30100
│ Low: 29900
│ Close: 30050
│ Volume: 0.7
1. K-Line Data Structure
1.1 OHLCV
#![allow(unused)]
fn main() {
pub struct KLine {
pub symbol_id: u32,
pub interval: KLineInterval,
pub open_time: u64, // Unix timestamp (ms)
pub close_time: u64,
pub open: u64,
pub high: u64,
pub low: u64,
pub close: u64,
pub volume: u64, // Base asset volume
pub quote_volume: u64, // Quote asset volume (price * qty)
pub trade_count: u32,
}
}
Warning
quote_volume Overflow:
price * qtymight overflowu64.Correct SQL:
SUM(CAST(price AS DOUBLE) * CAST(qty AS DOUBLE)) AS quote_volume
1.2 API Response Format
{
"symbol": "BTC_USDT",
"interval": "1m",
"open_time": 1734533760000,
"close_time": 1734533819999,
"open": "30000.00",
"high": "30100.00",
"low": "29900.00",
"close": "30050.00",
"volume": "0.700000",
"quote_volume": "21035.00",
"trade_count": 4
}
2. Architecture: TDengine Stream Computing
2.1 Core Concept
Leverage TDengine built-in Stream Computing for auto-aggregation. No manual aggregator implementation needed:
- Settlement writes to
tradestable. - TDengine automatically triggers stream computing.
- Results are written to
klinestables. - HTTP API queries
klinestables directly.
2.2 Data Flow
Settlement ──▶ trades table (TDengine)
│
│ TDengine Stream Computing (Auto)
│
├─── kline_1m_stream ──► klines_1m table
├─── kline_5m_stream ──► klines_5m table
└─── ...
│
┌────────────────────────┴───────────────────────┐
▼ ▼
HTTP API WebSocket Push
GET /api/v1/klines kline.update (Optional)
2.3 TDengine Stream Example
CREATE STREAM IF NOT EXISTS kline_1m_stream
INTO klines_1m SUBTABLE(CONCAT('kl_1m_', CAST(symbol_id AS NCHAR(10))))
AS SELECT
_wstart AS ts,
FIRST(price) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST(price) AS close,
SUM(qty) AS volume,
SUM(CAST(price AS DOUBLE) * CAST(qty AS DOUBLE)) AS quote_volume,
COUNT(*) AS trade_count
FROM trades
PARTITION BY symbol_id
INTERVAL(1m);
3. API Design
3.1 HTTP Endpoint
GET /api/v1/klines?symbol=BTC_USDT&interval=1m&limit=100
3.2 WebSocket Push
{
"type": "kline.update",
"data": {
"symbol": "BTC_USDT",
"interval": "1m",
"open": "30000.00",
"close": "30050.00",
"is_final": false
}
}
4. Module Structure
src/
├── persistence/
│ ├── klines.rs # Create Streams, Query K-Lines
│ ├── schema.rs # Add klines Super Table
│ └── queries.rs # Add query_klines()
├── gateway/
│ ├── handlers.rs # Add get_klines
│ └── ...
Tip
No need for
src/kline/logic directory, TDengine handles it.
5. Implementation Plan
- Phase 1: Schema: Add
klinessuper table. - Phase 2: Stream Computing: Implement
create_kline_streams(). - Phase 3: HTTP API: Implement
query_klines()and API endpoint. - Phase 4: Verification: E2E test.
6. Verification
6.1 E2E Test Scenarios
Script: ./scripts/test_kline_e2e.sh
- Check API connectivity.
- Record initial K-Line count.
- Create matched orders.
- Wait for Stream processing (5s).
- Query K-Line API and verify data structure.
6.2 Binance Standard Alignment
Warning
P0 Fix: Ensure time fields align with Binance standard (Unix Milliseconds Number).
open_time: 1734611580000 (was ISO 8601 string)close_time: 1734611639999 (was missing)
Summary
This chapter implements K-Line aggregation service leveraging TDengine’s Stream Computing.
Key Concept:
K-Line is derived data. We calculate it from trades in real-time, rather than storing original raw data.
Next Chapter: 0x09-e OrderBook Depth.
🇨🇳 中文
📦 代码变更: 查看 Diff
本节核心目标:实现 K-Line (蜡烛图) 实时聚合服务,支持多时间周期 (1m, 5m, 15m, 30m, 1h, 1d)。
背景:行情数据聚合
交易所需要提供标准化的行情数据:
每笔成交 K-Line (OHLCV)
│ │
├── Trade 1: price=30000, qty=0.1 │
├── Trade 2: price=30100, qty=0.2 ──▶ 1分钟 K-Line:
├── Trade 3: price=29900, qty=0.1 │ Open: 30000
└── Trade 4: price=30050, qty=0.3 │ High: 30100
│ Low: 29900
│ Close: 30050
│ Volume: 0.7
1. K-Line 数据结构
1.1 OHLCV
#![allow(unused)]
fn main() {
pub struct KLine {
pub symbol_id: u32,
pub interval: KLineInterval,
pub open_time: u64, // 时间戳 (毫秒)
pub close_time: u64,
pub open: u64, // 开盘价
pub high: u64, // 最高价
pub low: u64, // 最低价
pub close: u64, // 收盘价
pub volume: u64, // 成交量 (base asset)
pub quote_volume: u64, // 成交额 (quote asset)
pub trade_count: u32, // 成交笔数
}
}
Warning
quote_volume 精度问题:
price * qty可能导致 u64 溢出,需使用 DOUBLE 计算。
1.2 API 响应格式
{
"symbol": "BTC_USDT",
"interval": "1m",
"open_time": 1734533760000,
"close_time": 1734533819999,
"open": "30000.00",
"high": "30100.00",
"low": "29900.00",
"close": "30050.00",
"volume": "0.700000",
"quote_volume": "21035.00",
"trade_count": 4
}
2. 架构设计:TDengine Stream Computing
2.1 核心思路
利用 TDengine 内置流计算自动聚合 K-Line,无需手动实现聚合器:
- Settlement 写入
trades表后,TDengine 自动触发流计算 - 流计算结果自动写入
klines表 - HTTP API 直接查询
klines表返回结果
2.2 数据流
Settlement ──▶ trades 表 (TDengine)
│
│ TDengine Stream Computing (自动)
│
├─── kline_1m_stream ──► klines_1m 表
├─── kline_5m_stream ──► klines_5m 表
└─── ...
2.3 TDengine Stream 示例
CREATE STREAM IF NOT EXISTS kline_1m_stream
INTO klines_1m SUBTABLE(...)
AS SELECT
_wstart AS ts,
FIRST(price) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST(price) AS close,
SUM(qty) AS volume,
SUM(CAST(price AS DOUBLE) * CAST(qty AS DOUBLE)) AS quote_volume,
COUNT(*) AS trade_count
FROM trades
PARTITION BY symbol_id
INTERVAL(1m);
3. API 设计
HTTP 端点: GET /api/v1/klines?symbol=BTC_USDT&interval=1m&limit=100
4. 模块结构
src/
├── persistence/
│ ├── klines.rs # Create Stream, Query K-Line
│ ├── schema.rs # Add klines table
│ └── queries.rs # Add query_klines()
├── gateway/
│ ├── handlers.rs # Add get_klines
Tip
无需
src/kline/目录,TDengine 流计算替代了手动聚合逻辑
5. 实现计划
- Phase 1: Schema: 添加
klines超级表。 - Phase 2: Stream Computing: 实现
create_kline_streams()。 - Phase 3: HTTP API: 实现查询函数和 API 端点。
- Phase 4: 验证: E2E 测试。
6. 验证计划
运行脚本 ./scripts/test_kline_e2e.sh 验证:
- API 连通性
- K-Line 数据生成 (Stream 处理)
- 响应结构正确性 (对齐 Binance 标准)
Summary
本章实现 K-Line 聚合服务。
核心理念:
K-Line 是衍生数据:从成交事件实时计算,而非存储原始数据。
下一章 (0x09-e) 将实现 OrderBook Depth 聚合。