实时全推1分钟K线接口 API 文档

1. 接口概述

本文档描述 实时全推1分钟K线接口 的使用规范,该接口基于 原生 TCP 协议,用于接收股票全量推送数据,支持实时获取股票行情相关信息,适用于需要批量、实时同步股票数据的业务场景。

2. 接口基础信息

参数名称 参数值 说明
接口协议 原生 TCP 协议 采用 TCP 可靠传输,保障数据完整性
服务地址 (host) test.chagubang.com 接口服务端域名,需确保客户端可正常访问
端口 (port) 48989 TCP 连接端口,固定不变
认证令牌 (token) mushuju 客户端身份认证凭证,必填,需原样发送
字节序 小端字节序 数据传输及解析需遵循小端字节序规则

3. 连接与数据接收流程

3.1 连接建立

客户端需使用标准 TCP 客户端工具/代码,与服务端指定 hostport 建立 TCP 连接,确保网络通畅,无防火墙拦截该端口的 TCP 通信。

3.2 身份认证

TCP 连接建立成功后,客户端需立即向服务端发送指定 token,完成身份认证。认证通过后,服务端将开始向客户端推送全量数据;认证失败则会断开 TCP 连接。

3.3 数据接收

认证通过后,客户端持续监听 TCP 连接的输入流,接收服务端推送的数据。数据传输遵循以下规则:每一条完整数据的前4个字节为该条数据的长度(按小端字节序解析),后续字节为实际业务数据。

4. 数据格式说明

4.1 数据传输格式

单条数据采用 JSON 格式字符串,编码格式默认 UTF-8;数据长度字段(前4字节)为无符号整数,小端字节序。

4.2 返回数据示例

{"open": 10.84, "close": 10.84, "high": 10.85, "low": 10.84, "volume": 1018, "amount": 1103600.0, "time": 1773982977000, "stock_code": "000001.SZ"}

4.3 字段解析

数据字段解析如下,共 8 个字段:

序号 字段名称 数据类型 说明
1open浮点数开盘价
2high浮点数最高价
3close浮点数收盘价
4low浮点数最低价
5amount浮点数成交额
6volume整数成交量
7time整数时间戳
8stock_code字符串股票代码

5. 注意事项

  • 字节序规范:务必按照小端字节序解析数据长度字段,否则会导致数据解析失败。
  • Token 使用:Token 为接口访问唯一凭证,需妥善保管,禁止泄露;若 Token 失效,需联系接口提供方更新。
  • 连接稳定性:若 TCP 连接断开,客户端需重新建立连接并发送 Token,方可恢复数据接收。
  • 数据处理:建议客户端对接收的数据进行校验(如字段数量、数据格式),避免因异常数据导致业务异常。
  • 编码规范:数据字符串默认采用 UTF-8 编码,解析时需统一编码格式,防止中文乱码。

6. 异常处理

异常场景 现象描述 处理建议
连接失败 无法与服务端建立 TCP 连接 检查网络状态,确认端口放行,确认 host 及 port 无误
认证失败 发送 token 后连接被断开 检查 token 是否正确,确认 token 未失效
数据解析失败 解析出的字段数量不符或数据格式错误 检查字节序解析方式、数据是否为合法的 JSON 格式、编码格式是否为 UTF-8
连接断开 正常接收数据过程中连接中断 客户端实现重连机制,重新建立连接并完成认证

7. 客户端接入示例 (Python)

为方便开发者快速接入,以下提供两份基于 Python 的客户端接入代码示例。开发者可根据实际业务量级选择合适的方案。

7.1 基础接收示例(无缓存)

适用场景: 适用于本地开发调试、数据连通性测试,或下游系统自行处理数据的轻量级场景。代码实现了标准的 TCP 建立、Token 鉴权以及基于小端字节序拆包的核心逻辑。

import socket
import struct
import threading
import queue
import time
import sys

# === 配置参数 ===
HOST = 'test.chagubang.com'
PORT = 48989
TOKEN = 'mushuju'
# 队列最大容量,防止消费者挂掉导致内存撑爆(OOM)
# 100000 条行情数据大约占用几十MB内存
MAX_QUEUE_SIZE = 100000
def optimized_recvall(sock, n):
    """优化后的 recvall:使用列表推导和 join,减少 bytearray 的内存重分配开销"""
    chunks = []
    bytes_recd = 0
    while bytes_recd < n:
        chunk = sock.recv(min(n - bytes_recd, 4096))
        if not chunk:
            return None
        chunks.append(chunk)
        bytes_recd += len(chunk)
    return b''.join(chunks)


def receive_message(sock):
    """读取单条完整消息"""
    raw_msglen = optimized_recvall(sock, 4)
    if not raw_msglen:
        return None
    msglen = struct.unpack('<I', raw_msglen)[0]
    return optimized_recvall(sock, msglen)


def network_receiver(sock, data_queue):
    """
    【生产者线程】
    职责:以最高速度从网卡读取数据,不进行任何耗时操作(如解码、打印),直接塞入队列。
    """
    print("Receiver thread started.")
    while True:
        try:
            message = receive_message(sock)
            if message is None:
                print("\n[Network] Connection closed by server")
                # 放入一个特殊信号告诉消费者停止
                data_queue.put(None)
                break

            # 使用 put_nowait。如果队列满了,说明处理速度严重落后于接收速度
            try:
                data_queue.put_nowait(message)
            except queue.Full:
                # 极端情况下:队列满了!
                # 为了不阻塞TCP,只能丢弃当前数据,并报警。
                # 在真实高频交易中,一旦发生这种情况说明系统性能出现瓶颈。
                print("\n[Warning] Queue is FULL! Dropping packets to prevent TCP blocking!", file=sys.stderr)
        except Exception as e:
            print(f"\n[Network] Error: {e}")
            data_queue.put(None)
            break
def data_processor(data_queue):
    """
    【消费者线程】
    职责:从队列中获取数据进行解码、业务处理。
    """
    print("Processor thread started.")
    count = 0
    start_time = time.time()

    while True:
        message = data_queue.get()  # 阻塞等待数据
        if message is None:  # 收到结束信号
            break
        try:
            decoded_message = message.decode('utf-8')
            # --- 在这里进行你的业务逻辑处理 ---
            # 例如:写入数据库,计算指标等
            # 不要每一根行情都 print,改为每隔 X 条打印一次统计信息
            count += 1
            if count % 1 == 0:
                elapsed = time.time() - start_time
                qps = 10000 / elapsed
                #print(f"[Stats] Processed {count} msgs, Queue size: {data_queue.qsize()}, Speed: {qps:.2f} msgs/sec")
                print(f"Sample data: {decoded_message}") # 打印截断的样本以供调试
                start_time = time.time()

        except UnicodeDecodeError:
            print(f"[Error] Received non-UTF-8 data length: {len(message)}")
        except Exception as e:
            print(f"[Error] Processing error: {e}")
def main():
    # 创建线程安全的队列
    data_queue = queue.Queue(maxsize=MAX_QUEUE_SIZE)
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        # 优化点 1:增大 TCP 接收缓冲区大小到 8MB (根据操作系统可能需要调整)
        try:
            s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 8 * 1024 * 1024)
        except Exception as e:
            print(f"Warning: Could not set SO_RCVBUF: {e}")
        s.connect((HOST, PORT))
        print(f"Connected to {HOST}:{PORT}")
        # 发送鉴权
        s.sendall(TOKEN.encode('utf-8'))
        print(f"Sent: {TOKEN}")
        # 启动消费者线程
        processor = threading.Thread(target=data_processor, args=(data_queue,))
        processor.daemon = True  # 主线程退出时自动结束
        processor.start()
        # 启动生产者 (网络接收)
        # 这里直接使用主线程作为接收者,也可以单开线程
        network_receiver(s, data_queue)

if __name__ == "__main__":
    main()
	

7.2 高性能接收示例(配合 Redis 缓存)

适用场景: 适用于生产环境、海量高频数据推送的业务场景。该方案结合了 Redis 连接池Pipeline (管道) 批量提交技术,并配合后台定时任务,大幅降低了网络与存储 IO 开销,保障消费端不会因处理缓慢而产生背压。
import socket
import struct
import json
import redis
import schedule
import time
import threading
from datetime import datetime

# --- 1. Redis 连接池设置 ---
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
REDIS_PASSWORD = ''

try:
    # 创建Redis连接池
    pool = redis.ConnectionPool(
        host=REDIS_HOST,
        port=REDIS_PORT,
        password=REDIS_PASSWORD,
        decode_responses=True,   # 自动将响应从bytes解码为str
        health_check_interval=30 # 定期检查连接健康状况
    )
    redis_client = redis.Redis(connection_pool=pool)
    redis_client.ping()
    print("Successfully connected to Redis.")
except redis.exceptions.ConnectionError as e:
    print(f"Error connecting to Redis: {e}")
    exit()

# --- 线程间共享数据与安全锁 ---
pipeline_lock = threading.Lock()
redis_batch_context = {
    "pipe": redis_client.pipeline(),
    "count": 0
}

# --- 3. 定时提交任务 ---
def submit_pipe_job():
    """ 调度器定时调用的函数,用于提交当前 pipeline 中剩余的命令 """
    with pipeline_lock:
        if redis_batch_context["count"] > 0:
            current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            print(f"[{current_time}] Scheduled submission: Submitting batch of {redis_batch_context['count']} messages...")
            try:
                redis_batch_context["pipe"].execute()
                print("Scheduled batch submitted successfully.")
            except Exception as e:
                print(f"Error during scheduled pipe execution: {e}")
            finally:
                redis_batch_context["pipe"] = redis_client.pipeline()
                redis_batch_context["count"] = 0

def run_scheduler():
    """ 在后台线程中运行调度器 """
    # 设置每天 15:00:10 触发收尾提交
    schedule.every().day.at("15:00:10").do(submit_pipe_job)
    print("Scheduler configured to run every day at 15:00:10.")
    while True:
        schedule.run_pending()
        time.sleep(1)

# --- 核心接收函数 ---
def receive_message(sock):
    raw_msglen = recvall(sock, 4)
    if not raw_msglen:
        return None
    msglen = struct.unpack('<I', raw_msglen)[0]
    return recvall(sock, msglen)

def recvall(sock, n):
    data = bytearray()
    while len(data) < n:
        packet = sock.recv(n - len(data))
        if not packet:
            return None
        data.extend(packet)
    return data

def main():
    host = 'test.chagubang.com'
    port = 48989
    
    # 启动后台调度线程
    scheduler_thread = threading.Thread(target=run_scheduler, daemon=True)
    scheduler_thread.start()

    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.connect((host, port))
            print(f"Connected to {host}:{port}")

            token = 'mushuju'
            s.sendall(token.encode('utf-8'))
            print(f"Sent token.")
            
            batch_size = 2000

            while True:
                message = receive_message(s)
                if message is None:
                    print("Connection closed by server")
                    break
                try:
                    decoded_message = message.decode('utf-8')

                    # --- 2. 使用 Redis Pipeline 批量提交 ---
                    try:
                        msg_dict = json.loads(decoded_message)
                        # 解析JSON提取股票代码作为 Redis Key
                        key = msg_dict.get("stock_code")
                        if key:
                            value = decoded_message
                            
                            with pipeline_lock:
                                redis_batch_context["pipe"].set(key, value)
                                redis_batch_context["count"] += 1
                                
                                # 每攒满 batch_size 条提交一次,减少网络IO
                                if redis_batch_context["count"] >= batch_size:
                                    try:
                                        redis_batch_context["pipe"].execute()
                                    except Exception as e:
                                        print(f"Error batch execution: {e}")
                                    finally:
                                        redis_batch_context["pipe"] = redis_client.pipeline()
                                        redis_batch_context["count"] = 0
                        else:
                            print(f"Warning: Message missing 'stock_code', skipping: '{decoded_message}'")
                    except json.JSONDecodeError:
                        print(f"Warning: Invalid JSON data, skipping: '{decoded_message}'")

                except UnicodeDecodeError:
                    print(f"Received non-UTF-8 data.")
    
    except KeyboardInterrupt:
        print("\nProgram interrupted by user.")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        print("Exiting program. Submitting remaining messages in pipeline...")
        submit_pipe_job()
        print("Program finished.")

if __name__ == "__main__":
    main()