From eee576355b06a3cf2ba7cbaada4a4c49d2341448 Mon Sep 17 00:00:00 2001 From: rhochman Date: Wed, 30 Jul 2025 10:12:16 +0700 Subject: [PATCH] Fix: Support for Elasticsearch Cloud Connector (#23017) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: crazywoola <100913391+crazywoola@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../middleware/vdb/elasticsearch_config.py | 52 ++++- .../vdb/elasticsearch/elasticsearch_vector.py | 180 +++++++++++++++--- .../vdb/elasticsearch/test_elasticsearch.py | 4 +- docker/.env.example | 11 ++ docker/docker-compose.yaml | 8 + 5 files changed, 221 insertions(+), 34 deletions(-) diff --git a/api/configs/middleware/vdb/elasticsearch_config.py b/api/configs/middleware/vdb/elasticsearch_config.py index df8182985..8c4b333d4 100644 --- a/api/configs/middleware/vdb/elasticsearch_config.py +++ b/api/configs/middleware/vdb/elasticsearch_config.py @@ -1,12 +1,13 @@ from typing import Optional -from pydantic import Field, PositiveInt +from pydantic import Field, PositiveInt, model_validator from pydantic_settings import BaseSettings class ElasticsearchConfig(BaseSettings): """ - Configuration settings for Elasticsearch + Configuration settings for both self-managed and Elastic Cloud deployments. + Can load from environment variables or .env files. """ ELASTICSEARCH_HOST: Optional[str] = Field( @@ -28,3 +29,50 @@ class ElasticsearchConfig(BaseSettings): description="Password for authenticating with Elasticsearch (default is 'elastic')", default="elastic", ) + + # Elastic Cloud (optional) + ELASTICSEARCH_USE_CLOUD: Optional[bool] = Field( + description="Set to True to use Elastic Cloud instead of self-hosted Elasticsearch", default=False + ) + ELASTICSEARCH_CLOUD_URL: Optional[str] = Field( + description="Full URL for Elastic Cloud deployment (e.g., 'https://example.es.region.aws.found.io:443')", + default=None, + ) + ELASTICSEARCH_API_KEY: Optional[str] = Field( + description="API key for authenticating with Elastic Cloud", default=None + ) + + # Common options + ELASTICSEARCH_CA_CERTS: Optional[str] = Field( + description="Path to CA certificate file for SSL verification", default=None + ) + ELASTICSEARCH_VERIFY_CERTS: bool = Field( + description="Whether to verify SSL certificates (default is False)", default=False + ) + ELASTICSEARCH_REQUEST_TIMEOUT: int = Field( + description="Request timeout in milliseconds (default is 100000)", default=100000 + ) + ELASTICSEARCH_RETRY_ON_TIMEOUT: bool = Field( + description="Whether to retry requests on timeout (default is True)", default=True + ) + ELASTICSEARCH_MAX_RETRIES: int = Field( + description="Maximum number of retry attempts (default is 10000)", default=10000 + ) + + @model_validator(mode="after") + def validate_elasticsearch_config(self): + """Validate Elasticsearch configuration based on deployment type.""" + if self.ELASTICSEARCH_USE_CLOUD: + if not self.ELASTICSEARCH_CLOUD_URL: + raise ValueError("ELASTICSEARCH_CLOUD_URL is required when using Elastic Cloud") + if not self.ELASTICSEARCH_API_KEY: + raise ValueError("ELASTICSEARCH_API_KEY is required when using Elastic Cloud") + else: + if not self.ELASTICSEARCH_HOST: + raise ValueError("ELASTICSEARCH_HOST is required for self-hosted Elasticsearch") + if not self.ELASTICSEARCH_USERNAME: + raise ValueError("ELASTICSEARCH_USERNAME is required for self-hosted Elasticsearch") + if not self.ELASTICSEARCH_PASSWORD: + raise ValueError("ELASTICSEARCH_PASSWORD is required for self-hosted Elasticsearch") + + return self diff --git a/api/core/rag/datasource/vdb/elasticsearch/elasticsearch_vector.py b/api/core/rag/datasource/vdb/elasticsearch/elasticsearch_vector.py index 832485b23..9dea050dc 100644 --- a/api/core/rag/datasource/vdb/elasticsearch/elasticsearch_vector.py +++ b/api/core/rag/datasource/vdb/elasticsearch/elasticsearch_vector.py @@ -22,22 +22,50 @@ logger = logging.getLogger(__name__) class ElasticSearchConfig(BaseModel): - host: str - port: int - username: str - password: str + # Regular Elasticsearch config + host: Optional[str] = None + port: Optional[int] = None + username: Optional[str] = None + password: Optional[str] = None + + # Elastic Cloud specific config + cloud_url: Optional[str] = None # Cloud URL for Elasticsearch Cloud + api_key: Optional[str] = None + + # Common config + use_cloud: bool = False + ca_certs: Optional[str] = None + verify_certs: bool = False + request_timeout: int = 100000 + retry_on_timeout: bool = True + max_retries: int = 10000 @model_validator(mode="before") @classmethod def validate_config(cls, values: dict) -> dict: - if not values["host"]: - raise ValueError("config HOST is required") - if not values["port"]: - raise ValueError("config PORT is required") - if not values["username"]: - raise ValueError("config USERNAME is required") - if not values["password"]: - raise ValueError("config PASSWORD is required") + use_cloud = values.get("use_cloud", False) + cloud_url = values.get("cloud_url") + + if use_cloud: + # Cloud configuration validation - requires cloud_url and api_key + if not cloud_url: + raise ValueError("cloud_url is required for Elastic Cloud") + + api_key = values.get("api_key") + if not api_key: + raise ValueError("api_key is required for Elastic Cloud") + + else: + # Regular Elasticsearch validation + if not values.get("host"): + raise ValueError("config HOST is required for regular Elasticsearch") + if not values.get("port"): + raise ValueError("config PORT is required for regular Elasticsearch") + if not values.get("username"): + raise ValueError("config USERNAME is required for regular Elasticsearch") + if not values.get("password"): + raise ValueError("config PASSWORD is required for regular Elasticsearch") + return values @@ -50,21 +78,69 @@ class ElasticSearchVector(BaseVector): self._attributes = attributes def _init_client(self, config: ElasticSearchConfig) -> Elasticsearch: + """ + Initialize Elasticsearch client for both regular Elasticsearch and Elastic Cloud. + """ try: - parsed_url = urlparse(config.host) - if parsed_url.scheme in {"http", "https"}: - hosts = f"{config.host}:{config.port}" + # Check if using Elastic Cloud + client_config: dict[str, Any] + if config.use_cloud and config.cloud_url: + client_config = { + "request_timeout": config.request_timeout, + "retry_on_timeout": config.retry_on_timeout, + "max_retries": config.max_retries, + "verify_certs": config.verify_certs, + } + + # Parse cloud URL and configure hosts + parsed_url = urlparse(config.cloud_url) + host = f"{parsed_url.scheme}://{parsed_url.hostname}" + if parsed_url.port: + host += f":{parsed_url.port}" + + client_config["hosts"] = [host] + + # API key authentication for cloud + client_config["api_key"] = config.api_key + + # SSL settings + if config.ca_certs: + client_config["ca_certs"] = config.ca_certs + else: - hosts = f"http://{config.host}:{config.port}" - client = Elasticsearch( - hosts=hosts, - basic_auth=(config.username, config.password), - request_timeout=100000, - retry_on_timeout=True, - max_retries=10000, - ) - except requests.exceptions.ConnectionError: - raise ConnectionError("Vector database connection error") + # Regular Elasticsearch configuration + parsed_url = urlparse(config.host or "") + if parsed_url.scheme in {"http", "https"}: + hosts = f"{config.host}:{config.port}" + use_https = parsed_url.scheme == "https" + else: + hosts = f"http://{config.host}:{config.port}" + use_https = False + + client_config = { + "hosts": [hosts], + "basic_auth": (config.username, config.password), + "request_timeout": config.request_timeout, + "retry_on_timeout": config.retry_on_timeout, + "max_retries": config.max_retries, + } + + # Only add SSL settings if using HTTPS + if use_https: + client_config["verify_certs"] = config.verify_certs + if config.ca_certs: + client_config["ca_certs"] = config.ca_certs + + client = Elasticsearch(**client_config) + + # Test connection + if not client.ping(): + raise ConnectionError("Failed to connect to Elasticsearch") + + except requests.exceptions.ConnectionError as e: + raise ConnectionError(f"Vector database connection error: {str(e)}") + except Exception as e: + raise ConnectionError(f"Elasticsearch client initialization failed: {str(e)}") return client @@ -209,7 +285,11 @@ class ElasticSearchVector(BaseVector): }, } } + self._client.indices.create(index=self._collection_name, mappings=mappings) + logger.info("Created index %s with dimension %s", self._collection_name, dim) + else: + logger.info("Collection %s already exists.", self._collection_name) redis_client.set(collection_exist_cache_key, 1, ex=3600) @@ -225,13 +305,51 @@ class ElasticSearchVectorFactory(AbstractVectorFactory): dataset.index_struct = json.dumps(self.gen_index_struct_dict(VectorType.ELASTICSEARCH, collection_name)) config = current_app.config + + # Check if ELASTICSEARCH_USE_CLOUD is explicitly set to false (boolean) + use_cloud_env = config.get("ELASTICSEARCH_USE_CLOUD", False) + + if use_cloud_env is False: + # Use regular Elasticsearch with config values + config_dict = { + "use_cloud": False, + "host": config.get("ELASTICSEARCH_HOST", "elasticsearch"), + "port": config.get("ELASTICSEARCH_PORT", 9200), + "username": config.get("ELASTICSEARCH_USERNAME", "elastic"), + "password": config.get("ELASTICSEARCH_PASSWORD", "elastic"), + } + else: + # Check for cloud configuration + cloud_url = config.get("ELASTICSEARCH_CLOUD_URL") + if cloud_url: + config_dict = { + "use_cloud": True, + "cloud_url": cloud_url, + "api_key": config.get("ELASTICSEARCH_API_KEY"), + } + else: + # Fallback to regular Elasticsearch + config_dict = { + "use_cloud": False, + "host": config.get("ELASTICSEARCH_HOST", "localhost"), + "port": config.get("ELASTICSEARCH_PORT", 9200), + "username": config.get("ELASTICSEARCH_USERNAME", "elastic"), + "password": config.get("ELASTICSEARCH_PASSWORD", ""), + } + + # Common configuration + config_dict.update( + { + "ca_certs": str(config.get("ELASTICSEARCH_CA_CERTS")) if config.get("ELASTICSEARCH_CA_CERTS") else None, + "verify_certs": bool(config.get("ELASTICSEARCH_VERIFY_CERTS", False)), + "request_timeout": int(config.get("ELASTICSEARCH_REQUEST_TIMEOUT", 100000)), + "retry_on_timeout": bool(config.get("ELASTICSEARCH_RETRY_ON_TIMEOUT", True)), + "max_retries": int(config.get("ELASTICSEARCH_MAX_RETRIES", 10000)), + } + ) + return ElasticSearchVector( index_name=collection_name, - config=ElasticSearchConfig( - host=config.get("ELASTICSEARCH_HOST", "localhost"), - port=config.get("ELASTICSEARCH_PORT", 9200), - username=config.get("ELASTICSEARCH_USERNAME", ""), - password=config.get("ELASTICSEARCH_PASSWORD", ""), - ), + config=ElasticSearchConfig(**config_dict), attributes=[], ) diff --git a/api/tests/integration_tests/vdb/elasticsearch/test_elasticsearch.py b/api/tests/integration_tests/vdb/elasticsearch/test_elasticsearch.py index 2a0c1bb03..a5ff5b9e8 100644 --- a/api/tests/integration_tests/vdb/elasticsearch/test_elasticsearch.py +++ b/api/tests/integration_tests/vdb/elasticsearch/test_elasticsearch.py @@ -11,7 +11,9 @@ class ElasticSearchVectorTest(AbstractVectorTest): self.attributes = ["doc_id", "dataset_id", "document_id", "doc_hash"] self.vector = ElasticSearchVector( index_name=self.collection_name.lower(), - config=ElasticSearchConfig(host="http://localhost", port="9200", username="elastic", password="elastic"), + config=ElasticSearchConfig( + use_cloud=False, host="http://localhost", port="9200", username="elastic", password="elastic" + ), attributes=self.attributes, ) diff --git a/docker/.env.example b/docker/.env.example index 9d15ba53d..7ecdf899f 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -583,6 +583,17 @@ ELASTICSEARCH_USERNAME=elastic ELASTICSEARCH_PASSWORD=elastic KIBANA_PORT=5601 +# Using ElasticSearch Cloud Serverless, or not. +ELASTICSEARCH_USE_CLOUD=false +ELASTICSEARCH_CLOUD_URL=YOUR-ELASTICSEARCH_CLOUD_URL +ELASTICSEARCH_API_KEY=YOUR-ELASTICSEARCH_API_KEY + +ELASTICSEARCH_VERIFY_CERTS=False +ELASTICSEARCH_CA_CERTS= +ELASTICSEARCH_REQUEST_TIMEOUT=100000 +ELASTICSEARCH_RETRY_ON_TIMEOUT=True +ELASTICSEARCH_MAX_RETRIES=10 + # baidu vector configurations, only available when VECTOR_STORE is `baidu` BAIDU_VECTOR_DB_ENDPOINT=http://127.0.0.1:5287 BAIDU_VECTOR_DB_CONNECTION_TIMEOUT_MS=30000 diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 9e0f78eb0..ae83aa758 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -261,6 +261,14 @@ x-shared-env: &shared-api-worker-env ELASTICSEARCH_USERNAME: ${ELASTICSEARCH_USERNAME:-elastic} ELASTICSEARCH_PASSWORD: ${ELASTICSEARCH_PASSWORD:-elastic} KIBANA_PORT: ${KIBANA_PORT:-5601} + ELASTICSEARCH_USE_CLOUD: ${ELASTICSEARCH_USE_CLOUD:-false} + ELASTICSEARCH_CLOUD_URL: ${ELASTICSEARCH_CLOUD_URL:-YOUR-ELASTICSEARCH_CLOUD_URL} + ELASTICSEARCH_API_KEY: ${ELASTICSEARCH_API_KEY:-YOUR-ELASTICSEARCH_API_KEY} + ELASTICSEARCH_VERIFY_CERTS: ${ELASTICSEARCH_VERIFY_CERTS:-False} + ELASTICSEARCH_CA_CERTS: ${ELASTICSEARCH_CA_CERTS:-} + ELASTICSEARCH_REQUEST_TIMEOUT: ${ELASTICSEARCH_REQUEST_TIMEOUT:-100000} + ELASTICSEARCH_RETRY_ON_TIMEOUT: ${ELASTICSEARCH_RETRY_ON_TIMEOUT:-True} + ELASTICSEARCH_MAX_RETRIES: ${ELASTICSEARCH_MAX_RETRIES:-10} BAIDU_VECTOR_DB_ENDPOINT: ${BAIDU_VECTOR_DB_ENDPOINT:-http://127.0.0.1:5287} BAIDU_VECTOR_DB_CONNECTION_TIMEOUT_MS: ${BAIDU_VECTOR_DB_CONNECTION_TIMEOUT_MS:-30000} BAIDU_VECTOR_DB_ACCOUNT: ${BAIDU_VECTOR_DB_ACCOUNT:-root}