From 4964b7b14bd6fff5b1ecc0fb04cc137cebcee911 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 16 May 2024 01:08:01 +0100 Subject: [PATCH 01/22] opensearch improvements --- engine/clients/opensearch/config.py | 38 +++++++++++++++++++++++--- engine/clients/opensearch/configure.py | 23 ++++------------ engine/clients/opensearch/search.py | 18 ++---------- engine/clients/opensearch/upload.py | 18 ++---------- 4 files changed, 43 insertions(+), 54 deletions(-) diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index 57001884..85dc8fea 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -1,4 +1,34 @@ -OPENSEARCH_PORT = 9200 -OPENSEARCH_INDEX = "bench" -OPENSEARCH_USER = "opensearch" -OPENSEARCH_PASSWORD = "passwd" +from opensearchpy import NotFoundError, OpenSearch +import os + + +OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", 9200)) +OPENSEARCH_INDEX = os.getenv("OPENSEARCH_INDEX", "bench") +OPENSEARCH_USER = os.getenv("OPENSEARCH_USER", "opensearch") +OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD", "passwd") +OPENSEARCH_TIMEOUT = int(os.getenv("OPENSEARCH_TIMEOUT", 300)) +OPENSEARCH_INDEX_TIMEOUT = int(os.getenv("OPENSEARCH_INDEX_TIMEOUT", 300)) + + +def get_opensearch_client(host, connection_params): + init_params = { + **{ + "verify_certs": False, + "request_timeout": OPENSEARCH_TIMEOUT, + "retry_on_timeout": True, + }, + **connection_params, + } + if host.startswith("http"): + url = "" + else: + url = "http://" + url += f"{host}:{OPENSEARCH_PORT}" + + client = OpenSearch( + f"http://{host}:{OPENSEARCH_PORT}", + basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD), + **init_params, + ) + assert client.ping() + return client diff --git a/engine/clients/opensearch/configure.py b/engine/clients/opensearch/configure.py index bd550917..d4541e1b 100644 --- a/engine/clients/opensearch/configure.py +++ b/engine/clients/opensearch/configure.py @@ -6,9 +6,8 @@ from engine.base_client.distances import Distance from engine.clients.opensearch.config import ( OPENSEARCH_INDEX, - OPENSEARCH_PASSWORD, - OPENSEARCH_PORT, - OPENSEARCH_USER, + OPENSEARCH_INDEX_TIMEOUT, + get_opensearch_client, ) @@ -25,26 +24,14 @@ class OpenSearchConfigurator(BaseConfigurator): def __init__(self, host, collection_params: dict, connection_params: dict): super().__init__(host, collection_params, connection_params) - init_params = { - **{ - "verify_certs": False, - "request_timeout": 90, - "retry_on_timeout": True, - }, - **connection_params, - } - self.client = OpenSearch( - f"http://{host}:{OPENSEARCH_PORT}", - basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD), - **init_params, - ) + self.client = get_opensearch_client(host, connection_params) def clean(self): try: self.client.indices.delete( index=OPENSEARCH_INDEX, params={ - "timeout": 300, + "timeout": OPENSEARCH_INDEX_TIMEOUT, }, ) except NotFoundError: @@ -53,7 +40,7 @@ def clean(self): def recreate(self, dataset: Dataset, collection_params): if dataset.config.distance == Distance.DOT: raise IncompatibilityError - if dataset.config.vector_size > 1024: + if dataset.config.vector_size > 2048: raise IncompatibilityError self.client.indices.create( diff --git a/engine/clients/opensearch/search.py b/engine/clients/opensearch/search.py index a3e36058..f87362b0 100644 --- a/engine/clients/opensearch/search.py +++ b/engine/clients/opensearch/search.py @@ -8,9 +8,7 @@ from engine.base_client.search import BaseSearcher from engine.clients.opensearch.config import ( OPENSEARCH_INDEX, - OPENSEARCH_PASSWORD, - OPENSEARCH_PORT, - OPENSEARCH_USER, + get_opensearch_client, ) from engine.clients.opensearch.parser import OpenSearchConditionParser @@ -31,19 +29,7 @@ def get_mp_start_method(cls): @classmethod def init_client(cls, host, distance, connection_params: dict, search_params: dict): - init_params = { - **{ - "verify_certs": False, - "request_timeout": 90, - "retry_on_timeout": True, - }, - **connection_params, - } - cls.client: OpenSearch = OpenSearch( - f"http://{host}:{OPENSEARCH_PORT}", - basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD), - **init_params, - ) + cls.client = get_opensearch_client(host, connection_params) cls.search_params = search_params @classmethod diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index 0bc2427e..5677b760 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -8,9 +8,7 @@ from engine.base_client.upload import BaseUploader from engine.clients.opensearch.config import ( OPENSEARCH_INDEX, - OPENSEARCH_PASSWORD, - OPENSEARCH_PORT, - OPENSEARCH_USER, + get_opensearch_client, ) @@ -29,19 +27,7 @@ def get_mp_start_method(cls): @classmethod def init_client(cls, host, distance, connection_params, upload_params): - init_params = { - **{ - "verify_certs": False, - "request_timeout": 90, - "retry_on_timeout": True, - }, - **connection_params, - } - cls.client = OpenSearch( - f"http://{host}:{OPENSEARCH_PORT}", - basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD), - **init_params, - ) + cls.client = get_opensearch_client(host, connection_params) cls.upload_params = upload_params @classmethod From fbe689e0cc0eab0fed1de610a8f568a75d9efca1 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 16 May 2024 01:28:11 +0100 Subject: [PATCH 02/22] Fixes per PR linter --- engine/clients/opensearch/config.py | 2 +- engine/clients/opensearch/search.py | 5 +---- engine/clients/opensearch/upload.py | 5 +---- 3 files changed, 3 insertions(+), 9 deletions(-) diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index 85dc8fea..85af5959 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -1,6 +1,6 @@ -from opensearchpy import NotFoundError, OpenSearch import os +from opensearchpy import OpenSearch OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", 9200)) OPENSEARCH_INDEX = os.getenv("OPENSEARCH_INDEX", "bench") diff --git a/engine/clients/opensearch/search.py b/engine/clients/opensearch/search.py index f87362b0..63ac5321 100644 --- a/engine/clients/opensearch/search.py +++ b/engine/clients/opensearch/search.py @@ -6,10 +6,7 @@ from dataset_reader.base_reader import Query from engine.base_client.search import BaseSearcher -from engine.clients.opensearch.config import ( - OPENSEARCH_INDEX, - get_opensearch_client, -) +from engine.clients.opensearch.config import OPENSEARCH_INDEX, get_opensearch_client from engine.clients.opensearch.parser import OpenSearchConditionParser diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index 5677b760..fb69b59e 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -6,10 +6,7 @@ from dataset_reader.base_reader import Record from engine.base_client.upload import BaseUploader -from engine.clients.opensearch.config import ( - OPENSEARCH_INDEX, - get_opensearch_client, -) +from engine.clients.opensearch.config import OPENSEARCH_INDEX, get_opensearch_client class ClosableOpenSearch(OpenSearch): From 91d3dd0e6b57100faef28f55aadf0b2c707cb041 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 16 May 2024 01:43:55 +0100 Subject: [PATCH 03/22] Fixes per ruff linter --- engine/clients/opensearch/configure.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/clients/opensearch/configure.py b/engine/clients/opensearch/configure.py index d4541e1b..1f4ff8d3 100644 --- a/engine/clients/opensearch/configure.py +++ b/engine/clients/opensearch/configure.py @@ -1,4 +1,4 @@ -from opensearchpy import NotFoundError, OpenSearch +from opensearchpy import NotFoundError from benchmark.dataset import Dataset from engine.base_client import IncompatibilityError From acb11f17d514b2186bff160f6665779a0fbb936b Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 12:11:15 +0100 Subject: [PATCH 04/22] Increase the vector limit to 16K given the latest docs --- engine/clients/opensearch/configure.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/engine/clients/opensearch/configure.py b/engine/clients/opensearch/configure.py index 1f4ff8d3..6cc94f76 100644 --- a/engine/clients/opensearch/configure.py +++ b/engine/clients/opensearch/configure.py @@ -40,7 +40,9 @@ def clean(self): def recreate(self, dataset: Dataset, collection_params): if dataset.config.distance == Distance.DOT: raise IncompatibilityError - if dataset.config.vector_size > 2048: + # The knn_vector data type supports a vector of floats that can have a dimension count of up to 16,000 for the NMSLIB, Faiss, and Lucene engines, as set by the dimension mapping parameter. + # Source + if dataset.config.vector_size > 16000: raise IncompatibilityError self.client.indices.create( From e05c14517d1bdc76bcb2c098b566ed83c710cbb2 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 12:16:33 +0100 Subject: [PATCH 05/22] Removed the dotproduct incompatibility error given opensearch now supports it (supported for Lucene in OpenSearch version 2.13 and later) --- engine/clients/opensearch/configure.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/engine/clients/opensearch/configure.py b/engine/clients/opensearch/configure.py index 6cc94f76..3ac2cc2d 100644 --- a/engine/clients/opensearch/configure.py +++ b/engine/clients/opensearch/configure.py @@ -15,6 +15,7 @@ class OpenSearchConfigurator(BaseConfigurator): DISTANCE_MAPPING = { Distance.L2: "l2", Distance.COSINE: "cosinesimil", + # innerproduct (supported for Lucene in OpenSearch version 2.13 and later) Distance.DOT: "innerproduct", } INDEX_TYPE_MAPPING = { @@ -38,8 +39,6 @@ def clean(self): pass def recreate(self, dataset: Dataset, collection_params): - if dataset.config.distance == Distance.DOT: - raise IncompatibilityError # The knn_vector data type supports a vector of floats that can have a dimension count of up to 16,000 for the NMSLIB, Faiss, and Lucene engines, as set by the dimension mapping parameter. # Source if dataset.config.vector_size > 16000: From c82c5e9cc98b5adee2cbc695d0ab56dea788d4e9 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 12:17:07 +0100 Subject: [PATCH 06/22] Added source for IncompatibilityError on vector size --- engine/clients/opensearch/configure.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/clients/opensearch/configure.py b/engine/clients/opensearch/configure.py index 3ac2cc2d..605ac836 100644 --- a/engine/clients/opensearch/configure.py +++ b/engine/clients/opensearch/configure.py @@ -40,7 +40,7 @@ def clean(self): def recreate(self, dataset: Dataset, collection_params): # The knn_vector data type supports a vector of floats that can have a dimension count of up to 16,000 for the NMSLIB, Faiss, and Lucene engines, as set by the dimension mapping parameter. - # Source + # Source: https://opensearch.org/docs/latest/search-plugins/knn/approximate-knn/ if dataset.config.vector_size > 16000: raise IncompatibilityError From d98196d916e80e091023e22857db2be28de5ad4e Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 12:44:55 +0100 Subject: [PATCH 07/22] Only using basic_auth when we have opensearch login data (this allows for anonymous_auth feature of opensearch) --- engine/clients/opensearch/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index 85af5959..795fe780 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -19,15 +19,15 @@ def get_opensearch_client(host, connection_params): }, **connection_params, } + if OPENSEARCH_USER != "" and OPENSEARCH_PASSWORD != "": + init_params["basic_auth"] = (OPENSEARCH_USER, OPENSEARCH_PASSWORD) if host.startswith("http"): url = "" else: url = "http://" url += f"{host}:{OPENSEARCH_PORT}" - client = OpenSearch( f"http://{host}:{OPENSEARCH_PORT}", - basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD), **init_params, ) assert client.ping() From 3fb15b90df9210f8548cba6bb4992982a19bf460 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 12:46:51 +0100 Subject: [PATCH 08/22] Only using basic_auth when we have opensearch login data (this allows for anonymous_auth feature of opensearch) --- engine/clients/opensearch/config.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index 795fe780..fc607979 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -20,7 +20,10 @@ def get_opensearch_client(host, connection_params): **connection_params, } if OPENSEARCH_USER != "" and OPENSEARCH_PASSWORD != "": + print("Enabling basic auth on opensearch client") init_params["basic_auth"] = (OPENSEARCH_USER, OPENSEARCH_PASSWORD) + else: + print("Using anonymous auth on opensearch client") if host.startswith("http"): url = "" else: From 0555aa46aa69a2d231b68642dbbb0269d0dddb86 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 12:51:21 +0100 Subject: [PATCH 09/22] Detecting ssl features from url on opensearch client --- engine/clients/opensearch/config.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index fc607979..285b6b2d 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -24,6 +24,10 @@ def get_opensearch_client(host, connection_params): init_params["basic_auth"] = (OPENSEARCH_USER, OPENSEARCH_PASSWORD) else: print("Using anonymous auth on opensearch client") + if host.startswith("https"): + init_params["use_ssl"] = True + else: + init_params["use_ssl"] = False if host.startswith("http"): url = "" else: From fa3eb763e6156c2c495f275aa7e455ff3867f5b1 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 13:48:02 +0100 Subject: [PATCH 10/22] Fixed OpenSearch connection setup --- engine/clients/opensearch/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index 285b6b2d..592fcefd 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -34,7 +34,7 @@ def get_opensearch_client(host, connection_params): url = "http://" url += f"{host}:{OPENSEARCH_PORT}" client = OpenSearch( - f"http://{host}:{OPENSEARCH_PORT}", + url, **init_params, ) assert client.ping() From 8ea777a4cbda41dc7f33def9aed51c0fec576b5e Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 14:10:38 +0100 Subject: [PATCH 11/22] Waiting for yellow status at least on opensearch post upload stage --- engine/clients/opensearch/config.py | 22 +++++++++++++++---- engine/clients/opensearch/upload.py | 34 ++++++++++++++++++++++------- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index 592fcefd..a931e5cb 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -1,5 +1,5 @@ import os - +import time from opensearchpy import OpenSearch OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", 9200)) @@ -16,14 +16,15 @@ def get_opensearch_client(host, connection_params): "verify_certs": False, "request_timeout": OPENSEARCH_TIMEOUT, "retry_on_timeout": True, + # don't show warnings about ssl certs verification + "ssl_show_warn": False, }, **connection_params, } + # Enabling basic auth on opensearch client + # If the user and password are empty we use anonymous auth on opensearch client if OPENSEARCH_USER != "" and OPENSEARCH_PASSWORD != "": - print("Enabling basic auth on opensearch client") init_params["basic_auth"] = (OPENSEARCH_USER, OPENSEARCH_PASSWORD) - else: - print("Using anonymous auth on opensearch client") if host.startswith("https"): init_params["use_ssl"] = True else: @@ -39,3 +40,16 @@ def get_opensearch_client(host, connection_params): ) assert client.ping() return client + + +def _wait_for_es_status(client, status="yellow"): + print(f"waiting for ES {status} status...") + for _ in range(100): + try: + client.cluster.health(wait_for_status=status) + return client + except ConnectionError: + time.sleep(0.1) + else: + # timeout + raise Exception("Elasticsearch failed to start.") diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index fb69b59e..1b5ec920 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -6,7 +6,12 @@ from dataset_reader.base_reader import Record from engine.base_client.upload import BaseUploader -from engine.clients.opensearch.config import OPENSEARCH_INDEX, get_opensearch_client +from engine.clients.opensearch.config import ( + OPENSEARCH_INDEX, + OPENSEARCH_INDEX_TIMEOUT, + get_opensearch_client, + _wait_for_es_status, +) class ClosableOpenSearch(OpenSearch): @@ -39,16 +44,29 @@ def upload_batch(cls, batch: List[Record]): index=OPENSEARCH_INDEX, body=operations, params={ - "timeout": 300, + "timeout": OPENSEARCH_INDEX_TIMEOUT, }, ) @classmethod def post_upload(cls, _distance): - cls.client.indices.forcemerge( - index=OPENSEARCH_INDEX, - params={ - "timeout": 300, - }, - ) + print("forcing the merge into 1 segment...") + tries = 30 + for i in range(tries + 1): + try: + cls.client.indices.forcemerge( + index=OPENSEARCH_INDEX, wait_for_completion=True + ) + except Exception as e: + if i < tries: # i is zero indexed + print( + "Received the following error during retry {}/{} while waiting for ES index to be ready... {}".format( + i, tries, e.__str__() + ) + ) + continue + else: + raise + _wait_for_es_status(cls.client) + break return {} From aec4967c17697ec5623304b2065b1a5332fba1a8 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 14:13:38 +0100 Subject: [PATCH 12/22] Fixes per PR pre-commit: isort --- engine/clients/opensearch/config.py | 1 + engine/clients/opensearch/upload.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index a931e5cb..570eb7a7 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -1,5 +1,6 @@ import os import time + from opensearchpy import OpenSearch OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", 9200)) diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index 1b5ec920..6123e30c 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -9,8 +9,8 @@ from engine.clients.opensearch.config import ( OPENSEARCH_INDEX, OPENSEARCH_INDEX_TIMEOUT, - get_opensearch_client, _wait_for_es_status, + get_opensearch_client, ) From e8b5764bf00e29e83b8b24666d2a72a559c2d87b Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 14:23:47 +0100 Subject: [PATCH 13/22] Fixed forcemerge api usage on opensearch --- engine/clients/opensearch/upload.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index 6123e30c..09dc3682 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -55,7 +55,10 @@ def post_upload(cls, _distance): for i in range(tries + 1): try: cls.client.indices.forcemerge( - index=OPENSEARCH_INDEX, wait_for_completion=True + index=OPENSEARCH_INDEX, + params={ + "timeout": OPENSEARCH_INDEX_TIMEOUT, + }, ) except Exception as e: if i < tries: # i is zero indexed From e5000006f2fd01f425edf6a2c44cca73b68ea3e1 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 14:46:41 +0100 Subject: [PATCH 14/22] Renamed references to ES --- engine/clients/opensearch/config.py | 5 ++--- engine/clients/opensearch/upload.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index 570eb7a7..650e9a0a 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -1,6 +1,5 @@ import os import time - from opensearchpy import OpenSearch OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", 9200)) @@ -44,7 +43,7 @@ def get_opensearch_client(host, connection_params): def _wait_for_es_status(client, status="yellow"): - print(f"waiting for ES {status} status...") + print(f"waiting for OpenSearch cluster health {status} status...") for _ in range(100): try: client.cluster.health(wait_for_status=status) @@ -53,4 +52,4 @@ def _wait_for_es_status(client, status="yellow"): time.sleep(0.1) else: # timeout - raise Exception("Elasticsearch failed to start.") + raise Exception("OpenSearch failed to start.") diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index 09dc3682..1bd755c4 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -63,7 +63,7 @@ def post_upload(cls, _distance): except Exception as e: if i < tries: # i is zero indexed print( - "Received the following error during retry {}/{} while waiting for ES index to be ready... {}".format( + "Received the following error during retry {}/{} while waiting for OpenSearch index to be ready... {}".format( i, tries, e.__str__() ) ) From 4f5937aa5cb3e016fed7826b325911effa0dfe99 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 15:05:18 +0100 Subject: [PATCH 15/22] Added backoff strategy for search_one method on opensearch client --- engine/clients/opensearch/search.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/engine/clients/opensearch/search.py b/engine/clients/opensearch/search.py index 63ac5321..49c81918 100644 --- a/engine/clients/opensearch/search.py +++ b/engine/clients/opensearch/search.py @@ -2,11 +2,17 @@ import uuid from typing import List, Tuple +import backoff from opensearchpy import OpenSearch +from opensearchpy.exceptions import TransportError from dataset_reader.base_reader import Query from engine.base_client.search import BaseSearcher -from engine.clients.opensearch.config import OPENSEARCH_INDEX, get_opensearch_client +from engine.clients.opensearch.config import ( + OPENSEARCH_INDEX, + OPENSEARCH_TIMEOUT, + get_opensearch_client, +) from engine.clients.opensearch.parser import OpenSearchConditionParser @@ -29,6 +35,17 @@ def init_client(cls, host, distance, connection_params: dict, search_params: dic cls.client = get_opensearch_client(host, connection_params) cls.search_params = search_params + def _search_backoff_handler(details): + print( + f"Backing off OpenSearch query for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}" + ) + + @backoff.on_exception( + backoff.expo, + TransportError, + max_time=OPENSEARCH_TIMEOUT, + on_backoff=_search_backoff_handler, + ) @classmethod def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: opensearch_query = { From f83bc750065c35a064b891997358d2a21ffad501 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 15:07:36 +0100 Subject: [PATCH 16/22] Fixes per PR pre-commit: isort --- engine/clients/opensearch/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index 650e9a0a..8f205d49 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -1,5 +1,6 @@ import os import time + from opensearchpy import OpenSearch OPENSEARCH_PORT = int(os.getenv("OPENSEARCH_PORT", 9200)) From abd86371c879dc5d202a56220349746c81218f28 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Fri, 17 May 2024 15:29:10 +0100 Subject: [PATCH 17/22] Added backoff strategy for search_one method on opensearch client --- engine/clients/opensearch/search.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/clients/opensearch/search.py b/engine/clients/opensearch/search.py index 49c81918..af909f40 100644 --- a/engine/clients/opensearch/search.py +++ b/engine/clients/opensearch/search.py @@ -40,13 +40,13 @@ def _search_backoff_handler(details): f"Backing off OpenSearch query for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}" ) + @classmethod @backoff.on_exception( backoff.expo, TransportError, max_time=OPENSEARCH_TIMEOUT, on_backoff=_search_backoff_handler, ) - @classmethod def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: opensearch_query = { "knn": { @@ -73,7 +73,7 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: "size": top, }, params={ - "timeout": 60, + "timeout": OPENSEARCH_TIMEOUT, }, ) return [ From ae5b620897519680bbd15b11f94bbacaf7ace1e7 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Sun, 19 May 2024 09:40:29 +0100 Subject: [PATCH 18/22] Improved index and search performance based uppon docs recommendation --- engine/clients/opensearch/config.py | 2 +- engine/clients/opensearch/configure.py | 5 +++++ engine/clients/opensearch/upload.py | 11 +++++++++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index 8f205d49..451cac7c 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -8,7 +8,7 @@ OPENSEARCH_USER = os.getenv("OPENSEARCH_USER", "opensearch") OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD", "passwd") OPENSEARCH_TIMEOUT = int(os.getenv("OPENSEARCH_TIMEOUT", 300)) -OPENSEARCH_INDEX_TIMEOUT = int(os.getenv("OPENSEARCH_INDEX_TIMEOUT", 300)) +OPENSEARCH_INDEX_TIMEOUT = int(os.getenv("OPENSEARCH_INDEX_TIMEOUT", 1200)) def get_opensearch_client(host, connection_params): diff --git a/engine/clients/opensearch/configure.py b/engine/clients/opensearch/configure.py index 605ac836..115780e1 100644 --- a/engine/clients/opensearch/configure.py +++ b/engine/clients/opensearch/configure.py @@ -44,12 +44,17 @@ def recreate(self, dataset: Dataset, collection_params): if dataset.config.vector_size > 16000: raise IncompatibilityError + # Followed the bellow link for tuning for ingestion and querying + # https://opensearch.org/docs/1.1/search-plugins/knn/performance-tuning/#indexing-performance-tuning self.client.indices.create( index=OPENSEARCH_INDEX, body={ "settings": { "index": { "knn": True, + "number_of_shards": 1, + "number_of_replicas": 0, + "refresh_interval": -1, # no refresh is required because we index all the data at once } }, "mappings": { diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index 1bd755c4..de6f2587 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -56,6 +56,7 @@ def post_upload(cls, _distance): try: cls.client.indices.forcemerge( index=OPENSEARCH_INDEX, + max_num_segments=1, params={ "timeout": OPENSEARCH_INDEX_TIMEOUT, }, @@ -72,4 +73,14 @@ def post_upload(cls, _distance): raise _wait_for_es_status(cls.client) break + print( + "Updated the index settings back to the default and waiting for indexing to be completed." + ) + # Update the index settings back to the default + refresh_interval = "1s" + response = cls.client.indices.put_settings( + index=OPENSEARCH_INDEX, + body={"index": {"refresh_interval": refresh_interval}}, + ) + _wait_for_es_status(cls.client) return {} From 5679a1907438aed3d25895202461792d219518f8 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Sun, 19 May 2024 09:44:33 +0100 Subject: [PATCH 19/22] Collecting index stats at end of ingestion --- engine/clients/opensearch/upload.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index de6f2587..2bde9c06 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -84,3 +84,11 @@ def post_upload(cls, _distance): ) _wait_for_es_status(cls.client) return {} + + def get_memory_usage(cls): + index_stats = cls.client.indices.stats(index=OPENSEARCH_INDEX) + size_in_bytes = index_stats["_all"]["primaries"]["store"]["size_in_bytes"] + return { + "size_in_bytes": size_in_bytes, + "index_info": index_stats, + } From 276892c5c2d6e1627eb3c84383a7445406f34e79 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Sun, 19 May 2024 18:31:07 +0100 Subject: [PATCH 20/22] Using backoff on opensearch ingestion --- engine/clients/opensearch/config.py | 6 +++++- engine/clients/opensearch/configure.py | 4 ++-- engine/clients/opensearch/upload.py | 20 +++++++++++++++++--- 3 files changed, 24 insertions(+), 6 deletions(-) diff --git a/engine/clients/opensearch/config.py b/engine/clients/opensearch/config.py index 451cac7c..a1bc5bfa 100644 --- a/engine/clients/opensearch/config.py +++ b/engine/clients/opensearch/config.py @@ -8,7 +8,11 @@ OPENSEARCH_USER = os.getenv("OPENSEARCH_USER", "opensearch") OPENSEARCH_PASSWORD = os.getenv("OPENSEARCH_PASSWORD", "passwd") OPENSEARCH_TIMEOUT = int(os.getenv("OPENSEARCH_TIMEOUT", 300)) -OPENSEARCH_INDEX_TIMEOUT = int(os.getenv("OPENSEARCH_INDEX_TIMEOUT", 1200)) +OPENSEARCH_BULK_INDEX_TIMEOUT = int(os.getenv("OPENSEARCH_BULK_INDEX_TIMEOUT", 3600)) +OPENSEARCH_FULL_INDEX_TIMEOUT = int(os.getenv("OPENSEARCH_FULL_INDEX_TIMEOUT", 3600)) +OPENSEARCH_DELETE_INDEX_TIMEOUT = int( + os.getenv("OPENSEARCH_DELETE_INDEX_TIMEOUT", 1200) +) def get_opensearch_client(host, connection_params): diff --git a/engine/clients/opensearch/configure.py b/engine/clients/opensearch/configure.py index 115780e1..0c0f342b 100644 --- a/engine/clients/opensearch/configure.py +++ b/engine/clients/opensearch/configure.py @@ -6,7 +6,7 @@ from engine.base_client.distances import Distance from engine.clients.opensearch.config import ( OPENSEARCH_INDEX, - OPENSEARCH_INDEX_TIMEOUT, + OPENSEARCH_DELETE_INDEX_TIMEOUT, get_opensearch_client, ) @@ -32,7 +32,7 @@ def clean(self): self.client.indices.delete( index=OPENSEARCH_INDEX, params={ - "timeout": OPENSEARCH_INDEX_TIMEOUT, + "timeout": OPENSEARCH_DELETE_INDEX_TIMEOUT, }, ) except NotFoundError: diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index 2bde9c06..3adb711c 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -2,13 +2,16 @@ import uuid from typing import List +import backoff from opensearchpy import OpenSearch +from opensearchpy.exceptions import TransportError from dataset_reader.base_reader import Record from engine.base_client.upload import BaseUploader from engine.clients.opensearch.config import ( OPENSEARCH_INDEX, - OPENSEARCH_INDEX_TIMEOUT, + OPENSEARCH_BULK_INDEX_TIMEOUT, + OPENSEARCH_FULL_INDEX_TIMEOUT, _wait_for_es_status, get_opensearch_client, ) @@ -32,7 +35,18 @@ def init_client(cls, host, distance, connection_params, upload_params): cls.client = get_opensearch_client(host, connection_params) cls.upload_params = upload_params + def _upload_backoff_handler(details): + print( + f"Backing off OpenSearch bulk upload for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}" + ) + @classmethod + @backoff.on_exception( + backoff.expo, + TransportError, + max_time=OPENSEARCH_FULL_INDEX_TIMEOUT, + on_backoff=_upload_backoff_handler, + ) def upload_batch(cls, batch: List[Record]): operations = [] for record in batch: @@ -44,7 +58,7 @@ def upload_batch(cls, batch: List[Record]): index=OPENSEARCH_INDEX, body=operations, params={ - "timeout": OPENSEARCH_INDEX_TIMEOUT, + "timeout": OPENSEARCH_BULK_INDEX_TIMEOUT, }, ) @@ -58,7 +72,7 @@ def post_upload(cls, _distance): index=OPENSEARCH_INDEX, max_num_segments=1, params={ - "timeout": OPENSEARCH_INDEX_TIMEOUT, + "timeout": OPENSEARCH_FULL_INDEX_TIMEOUT, }, ) except Exception as e: From 2c5476215ac5665e455cbb437acc451ec6445601 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 20 May 2024 06:46:48 +0100 Subject: [PATCH 21/22] Included single shard experiment for opensearch. Added backoff to post upload stage --- engine/clients/opensearch/configure.py | 20 +++-- engine/clients/opensearch/upload.py | 36 +++----- ...opensearch-single-node-default-index.json} | 0 .../opensearch-single-node-single-shard.json | 87 +++++++++++++++++++ 4 files changed, 113 insertions(+), 30 deletions(-) rename experiments/configurations/{opensearch-single-node.json => opensearch-single-node-default-index.json} (100%) create mode 100644 experiments/configurations/opensearch-single-node-single-shard.json diff --git a/engine/clients/opensearch/configure.py b/engine/clients/opensearch/configure.py index 0c0f342b..ad0902c7 100644 --- a/engine/clients/opensearch/configure.py +++ b/engine/clients/opensearch/configure.py @@ -44,18 +44,26 @@ def recreate(self, dataset: Dataset, collection_params): if dataset.config.vector_size > 16000: raise IncompatibilityError + index_settings = ( + { + "knn": True, + "number_of_replicas": 0, + "refresh_interval": -1, # no refresh is required because we index all the data at once + }, + ) + index_config = collection_params.get("index") + + # if we specify the number_of_shards on the config, enforce it. otherwise use the default + if "number_of_shards" in index_config: + index_settings["number_of_shards"] = 1 + # Followed the bellow link for tuning for ingestion and querying # https://opensearch.org/docs/1.1/search-plugins/knn/performance-tuning/#indexing-performance-tuning self.client.indices.create( index=OPENSEARCH_INDEX, body={ "settings": { - "index": { - "knn": True, - "number_of_shards": 1, - "number_of_replicas": 0, - "refresh_interval": -1, # no refresh is required because we index all the data at once - } + "index": index_settings, }, "mappings": { "properties": { diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index 3adb711c..de039933 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -40,6 +40,11 @@ def _upload_backoff_handler(details): f"Backing off OpenSearch bulk upload for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}" ) + def _index_backoff_handler(details): + print( + f"Backing off OpenSearch indexing for {details['wait']} seconds after {details['tries']} tries due to {details['exception']}" + ) + @classmethod @backoff.on_exception( backoff.expo, @@ -63,36 +68,19 @@ def upload_batch(cls, batch: List[Record]): ) @classmethod + @backoff.on_exception( + backoff.expo, + TransportError, + max_time=OPENSEARCH_FULL_INDEX_TIMEOUT, + on_backoff=_index_backoff_handler, + ) def post_upload(cls, _distance): - print("forcing the merge into 1 segment...") - tries = 30 - for i in range(tries + 1): - try: - cls.client.indices.forcemerge( - index=OPENSEARCH_INDEX, - max_num_segments=1, - params={ - "timeout": OPENSEARCH_FULL_INDEX_TIMEOUT, - }, - ) - except Exception as e: - if i < tries: # i is zero indexed - print( - "Received the following error during retry {}/{} while waiting for OpenSearch index to be ready... {}".format( - i, tries, e.__str__() - ) - ) - continue - else: - raise - _wait_for_es_status(cls.client) - break print( "Updated the index settings back to the default and waiting for indexing to be completed." ) # Update the index settings back to the default refresh_interval = "1s" - response = cls.client.indices.put_settings( + cls.client.indices.put_settings( index=OPENSEARCH_INDEX, body={"index": {"refresh_interval": refresh_interval}}, ) diff --git a/experiments/configurations/opensearch-single-node.json b/experiments/configurations/opensearch-single-node-default-index.json similarity index 100% rename from experiments/configurations/opensearch-single-node.json rename to experiments/configurations/opensearch-single-node-default-index.json diff --git a/experiments/configurations/opensearch-single-node-single-shard.json b/experiments/configurations/opensearch-single-node-single-shard.json new file mode 100644 index 00000000..d1789dda --- /dev/null +++ b/experiments/configurations/opensearch-single-node-single-shard.json @@ -0,0 +1,87 @@ +[ + { + "name": "opensearch-default", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 16, "ef_construction": 100 } } }, +"search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } + ], + "upload_params": { "parallel": 16 } + }, + { + "name": "opensearch-m-16-ef-128", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 16, "ef_construction": 128 } } }, + "search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + "upload_params": { "parallel": 16 } + }, + { + "name": "opensearch-m-32-ef-128", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 32, "ef_construction": 128 } } }, + "search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + "upload_params": { "parallel": 16 } + }, + { + "name": "opensearch-m-32-ef-256", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 32, "ef_construction": 256 } } }, + "search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + "upload_params": { "parallel": 16 } + }, + { + "name": "opensearch-m-32-ef-512", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 32, "ef_construction": 512 } } }, + "search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + "upload_params": { "parallel": 16 } + }, + { + "name": "opensearch-m-64-ef-256", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 64, "ef_construction": 256 } } }, + "search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + "upload_params": { "parallel": 16 } + }, + { + "name": "opensearch-m-64-ef-512", + "engine": "opensearch", + "connection_params": { + "request_timeout": 10000 + }, + "collection_params": { "index": { "number_of_shards": 1 }, "method": { "parameters": { "m": 64, "ef_construction": 512 } } }, + "search_params": [ + { "parallel": 1, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 1, "config": { "knn.algo_param.ef_search": 512 } }, + { "parallel": 100, "config": { "knn.algo_param.ef_search": 128 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 256 } }, { "parallel": 100, "config": { "knn.algo_param.ef_search": 512 } } ], + "upload_params": { "parallel": 16 } + } +] From 7292e92a5118854398bea03a6d2532dfc81e99cb Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Mon, 20 May 2024 11:46:35 +0100 Subject: [PATCH 22/22] Fixes per PR pre-commit: isort --- engine/clients/opensearch/configure.py | 2 +- engine/clients/opensearch/upload.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/clients/opensearch/configure.py b/engine/clients/opensearch/configure.py index ad0902c7..7fd69d5e 100644 --- a/engine/clients/opensearch/configure.py +++ b/engine/clients/opensearch/configure.py @@ -5,8 +5,8 @@ from engine.base_client.configure import BaseConfigurator from engine.base_client.distances import Distance from engine.clients.opensearch.config import ( - OPENSEARCH_INDEX, OPENSEARCH_DELETE_INDEX_TIMEOUT, + OPENSEARCH_INDEX, get_opensearch_client, ) diff --git a/engine/clients/opensearch/upload.py b/engine/clients/opensearch/upload.py index de039933..ded5983e 100644 --- a/engine/clients/opensearch/upload.py +++ b/engine/clients/opensearch/upload.py @@ -9,9 +9,9 @@ from dataset_reader.base_reader import Record from engine.base_client.upload import BaseUploader from engine.clients.opensearch.config import ( - OPENSEARCH_INDEX, OPENSEARCH_BULK_INDEX_TIMEOUT, OPENSEARCH_FULL_INDEX_TIMEOUT, + OPENSEARCH_INDEX, _wait_for_es_status, get_opensearch_client, )