From d5e6e38c58f66e3e6e7801b1a06b0f624ae6d771 Mon Sep 17 00:00:00 2001 From: Yeuoly <45712896+Yeuoly@users.noreply.github.com> Date: Tue, 26 Aug 2025 21:15:23 +0800 Subject: [PATCH] refactor: unify blob chunk merging logic for plugin tool and agent (#24575) Co-authored-by: Claude Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/core/plugin/impl/agent.py | 3 +- api/core/plugin/impl/tool.py | 57 +-- api/core/plugin/utils/chunk_merger.py | 92 ++++ api/tests/unit_tests/core/plugin/__init__.py | 0 .../unit_tests/core/plugin/utils/__init__.py | 0 .../core/plugin/utils/test_chunk_merger.py | 460 ++++++++++++++++++ 6 files changed, 556 insertions(+), 56 deletions(-) create mode 100644 api/core/plugin/utils/chunk_merger.py create mode 100644 api/tests/unit_tests/core/plugin/__init__.py create mode 100644 api/tests/unit_tests/core/plugin/utils/__init__.py create mode 100644 api/tests/unit_tests/core/plugin/utils/test_chunk_merger.py diff --git a/api/core/plugin/impl/agent.py b/api/core/plugin/impl/agent.py index 9575c57ac..3c994ce70 100644 --- a/api/core/plugin/impl/agent.py +++ b/api/core/plugin/impl/agent.py @@ -8,6 +8,7 @@ from core.plugin.entities.plugin_daemon import ( ) from core.plugin.entities.request import PluginInvokeContext from core.plugin.impl.base import BasePluginClient +from core.plugin.utils.chunk_merger import merge_blob_chunks class PluginAgentClient(BasePluginClient): @@ -113,4 +114,4 @@ class PluginAgentClient(BasePluginClient): "Content-Type": "application/json", }, ) - return response + return merge_blob_chunks(response) diff --git a/api/core/plugin/impl/tool.py b/api/core/plugin/impl/tool.py index 04225f95e..4c1558efc 100644 --- a/api/core/plugin/impl/tool.py +++ b/api/core/plugin/impl/tool.py @@ -6,6 +6,7 @@ from pydantic import BaseModel from core.plugin.entities.plugin import GenericProviderID, ToolProviderID from core.plugin.entities.plugin_daemon import PluginBasicBooleanResponse, PluginToolProviderEntity from core.plugin.impl.base import BasePluginClient +from core.plugin.utils.chunk_merger import merge_blob_chunks from core.tools.entities.tool_entities import CredentialType, ToolInvokeMessage, ToolParameter @@ -113,61 +114,7 @@ class PluginToolManager(BasePluginClient): }, ) - class FileChunk: - """ - Only used for internal processing. - """ - - bytes_written: int - total_length: int - data: bytearray - - def __init__(self, total_length: int): - self.bytes_written = 0 - self.total_length = total_length - self.data = bytearray(total_length) - - files: dict[str, FileChunk] = {} - for resp in response: - if resp.type == ToolInvokeMessage.MessageType.BLOB_CHUNK: - assert isinstance(resp.message, ToolInvokeMessage.BlobChunkMessage) - # Get blob chunk information - chunk_id = resp.message.id - total_length = resp.message.total_length - blob_data = resp.message.blob - is_end = resp.message.end - - # Initialize buffer for this file if it doesn't exist - if chunk_id not in files: - files[chunk_id] = FileChunk(total_length) - - # If this is the final chunk, yield a complete blob message - if is_end: - yield ToolInvokeMessage( - type=ToolInvokeMessage.MessageType.BLOB, - message=ToolInvokeMessage.BlobMessage(blob=files[chunk_id].data), - meta=resp.meta, - ) - else: - # Check if file is too large (30MB limit) - if files[chunk_id].bytes_written + len(blob_data) > 30 * 1024 * 1024: - # Delete the file if it's too large - del files[chunk_id] - # Skip yielding this message - raise ValueError("File is too large which reached the limit of 30MB") - - # Check if single chunk is too large (8KB limit) - if len(blob_data) > 8192: - # Skip yielding this message - raise ValueError("File chunk is too large which reached the limit of 8KB") - - # Append the blob data to the buffer - files[chunk_id].data[ - files[chunk_id].bytes_written : files[chunk_id].bytes_written + len(blob_data) - ] = blob_data - files[chunk_id].bytes_written += len(blob_data) - else: - yield resp + return merge_blob_chunks(response) def validate_provider_credentials( self, tenant_id: str, user_id: str, provider: str, credentials: dict[str, Any] diff --git a/api/core/plugin/utils/chunk_merger.py b/api/core/plugin/utils/chunk_merger.py new file mode 100644 index 000000000..21ca2d8d3 --- /dev/null +++ b/api/core/plugin/utils/chunk_merger.py @@ -0,0 +1,92 @@ +from collections.abc import Generator +from dataclasses import dataclass, field +from typing import TypeVar, Union, cast + +from core.agent.entities import AgentInvokeMessage +from core.tools.entities.tool_entities import ToolInvokeMessage + +MessageType = TypeVar("MessageType", bound=Union[ToolInvokeMessage, AgentInvokeMessage]) + + +@dataclass +class FileChunk: + """ + Buffer for accumulating file chunks during streaming. + """ + + total_length: int + bytes_written: int = field(default=0, init=False) + data: bytearray = field(init=False) + + def __post_init__(self) -> None: + self.data = bytearray(self.total_length) + + +def merge_blob_chunks( + response: Generator[MessageType, None, None], + max_file_size: int = 30 * 1024 * 1024, + max_chunk_size: int = 8192, +) -> Generator[MessageType, None, None]: + """ + Merge streaming blob chunks into complete blob messages. + + This function processes a stream of plugin invoke messages, accumulating + BLOB_CHUNK messages by their ID until the final chunk is received, + then yielding a single complete BLOB message. + + Args: + response: Generator yielding messages that may include blob chunks + max_file_size: Maximum allowed file size in bytes (default: 30MB) + max_chunk_size: Maximum allowed chunk size in bytes (default: 8KB) + + Yields: + Messages from the response stream, with blob chunks merged into complete blobs + + Raises: + ValueError: If file size exceeds max_file_size or chunk size exceeds max_chunk_size + """ + files: dict[str, FileChunk] = {} + + for resp in response: + if resp.type == ToolInvokeMessage.MessageType.BLOB_CHUNK: + assert isinstance(resp.message, ToolInvokeMessage.BlobChunkMessage) + # Get blob chunk information + chunk_id = resp.message.id + total_length = resp.message.total_length + blob_data = resp.message.blob + is_end = resp.message.end + + # Initialize buffer for this file if it doesn't exist + if chunk_id not in files: + files[chunk_id] = FileChunk(total_length) + + # Check if file is too large (before appending) + if files[chunk_id].bytes_written + len(blob_data) > max_file_size: + # Delete the file if it's too large + del files[chunk_id] + raise ValueError(f"File is too large which reached the limit of {max_file_size / 1024 / 1024}MB") + + # Check if single chunk is too large + if len(blob_data) > max_chunk_size: + raise ValueError(f"File chunk is too large which reached the limit of {max_chunk_size / 1024}KB") + + # Append the blob data to the buffer + files[chunk_id].data[files[chunk_id].bytes_written : files[chunk_id].bytes_written + len(blob_data)] = ( + blob_data + ) + files[chunk_id].bytes_written += len(blob_data) + + # If this is the final chunk, yield a complete blob message + if is_end: + # Create the appropriate message type based on the response type + message_class = type(resp) + merged_message = message_class( + type=ToolInvokeMessage.MessageType.BLOB, + message=ToolInvokeMessage.BlobMessage(blob=files[chunk_id].data[: files[chunk_id].bytes_written]), + meta=resp.meta, + ) + yield cast(MessageType, merged_message) + # Clean up the buffer + del files[chunk_id] + else: + yield resp diff --git a/api/tests/unit_tests/core/plugin/__init__.py b/api/tests/unit_tests/core/plugin/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/tests/unit_tests/core/plugin/utils/__init__.py b/api/tests/unit_tests/core/plugin/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/api/tests/unit_tests/core/plugin/utils/test_chunk_merger.py b/api/tests/unit_tests/core/plugin/utils/test_chunk_merger.py new file mode 100644 index 000000000..e0eace0f2 --- /dev/null +++ b/api/tests/unit_tests/core/plugin/utils/test_chunk_merger.py @@ -0,0 +1,460 @@ +from collections.abc import Generator + +import pytest + +from core.agent.entities import AgentInvokeMessage +from core.plugin.utils.chunk_merger import FileChunk, merge_blob_chunks +from core.tools.entities.tool_entities import ToolInvokeMessage + + +class TestChunkMerger: + def test_file_chunk_initialization(self): + """Test FileChunk initialization.""" + chunk = FileChunk(1024) + assert chunk.bytes_written == 0 + assert chunk.total_length == 1024 + assert len(chunk.data) == 1024 + + def test_merge_blob_chunks_with_single_complete_chunk(self): + """Test merging a single complete blob chunk.""" + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # First chunk (partial) + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=0, total_length=10, blob=b"Hello", end=False + ), + ) + # Second chunk (final) + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=1, total_length=10, blob=b"World", end=True + ), + ) + + result = list(merge_blob_chunks(mock_generator())) + assert len(result) == 1 + assert result[0].type == ToolInvokeMessage.MessageType.BLOB + assert isinstance(result[0].message, ToolInvokeMessage.BlobMessage) + # The buffer should contain the complete data + assert result[0].message.blob[:10] == b"HelloWorld" + + def test_merge_blob_chunks_with_multiple_files(self): + """Test merging chunks from multiple files.""" + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # File 1, chunk 1 + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=0, total_length=4, blob=b"AB", end=False + ), + ) + # File 2, chunk 1 + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file2", sequence=0, total_length=4, blob=b"12", end=False + ), + ) + # File 1, chunk 2 (final) + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=1, total_length=4, blob=b"CD", end=True + ), + ) + # File 2, chunk 2 (final) + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file2", sequence=1, total_length=4, blob=b"34", end=True + ), + ) + + result = list(merge_blob_chunks(mock_generator())) + assert len(result) == 2 + # Check that both files are properly merged + assert all(r.type == ToolInvokeMessage.MessageType.BLOB for r in result) + + def test_merge_blob_chunks_passes_through_non_blob_messages(self): + """Test that non-blob messages pass through unchanged.""" + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # Text message + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.TEXT, + message=ToolInvokeMessage.TextMessage(text="Hello"), + ) + # Blob chunk + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=0, total_length=5, blob=b"Test", end=True + ), + ) + # Another text message + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.TEXT, + message=ToolInvokeMessage.TextMessage(text="World"), + ) + + result = list(merge_blob_chunks(mock_generator())) + assert len(result) == 3 + assert result[0].type == ToolInvokeMessage.MessageType.TEXT + assert isinstance(result[0].message, ToolInvokeMessage.TextMessage) + assert result[0].message.text == "Hello" + assert result[1].type == ToolInvokeMessage.MessageType.BLOB + assert result[2].type == ToolInvokeMessage.MessageType.TEXT + assert isinstance(result[2].message, ToolInvokeMessage.TextMessage) + assert result[2].message.text == "World" + + def test_merge_blob_chunks_file_too_large(self): + """Test that error is raised when file exceeds max size.""" + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # Send a chunk that would exceed the limit + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=0, total_length=100, blob=b"x" * 1024, end=False + ), + ) + + with pytest.raises(ValueError) as exc_info: + list(merge_blob_chunks(mock_generator(), max_file_size=1000)) + assert "File is too large" in str(exc_info.value) + + def test_merge_blob_chunks_chunk_too_large(self): + """Test that error is raised when chunk exceeds max chunk size.""" + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # Send a chunk that exceeds the max chunk size + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=0, total_length=10000, blob=b"x" * 9000, end=False + ), + ) + + with pytest.raises(ValueError) as exc_info: + list(merge_blob_chunks(mock_generator(), max_chunk_size=8192)) + assert "File chunk is too large" in str(exc_info.value) + + def test_merge_blob_chunks_with_agent_invoke_message(self): + """Test that merge_blob_chunks works with AgentInvokeMessage.""" + + def mock_generator() -> Generator[AgentInvokeMessage, None, None]: + # First chunk + yield AgentInvokeMessage( + type=AgentInvokeMessage.MessageType.BLOB_CHUNK, + message=AgentInvokeMessage.BlobChunkMessage( + id="agent_file", sequence=0, total_length=8, blob=b"Agent", end=False + ), + ) + # Final chunk + yield AgentInvokeMessage( + type=AgentInvokeMessage.MessageType.BLOB_CHUNK, + message=AgentInvokeMessage.BlobChunkMessage( + id="agent_file", sequence=1, total_length=8, blob=b"Data", end=True + ), + ) + + result = list(merge_blob_chunks(mock_generator())) + assert len(result) == 1 + assert isinstance(result[0], AgentInvokeMessage) + assert result[0].type == AgentInvokeMessage.MessageType.BLOB + + def test_merge_blob_chunks_preserves_meta(self): + """Test that meta information is preserved in merged messages.""" + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=0, total_length=4, blob=b"Test", end=True + ), + meta={"key": "value"}, + ) + + result = list(merge_blob_chunks(mock_generator())) + assert len(result) == 1 + assert result[0].meta == {"key": "value"} + + def test_merge_blob_chunks_custom_limits(self): + """Test merge_blob_chunks with custom size limits.""" + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # This should work with custom limits + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=0, total_length=500, blob=b"x" * 400, end=False + ), + ) + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=1, total_length=500, blob=b"y" * 100, end=True + ), + ) + + # Should work with custom limits + result = list(merge_blob_chunks(mock_generator(), max_file_size=1000, max_chunk_size=500)) + assert len(result) == 1 + + # Should fail with smaller file size limit + def mock_generator2() -> Generator[ToolInvokeMessage, None, None]: + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=0, total_length=500, blob=b"x" * 400, end=False + ), + ) + + with pytest.raises(ValueError): + list(merge_blob_chunks(mock_generator2(), max_file_size=300)) + + def test_merge_blob_chunks_data_integrity(self): + """Test that merged chunks exactly match the original data.""" + # Create original data + original_data = b"This is a test message that will be split into chunks for testing purposes." + chunk_size = 20 + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # Split original data into chunks + chunks = [] + for i in range(0, len(original_data), chunk_size): + chunk_data = original_data[i : i + chunk_size] + is_last = (i + chunk_size) >= len(original_data) + chunks.append((i // chunk_size, chunk_data, is_last)) + + # Yield chunks + for sequence, data, is_end in chunks: + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="test_file", + sequence=sequence, + total_length=len(original_data), + blob=data, + end=is_end, + ), + ) + + result = list(merge_blob_chunks(mock_generator())) + assert len(result) == 1 + assert result[0].type == ToolInvokeMessage.MessageType.BLOB + assert isinstance(result[0].message, ToolInvokeMessage.BlobMessage) + # Verify the merged data exactly matches the original + assert result[0].message.blob == original_data + + def test_merge_blob_chunks_empty_chunk(self): + """Test handling of empty chunks.""" + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # First chunk with data + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=0, total_length=10, blob=b"Hello", end=False + ), + ) + # Empty chunk in the middle + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=1, total_length=10, blob=b"", end=False + ), + ) + # Final chunk with data + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="file1", sequence=2, total_length=10, blob=b"World", end=True + ), + ) + + result = list(merge_blob_chunks(mock_generator())) + assert len(result) == 1 + assert result[0].type == ToolInvokeMessage.MessageType.BLOB + assert isinstance(result[0].message, ToolInvokeMessage.BlobMessage) + # The final blob should contain "Hello" followed by "World" + assert result[0].message.blob[:10] == b"HelloWorld" + + def test_merge_blob_chunks_single_chunk_file(self): + """Test file that arrives as a single complete chunk.""" + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # Single chunk that is both first and last + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="single_chunk_file", + sequence=0, + total_length=11, + blob=b"Single Data", + end=True, + ), + ) + + result = list(merge_blob_chunks(mock_generator())) + assert len(result) == 1 + assert result[0].type == ToolInvokeMessage.MessageType.BLOB + assert isinstance(result[0].message, ToolInvokeMessage.BlobMessage) + assert result[0].message.blob == b"Single Data" + + def test_merge_blob_chunks_concurrent_files(self): + """Test that chunks from different files are properly separated.""" + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # Interleave chunks from three different files + files_data = { + "file1": b"First file content", + "file2": b"Second file data", + "file3": b"Third file", + } + + # First chunk from each file + for file_id, data in files_data.items(): + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id=file_id, + sequence=0, + total_length=len(data), + blob=data[:6], + end=False, + ), + ) + + # Second chunk from each file (final) + for file_id, data in files_data.items(): + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id=file_id, + sequence=1, + total_length=len(data), + blob=data[6:], + end=True, + ), + ) + + result = list(merge_blob_chunks(mock_generator())) + assert len(result) == 3 + + # Extract the blob data from results + blobs = set() + for r in result: + assert isinstance(r.message, ToolInvokeMessage.BlobMessage) + blobs.add(r.message.blob) + expected = {b"First file content", b"Second file data", b"Third file"} + assert blobs == expected + + def test_merge_blob_chunks_exact_buffer_size(self): + """Test that data fitting exactly in buffer works correctly.""" + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # Create data that exactly fills the declared buffer + exact_data = b"X" * 100 + + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="exact_file", + sequence=0, + total_length=100, + blob=exact_data[:50], + end=False, + ), + ) + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="exact_file", + sequence=1, + total_length=100, + blob=exact_data[50:], + end=True, + ), + ) + + result = list(merge_blob_chunks(mock_generator())) + assert len(result) == 1 + assert isinstance(result[0].message, ToolInvokeMessage.BlobMessage) + assert len(result[0].message.blob) == 100 + assert result[0].message.blob == b"X" * 100 + + def test_merge_blob_chunks_large_file_simulation(self): + """Test handling of a large file split into many chunks.""" + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # Simulate a 1MB file split into 128 chunks of 8KB each + chunk_size = 8192 + num_chunks = 128 + total_size = chunk_size * num_chunks + + for i in range(num_chunks): + # Create unique data for each chunk to verify ordering + chunk_data = bytes([i % 256]) * chunk_size + is_last = i == num_chunks - 1 + + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="large_file", + sequence=i, + total_length=total_size, + blob=chunk_data, + end=is_last, + ), + ) + + result = list(merge_blob_chunks(mock_generator())) + assert len(result) == 1 + assert isinstance(result[0].message, ToolInvokeMessage.BlobMessage) + assert len(result[0].message.blob) == 1024 * 1024 + + # Verify the data pattern is correct + merged_data = result[0].message.blob + chunk_size = 8192 + num_chunks = 128 + for i in range(num_chunks): + chunk_start = i * chunk_size + chunk_end = chunk_start + chunk_size + expected_byte = i % 256 + chunk = merged_data[chunk_start:chunk_end] + assert all(b == expected_byte for b in chunk), f"Chunk {i} has incorrect data" + + def test_merge_blob_chunks_sequential_order_required(self): + """ + Test note: The current implementation assumes chunks arrive in sequential order. + Out-of-order chunks would need additional logic to handle properly. + This test documents the expected behavior with sequential chunks. + """ + + def mock_generator() -> Generator[ToolInvokeMessage, None, None]: + # Chunks arriving in correct sequential order + data_parts = [b"First", b"Second", b"Third"] + total_length = sum(len(part) for part in data_parts) + + for i, part in enumerate(data_parts): + is_last = i == len(data_parts) - 1 + yield ToolInvokeMessage( + type=ToolInvokeMessage.MessageType.BLOB_CHUNK, + message=ToolInvokeMessage.BlobChunkMessage( + id="ordered_file", + sequence=i, + total_length=total_length, + blob=part, + end=is_last, + ), + ) + + result = list(merge_blob_chunks(mock_generator())) + assert len(result) == 1 + assert isinstance(result[0].message, ToolInvokeMessage.BlobMessage) + assert result[0].message.blob == b"FirstSecondThird"