chore: code improvement for mcp_client and mcp_tools_manage_service (#22645)
This commit is contained in:
@@ -29,7 +29,7 @@ from libs.login import login_required
|
|||||||
from services.plugin.oauth_service import OAuthProxyService
|
from services.plugin.oauth_service import OAuthProxyService
|
||||||
from services.tools.api_tools_manage_service import ApiToolManageService
|
from services.tools.api_tools_manage_service import ApiToolManageService
|
||||||
from services.tools.builtin_tools_manage_service import BuiltinToolManageService
|
from services.tools.builtin_tools_manage_service import BuiltinToolManageService
|
||||||
from services.tools.mcp_tools_mange_service import MCPToolManageService
|
from services.tools.mcp_tools_manage_service import MCPToolManageService
|
||||||
from services.tools.tool_labels_service import ToolLabelsService
|
from services.tools.tool_labels_service import ToolLabelsService
|
||||||
from services.tools.tools_manage_service import ToolCommonService
|
from services.tools.tools_manage_service import ToolCommonService
|
||||||
from services.tools.tools_transform_service import ToolTransformService
|
from services.tools.tools_transform_service import ToolTransformService
|
||||||
|
@@ -8,7 +8,7 @@ from core.mcp.types import (
|
|||||||
OAuthTokens,
|
OAuthTokens,
|
||||||
)
|
)
|
||||||
from models.tools import MCPToolProvider
|
from models.tools import MCPToolProvider
|
||||||
from services.tools.mcp_tools_mange_service import MCPToolManageService
|
from services.tools.mcp_tools_manage_service import MCPToolManageService
|
||||||
|
|
||||||
LATEST_PROTOCOL_VERSION = "1.0"
|
LATEST_PROTOCOL_VERSION = "1.0"
|
||||||
|
|
||||||
|
@@ -68,15 +68,17 @@ class MCPClient:
|
|||||||
}
|
}
|
||||||
|
|
||||||
parsed_url = urlparse(self.server_url)
|
parsed_url = urlparse(self.server_url)
|
||||||
path = parsed_url.path
|
path = parsed_url.path or ""
|
||||||
method_name = path.rstrip("/").split("/")[-1] if path else ""
|
method_name = path.removesuffix("/").lower()
|
||||||
try:
|
if method_name in connection_methods:
|
||||||
client_factory = connection_methods[method_name]
|
client_factory = connection_methods[method_name]
|
||||||
self.connect_server(client_factory, method_name)
|
self.connect_server(client_factory, method_name)
|
||||||
except KeyError:
|
else:
|
||||||
try:
|
try:
|
||||||
|
logger.debug(f"Not supported method {method_name} found in URL path, trying default 'mcp' method.")
|
||||||
self.connect_server(sse_client, "sse")
|
self.connect_server(sse_client, "sse")
|
||||||
except MCPConnectionError:
|
except MCPConnectionError:
|
||||||
|
logger.debug("MCP connection failed with 'sse', falling back to 'mcp' method.")
|
||||||
self.connect_server(streamablehttp_client, "mcp")
|
self.connect_server(streamablehttp_client, "mcp")
|
||||||
|
|
||||||
def connect_server(
|
def connect_server(
|
||||||
@@ -91,7 +93,7 @@ class MCPClient:
|
|||||||
else {}
|
else {}
|
||||||
)
|
)
|
||||||
self._streams_context = client_factory(url=self.server_url, headers=headers)
|
self._streams_context = client_factory(url=self.server_url, headers=headers)
|
||||||
if self._streams_context is None:
|
if not self._streams_context:
|
||||||
raise MCPConnectionError("Failed to create connection context")
|
raise MCPConnectionError("Failed to create connection context")
|
||||||
|
|
||||||
# Use exit_stack to manage context managers properly
|
# Use exit_stack to manage context managers properly
|
||||||
@@ -141,10 +143,11 @@ class MCPClient:
|
|||||||
try:
|
try:
|
||||||
# ExitStack will handle proper cleanup of all managed context managers
|
# ExitStack will handle proper cleanup of all managed context managers
|
||||||
self.exit_stack.close()
|
self.exit_stack.close()
|
||||||
|
except Exception as e:
|
||||||
|
logging.exception("Error during cleanup")
|
||||||
|
raise ValueError(f"Error during cleanup: {e}")
|
||||||
|
finally:
|
||||||
self._session = None
|
self._session = None
|
||||||
self._session_context = None
|
self._session_context = None
|
||||||
self._streams_context = None
|
self._streams_context = None
|
||||||
self._initialized = False
|
self._initialized = False
|
||||||
except Exception as e:
|
|
||||||
logging.exception("Error during cleanup")
|
|
||||||
raise ValueError(f"Error during cleanup: {e}")
|
|
||||||
|
@@ -21,7 +21,7 @@ from core.tools.plugin_tool.tool import PluginTool
|
|||||||
from core.tools.utils.uuid_utils import is_valid_uuid
|
from core.tools.utils.uuid_utils import is_valid_uuid
|
||||||
from core.tools.workflow_as_tool.provider import WorkflowToolProviderController
|
from core.tools.workflow_as_tool.provider import WorkflowToolProviderController
|
||||||
from core.workflow.entities.variable_pool import VariablePool
|
from core.workflow.entities.variable_pool import VariablePool
|
||||||
from services.tools.mcp_tools_mange_service import MCPToolManageService
|
from services.tools.mcp_tools_manage_service import MCPToolManageService
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from core.workflow.nodes.tool.entities import ToolEntity
|
from core.workflow.nodes.tool.entities import ToolEntity
|
||||||
|
@@ -70,16 +70,15 @@ class MCPToolManageService:
|
|||||||
MCPToolProvider.server_url_hash == server_url_hash,
|
MCPToolProvider.server_url_hash == server_url_hash,
|
||||||
MCPToolProvider.server_identifier == server_identifier,
|
MCPToolProvider.server_identifier == server_identifier,
|
||||||
),
|
),
|
||||||
MCPToolProvider.tenant_id == tenant_id,
|
|
||||||
)
|
)
|
||||||
.first()
|
.first()
|
||||||
)
|
)
|
||||||
if existing_provider:
|
if existing_provider:
|
||||||
if existing_provider.name == name:
|
if existing_provider.name == name:
|
||||||
raise ValueError(f"MCP tool {name} already exists")
|
raise ValueError(f"MCP tool {name} already exists")
|
||||||
elif existing_provider.server_url_hash == server_url_hash:
|
if existing_provider.server_url_hash == server_url_hash:
|
||||||
raise ValueError(f"MCP tool {server_url} already exists")
|
raise ValueError(f"MCP tool {server_url} already exists")
|
||||||
elif existing_provider.server_identifier == server_identifier:
|
if existing_provider.server_identifier == server_identifier:
|
||||||
raise ValueError(f"MCP tool {server_identifier} already exists")
|
raise ValueError(f"MCP tool {server_identifier} already exists")
|
||||||
encrypted_server_url = encrypter.encrypt_token(tenant_id, server_url)
|
encrypted_server_url = encrypter.encrypt_token(tenant_id, server_url)
|
||||||
mcp_tool = MCPToolProvider(
|
mcp_tool = MCPToolProvider(
|
||||||
@@ -111,15 +110,14 @@ class MCPToolManageService:
|
|||||||
]
|
]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def list_mcp_tool_from_remote_server(cls, tenant_id: str, provider_id: str):
|
def list_mcp_tool_from_remote_server(cls, tenant_id: str, provider_id: str) -> ToolProviderApiEntity:
|
||||||
mcp_provider = cls.get_mcp_provider_by_provider_id(provider_id, tenant_id)
|
mcp_provider = cls.get_mcp_provider_by_provider_id(provider_id, tenant_id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with MCPClient(
|
with MCPClient(
|
||||||
mcp_provider.decrypted_server_url, provider_id, tenant_id, authed=mcp_provider.authed, for_list=True
|
mcp_provider.decrypted_server_url, provider_id, tenant_id, authed=mcp_provider.authed, for_list=True
|
||||||
) as mcp_client:
|
) as mcp_client:
|
||||||
tools = mcp_client.list_tools()
|
tools = mcp_client.list_tools()
|
||||||
except MCPAuthError as e:
|
except MCPAuthError:
|
||||||
raise ValueError("Please auth the tool first")
|
raise ValueError("Please auth the tool first")
|
||||||
except MCPError as e:
|
except MCPError as e:
|
||||||
raise ValueError(f"Failed to connect to MCP server: {e}")
|
raise ValueError(f"Failed to connect to MCP server: {e}")
|
||||||
@@ -184,12 +182,11 @@ class MCPToolManageService:
|
|||||||
error_msg = str(e.orig)
|
error_msg = str(e.orig)
|
||||||
if "unique_mcp_provider_name" in error_msg:
|
if "unique_mcp_provider_name" in error_msg:
|
||||||
raise ValueError(f"MCP tool {name} already exists")
|
raise ValueError(f"MCP tool {name} already exists")
|
||||||
elif "unique_mcp_provider_server_url" in error_msg:
|
if "unique_mcp_provider_server_url" in error_msg:
|
||||||
raise ValueError(f"MCP tool {server_url} already exists")
|
raise ValueError(f"MCP tool {server_url} already exists")
|
||||||
elif "unique_mcp_provider_server_identifier" in error_msg:
|
if "unique_mcp_provider_server_identifier" in error_msg:
|
||||||
raise ValueError(f"MCP tool {server_identifier} already exists")
|
raise ValueError(f"MCP tool {server_identifier} already exists")
|
||||||
else:
|
raise
|
||||||
raise
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def update_mcp_provider_credentials(
|
def update_mcp_provider_credentials(
|
Reference in New Issue
Block a user