diff --git a/api/controllers/inner_api/plugin/plugin.py b/api/controllers/inner_api/plugin/plugin.py index f3a1bd8fa..41063b35a 100644 --- a/api/controllers/inner_api/plugin/plugin.py +++ b/api/controllers/inner_api/plugin/plugin.py @@ -29,7 +29,7 @@ from core.plugin.entities.request import ( RequestRequestUploadFile, ) from core.tools.entities.tool_entities import ToolProviderType -from libs.helper import compact_generate_response +from libs.helper import length_prefixed_response from models.account import Account, Tenant from models.model import EndUser @@ -44,7 +44,7 @@ class PluginInvokeLLMApi(Resource): response = PluginModelBackwardsInvocation.invoke_llm(user_model.id, tenant_model, payload) return PluginModelBackwardsInvocation.convert_to_event_stream(response) - return compact_generate_response(generator()) + return length_prefixed_response(0xF, generator()) class PluginInvokeTextEmbeddingApi(Resource): @@ -101,7 +101,7 @@ class PluginInvokeTTSApi(Resource): ) return PluginModelBackwardsInvocation.convert_to_event_stream(response) - return compact_generate_response(generator()) + return length_prefixed_response(0xF, generator()) class PluginInvokeSpeech2TextApi(Resource): @@ -162,7 +162,7 @@ class PluginInvokeToolApi(Resource): ), ) - return compact_generate_response(generator()) + return length_prefixed_response(0xF, generator()) class PluginInvokeParameterExtractorNodeApi(Resource): @@ -228,7 +228,7 @@ class PluginInvokeAppApi(Resource): files=payload.files, ) - return compact_generate_response(PluginAppBackwardsInvocation.convert_to_event_stream(response)) + return length_prefixed_response(0xF, PluginAppBackwardsInvocation.convert_to_event_stream(response)) class PluginInvokeEncryptApi(Resource): diff --git a/api/core/plugin/backwards_invocation/base.py b/api/core/plugin/backwards_invocation/base.py index 3214e0746..2a5f85757 100644 --- a/api/core/plugin/backwards_invocation/base.py +++ b/api/core/plugin/backwards_invocation/base.py @@ -11,14 +11,12 @@ class BaseBackwardsInvocation: try: for chunk in response: if isinstance(chunk, BaseModel | dict): - yield BaseBackwardsInvocationResponse(data=chunk).model_dump_json().encode() + b"\n\n" - elif isinstance(chunk, str): - yield f"event: {chunk}\n\n".encode() + yield BaseBackwardsInvocationResponse(data=chunk).model_dump_json().encode() except Exception as e: error_message = BaseBackwardsInvocationResponse(error=str(e)).model_dump_json() - yield f"{error_message}\n\n".encode() + yield error_message.encode() else: - yield BaseBackwardsInvocationResponse(data=response).model_dump_json().encode() + b"\n\n" + yield BaseBackwardsInvocationResponse(data=response).model_dump_json().encode() T = TypeVar("T", bound=dict | Mapping | str | bool | int | BaseModel) diff --git a/api/libs/helper.py b/api/libs/helper.py index e78a782fb..070e575db 100644 --- a/api/libs/helper.py +++ b/api/libs/helper.py @@ -3,6 +3,7 @@ import logging import re import secrets import string +import struct import subprocess import time import uuid @@ -14,6 +15,7 @@ from zoneinfo import available_timezones from flask import Response, stream_with_context from flask_restful import fields +from pydantic import BaseModel from configs import dify_config from core.app.features.rate_limiting.rate_limit import RateLimitGenerator @@ -206,6 +208,60 @@ def compact_generate_response(response: Union[Mapping, Generator, RateLimitGener return Response(stream_with_context(generate()), status=200, mimetype="text/event-stream") +def length_prefixed_response(magic_number: int, response: Union[Mapping, Generator, RateLimitGenerator]) -> Response: + """ + This function is used to return a response with a length prefix. + Magic number is a one byte number that indicates the type of the response. + + For a compatibility with latest plugin daemon https://github.com/langgenius/dify-plugin-daemon/pull/341 + Avoid using line-based response, it leads a memory issue. + + We uses following format: + | Field | Size | Description | + |---------------|----------|---------------------------------| + | Magic Number | 1 byte | Magic number identifier | + | Reserved | 1 byte | Reserved field | + | Header Length | 2 bytes | Header length (usually 0xa) | + | Data Length | 4 bytes | Length of the data | + | Reserved | 6 bytes | Reserved fields | + | Data | Variable | Actual data content | + + | Reserved Fields | Header | Data | + |-----------------|----------|----------| + | 4 bytes total | Variable | Variable | + + all data is in little endian + """ + + def pack_response_with_length_prefix(response: bytes) -> bytes: + header_length = 0xA + data_length = len(response) + # | Magic Number 1byte | Reserved 1byte | Header Length 2bytes | Data Length 4bytes | Reserved 6bytes | Data + return struct.pack(" Generator: + for chunk in response: + if isinstance(chunk, str): + yield pack_response_with_length_prefix(chunk.encode("utf-8")) + else: + yield pack_response_with_length_prefix(chunk) + + return Response(stream_with_context(generate()), status=200, mimetype="text/event-stream") + + class TokenManager: @classmethod def generate_token(