实时全推1分钟K线接口 API 文档
1. 接口概述
本文档描述 实时全推1分钟K线接口 的使用规范,该接口基于 原生 TCP 协议,用于接收股票全量推送数据,支持实时获取股票行情相关信息,适用于需要批量、实时同步股票数据的业务场景。
2. 接口基础信息
| 参数名称 | 参数值 | 说明 |
|---|---|---|
| 接口协议 | 原生 TCP 协议 | 采用 TCP 可靠传输,保障数据完整性 |
| 服务地址 (host) | test.chagubang.com | 接口服务端域名,需确保客户端可正常访问 |
| 端口 (port) | 48989 | TCP 连接端口,固定不变 |
| 认证令牌 (token) | mushuju | 客户端身份认证凭证,必填,需原样发送 |
| 字节序 | 小端字节序 | 数据传输及解析需遵循小端字节序规则 |
3. 连接与数据接收流程
3.1 连接建立
客户端需使用标准 TCP 客户端工具/代码,与服务端指定 host 和 port 建立 TCP 连接,确保网络通畅,无防火墙拦截该端口的 TCP 通信。
3.2 身份认证
TCP 连接建立成功后,客户端需立即向服务端发送指定 token,完成身份认证。认证通过后,服务端将开始向客户端推送全量数据;认证失败则会断开 TCP 连接。
3.3 数据接收
认证通过后,客户端持续监听 TCP 连接的输入流,接收服务端推送的数据。数据传输遵循以下规则:每一条完整数据的前4个字节为该条数据的长度(按小端字节序解析),后续字节为实际业务数据。
4. 数据格式说明
4.1 数据传输格式
单条数据采用 JSON 格式字符串,编码格式默认 UTF-8;数据长度字段(前4字节)为无符号整数,小端字节序。
4.2 返回数据示例
{"open": 10.84, "close": 10.84, "high": 10.85, "low": 10.84, "volume": 1018, "amount": 1103600.0, "time": 1773982977000, "stock_code": "000001.SZ"}
4.3 字段解析
数据字段解析如下,共 8 个字段:
| 序号 | 字段名称 | 数据类型 | 说明 |
|---|---|---|---|
| 1 | open | 浮点数 | 开盘价 |
| 2 | high | 浮点数 | 最高价 |
| 3 | close | 浮点数 | 收盘价 |
| 4 | low | 浮点数 | 最低价 |
| 5 | amount | 浮点数 | 成交额 |
| 6 | volume | 整数 | 成交量 |
| 7 | time | 整数 | 时间戳 |
| 8 | stock_code | 字符串 | 股票代码 |
5. 注意事项
- 字节序规范:务必按照小端字节序解析数据长度字段,否则会导致数据解析失败。
- Token 使用:Token 为接口访问唯一凭证,需妥善保管,禁止泄露;若 Token 失效,需联系接口提供方更新。
- 连接稳定性:若 TCP 连接断开,客户端需重新建立连接并发送 Token,方可恢复数据接收。
- 数据处理:建议客户端对接收的数据进行校验(如字段数量、数据格式),避免因异常数据导致业务异常。
- 编码规范:数据字符串默认采用 UTF-8 编码,解析时需统一编码格式,防止中文乱码。
6. 异常处理
| 异常场景 | 现象描述 | 处理建议 |
|---|---|---|
| 连接失败 | 无法与服务端建立 TCP 连接 | 检查网络状态,确认端口放行,确认 host 及 port 无误 |
| 认证失败 | 发送 token 后连接被断开 | 检查 token 是否正确,确认 token 未失效 |
| 数据解析失败 | 解析出的字段数量不符或数据格式错误 | 检查字节序解析方式、数据是否为合法的 JSON 格式、编码格式是否为 UTF-8 |
| 连接断开 | 正常接收数据过程中连接中断 | 客户端实现重连机制,重新建立连接并完成认证 |
7. 客户端接入示例 (Python)
为方便开发者快速接入,以下提供两份基于 Python 的客户端接入代码示例。开发者可根据实际业务量级选择合适的方案。
7.1 基础接收示例(无缓存)
适用场景: 适用于本地开发调试、数据连通性测试,或下游系统自行处理数据的轻量级场景。代码实现了标准的 TCP 建立、Token 鉴权以及基于小端字节序拆包的核心逻辑。
import socket
import struct
import threading
import queue
import time
import sys
# === 配置参数 ===
HOST = 'test.chagubang.com'
PORT = 48989
TOKEN = 'mushuju'
# 队列最大容量,防止消费者挂掉导致内存撑爆(OOM)
# 100000 条行情数据大约占用几十MB内存
MAX_QUEUE_SIZE = 100000
def optimized_recvall(sock, n):
"""优化后的 recvall:使用列表推导和 join,减少 bytearray 的内存重分配开销"""
chunks = []
bytes_recd = 0
while bytes_recd < n:
chunk = sock.recv(min(n - bytes_recd, 4096))
if not chunk:
return None
chunks.append(chunk)
bytes_recd += len(chunk)
return b''.join(chunks)
def receive_message(sock):
"""读取单条完整消息"""
raw_msglen = optimized_recvall(sock, 4)
if not raw_msglen:
return None
msglen = struct.unpack('<I', raw_msglen)[0]
return optimized_recvall(sock, msglen)
def network_receiver(sock, data_queue):
"""
【生产者线程】
职责:以最高速度从网卡读取数据,不进行任何耗时操作(如解码、打印),直接塞入队列。
"""
print("Receiver thread started.")
while True:
try:
message = receive_message(sock)
if message is None:
print("\n[Network] Connection closed by server")
# 放入一个特殊信号告诉消费者停止
data_queue.put(None)
break
# 使用 put_nowait。如果队列满了,说明处理速度严重落后于接收速度
try:
data_queue.put_nowait(message)
except queue.Full:
# 极端情况下:队列满了!
# 为了不阻塞TCP,只能丢弃当前数据,并报警。
# 在真实高频交易中,一旦发生这种情况说明系统性能出现瓶颈。
print("\n[Warning] Queue is FULL! Dropping packets to prevent TCP blocking!", file=sys.stderr)
except Exception as e:
print(f"\n[Network] Error: {e}")
data_queue.put(None)
break
def data_processor(data_queue):
"""
【消费者线程】
职责:从队列中获取数据进行解码、业务处理。
"""
print("Processor thread started.")
count = 0
start_time = time.time()
while True:
message = data_queue.get() # 阻塞等待数据
if message is None: # 收到结束信号
break
try:
decoded_message = message.decode('utf-8')
# --- 在这里进行你的业务逻辑处理 ---
# 例如:写入数据库,计算指标等
# 不要每一根行情都 print,改为每隔 X 条打印一次统计信息
count += 1
if count % 1 == 0:
elapsed = time.time() - start_time
qps = 10000 / elapsed
#print(f"[Stats] Processed {count} msgs, Queue size: {data_queue.qsize()}, Speed: {qps:.2f} msgs/sec")
print(f"Sample data: {decoded_message}") # 打印截断的样本以供调试
start_time = time.time()
except UnicodeDecodeError:
print(f"[Error] Received non-UTF-8 data length: {len(message)}")
except Exception as e:
print(f"[Error] Processing error: {e}")
def main():
# 创建线程安全的队列
data_queue = queue.Queue(maxsize=MAX_QUEUE_SIZE)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# 优化点 1:增大 TCP 接收缓冲区大小到 8MB (根据操作系统可能需要调整)
try:
s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 8 * 1024 * 1024)
except Exception as e:
print(f"Warning: Could not set SO_RCVBUF: {e}")
s.connect((HOST, PORT))
print(f"Connected to {HOST}:{PORT}")
# 发送鉴权
s.sendall(TOKEN.encode('utf-8'))
print(f"Sent: {TOKEN}")
# 启动消费者线程
processor = threading.Thread(target=data_processor, args=(data_queue,))
processor.daemon = True # 主线程退出时自动结束
processor.start()
# 启动生产者 (网络接收)
# 这里直接使用主线程作为接收者,也可以单开线程
network_receiver(s, data_queue)
if __name__ == "__main__":
main()
7.2 高性能接收示例(配合 Redis 缓存)
适用场景: 适用于生产环境、海量高频数据推送的业务场景。该方案结合了 Redis 连接池 与 Pipeline (管道) 批量提交技术,并配合后台定时任务,大幅降低了网络与存储 IO 开销,保障消费端不会因处理缓慢而产生背压。
import socket
import struct
import json
import redis
import schedule
import time
import threading
from datetime import datetime
# --- 1. Redis 连接池设置 ---
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
REDIS_PASSWORD = ''
try:
# 创建Redis连接池
pool = redis.ConnectionPool(
host=REDIS_HOST,
port=REDIS_PORT,
password=REDIS_PASSWORD,
decode_responses=True, # 自动将响应从bytes解码为str
health_check_interval=30 # 定期检查连接健康状况
)
redis_client = redis.Redis(connection_pool=pool)
redis_client.ping()
print("Successfully connected to Redis.")
except redis.exceptions.ConnectionError as e:
print(f"Error connecting to Redis: {e}")
exit()
# --- 线程间共享数据与安全锁 ---
pipeline_lock = threading.Lock()
redis_batch_context = {
"pipe": redis_client.pipeline(),
"count": 0
}
# --- 3. 定时提交任务 ---
def submit_pipe_job():
""" 调度器定时调用的函数,用于提交当前 pipeline 中剩余的命令 """
with pipeline_lock:
if redis_batch_context["count"] > 0:
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"[{current_time}] Scheduled submission: Submitting batch of {redis_batch_context['count']} messages...")
try:
redis_batch_context["pipe"].execute()
print("Scheduled batch submitted successfully.")
except Exception as e:
print(f"Error during scheduled pipe execution: {e}")
finally:
redis_batch_context["pipe"] = redis_client.pipeline()
redis_batch_context["count"] = 0
def run_scheduler():
""" 在后台线程中运行调度器 """
# 设置每天 15:00:10 触发收尾提交
schedule.every().day.at("15:00:10").do(submit_pipe_job)
print("Scheduler configured to run every day at 15:00:10.")
while True:
schedule.run_pending()
time.sleep(1)
# --- 核心接收函数 ---
def receive_message(sock):
raw_msglen = recvall(sock, 4)
if not raw_msglen:
return None
msglen = struct.unpack('<I', raw_msglen)[0]
return recvall(sock, msglen)
def recvall(sock, n):
data = bytearray()
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data.extend(packet)
return data
def main():
host = 'test.chagubang.com'
port = 48989
# 启动后台调度线程
scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
print(f"Connected to {host}:{port}")
token = 'mushuju'
s.sendall(token.encode('utf-8'))
print(f"Sent token.")
batch_size = 2000
while True:
message = receive_message(s)
if message is None:
print("Connection closed by server")
break
try:
decoded_message = message.decode('utf-8')
# --- 2. 使用 Redis Pipeline 批量提交 ---
try:
msg_dict = json.loads(decoded_message)
# 解析JSON提取股票代码作为 Redis Key
key = msg_dict.get("stock_code")
if key:
value = decoded_message
with pipeline_lock:
redis_batch_context["pipe"].set(key, value)
redis_batch_context["count"] += 1
# 每攒满 batch_size 条提交一次,减少网络IO
if redis_batch_context["count"] >= batch_size:
try:
redis_batch_context["pipe"].execute()
except Exception as e:
print(f"Error batch execution: {e}")
finally:
redis_batch_context["pipe"] = redis_client.pipeline()
redis_batch_context["count"] = 0
else:
print(f"Warning: Message missing 'stock_code', skipping: '{decoded_message}'")
except json.JSONDecodeError:
print(f"Warning: Invalid JSON data, skipping: '{decoded_message}'")
except UnicodeDecodeError:
print(f"Received non-UTF-8 data.")
except KeyboardInterrupt:
print("\nProgram interrupted by user.")
except Exception as e:
print(f"An unexpected error occurred: {e}")
finally:
print("Exiting program. Submitting remaining messages in pipeline...")
submit_pipe_job()
print("Program finished.")
if __name__ == "__main__":
main()