Python实现流式数据转换:将code_a格式转换为code_b格式

ChatGPT4o 使用 ChatGPT 4o 回答

问题:

我正在对post的接口返回strem的形式进行代理, 使用python将<code_a>的 stream 形式的返回结构,转换为<code_b>的形式,需要考虑 stream 分批不是完整的 event 的场景 <code_a>event:data data:{"count":1,"message":"亲爱的编","messageSingle":"亲爱的编","end":false}

event:data data:{"count":2,"message":"亲爱的编程小勇士","messageSingle":"程小勇士","end":false}

event:done data:

</code_a> <code_b> data: {"id":"chatcmpl-534","object":"chat.completion.chunk","created":1722599889,"model":"qwen2","system_fingerprint":"fp_ollama","choices":[{"index":0,"delta":{"role":"assistant","content":"嘿"},"finish_reason":null}]}

data: {"id":"chatcmpl-534","object":"chat.completion.chunk","created":1722599889,"model":"qwen2","system_fingerprint":"fp_ollama","choices":[{"index":0,"delta":{"role":"assistant","content":","},"finish_reason":null}]}

data: {"id":"chatcmpl-534","object":"chat.completion.chunk","created":1722599889,"model":"qwen2","system_fingerprint":"fp_ollama","choices":[{"index":0,"delta":{"role":"assistant","content":"小"},"finish_reason":null}]}

data: [DONE]

</code_b>

回答:

要将 code_a 的流式返回结构转换为 code_b 的形式,可以使用 Python 进行流处理。我们需要考虑到流数据可能会分批到达,并且每个批次可能不是完整的事件。以下是一个示例代码,展示了如何实现这个转换:

import json
import time
import uuid

def generate_code_b_event(data, role="assistant"):
    return {
        "id": "chatcmpl-534",
        "object": "chat.completion.chunk",
        "created": int(time.time()),
        "model": "qwen2",
        "system_fingerprint": "fp_ollama",
        "choices": [
            {
                "index": 0,
                "delta": {
                    "role": role,
                    "content": data
                },
                "finish_reason": None
            }
        ]
    }

def process_stream(input_stream):
    buffer = ""
    for chunk in input_stream:
        buffer += chunk
        while "\n\n" in buffer:
            event, buffer = buffer.split("\n\n", 1)
            if event.startswith("event: done"):
                yield "data: [DONE]\n\n"
            elif event.startswith("event: data"):
                data_line = event.split("\n")[1]
                data_json = json.loads(data_line.split(":", 1)[1].strip())
                message = data_json.get("messageSingle", "")
                for char in message:
                    event_b = generate_code_b_event(char)
                    yield f"data: {json.dumps(event_b)}\n\n"

# 模拟输入流
input_stream = [
    "event:data\ndata:{\"count\":1,\"message\":\"亲爱的编\",\"messageSingle\":\"亲爱的编\",\"end\":false}\n\n",
    "event:data\ndata:{\"count\":2,\"message\":\"亲爱的编程小勇士\",\"messageSingle\":\"程小勇士\",\"end\":false}\n\n",
    "event:done\ndata:\n\n"
]

# 处理流并输出结果
for output in process_stream(input_stream):
    print(output, end="")

解释

  1. generate_code_b_event 函数:生成 code_b 格式的事件。
  2. process_stream 函数:处理输入流,将 code_a 格式的事件转换为 code_b 格式。
    • 使用 buffer 来存储未处理的部分,确保处理分批到达的数据。
    • 使用 split("\n\n", 1) 来分割事件,处理每个完整的事件。
    • 根据事件类型(datadone)生成相应的 code_b 格式输出。
  3. 模拟输入流input_stream 模拟了一个流式输入,包含多个事件。
  4. 处理流并输出结果:遍历 process_stream 的输出并打印结果。

这个示例代码展示了如何处理流式数据并进行格式转换,确保即使数据分批到达也能正确处理。