Hello World

吞风吻雨葬落日 欺山赶海踏雪径

0%

某网站 Chat2API

某网站是一个 AI 对话平台,提供了类似 ChatGPT 的对话能力。通过抓包分析,发现其 API 使用 gRPC-Web 协议进行通信,于是用 Python 实现了一个完整的客户端,并提供了 OpenAI 协议兼容的 HTTP 服务。

gRPC-Web 简介

gRPC-Web 是 gRPC 在浏览器端的实现,允许 Web 应用直接调用 gRPC 服务,无需通过 HTTP/JSON 中间层。

为什么用 gRPC-Web

相比传统的 REST API,gRPC-Web 有以下优势:

特性 REST API gRPC-Web
序列化 JSON (文本) Protobuf (二进制)
数据大小 较大 较小 (节省 30-50%)
解析速度
类型安全 强 (Protobuf Schema)
流式支持 SSE/WebSocket 原生支持

gRPC-Web 协议格式

gRPC-Web 的请求体采用特殊的二进制格式:

1
2
3
4
5
+-------------------+----------------------+----------------------+
| 压缩标志 (1字节) | 消息长度 (4字节) | 消息体 (N字节) |
| 0x00 = 未压缩 | 大端序整数 | JSON 或 Protobuf |
| 0x01 = 压缩 | | |
+-------------------+----------------------+----------------------+

示例: 假设消息体是 {"hello": "world"} (17 字节)

1
2
3
00 00 00 00 11 7B 22 68 65 6C 6C 6F 22 3A 20 22 77 6F 72 6C 64 22 7D
^^ ^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
未压缩 长度=17 {"hello": "world"}

gRPC-Web vs 原生 gRPC

方面 原生 gRPC gRPC-Web
传输协议 HTTP/2 HTTP/1.1 或 HTTP/2
浏览器支持 需要代理 原生支持
双向流 支持 仅服务端流
压缩 内置 需手动处理

抓包分析过程

第一步:观察网络请求

打开浏览器访问 https://www.xxx.com,登录后打开开发者工具 (F12):

  1. 切换到 Network (网络) 标签
  2. 在输入框发送一条消息
  3. 观察出现的网络请求

发现了几个关键请求:

1
2
POST /api/agent/capy.agent.v1.AgentService/CreateChatSession
POST /api/agent/capy.agent.v1.AgentService/ChatStream

从 URL 路径可以看出:

  • 使用了 gRPC 服务 (capy.agent.v1.AgentService)
  • 方法名采用 PascalCase (CreateChatSession, ChatStream)

第二步:分析请求头

查看 ChatStream 请求的 Request Headers:

1
2
3
4
5
6
7
8
9
POST /api/agent/capy.agent.v1.AgentService/ChatStream HTTP/1.1
Host: www.target.com
Content-Type: application/connect+json
connect-protocol-version: 1
oasis-appid: 10200
oasis-language: zh
oasis-platform: web
x-waf-client-type: fetch_sdk
Cookie: Oasis-Webid=xxx; Oasis-Token=xxx

关键发现:

  • Content-Type: application/connect+json - 这是 gRPC-Web 的 Content-Type
  • connect-protocol-version: 1 - Connect 协议版本标识
  • oasis-appid / oasis-language / oasis-platform - 业务自定义头

第三步:解析请求体

在 Chrome DevTools 中,请求体显示为二进制:

1
2
3
4
5
6
7
8
9
00000000: 00 00 00 00 5f 7b 22 6d  65 73 73 61 67 65 22 3a  ..._{"message":
00000010: 7b 22 63 68 61 74 53 65 73 73 69 6f 6e 49 64 22 {"chatSessionId"
00000020: 3a 22 78 78 78 22 2c 22 63 6f 6e 74 65 6e 74 22 :"xxx","content"
00000030: 3a 7b 22 75 73 65 72 4d 65 73 73 61 67 65 22 3a :{"userMessage":
00000040: 7b 22 71 61 22 3a 7b 22 63 6f 6e 74 65 6e 74 22 {"qa":{"content"
00000050: 3a 22 e4 bd a0 e5 a5 bd 5c 6e 22 7d 7d 7d 2c 22 :"你好\n"}}},"config"
00000060: 3a 7b 22 6d 6f 64 65 6c 22 3a 22 73 74 65 70 2d :{"model":"step-
00000070: 61 75 74 6f 22 2c 22 65 6e 61 62 6c 65 52 65 61 auto","enableRea
00000080: 73 6f 6e 69 6e 67 22 3a 74 72 75 65 7d 7d soning":true}}

解析结构:

  • 00 - 压缩标志 (未压缩)
  • 00 00 00 5f - 消息长度 (大端序,= 95 字节)
  • 后续是 JSON 内容

手动提取 JSON (跳过前 5 字节):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"message": {
"chatSessionId": "xxx",
"content": {
"userMessage": {
"qa": {
"content": "你好\n"
}
}
}
},
"config": {
"model": "step-auto",
"enableReasoning": true
}
}

第四步:分析响应体

响应类型是 text/event-stream,即 SSE (Server-Sent Events):

1
2
3
4
5
6
7
data: {"data":{"event":{"startEvent":{"messageId":"msg_xxx"}}}}

data: {"data":{"event":{"reasoningEvent":{"text":"用户问好..."}}}}

data: {"data":{"event":{"textEvent":{"text":"你好!我是..."}}}}

data: {"data":{"event":{"finishEvent":{}}}}

但实际上在 Network 面板看到的是原始字节:

1
2
3
4
00000000: 00 00 00 00 5f 7b 22 64  61 74 61 22 3a 7b 22 65  ..._{"data":{"e
00000010: 76 65 6e 74 22 3a 7b 22 73 74 61 72 74 45 76 65 vent":{"startEve
00000020: 6e 74 22 3a 7b 22 6d 65 73 73 61 67 65 49 64 22 nt":{"messageId"
00000030: 3a 22 6d 73 67 5f 78 78 78 22 7d 7d 7d 7d :"msg_xxx"}}}}

每个 SSE 事件都带有 5 字节前缀,格式与请求相同。

第五步:获取认证信息

在 Application > Cookies 中找到关键 Cookie:

Cookie 说明
Oasis-Token 访问令牌,格式:accessToken...refreshToken
Oasis-Webid 会话标识,UUID 格式

Oasis-Token 分析:

1
2
3
4
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...<refreshToken>
^ ^
| |
accessToken (JWT) 分隔符

JWT 解码后的 payload:

1
2
3
4
5
{
"sub": "user_123",
"exp": 1740000000,
"iat": 1739996400
}

API 端点总结

主要的 API 端点:

1
2
3
POST https://www.target.com/api/agent/capy.agent.v1.AgentService/CreateChatSession
POST https://www.target.com/api/agent/capy.agent.v1.AgentService/ChatStream
POST https://www.target.com/api/agent/capy.agent.v1.AgentService/ListChatSessions

请求格式

创建会话的请求体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
{
"message": {
"chatSessionId": "xxx",
"content": {
"userMessage": {
"qa": {
"content": "你好\n"
}
}
}
},
"config": {
"model": "step-auto",
"enableReasoning": true
}
}

响应格式

SSE 流式响应,每个事件带有长度前缀:

1
2
3
[5字节长度前缀]{"data":{"event":{"startEvent":{...}}}}
[5字节长度前缀]{"data":{"event":{"reasoningEvent":{"text":"..."}}}}
[5字节长度前缀]{"data":{"event":{"textEvent":{"text":"..."}}}}

Python 客户端实现

构建二进制请求体

这是实现的关键部分。我们需要按照 gRPC-Web 格式组装请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def _build_grpc_web_body(payload: dict) -> bytes:
"""
构建 gRPC-Web 格式的请求体

格式: [压缩标志(1字节)][消息长度(4字节大端序)][消息体]
"""
# 1. 将字典转为 JSON 字符串
json_str = json.dumps(payload, ensure_ascii=False)

# 2. 编码为 UTF-8 字节
json_bytes = json_str.encode('utf-8')

# 3. 计算消息长度
message_length = len(json_bytes)

# 4. 组装二进制数据
# bytes([0]) = 压缩标志 0x00 (未压缩)
# message_length.to_bytes(4, 'big') = 4字节大端序长度
data_bytes = bytes([0]) + message_length.to_bytes(4, 'big') + json_bytes

return data_bytes

调试示例:

1
2
3
4
5
6
7
8
9
10
11
# 测试代码
payload = {"hello": "world"}
result = _build_grpc_web_body(payload)

# 打印十六进制
print(result.hex())
# 输出: 000000000f7b2268656c6c6f223a22776f726c64227d
# 解析:
# 00 - 压缩标志 (未压缩)
# 0000000f - 长度 = 15
# 7b...7d - {"hello":"world"}

解析 SSE 响应流

响应流的解析比较复杂,需要处理多种事件类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def parse_sse_line(line: bytes) -> dict:
"""
解析 SSE 行,处理长度前缀

输入示例: b'\x00\x00\x00\x1a{"data":{"event":{...}}}'
"""
offset = 0

# 读取 5 字节前缀
if offset + 5 > len(line):
raise ValueError("Invalid line format")

# 检查压缩标志 (应该是 0x00)
if line[offset] != 0:
raise ValueError(f"Unsupported compression flag: {line[offset]}")
offset += 1

# 读取消息长度 (大端序)
msg_len = int.from_bytes(line[offset:offset+4], 'big')
offset += 4

# 读取 JSON 数据
if offset + msg_len > len(line):
raise ValueError("Message length mismatch")

json_bytes = line[offset:offset+msg_len]
json_str = json_bytes.decode('utf-8')

return json.loads(json_str)

事件类型映射:

事件类型 说明 示例数据
startEvent 对话开始 {"messageId": "msg_xxx"}
reasoningEvent AI 思考过程 {"text": "用户问好..."}
textEvent 正式回答 {"text": "你好!我是..."}
finishEvent 对话结束 {}
errorEvent 错误信息 {"code": 500, "message": "..."}

核心 API 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from dataclasses import dataclass
from typing import Iterator, Optional, Dict, Union, Literal

@dataclass
class StreamChunk:
"""流式响应块"""
type: Literal["start", "reasoning", "content", "end", "error"]
data: Union[str, Dict]
message_id: Optional[str] = None

class TargetAPI:
""" API 客户端"""

BASE_URL = "https://www.target.com/api/agent"
SERVICE = "capy.agent.v1.AgentService"

def __init__(self, oasis_token: str, oasis_webid: str, oasis_appid: str = "10200"):
self.oasis_token = oasis_token
self.oasis_webid = oasis_webid
self.oasis_appid = oasis_appid
self.session = requests.Session()
self._setup_headers()

def _setup_headers(self):
"""设置请求头"""
self.session.headers.update({
"User-Agent": "Mozilla/5.0 ...",
"connect-protocol-version": "1",
"oasis-appid": self.oasis_appid,
"oasis-language": "zh",
"oasis-platform": "web",
"x-waf-client-type": "fetch_sdk",
})
cookie = f"Oasis-Webid={self.oasis_webid}; Oasis-Token={self.oasis_token}"
self.session.headers["Cookie"] = cookie

流式对话实现

完整的流式对话处理流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def chat_stream(
self,
chat_session_id: str,
content: str,
model: str = "step-auto",
enable_reasoning: bool = True,
) -> Iterator[StreamChunk]:
"""
流式对话

处理流程:
1. 构建 gRPC-Web 格式请求体
2. 发送 POST 请求
3. 逐行解析 SSE 响应
4. 根据事件类型生成 StreamChunk
"""
url = f"{self.BASE_URL}/{self.SERVICE}/ChatStream"
headers = {**self.session.headers, "content-type": "application/connect+json"}

# 构建请求体
payload = {
"message": {
"chatSessionId": chat_session_id,
"content": {
"userMessage": {
"qa": {"content": content + "\n"}
}
}
},
"config": {
"model": model,
"enableReasoning": enable_reasoning
}
}

# 转换为 gRPC-Web 二进制格式
json_bytes = json.dumps(payload, ensure_ascii=False).encode('utf-8')
data_bytes = bytes([0]) + len(json_bytes).to_bytes(4, 'big') + json_bytes

# 发送请求并获取流式响应
response = self.session.post(url, headers=headers, data=data_bytes, stream=True)
response.raise_for_status()

# 逐行解析 SSE 响应
for line in response.iter_lines():
if not line:
continue

try:
# 跳过 5 字节前缀,解析 JSON
if len(line) > 5:
json_bytes = line[5:]
json_str = json_bytes.decode('utf-8')
data = json.loads(json_str)

# 根据事件类型分发处理
yield from self._process_event(data)
except (json.JSONDecodeError, UnicodeDecodeError):
continue

事件处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def _process_event(self, data: Dict) -> Iterator[StreamChunk]:
"""处理SSE事件数据,根据事件类型生成对应的 StreamChunk"""
event_data = data.get("data", {})
event = event_data.get("event", {})

# 处理开始事件
if "startEvent" in event:
start = event["startEvent"]
yield StreamChunk(
type="start",
data=start,
message_id=start.get("messageId")
)

# 处理推理事件 (AI 思考过程)
elif "reasoningEvent" in event:
text = event["reasoningEvent"].get("text", "")
if text:
yield StreamChunk(type="reasoning", data={"text": text})

# 处理消息事件 (正式回答)
elif "message" in event:
msg = event["message"]
text = msg.get("content", {}).get("assistantMessage", {}).get("qa", {}).get("content", "")
if text:
yield StreamChunk(type="content", data={"text": text}, message_id=msg.get("messageId"))

# 处理文本事件 (另一种回答格式)
elif "textEvent" in event:
text = event["textEvent"].get("text", "")
if text:
yield StreamChunk(type="content", data={"text": text})

# 处理结束事件
elif "doneEvent" in event or "messageDoneEvent" in event or "finishEvent" in event:
yield StreamChunk(type="end", data={})

Token 刷新机制

Oasis-Token 有过期时间,需要自动刷新。Token 格式为 accessToken...refreshToken

JWT 解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import base64
import json
import time

def _check_token_expired(self) -> bool:
"""
检查 Token 是否过期

JWT 格式: header.payload.signature
我们只需要解析 payload 部分
"""
try:
# Token 格式: accessToken...refreshToken
parts = self.oasis_token.split("...")
if len(parts) < 2:
return False

access_token = parts[0]

# JWT: header.payload.signature
# 取第二段 (payload)
payload_b64 = access_token.split(".")[1]

# 添加 Base64 padding
padding = 4 - len(payload_b64) % 4
payload_b64 += "=" * padding

# Base64 解码
payload_bytes = base64.b64decode(payload_b64)
payload = json.loads(payload_bytes.decode('utf-8'))

# 获取过期时间 (Unix timestamp)
exp = payload.get("exp", 0)

# 提前 60 秒刷新,避免临界点问题
current_time = int(time.time())
is_expired = exp < current_time + 60

if is_expired:
logger.warning(f"Token 即将过期: exp={exp}, current={current_time}")

return is_expired

except Exception as e:
logger.error(f"检查 Token 过期失败: {e}")
return False

JWT Payload 示例:

1
2
3
4
5
6
{
"sub": "user_123456",
"exp": 1740585600,
"iat": 1740582000,
"iss": "target.com"
}

Token 刷新请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def _refresh_token(self) -> None:
"""
刷新 Token

调用刷新接口,获取新的 accessToken 和 refreshToken
"""
try:
logger.info("开始刷新 Token...")

url = "https://www.target.com/passport/proto.api.passport.v1.PassportService/RefreshToken"
headers = {
**self.session.headers,
"content-type": "application/json",
}

# 刷新接口不需要请求体,Token 从 Cookie 中获取
response = requests.post(url, headers=headers, json={})
response.raise_for_status()

data = response.json()

# 解析新 Token
access_token = data.get("accessToken", {}).get("raw", "")
refresh_token = data.get("refreshToken", {}).get("raw", "")

if not access_token or not refresh_token:
raise ValueError("刷新 Token 响应格式错误")

# 拼接新 Token
new_token = f"{access_token}...{refresh_token}"

logger.info(f"Token 刷新成功")

# 更新 Token
self.oasis_token = new_token
self._setup_headers()

# 触发回调(如果有)
if self.on_token_refresh:
self.on_token_refresh(new_token)

except Exception as e:
logger.error(f"刷新 Token 失败: {e}")
raise

自动刷新机制

在每次 API 调用前检查 Token 是否过期:

1
2
3
4
5
6
7
8
9
10
11
12
13
def _ensure_token_valid(self) -> None:
"""
确保 Token 有效,如果即将过期则自动刷新
"""
if self._check_token_expired():
self._refresh_token()

def chat_stream(self, chat_session_id: str, content: str, model: str = "step-auto"):
"""流式对话 - 调用前检查 Token"""
self._ensure_token_valid() # <-- 自动检查并刷新

url = f"{self.BASE_URL}/{self.SERVICE}/ChatStream"
# ... 继续处理

OpenAI 兼容接口

为了方便集成,实现 OpenAI 协议兼容接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@dataclass
class OpenAIResponse:
id: str
object: str
created: int
model: str
choices: List

class OpenAICompatibleAPI:
def create_chat_completion(self, model, messages, stream=False):
"""OpenAI 兼容的聊天补全"""
chat_session_id = self._get_or_create_session()
content = messages[-1]["content"]

if stream:
return self._chat_completion_stream(chat_session_id, content, model)
else:
return self._chat_completion_sync(chat_session_id, content, model)

HTTP API Server

基于 FastAPI 提供 HTTP 服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI(title="Target API Server")

@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest):
"""OpenAI 兼容的聊天补全接口"""

if request.stream:
async def stream_generator():
for chunk in api_client.chat_stream(...):
# 转换为 OpenAI SSE 格式
yield f"data: {json.dumps(chunk)}\n\n"
yield "data: [DONE]\n\n"

return StreamingResponse(stream_generator(), media_type="text/event-stream")

# 非流式
response = openai_api.create_chat_completion(...)
return openai_api.to_openai_dict(response)

启动服务:

1
2
3
python target_api_server.py
# 或
uvicorn target_api_server:app --host 0.0.0.0 --port 8000

使用示例

直接使用客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from target_api import TargetAPI

api = TargetAPI(
oasis_token="your_token",
oasis_webid="your_webid",
)

# 创建会话
session = api.create_chat_session()

# 流式对话
for chunk in api.chat_stream(
chat_session_id=session.chat_session_id,
content="你好,请介绍一下自己",
model="step-auto",
enable_reasoning=True,
):
if chunk.type == "reasoning":
print(f"[思考] {chunk.data['text']}", end="", flush=True)
elif chunk.type == "content":
print(chunk.data['text'], end="", flush=True)
elif chunk.type == "end":
print()

使用 OpenAI SDK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from openai import OpenAI

client = OpenAI(
base_url="http://localhost:8000/v1",
api_key="unused",
)

response = client.chat.completions.create(
model="step-auto",
messages=[{"role": "user", "content": "你好"}],
stream=True,
)

for chunk in response:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)

使用 curl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 非流式
curl http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "step-auto",
"messages": [{"role": "user", "content": "你好"}]
}'

# 流式
curl http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "step-auto",
"messages": [{"role": "user", "content": "你好"}],
"stream": true
}'

Reference