feat: support bool type variable frontend (#24437)

Co-authored-by: QuantumGhost <obelisk.reg+git@gmail.com>
This commit is contained in:
Joel
2025-08-26 18:16:05 +08:00
committed by GitHub
parent b5c2756261
commit dac72b078d
126 changed files with 3832 additions and 512 deletions

View File

@@ -23,6 +23,7 @@ class TestSegmentTypeIsArrayType:
SegmentType.ARRAY_NUMBER,
SegmentType.ARRAY_OBJECT,
SegmentType.ARRAY_FILE,
SegmentType.ARRAY_BOOLEAN,
]
expected_non_array_types = [
SegmentType.INTEGER,
@@ -34,6 +35,7 @@ class TestSegmentTypeIsArrayType:
SegmentType.FILE,
SegmentType.NONE,
SegmentType.GROUP,
SegmentType.BOOLEAN,
]
for seg_type in expected_array_types:

View File

@@ -0,0 +1,729 @@
"""
Comprehensive unit tests for SegmentType.is_valid and SegmentType._validate_array methods.
This module provides thorough testing of the validation logic for all SegmentType values,
including edge cases, error conditions, and different ArrayValidation strategies.
"""
from dataclasses import dataclass
from typing import Any
import pytest
from core.file.enums import FileTransferMethod, FileType
from core.file.models import File
from core.variables.types import ArrayValidation, SegmentType
def create_test_file(
file_type: FileType = FileType.DOCUMENT,
transfer_method: FileTransferMethod = FileTransferMethod.LOCAL_FILE,
filename: str = "test.txt",
extension: str = ".txt",
mime_type: str = "text/plain",
size: int = 1024,
) -> File:
"""Factory function to create File objects for testing."""
return File(
tenant_id="test-tenant",
type=file_type,
transfer_method=transfer_method,
filename=filename,
extension=extension,
mime_type=mime_type,
size=size,
related_id="test-file-id" if transfer_method != FileTransferMethod.REMOTE_URL else None,
remote_url="https://example.com/file.txt" if transfer_method == FileTransferMethod.REMOTE_URL else None,
storage_key="test-storage-key",
)
@dataclass
class ValidationTestCase:
"""Test case data structure for validation tests."""
segment_type: SegmentType
value: Any
expected: bool
description: str
def get_id(self):
return self.description
@dataclass
class ArrayValidationTestCase:
"""Test case data structure for array validation tests."""
segment_type: SegmentType
value: Any
array_validation: ArrayValidation
expected: bool
description: str
def get_id(self):
return self.description
# Test data construction functions
def get_boolean_cases() -> list[ValidationTestCase]:
return [
# valid values
ValidationTestCase(SegmentType.BOOLEAN, True, True, "True boolean"),
ValidationTestCase(SegmentType.BOOLEAN, False, True, "False boolean"),
# Invalid values
ValidationTestCase(SegmentType.BOOLEAN, 1, False, "Integer 1 (not boolean)"),
ValidationTestCase(SegmentType.BOOLEAN, 0, False, "Integer 0 (not boolean)"),
ValidationTestCase(SegmentType.BOOLEAN, "true", False, "String 'true'"),
ValidationTestCase(SegmentType.BOOLEAN, "false", False, "String 'false'"),
ValidationTestCase(SegmentType.BOOLEAN, None, False, "None value"),
ValidationTestCase(SegmentType.BOOLEAN, [], False, "Empty list"),
ValidationTestCase(SegmentType.BOOLEAN, {}, False, "Empty dict"),
]
def get_number_cases() -> list[ValidationTestCase]:
"""Get test cases for valid number values."""
return [
# valid values
ValidationTestCase(SegmentType.NUMBER, 42, True, "Positive integer"),
ValidationTestCase(SegmentType.NUMBER, -42, True, "Negative integer"),
ValidationTestCase(SegmentType.NUMBER, 0, True, "Zero integer"),
ValidationTestCase(SegmentType.NUMBER, 3.14, True, "Positive float"),
ValidationTestCase(SegmentType.NUMBER, -3.14, True, "Negative float"),
ValidationTestCase(SegmentType.NUMBER, 0.0, True, "Zero float"),
ValidationTestCase(SegmentType.NUMBER, float("inf"), True, "Positive infinity"),
ValidationTestCase(SegmentType.NUMBER, float("-inf"), True, "Negative infinity"),
ValidationTestCase(SegmentType.NUMBER, float("nan"), True, "float(NaN)"),
# invalid number values
ValidationTestCase(SegmentType.NUMBER, "42", False, "String number"),
ValidationTestCase(SegmentType.NUMBER, None, False, "None value"),
ValidationTestCase(SegmentType.NUMBER, [], False, "Empty list"),
ValidationTestCase(SegmentType.NUMBER, {}, False, "Empty dict"),
ValidationTestCase(SegmentType.NUMBER, "3.14", False, "String float"),
]
def get_string_cases() -> list[ValidationTestCase]:
"""Get test cases for valid string values."""
return [
# valid values
ValidationTestCase(SegmentType.STRING, "", True, "Empty string"),
ValidationTestCase(SegmentType.STRING, "hello", True, "Simple string"),
ValidationTestCase(SegmentType.STRING, "🚀", True, "Unicode emoji"),
ValidationTestCase(SegmentType.STRING, "line1\nline2", True, "Multiline string"),
# invalid values
ValidationTestCase(SegmentType.STRING, 123, False, "Integer"),
ValidationTestCase(SegmentType.STRING, 3.14, False, "Float"),
ValidationTestCase(SegmentType.STRING, True, False, "Boolean"),
ValidationTestCase(SegmentType.STRING, None, False, "None value"),
ValidationTestCase(SegmentType.STRING, [], False, "Empty list"),
ValidationTestCase(SegmentType.STRING, {}, False, "Empty dict"),
]
def get_object_cases() -> list[ValidationTestCase]:
"""Get test cases for valid object values."""
return [
# valid cases
ValidationTestCase(SegmentType.OBJECT, {}, True, "Empty dict"),
ValidationTestCase(SegmentType.OBJECT, {"key": "value"}, True, "Simple dict"),
ValidationTestCase(SegmentType.OBJECT, {"a": 1, "b": 2}, True, "Dict with numbers"),
ValidationTestCase(SegmentType.OBJECT, {"nested": {"key": "value"}}, True, "Nested dict"),
ValidationTestCase(SegmentType.OBJECT, {"list": [1, 2, 3]}, True, "Dict with list"),
ValidationTestCase(SegmentType.OBJECT, {"mixed": [1, "two", {"three": 3}]}, True, "Complex dict"),
# invalid cases
ValidationTestCase(SegmentType.OBJECT, "not a dict", False, "String"),
ValidationTestCase(SegmentType.OBJECT, 123, False, "Integer"),
ValidationTestCase(SegmentType.OBJECT, 3.14, False, "Float"),
ValidationTestCase(SegmentType.OBJECT, True, False, "Boolean"),
ValidationTestCase(SegmentType.OBJECT, None, False, "None value"),
ValidationTestCase(SegmentType.OBJECT, [], False, "Empty list"),
ValidationTestCase(SegmentType.OBJECT, [1, 2, 3], False, "List with values"),
]
def get_secret_cases() -> list[ValidationTestCase]:
"""Get test cases for valid secret values."""
return [
# valid cases
ValidationTestCase(SegmentType.SECRET, "", True, "Empty secret"),
ValidationTestCase(SegmentType.SECRET, "secret", True, "Simple secret"),
ValidationTestCase(SegmentType.SECRET, "api_key_123", True, "API key format"),
ValidationTestCase(SegmentType.SECRET, "very_long_secret_key_with_special_chars!@#", True, "Complex secret"),
# invalid cases
ValidationTestCase(SegmentType.SECRET, 123, False, "Integer"),
ValidationTestCase(SegmentType.SECRET, 3.14, False, "Float"),
ValidationTestCase(SegmentType.SECRET, True, False, "Boolean"),
ValidationTestCase(SegmentType.SECRET, None, False, "None value"),
ValidationTestCase(SegmentType.SECRET, [], False, "Empty list"),
ValidationTestCase(SegmentType.SECRET, {}, False, "Empty dict"),
]
def get_file_cases() -> list[ValidationTestCase]:
"""Get test cases for valid file values."""
test_file = create_test_file()
image_file = create_test_file(
file_type=FileType.IMAGE, filename="image.jpg", extension=".jpg", mime_type="image/jpeg"
)
remote_file = create_test_file(
transfer_method=FileTransferMethod.REMOTE_URL, filename="remote.pdf", extension=".pdf"
)
return [
# valid cases
ValidationTestCase(SegmentType.FILE, test_file, True, "Document file"),
ValidationTestCase(SegmentType.FILE, image_file, True, "Image file"),
ValidationTestCase(SegmentType.FILE, remote_file, True, "Remote file"),
# invalid cases
ValidationTestCase(SegmentType.FILE, "not a file", False, "String"),
ValidationTestCase(SegmentType.FILE, 123, False, "Integer"),
ValidationTestCase(SegmentType.FILE, {"filename": "test.txt"}, False, "Dict resembling file"),
ValidationTestCase(SegmentType.FILE, None, False, "None value"),
ValidationTestCase(SegmentType.FILE, [], False, "Empty list"),
ValidationTestCase(SegmentType.FILE, True, False, "Boolean"),
]
def get_none_cases() -> list[ValidationTestCase]:
"""Get test cases for valid none values."""
return [
# valid cases
ValidationTestCase(SegmentType.NONE, None, True, "None value"),
# invalid cases
ValidationTestCase(SegmentType.NONE, "", False, "Empty string"),
ValidationTestCase(SegmentType.NONE, 0, False, "Zero integer"),
ValidationTestCase(SegmentType.NONE, 0.0, False, "Zero float"),
ValidationTestCase(SegmentType.NONE, False, False, "False boolean"),
ValidationTestCase(SegmentType.NONE, [], False, "Empty list"),
ValidationTestCase(SegmentType.NONE, {}, False, "Empty dict"),
ValidationTestCase(SegmentType.NONE, "null", False, "String 'null'"),
]
def get_array_any_validation_cases() -> list[ArrayValidationTestCase]:
"""Get test cases for ARRAY_ANY validation."""
return [
ArrayValidationTestCase(
SegmentType.ARRAY_ANY,
[1, "string", 3.14, {"key": "value"}, True],
ArrayValidation.NONE,
True,
"Mixed types with NONE validation",
),
ArrayValidationTestCase(
SegmentType.ARRAY_ANY,
[1, "string", 3.14, {"key": "value"}, True],
ArrayValidation.FIRST,
True,
"Mixed types with FIRST validation",
),
ArrayValidationTestCase(
SegmentType.ARRAY_ANY,
[1, "string", 3.14, {"key": "value"}, True],
ArrayValidation.ALL,
True,
"Mixed types with ALL validation",
),
ArrayValidationTestCase(
SegmentType.ARRAY_ANY, [None, None, None], ArrayValidation.ALL, True, "All None values"
),
]
def get_array_string_validation_none_cases() -> list[ArrayValidationTestCase]:
"""Get test cases for ARRAY_STRING validation with NONE strategy."""
return [
ArrayValidationTestCase(
SegmentType.ARRAY_STRING,
["hello", "world"],
ArrayValidation.NONE,
True,
"Valid strings with NONE validation",
),
ArrayValidationTestCase(
SegmentType.ARRAY_STRING,
[123, 456],
ArrayValidation.NONE,
True,
"Invalid elements with NONE validation",
),
ArrayValidationTestCase(
SegmentType.ARRAY_STRING,
["valid", 123, True],
ArrayValidation.NONE,
True,
"Mixed types with NONE validation",
),
]
def get_array_string_validation_first_cases() -> list[ArrayValidationTestCase]:
"""Get test cases for ARRAY_STRING validation with FIRST strategy."""
return [
ArrayValidationTestCase(
SegmentType.ARRAY_STRING, ["hello", "world"], ArrayValidation.FIRST, True, "All valid strings"
),
ArrayValidationTestCase(
SegmentType.ARRAY_STRING,
["hello", 123, True],
ArrayValidation.FIRST,
True,
"First valid, others invalid",
),
ArrayValidationTestCase(
SegmentType.ARRAY_STRING,
[123, "hello", "world"],
ArrayValidation.FIRST,
False,
"First invalid, others valid",
),
ArrayValidationTestCase(SegmentType.ARRAY_STRING, [None, "hello"], ArrayValidation.FIRST, False, "First None"),
]
def get_array_string_validation_all_cases() -> list[ArrayValidationTestCase]:
"""Get test cases for ARRAY_STRING validation with ALL strategy."""
return [
ArrayValidationTestCase(
SegmentType.ARRAY_STRING, ["hello", "world", "test"], ArrayValidation.ALL, True, "All valid strings"
),
ArrayValidationTestCase(
SegmentType.ARRAY_STRING, ["hello", 123, "world"], ArrayValidation.ALL, False, "One invalid element"
),
ArrayValidationTestCase(
SegmentType.ARRAY_STRING, [123, 456, 789], ArrayValidation.ALL, False, "All invalid elements"
),
ArrayValidationTestCase(
SegmentType.ARRAY_STRING, ["valid", None, "also_valid"], ArrayValidation.ALL, False, "Contains None"
),
]
def get_array_number_validation_cases() -> list[ArrayValidationTestCase]:
"""Get test cases for ARRAY_NUMBER validation with different strategies."""
return [
# NONE strategy
ArrayValidationTestCase(
SegmentType.ARRAY_NUMBER, [1, 2.5, 3], ArrayValidation.NONE, True, "Valid numbers with NONE"
),
ArrayValidationTestCase(
SegmentType.ARRAY_NUMBER, ["not", "numbers"], ArrayValidation.NONE, True, "Invalid elements with NONE"
),
# FIRST strategy
ArrayValidationTestCase(
SegmentType.ARRAY_NUMBER, [42, "not a number"], ArrayValidation.FIRST, True, "First valid number"
),
ArrayValidationTestCase(
SegmentType.ARRAY_NUMBER, ["not a number", 42], ArrayValidation.FIRST, False, "First invalid"
),
ArrayValidationTestCase(
SegmentType.ARRAY_NUMBER, [3.14, 2.71, 1.41], ArrayValidation.FIRST, True, "All valid floats"
),
# ALL strategy
ArrayValidationTestCase(
SegmentType.ARRAY_NUMBER, [1, 2, 3, 4.5], ArrayValidation.ALL, True, "All valid numbers"
),
ArrayValidationTestCase(
SegmentType.ARRAY_NUMBER, [1, "invalid", 3], ArrayValidation.ALL, False, "One invalid element"
),
ArrayValidationTestCase(
SegmentType.ARRAY_NUMBER,
[float("inf"), float("-inf"), float("nan")],
ArrayValidation.ALL,
True,
"Special float values",
),
]
def get_array_object_validation_cases() -> list[ArrayValidationTestCase]:
"""Get test cases for ARRAY_OBJECT validation with different strategies."""
return [
# NONE strategy
ArrayValidationTestCase(
SegmentType.ARRAY_OBJECT, [{}, {"key": "value"}], ArrayValidation.NONE, True, "Valid objects with NONE"
),
ArrayValidationTestCase(
SegmentType.ARRAY_OBJECT, ["not", "objects"], ArrayValidation.NONE, True, "Invalid elements with NONE"
),
# FIRST strategy
ArrayValidationTestCase(
SegmentType.ARRAY_OBJECT,
[{"valid": "object"}, "not an object"],
ArrayValidation.FIRST,
True,
"First valid object",
),
ArrayValidationTestCase(
SegmentType.ARRAY_OBJECT,
["not an object", {"valid": "object"}],
ArrayValidation.FIRST,
False,
"First invalid",
),
# ALL strategy
ArrayValidationTestCase(
SegmentType.ARRAY_OBJECT,
[{}, {"a": 1}, {"nested": {"key": "value"}}],
ArrayValidation.ALL,
True,
"All valid objects",
),
ArrayValidationTestCase(
SegmentType.ARRAY_OBJECT,
[{"valid": "object"}, "invalid", {"another": "object"}],
ArrayValidation.ALL,
False,
"One invalid element",
),
]
def get_array_file_validation_cases() -> list[ArrayValidationTestCase]:
"""Get test cases for ARRAY_FILE validation with different strategies."""
file1 = create_test_file(filename="file1.txt")
file2 = create_test_file(filename="file2.txt")
return [
# NONE strategy
ArrayValidationTestCase(
SegmentType.ARRAY_FILE, [file1, file2], ArrayValidation.NONE, True, "Valid files with NONE"
),
ArrayValidationTestCase(
SegmentType.ARRAY_FILE, ["not", "files"], ArrayValidation.NONE, True, "Invalid elements with NONE"
),
# FIRST strategy
ArrayValidationTestCase(
SegmentType.ARRAY_FILE, [file1, "not a file"], ArrayValidation.FIRST, True, "First valid file"
),
ArrayValidationTestCase(
SegmentType.ARRAY_FILE, ["not a file", file1], ArrayValidation.FIRST, False, "First invalid"
),
# ALL strategy
ArrayValidationTestCase(SegmentType.ARRAY_FILE, [file1, file2], ArrayValidation.ALL, True, "All valid files"),
ArrayValidationTestCase(
SegmentType.ARRAY_FILE, [file1, "invalid", file2], ArrayValidation.ALL, False, "One invalid element"
),
]
def get_array_boolean_validation_cases() -> list[ArrayValidationTestCase]:
"""Get test cases for ARRAY_BOOLEAN validation with different strategies."""
return [
# NONE strategy
ArrayValidationTestCase(
SegmentType.ARRAY_BOOLEAN, [True, False, True], ArrayValidation.NONE, True, "Valid booleans with NONE"
),
ArrayValidationTestCase(
SegmentType.ARRAY_BOOLEAN, [1, 0, "true"], ArrayValidation.NONE, True, "Invalid elements with NONE"
),
# FIRST strategy
ArrayValidationTestCase(
SegmentType.ARRAY_BOOLEAN, [True, 1, 0], ArrayValidation.FIRST, True, "First valid boolean"
),
ArrayValidationTestCase(
SegmentType.ARRAY_BOOLEAN, [1, True, False], ArrayValidation.FIRST, False, "First invalid (integer 1)"
),
ArrayValidationTestCase(
SegmentType.ARRAY_BOOLEAN, [0, True, False], ArrayValidation.FIRST, False, "First invalid (integer 0)"
),
# ALL strategy
ArrayValidationTestCase(
SegmentType.ARRAY_BOOLEAN, [True, False, True, False], ArrayValidation.ALL, True, "All valid booleans"
),
ArrayValidationTestCase(
SegmentType.ARRAY_BOOLEAN, [True, 1, False], ArrayValidation.ALL, False, "One invalid element (integer)"
),
ArrayValidationTestCase(
SegmentType.ARRAY_BOOLEAN,
[True, "false", False],
ArrayValidation.ALL,
False,
"One invalid element (string)",
),
]
class TestSegmentTypeIsValid:
"""Test suite for SegmentType.is_valid method covering all non-array types."""
@pytest.mark.parametrize("case", get_boolean_cases(), ids=lambda case: case.description)
def test_boolean_validation(self, case):
assert case.segment_type.is_valid(case.value) == case.expected
@pytest.mark.parametrize("case", get_number_cases(), ids=lambda case: case.description)
def test_number_validation(self, case: ValidationTestCase):
assert case.segment_type.is_valid(case.value) == case.expected
@pytest.mark.parametrize("case", get_string_cases(), ids=lambda case: case.description)
def test_string_validation(self, case):
assert case.segment_type.is_valid(case.value) == case.expected
@pytest.mark.parametrize("case", get_object_cases(), ids=lambda case: case.description)
def test_object_validation(self, case):
assert case.segment_type.is_valid(case.value) == case.expected
@pytest.mark.parametrize("case", get_secret_cases(), ids=lambda case: case.description)
def test_secret_validation(self, case):
assert case.segment_type.is_valid(case.value) == case.expected
@pytest.mark.parametrize("case", get_file_cases(), ids=lambda case: case.description)
def test_file_validation(self, case):
assert case.segment_type.is_valid(case.value) == case.expected
@pytest.mark.parametrize("case", get_none_cases(), ids=lambda case: case.description)
def test_none_validation_valid_cases(self, case):
assert case.segment_type.is_valid(case.value) == case.expected
def test_unsupported_segment_type_raises_assertion_error(self):
"""Test that unsupported SegmentType values raise AssertionError."""
# GROUP is not handled in is_valid method
with pytest.raises(AssertionError, match="this statement should be unreachable"):
SegmentType.GROUP.is_valid("any value")
class TestSegmentTypeArrayValidation:
"""Test suite for SegmentType._validate_array method and array type validation."""
def test_array_validation_non_list_values(self):
"""Test that non-list values return False for all array types."""
array_types = [
SegmentType.ARRAY_ANY,
SegmentType.ARRAY_STRING,
SegmentType.ARRAY_NUMBER,
SegmentType.ARRAY_OBJECT,
SegmentType.ARRAY_FILE,
SegmentType.ARRAY_BOOLEAN,
]
non_list_values = [
"not a list",
123,
3.14,
True,
None,
{"key": "value"},
create_test_file(),
]
for array_type in array_types:
for value in non_list_values:
assert array_type.is_valid(value) is False, f"{array_type} should reject {type(value).__name__}"
def test_empty_array_validation(self):
"""Test that empty arrays are valid for all array types regardless of validation strategy."""
array_types = [
SegmentType.ARRAY_ANY,
SegmentType.ARRAY_STRING,
SegmentType.ARRAY_NUMBER,
SegmentType.ARRAY_OBJECT,
SegmentType.ARRAY_FILE,
SegmentType.ARRAY_BOOLEAN,
]
validation_strategies = [ArrayValidation.NONE, ArrayValidation.FIRST, ArrayValidation.ALL]
for array_type in array_types:
for strategy in validation_strategies:
assert array_type.is_valid([], strategy) is True, (
f"{array_type} should accept empty array with {strategy}"
)
@pytest.mark.parametrize("case", get_array_any_validation_cases(), ids=lambda case: case.description)
def test_array_any_validation(self, case):
"""Test ARRAY_ANY validation accepts any list regardless of content."""
assert case.segment_type.is_valid(case.value, case.array_validation) == case.expected
@pytest.mark.parametrize("case", get_array_string_validation_none_cases(), ids=lambda case: case.description)
def test_array_string_validation_with_none_strategy(self, case):
"""Test ARRAY_STRING validation with NONE strategy (no element validation)."""
assert case.segment_type.is_valid(case.value, case.array_validation) == case.expected
@pytest.mark.parametrize("case", get_array_string_validation_first_cases(), ids=lambda case: case.description)
def test_array_string_validation_with_first_strategy(self, case):
"""Test ARRAY_STRING validation with FIRST strategy (validate first element only)."""
assert case.segment_type.is_valid(case.value, case.array_validation) == case.expected
@pytest.mark.parametrize("case", get_array_string_validation_all_cases(), ids=lambda case: case.description)
def test_array_string_validation_with_all_strategy(self, case):
"""Test ARRAY_STRING validation with ALL strategy (validate all elements)."""
assert case.segment_type.is_valid(case.value, case.array_validation) == case.expected
@pytest.mark.parametrize("case", get_array_number_validation_cases(), ids=lambda case: case.description)
def test_array_number_validation_with_different_strategies(self, case):
"""Test ARRAY_NUMBER validation with different validation strategies."""
assert case.segment_type.is_valid(case.value, case.array_validation) == case.expected
@pytest.mark.parametrize("case", get_array_object_validation_cases(), ids=lambda case: case.description)
def test_array_object_validation_with_different_strategies(self, case):
"""Test ARRAY_OBJECT validation with different validation strategies."""
assert case.segment_type.is_valid(case.value, case.array_validation) == case.expected
@pytest.mark.parametrize("case", get_array_file_validation_cases(), ids=lambda case: case.description)
def test_array_file_validation_with_different_strategies(self, case):
"""Test ARRAY_FILE validation with different validation strategies."""
assert case.segment_type.is_valid(case.value, case.array_validation) == case.expected
@pytest.mark.parametrize("case", get_array_boolean_validation_cases(), ids=lambda case: case.description)
def test_array_boolean_validation_with_different_strategies(self, case):
"""Test ARRAY_BOOLEAN validation with different validation strategies."""
assert case.segment_type.is_valid(case.value, case.array_validation) == case.expected
def test_default_array_validation_strategy(self):
"""Test that default array validation strategy is FIRST."""
# When no array_validation parameter is provided, it should default to FIRST
assert SegmentType.ARRAY_STRING.is_valid(["valid", 123]) is False # First element valid
assert SegmentType.ARRAY_STRING.is_valid([123, "valid"]) is False # First element invalid
assert SegmentType.ARRAY_NUMBER.is_valid([42, "invalid"]) is False # First element valid
assert SegmentType.ARRAY_NUMBER.is_valid(["invalid", 42]) is False # First element invalid
def test_array_validation_edge_cases(self):
"""Test edge cases for array validation."""
# Test with nested arrays (should be invalid for specific array types)
nested_array = [["nested", "array"], ["another", "nested"]]
assert SegmentType.ARRAY_STRING.is_valid(nested_array, ArrayValidation.FIRST) is False
assert SegmentType.ARRAY_STRING.is_valid(nested_array, ArrayValidation.ALL) is False
assert SegmentType.ARRAY_ANY.is_valid(nested_array, ArrayValidation.ALL) is True
# Test with very large arrays (performance consideration)
large_valid_array = ["string"] * 1000
large_mixed_array = ["string"] * 999 + [123] # Last element invalid
assert SegmentType.ARRAY_STRING.is_valid(large_valid_array, ArrayValidation.ALL) is True
assert SegmentType.ARRAY_STRING.is_valid(large_mixed_array, ArrayValidation.ALL) is False
assert SegmentType.ARRAY_STRING.is_valid(large_mixed_array, ArrayValidation.FIRST) is True
class TestSegmentTypeValidationIntegration:
"""Integration tests for SegmentType validation covering interactions between methods."""
def test_non_array_types_ignore_array_validation_parameter(self):
"""Test that non-array types ignore the array_validation parameter."""
non_array_types = [
SegmentType.STRING,
SegmentType.NUMBER,
SegmentType.BOOLEAN,
SegmentType.OBJECT,
SegmentType.SECRET,
SegmentType.FILE,
SegmentType.NONE,
]
for segment_type in non_array_types:
# Create appropriate valid value for each type
valid_value: Any
if segment_type == SegmentType.STRING:
valid_value = "test"
elif segment_type == SegmentType.NUMBER:
valid_value = 42
elif segment_type == SegmentType.BOOLEAN:
valid_value = True
elif segment_type == SegmentType.OBJECT:
valid_value = {"key": "value"}
elif segment_type == SegmentType.SECRET:
valid_value = "secret"
elif segment_type == SegmentType.FILE:
valid_value = create_test_file()
elif segment_type == SegmentType.NONE:
valid_value = None
else:
continue # Skip unsupported types
# All array validation strategies should give the same result
result_none = segment_type.is_valid(valid_value, ArrayValidation.NONE)
result_first = segment_type.is_valid(valid_value, ArrayValidation.FIRST)
result_all = segment_type.is_valid(valid_value, ArrayValidation.ALL)
assert result_none == result_first == result_all == True, (
f"{segment_type} should ignore array_validation parameter"
)
def test_comprehensive_type_coverage(self):
"""Test that all SegmentType enum values are covered in validation tests."""
all_segment_types = set(SegmentType)
# Types that should be handled by is_valid method
handled_types = {
# Non-array types
SegmentType.STRING,
SegmentType.NUMBER,
SegmentType.BOOLEAN,
SegmentType.OBJECT,
SegmentType.SECRET,
SegmentType.FILE,
SegmentType.NONE,
# Array types
SegmentType.ARRAY_ANY,
SegmentType.ARRAY_STRING,
SegmentType.ARRAY_NUMBER,
SegmentType.ARRAY_OBJECT,
SegmentType.ARRAY_FILE,
SegmentType.ARRAY_BOOLEAN,
}
# Types that are not handled by is_valid (should raise AssertionError)
unhandled_types = {
SegmentType.GROUP,
SegmentType.INTEGER, # Handled by NUMBER validation logic
SegmentType.FLOAT, # Handled by NUMBER validation logic
}
# Verify all types are accounted for
assert handled_types | unhandled_types == all_segment_types, "All SegmentType values should be categorized"
# Test that handled types work correctly
for segment_type in handled_types:
if segment_type.is_array_type():
# Test with empty array (should always be valid)
assert segment_type.is_valid([]) is True, f"{segment_type} should accept empty array"
else:
# Test with appropriate valid value
if segment_type == SegmentType.STRING:
assert segment_type.is_valid("test") is True
elif segment_type == SegmentType.NUMBER:
assert segment_type.is_valid(42) is True
elif segment_type == SegmentType.BOOLEAN:
assert segment_type.is_valid(True) is True
elif segment_type == SegmentType.OBJECT:
assert segment_type.is_valid({}) is True
elif segment_type == SegmentType.SECRET:
assert segment_type.is_valid("secret") is True
elif segment_type == SegmentType.FILE:
assert segment_type.is_valid(create_test_file()) is True
elif segment_type == SegmentType.NONE:
assert segment_type.is_valid(None) is True
def test_boolean_vs_integer_type_distinction(self):
"""Test the important distinction between boolean and integer types in validation."""
# This tests the comment in the code about bool being a subclass of int
# Boolean type should only accept actual booleans, not integers
assert SegmentType.BOOLEAN.is_valid(True) is True
assert SegmentType.BOOLEAN.is_valid(False) is True
assert SegmentType.BOOLEAN.is_valid(1) is False # Integer 1, not boolean
assert SegmentType.BOOLEAN.is_valid(0) is False # Integer 0, not boolean
# Number type should accept both integers and floats, including booleans (since bool is subclass of int)
assert SegmentType.NUMBER.is_valid(42) is True
assert SegmentType.NUMBER.is_valid(3.14) is True
assert SegmentType.NUMBER.is_valid(True) is True # bool is subclass of int
assert SegmentType.NUMBER.is_valid(False) is True # bool is subclass of int
def test_array_validation_recursive_behavior(self):
"""Test that array validation correctly handles recursive validation calls."""
# When validating array elements, _validate_array calls is_valid recursively
# with ArrayValidation.NONE to avoid infinite recursion
# Test nested validation doesn't cause issues
nested_arrays = [["inner", "array"], ["another", "inner"]]
# ARRAY_ANY should accept nested arrays
assert SegmentType.ARRAY_ANY.is_valid(nested_arrays, ArrayValidation.ALL) is True
# ARRAY_STRING should reject nested arrays (first element is not a string)
assert SegmentType.ARRAY_STRING.is_valid(nested_arrays, ArrayValidation.FIRST) is False
assert SegmentType.ARRAY_STRING.is_valid(nested_arrays, ArrayValidation.ALL) is False

View File

@@ -0,0 +1,27 @@
from core.variables.types import SegmentType
from core.workflow.nodes.parameter_extractor.entities import ParameterConfig
class TestParameterConfig:
def test_select_type(self):
data = {
"name": "yes_or_no",
"type": "select",
"options": ["yes", "no"],
"description": "a simple select made of `yes` and `no`",
"required": True,
}
pc = ParameterConfig.model_validate(data)
assert pc.type == SegmentType.STRING
assert pc.options == data["options"]
def test_validate_bool_type(self):
data = {
"name": "boolean",
"type": "bool",
"description": "a simple boolean parameter",
"required": True,
}
pc = ParameterConfig.model_validate(data)
assert pc.type == SegmentType.BOOLEAN

View File

@@ -0,0 +1,567 @@
"""
Test cases for ParameterExtractorNode._validate_result and _transform_result methods.
"""
from dataclasses import dataclass
from typing import Any
import pytest
from core.model_runtime.entities import LLMMode
from core.variables.types import SegmentType
from core.workflow.nodes.llm import ModelConfig, VisionConfig
from core.workflow.nodes.parameter_extractor.entities import ParameterConfig, ParameterExtractorNodeData
from core.workflow.nodes.parameter_extractor.exc import (
InvalidNumberOfParametersError,
InvalidSelectValueError,
InvalidValueTypeError,
RequiredParameterMissingError,
)
from core.workflow.nodes.parameter_extractor.parameter_extractor_node import ParameterExtractorNode
from factories.variable_factory import build_segment_with_type
@dataclass
class ValidTestCase:
"""Test case data for valid scenarios."""
name: str
parameters: list[ParameterConfig]
result: dict[str, Any]
def get_name(self) -> str:
return self.name
@dataclass
class ErrorTestCase:
"""Test case data for error scenarios."""
name: str
parameters: list[ParameterConfig]
result: dict[str, Any]
expected_exception: type[Exception]
expected_message: str
def get_name(self) -> str:
return self.name
@dataclass
class TransformTestCase:
"""Test case data for transformation scenarios."""
name: str
parameters: list[ParameterConfig]
input_result: dict[str, Any]
expected_result: dict[str, Any]
def get_name(self) -> str:
return self.name
class TestParameterExtractorNodeMethods:
"""Test helper class that provides access to the methods under test."""
def validate_result(self, data: ParameterExtractorNodeData, result: dict[str, Any]) -> dict[str, Any]:
"""Wrapper to call _validate_result method."""
node = ParameterExtractorNode.__new__(ParameterExtractorNode)
return node._validate_result(data=data, result=result)
def transform_result(self, data: ParameterExtractorNodeData, result: dict[str, Any]) -> dict[str, Any]:
"""Wrapper to call _transform_result method."""
node = ParameterExtractorNode.__new__(ParameterExtractorNode)
return node._transform_result(data=data, result=result)
class TestValidateResult:
"""Test cases for _validate_result method."""
@staticmethod
def get_valid_test_cases() -> list[ValidTestCase]:
"""Get test cases that should pass validation."""
return [
ValidTestCase(
name="single_string_parameter",
parameters=[ParameterConfig(name="name", type=SegmentType.STRING, description="Name", required=True)],
result={"name": "John"},
),
ValidTestCase(
name="single_number_parameter_int",
parameters=[ParameterConfig(name="age", type=SegmentType.NUMBER, description="Age", required=True)],
result={"age": 25},
),
ValidTestCase(
name="single_number_parameter_float",
parameters=[ParameterConfig(name="price", type=SegmentType.NUMBER, description="Price", required=True)],
result={"price": 19.99},
),
ValidTestCase(
name="single_bool_parameter_true",
parameters=[
ParameterConfig(name="active", type=SegmentType.BOOLEAN, description="Active", required=True)
],
result={"active": True},
),
ValidTestCase(
name="single_bool_parameter_true",
parameters=[
ParameterConfig(name="active", type=SegmentType.BOOLEAN, description="Active", required=True)
],
result={"active": True},
),
ValidTestCase(
name="single_bool_parameter_false",
parameters=[
ParameterConfig(name="active", type=SegmentType.BOOLEAN, description="Active", required=True)
],
result={"active": False},
),
ValidTestCase(
name="select_parameter_valid_option",
parameters=[
ParameterConfig(
name="status",
type="select", # pyright: ignore[reportArgumentType]
description="Status",
required=True,
options=["active", "inactive"],
)
],
result={"status": "active"},
),
ValidTestCase(
name="array_string_parameter",
parameters=[
ParameterConfig(name="tags", type=SegmentType.ARRAY_STRING, description="Tags", required=True)
],
result={"tags": ["tag1", "tag2", "tag3"]},
),
ValidTestCase(
name="array_number_parameter",
parameters=[
ParameterConfig(name="scores", type=SegmentType.ARRAY_NUMBER, description="Scores", required=True)
],
result={"scores": [85, 92.5, 78]},
),
ValidTestCase(
name="array_object_parameter",
parameters=[
ParameterConfig(name="items", type=SegmentType.ARRAY_OBJECT, description="Items", required=True)
],
result={"items": [{"name": "item1"}, {"name": "item2"}]},
),
ValidTestCase(
name="multiple_parameters",
parameters=[
ParameterConfig(name="name", type=SegmentType.STRING, description="Name", required=True),
ParameterConfig(name="age", type=SegmentType.NUMBER, description="Age", required=True),
ParameterConfig(name="active", type=SegmentType.BOOLEAN, description="Active", required=True),
],
result={"name": "John", "age": 25, "active": True},
),
ValidTestCase(
name="optional_parameter_present",
parameters=[
ParameterConfig(name="name", type=SegmentType.STRING, description="Name", required=True),
ParameterConfig(name="nickname", type=SegmentType.STRING, description="Nickname", required=False),
],
result={"name": "John", "nickname": "Johnny"},
),
ValidTestCase(
name="empty_array_parameter",
parameters=[
ParameterConfig(name="tags", type=SegmentType.ARRAY_STRING, description="Tags", required=True)
],
result={"tags": []},
),
]
@staticmethod
def get_error_test_cases() -> list[ErrorTestCase]:
"""Get test cases that should raise exceptions."""
return [
ErrorTestCase(
name="invalid_number_of_parameters_too_few",
parameters=[
ParameterConfig(name="name", type=SegmentType.STRING, description="Name", required=True),
ParameterConfig(name="age", type=SegmentType.NUMBER, description="Age", required=True),
],
result={"name": "John"},
expected_exception=InvalidNumberOfParametersError,
expected_message="Invalid number of parameters",
),
ErrorTestCase(
name="invalid_number_of_parameters_too_many",
parameters=[ParameterConfig(name="name", type=SegmentType.STRING, description="Name", required=True)],
result={"name": "John", "age": 25},
expected_exception=InvalidNumberOfParametersError,
expected_message="Invalid number of parameters",
),
ErrorTestCase(
name="invalid_string_value_none",
parameters=[
ParameterConfig(name="name", type=SegmentType.STRING, description="Name", required=True),
],
result={"name": None}, # Parameter present but None value, will trigger type check first
expected_exception=InvalidValueTypeError,
expected_message="Invalid value for parameter name, expected segment type: string, actual_type: none",
),
ErrorTestCase(
name="invalid_select_value",
parameters=[
ParameterConfig(
name="status",
type="select", # type: ignore
description="Status",
required=True,
options=["active", "inactive"],
)
],
result={"status": "pending"},
expected_exception=InvalidSelectValueError,
expected_message="Invalid `select` value for parameter status",
),
ErrorTestCase(
name="invalid_number_value_string",
parameters=[ParameterConfig(name="age", type=SegmentType.NUMBER, description="Age", required=True)],
result={"age": "twenty-five"},
expected_exception=InvalidValueTypeError,
expected_message="Invalid value for parameter age, expected segment type: number, actual_type: string",
),
ErrorTestCase(
name="invalid_bool_value_string",
parameters=[
ParameterConfig(name="active", type=SegmentType.BOOLEAN, description="Active", required=True)
],
result={"active": "yes"},
expected_exception=InvalidValueTypeError,
expected_message=(
"Invalid value for parameter active, expected segment type: boolean, actual_type: string"
),
),
ErrorTestCase(
name="invalid_string_value_number",
parameters=[
ParameterConfig(
name="description", type=SegmentType.STRING, description="Description", required=True
)
],
result={"description": 123},
expected_exception=InvalidValueTypeError,
expected_message=(
"Invalid value for parameter description, expected segment type: string, actual_type: integer"
),
),
ErrorTestCase(
name="invalid_array_value_not_list",
parameters=[
ParameterConfig(name="tags", type=SegmentType.ARRAY_STRING, description="Tags", required=True)
],
result={"tags": "tag1,tag2,tag3"},
expected_exception=InvalidValueTypeError,
expected_message=(
"Invalid value for parameter tags, expected segment type: array[string], actual_type: string"
),
),
ErrorTestCase(
name="invalid_array_number_wrong_element_type",
parameters=[
ParameterConfig(name="scores", type=SegmentType.ARRAY_NUMBER, description="Scores", required=True)
],
result={"scores": [85, "ninety-two", 78]},
expected_exception=InvalidValueTypeError,
expected_message=(
"Invalid value for parameter scores, expected segment type: array[number], actual_type: array[any]"
),
),
ErrorTestCase(
name="invalid_array_string_wrong_element_type",
parameters=[
ParameterConfig(name="tags", type=SegmentType.ARRAY_STRING, description="Tags", required=True)
],
result={"tags": ["tag1", 123, "tag3"]},
expected_exception=InvalidValueTypeError,
expected_message=(
"Invalid value for parameter tags, expected segment type: array[string], actual_type: array[any]"
),
),
ErrorTestCase(
name="invalid_array_object_wrong_element_type",
parameters=[
ParameterConfig(name="items", type=SegmentType.ARRAY_OBJECT, description="Items", required=True)
],
result={"items": [{"name": "item1"}, "item2"]},
expected_exception=InvalidValueTypeError,
expected_message=(
"Invalid value for parameter items, expected segment type: array[object], actual_type: array[any]"
),
),
ErrorTestCase(
name="required_parameter_missing",
parameters=[
ParameterConfig(name="name", type=SegmentType.STRING, description="Name", required=True),
ParameterConfig(name="age", type=SegmentType.NUMBER, description="Age", required=False),
],
result={"age": 25, "other": "value"}, # Missing required 'name' parameter, but has correct count
expected_exception=RequiredParameterMissingError,
expected_message="Parameter name is required",
),
]
@pytest.mark.parametrize("test_case", get_valid_test_cases(), ids=ValidTestCase.get_name)
def test_validate_result_valid_cases(self, test_case):
"""Test _validate_result with valid inputs."""
helper = TestParameterExtractorNodeMethods()
node_data = ParameterExtractorNodeData(
title="Test Node",
model=ModelConfig(provider="openai", name="gpt-3.5-turbo", mode=LLMMode.CHAT, completion_params={}),
query=["test_query"],
parameters=test_case.parameters,
reasoning_mode="function_call",
vision=VisionConfig(),
)
result = helper.validate_result(data=node_data, result=test_case.result)
assert result == test_case.result, f"Failed for case: {test_case.name}"
@pytest.mark.parametrize("test_case", get_error_test_cases(), ids=ErrorTestCase.get_name)
def test_validate_result_error_cases(self, test_case):
"""Test _validate_result with invalid inputs that should raise exceptions."""
helper = TestParameterExtractorNodeMethods()
node_data = ParameterExtractorNodeData(
title="Test Node",
model=ModelConfig(provider="openai", name="gpt-3.5-turbo", mode=LLMMode.CHAT, completion_params={}),
query=["test_query"],
parameters=test_case.parameters,
reasoning_mode="function_call",
vision=VisionConfig(),
)
with pytest.raises(test_case.expected_exception) as exc_info:
helper.validate_result(data=node_data, result=test_case.result)
assert test_case.expected_message in str(exc_info.value), f"Failed for case: {test_case.name}"
class TestTransformResult:
"""Test cases for _transform_result method."""
@staticmethod
def get_transform_test_cases() -> list[TransformTestCase]:
"""Get test cases for result transformation."""
return [
# String parameter transformation
TransformTestCase(
name="string_parameter_present",
parameters=[ParameterConfig(name="name", type=SegmentType.STRING, description="Name", required=True)],
input_result={"name": "John"},
expected_result={"name": "John"},
),
TransformTestCase(
name="string_parameter_missing",
parameters=[ParameterConfig(name="name", type=SegmentType.STRING, description="Name", required=True)],
input_result={},
expected_result={"name": ""},
),
# Number parameter transformation
TransformTestCase(
name="number_parameter_int_present",
parameters=[ParameterConfig(name="age", type=SegmentType.NUMBER, description="Age", required=True)],
input_result={"age": 25},
expected_result={"age": 25},
),
TransformTestCase(
name="number_parameter_float_present",
parameters=[ParameterConfig(name="price", type=SegmentType.NUMBER, description="Price", required=True)],
input_result={"price": 19.99},
expected_result={"price": 19.99},
),
TransformTestCase(
name="number_parameter_missing",
parameters=[ParameterConfig(name="age", type=SegmentType.NUMBER, description="Age", required=True)],
input_result={},
expected_result={"age": 0},
),
# Bool parameter transformation
TransformTestCase(
name="bool_parameter_missing",
parameters=[
ParameterConfig(name="active", type=SegmentType.BOOLEAN, description="Active", required=True)
],
input_result={},
expected_result={"active": False},
),
# Select parameter transformation
TransformTestCase(
name="select_parameter_present",
parameters=[
ParameterConfig(
name="status",
type="select", # type: ignore
description="Status",
required=True,
options=["active", "inactive"],
)
],
input_result={"status": "active"},
expected_result={"status": "active"},
),
TransformTestCase(
name="select_parameter_missing",
parameters=[
ParameterConfig(
name="status",
type="select", # type: ignore
description="Status",
required=True,
options=["active", "inactive"],
)
],
input_result={},
expected_result={"status": ""},
),
# Array parameter transformation - present cases
TransformTestCase(
name="array_string_parameter_present",
parameters=[
ParameterConfig(name="tags", type=SegmentType.ARRAY_STRING, description="Tags", required=True)
],
input_result={"tags": ["tag1", "tag2"]},
expected_result={
"tags": build_segment_with_type(segment_type=SegmentType.ARRAY_STRING, value=["tag1", "tag2"])
},
),
TransformTestCase(
name="array_number_parameter_present",
parameters=[
ParameterConfig(name="scores", type=SegmentType.ARRAY_NUMBER, description="Scores", required=True)
],
input_result={"scores": [85, 92.5]},
expected_result={
"scores": build_segment_with_type(segment_type=SegmentType.ARRAY_NUMBER, value=[85, 92.5])
},
),
TransformTestCase(
name="array_number_parameter_with_string_conversion",
parameters=[
ParameterConfig(name="scores", type=SegmentType.ARRAY_NUMBER, description="Scores", required=True)
],
input_result={"scores": [85, "92.5", "78"]},
expected_result={
"scores": build_segment_with_type(segment_type=SegmentType.ARRAY_NUMBER, value=[85, 92.5, 78])
},
),
TransformTestCase(
name="array_object_parameter_present",
parameters=[
ParameterConfig(name="items", type=SegmentType.ARRAY_OBJECT, description="Items", required=True)
],
input_result={"items": [{"name": "item1"}, {"name": "item2"}]},
expected_result={
"items": build_segment_with_type(
segment_type=SegmentType.ARRAY_OBJECT, value=[{"name": "item1"}, {"name": "item2"}]
)
},
),
# Array parameter transformation - missing cases
TransformTestCase(
name="array_string_parameter_missing",
parameters=[
ParameterConfig(name="tags", type=SegmentType.ARRAY_STRING, description="Tags", required=True)
],
input_result={},
expected_result={"tags": build_segment_with_type(segment_type=SegmentType.ARRAY_STRING, value=[])},
),
TransformTestCase(
name="array_number_parameter_missing",
parameters=[
ParameterConfig(name="scores", type=SegmentType.ARRAY_NUMBER, description="Scores", required=True)
],
input_result={},
expected_result={"scores": build_segment_with_type(segment_type=SegmentType.ARRAY_NUMBER, value=[])},
),
TransformTestCase(
name="array_object_parameter_missing",
parameters=[
ParameterConfig(name="items", type=SegmentType.ARRAY_OBJECT, description="Items", required=True)
],
input_result={},
expected_result={"items": build_segment_with_type(segment_type=SegmentType.ARRAY_OBJECT, value=[])},
),
# Multiple parameters transformation
TransformTestCase(
name="multiple_parameters_mixed",
parameters=[
ParameterConfig(name="name", type=SegmentType.STRING, description="Name", required=True),
ParameterConfig(name="age", type=SegmentType.NUMBER, description="Age", required=True),
ParameterConfig(name="active", type=SegmentType.BOOLEAN, description="Active", required=True),
ParameterConfig(name="tags", type=SegmentType.ARRAY_STRING, description="Tags", required=True),
],
input_result={"name": "John", "age": 25},
expected_result={
"name": "John",
"age": 25,
"active": False,
"tags": build_segment_with_type(segment_type=SegmentType.ARRAY_STRING, value=[]),
},
),
# Number parameter transformation with string conversion
TransformTestCase(
name="number_parameter_string_to_float",
parameters=[ParameterConfig(name="price", type=SegmentType.NUMBER, description="Price", required=True)],
input_result={"price": "19.99"},
expected_result={"price": 19.99}, # String not converted, falls back to default
),
TransformTestCase(
name="number_parameter_string_to_int",
parameters=[ParameterConfig(name="age", type=SegmentType.NUMBER, description="Age", required=True)],
input_result={"age": "25"},
expected_result={"age": 25}, # String not converted, falls back to default
),
TransformTestCase(
name="number_parameter_invalid_string",
parameters=[ParameterConfig(name="age", type=SegmentType.NUMBER, description="Age", required=True)],
input_result={"age": "invalid_number"},
expected_result={"age": 0}, # Invalid string conversion fails, falls back to default
),
TransformTestCase(
name="number_parameter_non_string_non_number",
parameters=[ParameterConfig(name="age", type=SegmentType.NUMBER, description="Age", required=True)],
input_result={"age": ["not_a_number"]}, # Non-string, non-number value
expected_result={"age": 0}, # Falls back to default
),
TransformTestCase(
name="array_number_parameter_with_invalid_string_conversion",
parameters=[
ParameterConfig(name="scores", type=SegmentType.ARRAY_NUMBER, description="Scores", required=True)
],
input_result={"scores": [85, "invalid", "78"]},
expected_result={
"scores": build_segment_with_type(
segment_type=SegmentType.ARRAY_NUMBER, value=[85, 78]
) # Invalid string skipped
},
),
]
@pytest.mark.parametrize("test_case", get_transform_test_cases(), ids=TransformTestCase.get_name)
def test_transform_result_cases(self, test_case):
"""Test _transform_result with various inputs."""
helper = TestParameterExtractorNodeMethods()
node_data = ParameterExtractorNodeData(
title="Test Node",
model=ModelConfig(provider="openai", name="gpt-3.5-turbo", mode=LLMMode.CHAT, completion_params={}),
query=["test_query"],
parameters=test_case.parameters,
reasoning_mode="function_call",
vision=VisionConfig(),
)
result = helper.transform_result(data=node_data, result=test_case.input_result)
assert result == test_case.expected_result, (
f"Failed for case: {test_case.name}. Expected: {test_case.expected_result}, Got: {result}"
)

View File

@@ -2,6 +2,8 @@ import time
import uuid
from unittest.mock import MagicMock, Mock
import pytest
from core.app.entities.app_invoke_entities import InvokeFrom
from core.file import File, FileTransferMethod, FileType
from core.variables import ArrayFileSegment
@@ -272,3 +274,220 @@ def test_array_file_contains_file_name():
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs["result"] is True
def _get_test_conditions() -> list:
conditions = [
# Test boolean "is" operator
{"comparison_operator": "is", "variable_selector": ["start", "bool_true"], "value": "true"},
# Test boolean "is not" operator
{"comparison_operator": "is not", "variable_selector": ["start", "bool_false"], "value": "true"},
# Test boolean "=" operator
{"comparison_operator": "=", "variable_selector": ["start", "bool_true"], "value": "1"},
# Test boolean "≠" operator
{"comparison_operator": "", "variable_selector": ["start", "bool_false"], "value": "1"},
# Test boolean "not null" operator
{"comparison_operator": "not null", "variable_selector": ["start", "bool_true"]},
# Test boolean array "contains" operator
{"comparison_operator": "contains", "variable_selector": ["start", "bool_array"], "value": "true"},
# Test boolean "in" operator
{
"comparison_operator": "in",
"variable_selector": ["start", "bool_true"],
"value": ["true", "false"],
},
]
return [Condition.model_validate(i) for i in conditions]
def _get_condition_test_id(c: Condition):
return c.comparison_operator
@pytest.mark.parametrize("condition", _get_test_conditions(), ids=_get_condition_test_id)
def test_execute_if_else_boolean_conditions(condition: Condition):
"""Test IfElseNode with boolean conditions using various operators"""
graph_config = {"edges": [], "nodes": [{"data": {"type": "start"}, "id": "start"}]}
graph = Graph.init(graph_config=graph_config)
init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_type=WorkflowType.WORKFLOW,
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)
# construct variable pool with boolean values
pool = VariablePool(
system_variables=SystemVariable(files=[], user_id="aaa"),
)
pool.add(["start", "bool_true"], True)
pool.add(["start", "bool_false"], False)
pool.add(["start", "bool_array"], [True, False, True])
pool.add(["start", "mixed_array"], [True, "false", 1, 0])
node_data = {
"title": "Boolean Test",
"type": "if-else",
"logical_operator": "and",
"conditions": [condition.model_dump()],
}
node = IfElseNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph=graph,
graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()),
config={"id": "if-else", "data": node_data},
)
node.init_node_data(node_data)
# Mock db.session.close()
db.session.close = MagicMock()
# execute node
result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs["result"] is True
def test_execute_if_else_boolean_false_conditions():
"""Test IfElseNode with boolean conditions that should evaluate to false"""
graph_config = {"edges": [], "nodes": [{"data": {"type": "start"}, "id": "start"}]}
graph = Graph.init(graph_config=graph_config)
init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_type=WorkflowType.WORKFLOW,
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)
# construct variable pool with boolean values
pool = VariablePool(
system_variables=SystemVariable(files=[], user_id="aaa"),
)
pool.add(["start", "bool_true"], True)
pool.add(["start", "bool_false"], False)
pool.add(["start", "bool_array"], [True, False, True])
node_data = {
"title": "Boolean False Test",
"type": "if-else",
"logical_operator": "or",
"conditions": [
# Test boolean "is" operator (should be false)
{"comparison_operator": "is", "variable_selector": ["start", "bool_true"], "value": "false"},
# Test boolean "=" operator (should be false)
{"comparison_operator": "=", "variable_selector": ["start", "bool_false"], "value": "1"},
# Test boolean "not contains" operator (should be false)
{
"comparison_operator": "not contains",
"variable_selector": ["start", "bool_array"],
"value": "true",
},
],
}
node = IfElseNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph=graph,
graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()),
config={
"id": "if-else",
"data": node_data,
},
)
node.init_node_data(node_data)
# Mock db.session.close()
db.session.close = MagicMock()
# execute node
result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs["result"] is False
def test_execute_if_else_boolean_cases_structure():
"""Test IfElseNode with boolean conditions using the new cases structure"""
graph_config = {"edges": [], "nodes": [{"data": {"type": "start"}, "id": "start"}]}
graph = Graph.init(graph_config=graph_config)
init_params = GraphInitParams(
tenant_id="1",
app_id="1",
workflow_type=WorkflowType.WORKFLOW,
workflow_id="1",
graph_config=graph_config,
user_id="1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
call_depth=0,
)
# construct variable pool with boolean values
pool = VariablePool(
system_variables=SystemVariable(files=[], user_id="aaa"),
)
pool.add(["start", "bool_true"], True)
pool.add(["start", "bool_false"], False)
node_data = {
"title": "Boolean Cases Test",
"type": "if-else",
"cases": [
{
"case_id": "true",
"logical_operator": "and",
"conditions": [
{
"comparison_operator": "is",
"variable_selector": ["start", "bool_true"],
"value": "true",
},
{
"comparison_operator": "is not",
"variable_selector": ["start", "bool_false"],
"value": "true",
},
],
}
],
}
node = IfElseNode(
id=str(uuid.uuid4()),
graph_init_params=init_params,
graph=graph,
graph_runtime_state=GraphRuntimeState(variable_pool=pool, start_at=time.perf_counter()),
config={"id": "if-else", "data": node_data},
)
node.init_node_data(node_data)
# Mock db.session.close()
db.session.close = MagicMock()
# execute node
result = node._run()
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs is not None
assert result.outputs["result"] is True
assert result.outputs["selected_case_id"] == "true"

View File

@@ -11,7 +11,8 @@ from core.workflow.nodes.list_operator.entities import (
FilterCondition,
Limit,
ListOperatorNodeData,
OrderBy,
Order,
OrderByConfig,
)
from core.workflow.nodes.list_operator.exc import InvalidKeyError
from core.workflow.nodes.list_operator.node import ListOperatorNode, _get_file_extract_string_func
@@ -27,7 +28,7 @@ def list_operator_node():
FilterCondition(key="type", comparison_operator="in", value=[FileType.IMAGE, FileType.DOCUMENT])
],
),
"order_by": OrderBy(enabled=False, value="asc"),
"order_by": OrderByConfig(enabled=False, value=Order.ASC),
"limit": Limit(enabled=False, size=0),
"extract_by": ExtractConfig(enabled=False, serial="1"),
"title": "Test Title",

View File

@@ -24,16 +24,18 @@ from core.variables.segments import (
ArrayNumberSegment,
ArrayObjectSegment,
ArrayStringSegment,
BooleanSegment,
FileSegment,
FloatSegment,
IntegerSegment,
NoneSegment,
ObjectSegment,
Segment,
StringSegment,
)
from core.variables.types import SegmentType
from factories import variable_factory
from factories.variable_factory import TypeMismatchError, build_segment_with_type
from factories.variable_factory import TypeMismatchError, build_segment, build_segment_with_type
def test_string_variable():
@@ -139,6 +141,26 @@ def test_array_number_variable():
assert isinstance(variable.value[1], float)
def test_build_segment_scalar_values():
@dataclass
class TestCase:
value: Any
expected: Segment
description: str
cases = [
TestCase(
value=True,
expected=BooleanSegment(value=True),
description="build_segment with boolean should yield BooleanSegment",
)
]
for idx, c in enumerate(cases, 1):
seg = build_segment(c.value)
assert seg == c.expected, f"Test case {idx} failed: {c.description}"
def test_array_object_variable():
mapping = {
"id": str(uuid4()),
@@ -847,15 +869,22 @@ class TestBuildSegmentValueErrors:
f"but got: {error_message}"
)
def test_build_segment_boolean_type_note(self):
"""Note: Boolean values are actually handled as integers in Python, so they don't raise ValueError."""
# Boolean values in Python are subclasses of int, so they get processed as integers
# True becomes IntegerSegment(value=1) and False becomes IntegerSegment(value=0)
def test_build_segment_boolean_type(self):
"""Test that Boolean values are correctly handled as boolean type, not integers."""
# Boolean values should now be processed as BooleanSegment, not IntegerSegment
# This is because the bool check now comes before the int check in build_segment
true_segment = variable_factory.build_segment(True)
false_segment = variable_factory.build_segment(False)
# Verify they are processed as integers, not as errors
assert true_segment.value == 1, "Test case 1 (boolean_true): Expected True to be processed as integer 1"
assert false_segment.value == 0, "Test case 2 (boolean_false): Expected False to be processed as integer 0"
assert true_segment.value_type == SegmentType.INTEGER
assert false_segment.value_type == SegmentType.INTEGER
# Verify they are processed as booleans, not integers
assert true_segment.value is True, "Test case 1 (boolean_true): Expected True to be processed as boolean True"
assert false_segment.value is False, (
"Test case 2 (boolean_false): Expected False to be processed as boolean False"
)
assert true_segment.value_type == SegmentType.BOOLEAN
assert false_segment.value_type == SegmentType.BOOLEAN
# Test array of booleans
bool_array_segment = variable_factory.build_segment([True, False, True])
assert bool_array_segment.value_type == SegmentType.ARRAY_BOOLEAN
assert bool_array_segment.value == [True, False, True]