test: add comprehensive unit tests for auth service module (#22662)
This commit is contained in:
49
api/tests/unit_tests/services/auth/test_api_key_auth_base.py
Normal file
49
api/tests/unit_tests/services/auth/test_api_key_auth_base.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import pytest
|
||||
|
||||
from services.auth.api_key_auth_base import ApiKeyAuthBase
|
||||
|
||||
|
||||
class ConcreteApiKeyAuth(ApiKeyAuthBase):
|
||||
"""Concrete implementation for testing abstract base class"""
|
||||
|
||||
def validate_credentials(self):
|
||||
return True
|
||||
|
||||
|
||||
class TestApiKeyAuthBase:
|
||||
def test_should_store_credentials_on_init(self):
|
||||
"""Test that credentials are properly stored during initialization"""
|
||||
credentials = {"api_key": "test_key", "auth_type": "bearer"}
|
||||
auth = ConcreteApiKeyAuth(credentials)
|
||||
assert auth.credentials == credentials
|
||||
|
||||
def test_should_not_instantiate_abstract_class(self):
|
||||
"""Test that ApiKeyAuthBase cannot be instantiated directly"""
|
||||
credentials = {"api_key": "test_key"}
|
||||
|
||||
with pytest.raises(TypeError) as exc_info:
|
||||
ApiKeyAuthBase(credentials)
|
||||
|
||||
assert "Can't instantiate abstract class" in str(exc_info.value)
|
||||
assert "validate_credentials" in str(exc_info.value)
|
||||
|
||||
def test_should_allow_subclass_implementation(self):
|
||||
"""Test that subclasses can properly implement the abstract method"""
|
||||
credentials = {"api_key": "test_key", "auth_type": "bearer"}
|
||||
auth = ConcreteApiKeyAuth(credentials)
|
||||
|
||||
# Should not raise any exception
|
||||
result = auth.validate_credentials()
|
||||
assert result is True
|
||||
|
||||
def test_should_handle_empty_credentials(self):
|
||||
"""Test initialization with empty credentials"""
|
||||
credentials = {}
|
||||
auth = ConcreteApiKeyAuth(credentials)
|
||||
assert auth.credentials == {}
|
||||
|
||||
def test_should_handle_none_credentials(self):
|
||||
"""Test initialization with None credentials"""
|
||||
credentials = None
|
||||
auth = ConcreteApiKeyAuth(credentials)
|
||||
assert auth.credentials is None
|
@@ -0,0 +1,81 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from services.auth.api_key_auth_factory import ApiKeyAuthFactory
|
||||
from services.auth.auth_type import AuthType
|
||||
|
||||
|
||||
class TestApiKeyAuthFactory:
|
||||
"""Test cases for ApiKeyAuthFactory"""
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("provider", "auth_class_path"),
|
||||
[
|
||||
(AuthType.FIRECRAWL, "services.auth.firecrawl.firecrawl.FirecrawlAuth"),
|
||||
(AuthType.WATERCRAWL, "services.auth.watercrawl.watercrawl.WatercrawlAuth"),
|
||||
(AuthType.JINA, "services.auth.jina.jina.JinaAuth"),
|
||||
],
|
||||
)
|
||||
def test_get_apikey_auth_factory_valid_providers(self, provider, auth_class_path):
|
||||
"""Test getting auth factory for all valid providers"""
|
||||
with patch(auth_class_path) as mock_auth:
|
||||
auth_class = ApiKeyAuthFactory.get_apikey_auth_factory(provider)
|
||||
assert auth_class == mock_auth
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"invalid_provider",
|
||||
[
|
||||
"invalid_provider",
|
||||
"",
|
||||
None,
|
||||
123,
|
||||
"UNSUPPORTED",
|
||||
],
|
||||
)
|
||||
def test_get_apikey_auth_factory_invalid_providers(self, invalid_provider):
|
||||
"""Test getting auth factory with various invalid providers"""
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
ApiKeyAuthFactory.get_apikey_auth_factory(invalid_provider)
|
||||
assert str(exc_info.value) == "Invalid provider"
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("credentials_return_value", "expected_result"),
|
||||
[
|
||||
(True, True),
|
||||
(False, False),
|
||||
],
|
||||
)
|
||||
@patch("services.auth.api_key_auth_factory.ApiKeyAuthFactory.get_apikey_auth_factory")
|
||||
def test_validate_credentials_delegates_to_auth_instance(
|
||||
self, mock_get_factory, credentials_return_value, expected_result
|
||||
):
|
||||
"""Test that validate_credentials delegates to auth instance correctly"""
|
||||
# Arrange
|
||||
mock_auth_instance = MagicMock()
|
||||
mock_auth_instance.validate_credentials.return_value = credentials_return_value
|
||||
mock_auth_class = MagicMock(return_value=mock_auth_instance)
|
||||
mock_get_factory.return_value = mock_auth_class
|
||||
|
||||
# Act
|
||||
factory = ApiKeyAuthFactory(AuthType.FIRECRAWL, {"api_key": "test_key"})
|
||||
result = factory.validate_credentials()
|
||||
|
||||
# Assert
|
||||
assert result is expected_result
|
||||
mock_auth_instance.validate_credentials.assert_called_once()
|
||||
|
||||
@patch("services.auth.api_key_auth_factory.ApiKeyAuthFactory.get_apikey_auth_factory")
|
||||
def test_validate_credentials_propagates_exceptions(self, mock_get_factory):
|
||||
"""Test that exceptions from auth instance are propagated"""
|
||||
# Arrange
|
||||
mock_auth_instance = MagicMock()
|
||||
mock_auth_instance.validate_credentials.side_effect = Exception("Authentication error")
|
||||
mock_auth_class = MagicMock(return_value=mock_auth_instance)
|
||||
mock_get_factory.return_value = mock_auth_class
|
||||
|
||||
# Act & Assert
|
||||
factory = ApiKeyAuthFactory(AuthType.FIRECRAWL, {"api_key": "test_key"})
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
factory.validate_credentials()
|
||||
assert str(exc_info.value) == "Authentication error"
|
155
api/tests/unit_tests/services/auth/test_jina_auth.py
Normal file
155
api/tests/unit_tests/services/auth/test_jina_auth.py
Normal file
@@ -0,0 +1,155 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from services.auth.jina.jina import JinaAuth
|
||||
|
||||
|
||||
class TestJinaAuth:
|
||||
def test_should_initialize_with_valid_bearer_credentials(self):
|
||||
"""Test successful initialization with valid bearer credentials"""
|
||||
credentials = {"auth_type": "bearer", "config": {"api_key": "test_api_key_123"}}
|
||||
auth = JinaAuth(credentials)
|
||||
assert auth.api_key == "test_api_key_123"
|
||||
assert auth.credentials == credentials
|
||||
|
||||
def test_should_raise_error_for_invalid_auth_type(self):
|
||||
"""Test that non-bearer auth type raises ValueError"""
|
||||
credentials = {"auth_type": "basic", "config": {"api_key": "test_api_key_123"}}
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
JinaAuth(credentials)
|
||||
assert str(exc_info.value) == "Invalid auth type, Jina Reader auth type must be Bearer"
|
||||
|
||||
def test_should_raise_error_for_missing_api_key(self):
|
||||
"""Test that missing API key raises ValueError"""
|
||||
credentials = {"auth_type": "bearer", "config": {}}
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
JinaAuth(credentials)
|
||||
assert str(exc_info.value) == "No API key provided"
|
||||
|
||||
def test_should_raise_error_for_missing_config(self):
|
||||
"""Test that missing config section raises ValueError"""
|
||||
credentials = {"auth_type": "bearer"}
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
JinaAuth(credentials)
|
||||
assert str(exc_info.value) == "No API key provided"
|
||||
|
||||
@patch("services.auth.jina.jina.requests.post")
|
||||
def test_should_validate_valid_credentials_successfully(self, mock_post):
|
||||
"""Test successful credential validation"""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_post.return_value = mock_response
|
||||
|
||||
credentials = {"auth_type": "bearer", "config": {"api_key": "test_api_key_123"}}
|
||||
auth = JinaAuth(credentials)
|
||||
result = auth.validate_credentials()
|
||||
|
||||
assert result is True
|
||||
mock_post.assert_called_once_with(
|
||||
"https://r.jina.ai",
|
||||
headers={"Content-Type": "application/json", "Authorization": "Bearer test_api_key_123"},
|
||||
json={"url": "https://example.com"},
|
||||
)
|
||||
|
||||
@patch("services.auth.jina.jina.requests.post")
|
||||
def test_should_handle_http_402_error(self, mock_post):
|
||||
"""Test handling of 402 Payment Required error"""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 402
|
||||
mock_response.json.return_value = {"error": "Payment required"}
|
||||
mock_post.return_value = mock_response
|
||||
|
||||
credentials = {"auth_type": "bearer", "config": {"api_key": "test_api_key_123"}}
|
||||
auth = JinaAuth(credentials)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
auth.validate_credentials()
|
||||
assert str(exc_info.value) == "Failed to authorize. Status code: 402. Error: Payment required"
|
||||
|
||||
@patch("services.auth.jina.jina.requests.post")
|
||||
def test_should_handle_http_409_error(self, mock_post):
|
||||
"""Test handling of 409 Conflict error"""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 409
|
||||
mock_response.json.return_value = {"error": "Conflict error"}
|
||||
mock_post.return_value = mock_response
|
||||
|
||||
credentials = {"auth_type": "bearer", "config": {"api_key": "test_api_key_123"}}
|
||||
auth = JinaAuth(credentials)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
auth.validate_credentials()
|
||||
assert str(exc_info.value) == "Failed to authorize. Status code: 409. Error: Conflict error"
|
||||
|
||||
@patch("services.auth.jina.jina.requests.post")
|
||||
def test_should_handle_http_500_error(self, mock_post):
|
||||
"""Test handling of 500 Internal Server Error"""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 500
|
||||
mock_response.json.return_value = {"error": "Internal server error"}
|
||||
mock_post.return_value = mock_response
|
||||
|
||||
credentials = {"auth_type": "bearer", "config": {"api_key": "test_api_key_123"}}
|
||||
auth = JinaAuth(credentials)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
auth.validate_credentials()
|
||||
assert str(exc_info.value) == "Failed to authorize. Status code: 500. Error: Internal server error"
|
||||
|
||||
@patch("services.auth.jina.jina.requests.post")
|
||||
def test_should_handle_unexpected_error_with_text_response(self, mock_post):
|
||||
"""Test handling of unexpected errors with text response"""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 403
|
||||
mock_response.text = '{"error": "Forbidden"}'
|
||||
mock_response.json.side_effect = Exception("Not JSON")
|
||||
mock_post.return_value = mock_response
|
||||
|
||||
credentials = {"auth_type": "bearer", "config": {"api_key": "test_api_key_123"}}
|
||||
auth = JinaAuth(credentials)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
auth.validate_credentials()
|
||||
assert str(exc_info.value) == "Failed to authorize. Status code: 403. Error: Forbidden"
|
||||
|
||||
@patch("services.auth.jina.jina.requests.post")
|
||||
def test_should_handle_unexpected_error_without_text(self, mock_post):
|
||||
"""Test handling of unexpected errors without text response"""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 404
|
||||
mock_response.text = ""
|
||||
mock_response.json.side_effect = Exception("Not JSON")
|
||||
mock_post.return_value = mock_response
|
||||
|
||||
credentials = {"auth_type": "bearer", "config": {"api_key": "test_api_key_123"}}
|
||||
auth = JinaAuth(credentials)
|
||||
|
||||
with pytest.raises(Exception) as exc_info:
|
||||
auth.validate_credentials()
|
||||
assert str(exc_info.value) == "Unexpected error occurred while trying to authorize. Status code: 404"
|
||||
|
||||
@patch("services.auth.jina.jina.requests.post")
|
||||
def test_should_handle_network_errors(self, mock_post):
|
||||
"""Test handling of network connection errors"""
|
||||
mock_post.side_effect = requests.ConnectionError("Network error")
|
||||
|
||||
credentials = {"auth_type": "bearer", "config": {"api_key": "test_api_key_123"}}
|
||||
auth = JinaAuth(credentials)
|
||||
|
||||
with pytest.raises(requests.ConnectionError):
|
||||
auth.validate_credentials()
|
||||
|
||||
def test_should_not_expose_api_key_in_error_messages(self):
|
||||
"""Test that API key is not exposed in error messages"""
|
||||
credentials = {"auth_type": "bearer", "config": {"api_key": "super_secret_key_12345"}}
|
||||
auth = JinaAuth(credentials)
|
||||
|
||||
# Verify API key is stored but not in any error message
|
||||
assert auth.api_key == "super_secret_key_12345"
|
||||
|
||||
# Test various error scenarios don't expose the key
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
JinaAuth({"auth_type": "basic", "config": {"api_key": "super_secret_key_12345"}})
|
||||
assert "super_secret_key_12345" not in str(exc_info.value)
|
Reference in New Issue
Block a user