diff --git a/.gitignore b/.gitignore index e21c3643..8cb6a349 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,6 @@ NOTES.md results/* tools/custom/data.json + +.venv +*.DS_Store diff --git a/engine/clients/cassandra/README.md b/engine/clients/cassandra/README.md new file mode 100644 index 00000000..7cdf0ebb --- /dev/null +++ b/engine/clients/cassandra/README.md @@ -0,0 +1,55 @@ +# Apache Cassandra® + +## Setup +### Pre-requisites +Adjust the configuration file [`cassandra-single-node.json`](../../../experiments/configurations/cassandra-single-node.json) to your needs. The default configuration is set to use the default Apache Cassandra® settings. +### Start up the server +Run the following command to start the server (alternatively, run `docker compose up -d` to run in detached mode): +```bash +$ docker compose up + +[+] Running 1/1 + ✔ cassandra Pulled 1.4s +[+] Running 1/1 + ✔ Container cassandra-benchmark Recreated 0.1s +Attaching to cassandra-benchmark +cassandra-benchmark | CompileCommand: dontinline org/apache/cassandra/db/Columns$Serializer.deserializeLargeSubset(Lorg/apache/cassandra/io/util/DataInputPlus;Lorg/apache/cassandra/db/Columns;I)Lorg/apache/cassandra/db/Columns; bool dontinline = true +... +cassandra-benchmark | INFO [main] 2025-04-04 22:27:38,592 StorageService.java:957 - Cassandra version: 5.0.3 +cassandra-benchmark | INFO [main] 2025-04-04 22:27:38,592 StorageService.java:958 - Git SHA: b0226c8ea122c3e5ea8680efb0744d33924fd732 +cassandra-benchmark | INFO [main] 2025-04-04 22:27:38,592 StorageService.java:959 - CQL version: 3.4.7 +... +cassandra-benchmark | INFO [main] 2025-04-04 22:28:25,091 StorageService.java:3262 - Node /172.18.0.2:7000 state jump to NORMAL +``` +> [!TIP] +> Other helpful commands: +> - Run `docker exec -it cassandra-benchmark cqlsh` to access the Cassandra shell. +> - Run `docker compose logs --follow --tail 10` to view & follow the logs of the container running Cassandra. + +### Start up the client benchmark +Run the following command to start the client benchmark using `glove-25-angular` dataset as an example: +```bash +% python3 -m run --engines cassandra-single-node --datasets glove-25-angular +``` +and you'll see the following output: +```bash +Running experiment: cassandra-single-node - glove-25-angular +Downloading http://ann-benchmarks.com/glove-25-angular.hdf5... +... +Experiment stage: Configure +Experiment stage: Upload +1183514it [mm:ss, <...>it/s] +Upload time: <...> +Index is not ready yet, sleeping for 2 minutes... +... +Total import time: <...> +Experiment stage: Search +10000it [mm:ss, <...>it/s] +10000it [mm:ss, <...>it/s] +Experiment stage: Done +Results saved to: /path/to/repository/results +... +``` + +> [!TIP] +> If you want to see the detailed results, set environment variable `DETAILED_RESULTS=1` before running the benchmark. \ No newline at end of file diff --git a/engine/clients/cassandra/__init__.py b/engine/clients/cassandra/__init__.py new file mode 100644 index 00000000..54b47b0f --- /dev/null +++ b/engine/clients/cassandra/__init__.py @@ -0,0 +1,5 @@ +from engine.clients.cassandra.configure import CassandraConfigurator +from engine.clients.cassandra.search import CassandraSearcher +from engine.clients.cassandra.upload import CassandraUploader + +__all__ = ["CassandraConfigurator", "CassandraSearcher", "CassandraUploader"] diff --git a/engine/clients/cassandra/config.py b/engine/clients/cassandra/config.py new file mode 100644 index 00000000..97db40ce --- /dev/null +++ b/engine/clients/cassandra/config.py @@ -0,0 +1,7 @@ +import os + +CASSANDRA_KEYSPACE = os.getenv("CASSANDRA_KEYSPACE", "benchmark") +CASSANDRA_TABLE = os.getenv("CASSANDRA_TABLE", "vectors") +ASTRA_API_ENDPOINT = os.getenv("ASTRA_API_ENDPOINT", None) +ASTRA_API_KEY = os.getenv("ASTRA_API_KEY", None) +ASTRA_SCB_PATH = os.getenv("ASTRA_SCB_PATH", None) diff --git a/engine/clients/cassandra/configure.py b/engine/clients/cassandra/configure.py new file mode 100644 index 00000000..30c6f734 --- /dev/null +++ b/engine/clients/cassandra/configure.py @@ -0,0 +1,102 @@ +from cassandra import ConsistencyLevel, ProtocolVersion +from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile +from cassandra.policies import ( + DCAwareRoundRobinPolicy, + ExponentialReconnectionPolicy, + TokenAwarePolicy, +) + +from benchmark.dataset import Dataset +from engine.base_client.configure import BaseConfigurator +from engine.base_client.distances import Distance +from engine.clients.cassandra.config import CASSANDRA_KEYSPACE, CASSANDRA_TABLE + + +class CassandraConfigurator(BaseConfigurator): + SPARSE_VECTOR_SUPPORT = False + DISTANCE_MAPPING = { + Distance.L2: "euclidean", + Distance.COSINE: "cosine", + Distance.DOT: "dot_product", + } + + def __init__(self, host, collection_params: dict, connection_params: dict): + super().__init__(host, collection_params, connection_params) + + # Set up execution profiles for consistency and performance + profile = ExecutionProfile( + load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()), + consistency_level=ConsistencyLevel.LOCAL_QUORUM, + request_timeout=60, + ) + + # Initialize Cassandra cluster connection + self.cluster = Cluster( + contact_points=[host], + execution_profiles={EXEC_PROFILE_DEFAULT: profile}, + protocol_version=ProtocolVersion.V4, + reconnection_policy=ExponentialReconnectionPolicy( + base_delay=1, max_delay=60 + ), + **connection_params, + ) + self.session = self.cluster.connect() + + def clean(self): + """Drop the keyspace if it exists""" + self.session.execute(f"DROP KEYSPACE IF EXISTS {CASSANDRA_KEYSPACE}") + + def recreate(self, dataset: Dataset, collection_params): + """Create keyspace and table for vector search""" + # Create keyspace if not exists + self.session.execute( + f"""CREATE KEYSPACE IF NOT EXISTS {CASSANDRA_KEYSPACE} + WITH REPLICATION = {{ 'class': 'SimpleStrategy', 'replication_factor': 1 }}""" + ) + + # Use the keyspace + self.session.execute(f"USE {CASSANDRA_KEYSPACE}") + + # Get the distance metric + distance_metric = self.DISTANCE_MAPPING.get(dataset.config.distance) + vector_size = dataset.config.vector_size + + # Create vector table + # Using a simple schema that supports vector similarity search + self.session.execute( + f"""CREATE TABLE IF NOT EXISTS {CASSANDRA_TABLE} ( + id int PRIMARY KEY, + embedding vector, + metadata map + )""" + ) + + # Create vector index using the appropriate distance metric + self.session.execute( + f"""CREATE CUSTOM INDEX IF NOT EXISTS vector_index ON {CASSANDRA_TABLE}(embedding) + USING 'StorageAttachedIndex' + WITH OPTIONS = {{ 'similarity_function': '{distance_metric}' }}""" + ) + + # Add additional schema fields based on collection_params if needed + for field_name, field_type in dataset.config.schema.items(): + if field_type in ["keyword", "text"]: + # For text fields, we would typically add them to metadata + pass + elif field_type in ["int", "float"]: + # For numeric fields that need separate indexing + # In a real implementation, we might alter the table to add these columns + pass + + return collection_params + + def execution_params(self, distance, vector_size) -> dict: + """Return any execution parameters needed for the dataset""" + return {"normalize": distance == Distance.COSINE} + + def delete_client(self): + """Close the Cassandra connection""" + if hasattr(self, "session") and self.session: + self.session.shutdown() + if hasattr(self, "cluster") and self.cluster: + self.cluster.shutdown() diff --git a/engine/clients/cassandra/parser.py b/engine/clients/cassandra/parser.py new file mode 100644 index 00000000..29a32e5a --- /dev/null +++ b/engine/clients/cassandra/parser.py @@ -0,0 +1,92 @@ +from typing import Any, List, Optional + +from engine.base_client.parser import BaseConditionParser, FieldValue + + +class CassandraConditionParser(BaseConditionParser): + def build_condition( + self, and_subfilters: Optional[List[Any]], or_subfilters: Optional[List[Any]] + ) -> Optional[Any]: + """ + Build a CQL condition expression that combines AND and OR subfilters + """ + conditions = [] + + # Add AND conditions + if and_subfilters and len(and_subfilters) > 0: + and_conds = " AND ".join([f"({cond})" for cond in and_subfilters if cond]) + if and_conds: + conditions.append(f"({and_conds})") + + # Add OR conditions + if or_subfilters and len(or_subfilters) > 0: + or_conds = " OR ".join([f"({cond})" for cond in or_subfilters if cond]) + if or_conds: + conditions.append(f"({or_conds})") + + # Combine all conditions + if not conditions: + return None + + return " AND ".join(conditions) + + def build_exact_match_filter(self, field_name: str, value: FieldValue) -> Any: + """ + Build a CQL exact match condition for metadata fields + For Cassandra, we format metadata as a map with string values + """ + if isinstance(value, str): + return f"metadata['{field_name}'] = '{value}'" + else: + return f"metadata['{field_name}'] = '{str(value)}'" + + def build_range_filter( + self, + field_name: str, + lt: Optional[FieldValue], + gt: Optional[FieldValue], + lte: Optional[FieldValue], + gte: Optional[FieldValue], + ) -> Any: + """ + Build a CQL range filter condition + """ + conditions = [] + + if lt is not None: + if isinstance(lt, str): + conditions.append(f"metadata['{field_name}'] < '{lt}'") + else: + conditions.append(f"metadata['{field_name}'] < '{str(lt)}'") + + if gt is not None: + if isinstance(gt, str): + conditions.append(f"metadata['{field_name}'] > '{gt}'") + else: + conditions.append(f"metadata['{field_name}'] > '{str(gt)}'") + + if lte is not None: + if isinstance(lte, str): + conditions.append(f"metadata['{field_name}'] <= '{lte}'") + else: + conditions.append(f"metadata['{field_name}'] <= '{str(lte)}'") + + if gte is not None: + if isinstance(gte, str): + conditions.append(f"metadata['{field_name}'] >= '{gte}'") + else: + conditions.append(f"metadata['{field_name}'] >= '{str(gte)}'") + + return " AND ".join(conditions) + + def build_geo_filter( + self, field_name: str, lat: float, lon: float, radius: float + ) -> Any: + """ + Build a CQL geo filter condition + Note: Basic Cassandra doesn't have built-in geo filtering. + This is a simplified approach that won't actually work without extensions. + """ + # In a real implementation with a geo extension, we'd implement proper geo filtering + # For this benchmark, we'll return a placeholder condition that doesn't filter + return "1=1" # Always true condition as a placeholder diff --git a/engine/clients/cassandra/search.py b/engine/clients/cassandra/search.py new file mode 100644 index 00000000..427dde8c --- /dev/null +++ b/engine/clients/cassandra/search.py @@ -0,0 +1,126 @@ +import multiprocessing as mp +from typing import List, Tuple + +from cassandra import ConsistencyLevel, ProtocolVersion +from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile +from cassandra.policies import ( + DCAwareRoundRobinPolicy, + ExponentialReconnectionPolicy, + TokenAwarePolicy, +) + +from dataset_reader.base_reader import Query +from engine.base_client.distances import Distance +from engine.base_client.search import BaseSearcher +from engine.clients.cassandra.config import CASSANDRA_KEYSPACE, CASSANDRA_TABLE +from engine.clients.cassandra.parser import CassandraConditionParser + + +class CassandraSearcher(BaseSearcher): + search_params = {} + session = None + cluster = None + parser = CassandraConditionParser() + + @classmethod + def init_client(cls, host, distance, connection_params: dict, search_params: dict): + # Set up execution profiles for consistency and performance + profile = ExecutionProfile( + load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()), + consistency_level=ConsistencyLevel.LOCAL_ONE, + request_timeout=60, + ) + + # Initialize Cassandra cluster connection + cls.cluster = Cluster( + contact_points=[host], + execution_profiles={EXEC_PROFILE_DEFAULT: profile}, + reconnection_policy=ExponentialReconnectionPolicy( + base_delay=1, max_delay=60 + ), + protocol_version=ProtocolVersion.V4, + **connection_params, + ) + cls.session = cls.cluster.connect(CASSANDRA_KEYSPACE) + cls.search_params = search_params + + # Update prepared statements with current search parameters + cls.update_prepared_statements(distance) + + @classmethod + def get_mp_start_method(cls): + return "fork" if "fork" in mp.get_all_start_methods() else "spawn" + + @classmethod + def update_prepared_statements(cls, distance): + """Create prepared statements for vector searches""" + # Prepare a vector similarity search query + limit = cls.search_params.get("top", 10) + + if distance == Distance.COSINE: + SIMILARITY_FUNC = "similarity_cosine" + elif distance == Distance.L2: + SIMILARITY_FUNC = "similarity_euclidean" + elif distance == Distance.DOT: + SIMILARITY_FUNC = "similarity_dot_product" + else: + raise ValueError(f"Unsupported distance metric: {distance}") + + cls.ann_search_stmt = cls.session.prepare( + f"""SELECT id, {SIMILARITY_FUNC}(embedding, ?) as distance + FROM {CASSANDRA_TABLE} + -- The 'ANN' clause is used for Approximate Nearest Neighbor (ANN) search. + -- It orders results based on proximity to the query vector using the specified similarity function. + -- Ensure that the Cassandra setup supports ANN queries, as this is not standard CQL. + ORDER BY embedding ANN OF ? + LIMIT {limit}""" + ) + + # Prepare a statement for filtered vector search + cls.filtered_search_query_template = f"""SELECT id, {SIMILARITY_FUNC}(embedding, ?) as distance + FROM {CASSANDRA_TABLE} + WHERE {{conditions}} + ORDER BY embedding ANN OF ? + LIMIT {limit}""" + + @classmethod + def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: + """Execute a vector similarity search with optional filters""" + # Convert query vector to a format Cassandra can use + query_vector = ( + query.vector.tolist() if hasattr(query.vector, "tolist") else query.vector + ) + + # Generate filter conditions if metadata conditions exist + filter_conditions = cls.parser.parse(query.meta_conditions) + + try: + if filter_conditions: + # Use the filtered search query + query_with_conditions = cls.filtered_search_query_template.format( + conditions=filter_conditions + ) + results = cls.session.execute( + cls.session.prepare(query_with_conditions), + (query_vector, query_vector), + ) + else: + # Use the basic ANN search query + results = cls.session.execute( + cls.ann_search_stmt, (query_vector, query_vector) + ) + + # Extract and return results + return [(row.id, row.distance) for row in results] + + except Exception as ex: + print(f"Error during Cassandra vector search: {ex}") + raise ex + + @classmethod + def delete_client(cls): + """Close the Cassandra connection""" + if cls.session: + cls.session.shutdown() + if cls.cluster: + cls.cluster.shutdown() diff --git a/engine/clients/cassandra/upload.py b/engine/clients/cassandra/upload.py new file mode 100644 index 00000000..d529ce41 --- /dev/null +++ b/engine/clients/cassandra/upload.py @@ -0,0 +1,105 @@ +import time +from typing import List + +from cassandra import ConsistencyLevel, ProtocolVersion +from cassandra.cluster import EXEC_PROFILE_DEFAULT, Cluster, ExecutionProfile, ResultSet +from cassandra.policies import ( + DCAwareRoundRobinPolicy, + ExponentialReconnectionPolicy, + TokenAwarePolicy, +) + +from dataset_reader.base_reader import Record +from engine.base_client.upload import BaseUploader +from engine.clients.cassandra.config import CASSANDRA_KEYSPACE, CASSANDRA_TABLE + + +class CassandraUploader(BaseUploader): + client = None + upload_params = {} + + @classmethod + def init_client(cls, host, distance, connection_params, upload_params): + # Set up execution profiles for consistency and performance + profile = ExecutionProfile( + load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy()), + consistency_level=ConsistencyLevel.LOCAL_QUORUM, + request_timeout=60, + ) + + # Initialize Cassandra cluster connection + cls.cluster = Cluster( + contact_points=[host], + execution_profiles={EXEC_PROFILE_DEFAULT: profile}, + protocol_version=ProtocolVersion.V4, + reconnection_policy=ExponentialReconnectionPolicy( + base_delay=1, max_delay=60 + ), + **connection_params, + ) + cls.session = cls.cluster.connect(CASSANDRA_KEYSPACE) + cls.upload_params = upload_params + + # Prepare statements for faster uploads + cls.insert_stmt = cls.session.prepare( + f"""INSERT INTO {CASSANDRA_TABLE} (id, embedding, metadata) VALUES (?, ?, ?)""" + ) + + @classmethod + def upload_batch(cls, batch: List[Record]): + """Upload a batch of records to Cassandra""" + for point in batch: + # Convert metadata to a map format that Cassandra can store + metadata = {} + if point.metadata: + for key, value in point.metadata.items(): + # Convert all values to strings for simplicity + metadata[str(key)] = str(value) + + # Cassandra vector type requires a list of float values + vector = ( + point.vector.tolist() + if hasattr(point.vector, "tolist") + else point.vector + ) + + # Execute the prepared statement + cls.session.execute(cls.insert_stmt, (int(point.id), vector, metadata)) + + @classmethod + def check_index_status(cls) -> ResultSet: + """ + Check the status of the index + See https://docs.datastax.com/en/cql/cassandra-5.0/develop/indexing/sai/sai-monitor.html + """ + if cls.session is None: + raise RuntimeError("CQL session is not initialized") + return cls.session.execute( + f""" + SELECT is_queryable, is_building + FROM system_views.sai_column_indexes + WHERE keyspace_name='{CASSANDRA_KEYSPACE}' AND table_name='{CASSANDRA_TABLE}' AND index_name='vector_index'; + """ + ).one() + + @classmethod + def post_upload(cls, _distance): + """Post-upload operations, like waiting for indexing to complete""" + # Cassandra vector indexes are automatically built when data is inserted + # so the wait time must be very quick + while True: + result = cls.check_index_status() + idx_ready = result and result.is_queryable and not result.is_building + if idx_ready: + break + print("Index is not ready yet, sleeping for 30 seconds...") + time.sleep(30) + return {} + + @classmethod + def delete_client(cls): + """Close the Cassandra connection""" + if hasattr(cls, "session") and cls.session: + cls.session.shutdown() + if hasattr(cls, "cluster") and cls.cluster: + cls.cluster.shutdown() diff --git a/engine/clients/client_factory.py b/engine/clients/client_factory.py index a74df2ab..9f4535fd 100644 --- a/engine/clients/client_factory.py +++ b/engine/clients/client_factory.py @@ -7,6 +7,11 @@ BaseSearcher, BaseUploader, ) +from engine.clients.cassandra import ( + CassandraConfigurator, + CassandraSearcher, + CassandraUploader, +) from engine.clients.elasticsearch import ( ElasticConfigurator, ElasticSearcher, @@ -39,6 +44,7 @@ "opensearch": OpenSearchConfigurator, "redis": RedisConfigurator, "pgvector": PgVectorConfigurator, + "cassandra": CassandraConfigurator, } ENGINE_UPLOADERS = { @@ -49,6 +55,7 @@ "opensearch": OpenSearchUploader, "redis": RedisUploader, "pgvector": PgVectorUploader, + "cassandra": CassandraUploader, } ENGINE_SEARCHERS = { @@ -59,6 +66,7 @@ "opensearch": OpenSearchSearcher, "redis": RedisSearcher, "pgvector": PgVectorSearcher, + "cassandra": CassandraSearcher, } diff --git a/engine/clients/qdrant/configure.py b/engine/clients/qdrant/configure.py index de716ff9..fcb43ab5 100644 --- a/engine/clients/qdrant/configure.py +++ b/engine/clients/qdrant/configure.py @@ -4,7 +4,7 @@ from benchmark.dataset import Dataset from engine.base_client.configure import BaseConfigurator from engine.base_client.distances import Distance -from engine.clients.qdrant.config import QDRANT_COLLECTION_NAME, QDRANT_API_KEY +from engine.clients.qdrant.config import QDRANT_API_KEY, QDRANT_COLLECTION_NAME class QdrantConfigurator(BaseConfigurator): @@ -32,7 +32,9 @@ class QdrantConfigurator(BaseConfigurator): def __init__(self, host, collection_params: dict, connection_params: dict): super().__init__(host, collection_params, connection_params) - self.client = QdrantClient(url=host, api_key=QDRANT_API_KEY, **connection_params) + self.client = QdrantClient( + url=host, api_key=QDRANT_API_KEY, **connection_params + ) def clean(self): self.client.delete_collection(collection_name=QDRANT_COLLECTION_NAME) diff --git a/engine/clients/qdrant/search.py b/engine/clients/qdrant/search.py index b6b00908..b664db28 100644 --- a/engine/clients/qdrant/search.py +++ b/engine/clients/qdrant/search.py @@ -2,13 +2,12 @@ from typing import List, Tuple import httpx -from qdrant_client import QdrantClient +from qdrant_client import QdrantClient, models from qdrant_client._pydantic_compat import construct -from qdrant_client import models from dataset_reader.base_reader import Query from engine.base_client.search import BaseSearcher -from engine.clients.qdrant.config import QDRANT_COLLECTION_NAME, QDRANT_API_KEY +from engine.clients.qdrant.config import QDRANT_API_KEY, QDRANT_COLLECTION_NAME from engine.clients.qdrant.parser import QdrantConditionParser @@ -48,7 +47,6 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: values=query.sparse_vector.values, ) - prefetch = cls.search_params.get("prefetch") if prefetch: @@ -65,7 +63,9 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]: query=query_vector, query_filter=cls.parser.parse(query.meta_conditions), limit=top, - search_params=models.SearchParams(**cls.search_params.get("config", {})), + search_params=models.SearchParams( + **cls.search_params.get("config", {}) + ), with_payload=cls.search_params.get("with_payload", False), ) except Exception as ex: diff --git a/engine/clients/qdrant/upload.py b/engine/clients/qdrant/upload.py index 18dcedbb..ca55b55d 100644 --- a/engine/clients/qdrant/upload.py +++ b/engine/clients/qdrant/upload.py @@ -13,7 +13,7 @@ from dataset_reader.base_reader import Record from engine.base_client.upload import BaseUploader -from engine.clients.qdrant.config import QDRANT_COLLECTION_NAME, QDRANT_API_KEY +from engine.clients.qdrant.config import QDRANT_API_KEY, QDRANT_COLLECTION_NAME class QdrantUploader(BaseUploader): @@ -24,7 +24,9 @@ class QdrantUploader(BaseUploader): def init_client(cls, host, distance, connection_params, upload_params): os.environ["GRPC_ENABLE_FORK_SUPPORT"] = "true" os.environ["GRPC_POLL_STRATEGY"] = "epoll,poll" - cls.client = QdrantClient(url=host, prefer_grpc=True, api_key=QDRANT_API_KEY, **connection_params) + cls.client = QdrantClient( + url=host, prefer_grpc=True, api_key=QDRANT_API_KEY, **connection_params + ) cls.upload_params = upload_params @classmethod diff --git a/engine/servers/cassandra-single-node/docker-compose.yaml b/engine/servers/cassandra-single-node/docker-compose.yaml new file mode 100644 index 00000000..093d9b9f --- /dev/null +++ b/engine/servers/cassandra-single-node/docker-compose.yaml @@ -0,0 +1,29 @@ +version: '3.8' + +services: + cassandra: + image: cassandra:latest + container_name: cassandra-benchmark + ports: + - "9042:9042" # CQL native transport port + environment: + - CASSANDRA_CLUSTER_NAME=VectorBenchmark + - MAX_HEAP_SIZE=4G + - HEAP_NEWSIZE=800M + volumes: + - cassandra-data:/var/lib/cassandra + # Healthcheck to verify when Cassandra is ready + healthcheck: + test: ["CMD-SHELL", "cqlsh -e 'describe keyspaces' || exit 1"] + interval: 30s + timeout: 10s + retries: 10 + restart: unless-stopped + logging: + driver: "json-file" + options: + max-file: "1" + max-size: "10m" + +volumes: + cassandra-data: \ No newline at end of file diff --git a/experiments/configurations/cassandra-single-node.json b/experiments/configurations/cassandra-single-node.json new file mode 100644 index 00000000..036e48b4 --- /dev/null +++ b/experiments/configurations/cassandra-single-node.json @@ -0,0 +1,25 @@ +[ + { + "name": "cassandra-single-node", + "engine": "cassandra", + "host": "localhost", + "connection_params": { + "port": 9042, + "connect_timeout": 60 + }, + "collection_params": { + }, + "upload_params": { + "batch_size": 100, + "parallel": 4 + }, + "search_params": [ + { + "top": 10 + }, + { + "top": 100 + } + ] + } +] \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 1726ec93..3232cfb3 100644 --- a/poetry.lock +++ b/poetry.lock @@ -180,6 +180,54 @@ files = [ [package.extras] tzdata = ["tzdata"] +[[package]] +name = "cassandra-driver" +version = "3.29.2" +description = "DataStax Driver for Apache Cassandra" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "cassandra-driver-3.29.2.tar.gz", hash = "sha256:c4310a7d0457f51a63fb019d8ef501588c491141362b53097fbc62fa06559b7c"}, + {file = "cassandra_driver-3.29.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:957208093ff2353230d0d83edf8c8e8582e4f2999d9a33292be6558fec943562"}, + {file = "cassandra_driver-3.29.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d70353b6d9d6e01e2b261efccfe90ce0aa6f416588e6e626ca2ed0aff6b540cf"}, + {file = "cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:06ad489e4df2cc7f41d3aca8bd8ddeb8071c4fb98240ed07f1dcd9b5180fd879"}, + {file = "cassandra_driver-3.29.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e7f1dfa33c3d93350057d6dc163bb92748b6e6a164c408c75cf2c59be0a203b7"}, + {file = "cassandra_driver-3.29.2-cp310-cp310-win32.whl", hash = "sha256:f9df1e6ae4201eb2eae899cb0649d46b3eb0843f075199b51360bc9d59679a31"}, + {file = "cassandra_driver-3.29.2-cp310-cp310-win_amd64.whl", hash = "sha256:c4a005bc0b4fd8b5716ad931e1cc788dbd45967b0bcbdc3dfde33c7f9fde40d4"}, + {file = "cassandra_driver-3.29.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:e31cee01a6fc8cf7f32e443fa0031bdc75eed46126831b7a807ab167b4dc1316"}, + {file = "cassandra_driver-3.29.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:52edc6d4bd7d07b10dc08b7f044dbc2ebe24ad7009c23a65e0916faed1a34065"}, + {file = "cassandra_driver-3.29.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:eb3a9f24fc84324d426a69dc35df66de550833072a4d9a4d63d72fda8fcaecb9"}, + {file = "cassandra_driver-3.29.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1e89de04809d02bb1d5d03c0946a7baaaf85e93d7e6414885b4ea2616efe9de0"}, + {file = "cassandra_driver-3.29.2-cp311-cp311-win32.whl", hash = "sha256:7104e5043e9cc98136d7fafe2418cbc448dacb4e1866fe38ff5be76f227437ef"}, + {file = "cassandra_driver-3.29.2-cp311-cp311-win_amd64.whl", hash = "sha256:69aa53f1bdb23487765faa92eef57366637878eafc412f46af999e722353b22f"}, + {file = "cassandra_driver-3.29.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:a1e994a82b2e6ab022c5aec24e03ad49fca5f3d47e566a145de34eb0e768473a"}, + {file = "cassandra_driver-3.29.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:2039201ae5d9b7c7ce0930af7138d2637ca16a4c7aaae2fbdd4355fbaf3003c5"}, + {file = "cassandra_driver-3.29.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8067fad22e76e250c3846507d804f90b53e943bba442fa1b26583bcac692aaf1"}, + {file = "cassandra_driver-3.29.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ee0ebe8eb4fb007d8001ffcd1c3828b74defeb01075d8a1f1116ae9c60f75541"}, + {file = "cassandra_driver-3.29.2-cp312-cp312-win32.whl", hash = "sha256:83dc9399cdabe482fd3095ca54ec227212d8c491b563a7276f6c100e30ee856c"}, + {file = "cassandra_driver-3.29.2-cp312-cp312-win_amd64.whl", hash = "sha256:6c74610f56a4c53863a5d44a2af9c6c3405da19d51966fabd85d7f927d5c6abc"}, + {file = "cassandra_driver-3.29.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c86b0a796ff67d66de7df5f85243832a4dc853217f6a3eade84694f6f4fae151"}, + {file = "cassandra_driver-3.29.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:c53700b0d1f8c1d777eaa9e9fb6d17839d9a83f27a61649e0cbaa15d9d3df34b"}, + {file = "cassandra_driver-3.29.2-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7d348c769aa6c37919e7d6247e8cf09c23d387b7834a340408bd7d611f174d80"}, + {file = "cassandra_driver-3.29.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a8c496318e3c136cf12ab21e1598fee4b48ea1c71746ea8cc9d32e4dcd09cb93"}, + {file = "cassandra_driver-3.29.2-cp38-cp38-win32.whl", hash = "sha256:d180183451bec81c15e0441fa37a63dc52c6489e860e832cadd854373b423141"}, + {file = "cassandra_driver-3.29.2-cp38-cp38-win_amd64.whl", hash = "sha256:a66b20c421d8fb21f18bd0ac713de6f09c5c25b6ab3d6043c3779b9c012d7c98"}, + {file = "cassandra_driver-3.29.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:70d4d0dce373943308ad461a425fc70a23d0f524859367b8c6fc292400f39954"}, + {file = "cassandra_driver-3.29.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:b86427fab4d5a96e91ad82bb9338d4101ae4d3758ba96c356e0198da3de4d350"}, + {file = "cassandra_driver-3.29.2-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c25b42e1a99f377a933d79ae93ea27601e337a5abb7bb843a0e951cf1b3836f7"}, + {file = "cassandra_driver-3.29.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9e36437288d6cd6f6c74b8ee5997692126e24adc2da3d031dc11c7dfea8bc220"}, + {file = "cassandra_driver-3.29.2-cp39-cp39-win32.whl", hash = "sha256:e967c1341a651f03bdc466f3835d72d3c0a0648b562035e6d780fa0b796c02f6"}, + {file = "cassandra_driver-3.29.2-cp39-cp39-win_amd64.whl", hash = "sha256:c5a9aab2367e8aad48ae853847a5a8985749ac5f102676de2c119b33fef13b42"}, +] + +[package.dependencies] +geomet = ">=0.1,<0.3" + +[package.extras] +cle = ["cryptography (>=35.0)"] +graph = ["gremlinpython (==3.4.6)"] + [[package]] name = "certifi" version = "2025.4.26" diff --git a/pyproject.toml b/pyproject.toml index ee84d34a..80718300 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ opensearch-py = "^2.3.2" tqdm = "^4.66.1" psycopg = {extras = ["binary"], version = "^3.1.17"} pgvector = "^0.2.4" +cassandra-driver = ">=3.29.2" [tool.poetry.group.dev.dependencies] pre-commit = "^2.20.0"