@@ -72,15 +72,16 @@ def check_opensearch_domain(domain_name: str, kb_name: str) -> str | None: # ty
72
72
encryption_config = domain .get ("DomainStatus" , {}).get ("EncryptionAtRestOptions" , {})
73
73
if not encryption_config .get ("Enabled" , False ):
74
74
return f"{ kb_name } (OpenSearch domain encryption not enabled)"
75
- if not encryption_config .get ("KmsKeyId" ):
75
+ kms_key_id = encryption_config .get ("KmsKeyId" , "" )
76
+ if not kms_key_id or "aws/opensearch" in kms_key_id :
76
77
return f"{ kb_name } (OpenSearch domain not using CMK)"
77
78
except ClientError as e :
78
79
LOGGER .error (f"Error checking OpenSearch domain: { str (e )} " )
79
80
return f"{ kb_name } (error checking OpenSearch domain)"
80
81
return None
81
82
82
83
83
- def check_knowledge_base (kb_id : str , kb_name : str ) -> str | None : # type: ignore # noqa: CFQ004
84
+ def check_knowledge_base (kb_id : str , kb_name : str ) -> tuple [ bool , str | None ] : # type: ignore # noqa: CFQ004
84
85
"""Check a knowledge base's OpenSearch configuration.
85
86
86
87
Args:
@@ -91,40 +92,56 @@ def check_knowledge_base(kb_id: str, kb_name: str) -> str | None: # type: ignor
91
92
ClientError: If there is an error checking the knowledge base
92
93
93
94
Returns:
94
- str | None: Error message if non-compliant, None if compliant
95
+ tuple[bool, str | None]: (has_opensearch, error_message)
95
96
"""
96
97
try :
97
98
kb_details = bedrock_agent_client .get_knowledge_base (knowledgeBaseId = kb_id )
98
- vector_store = kb_details .get ("vectorStoreConfiguration" )
99
+ # Convert datetime objects to strings before JSON serialization
100
+ kb_details_serializable = json .loads (json .dumps (kb_details , default = str ))
101
+ LOGGER .info (f"Knowledge base details for { kb_name } : { json .dumps (kb_details_serializable )} " )
102
+
103
+ # Access the knowledgeBase key from the response
104
+ kb_data = kb_details .get ("knowledgeBase" , {})
105
+
106
+ # Check both possible locations for vector store config
107
+ vector_store = kb_data .get ("vectorStoreConfiguration" ) or kb_data .get ("storageConfiguration" , {})
108
+ LOGGER .info (f"Vector store config for { kb_name } : { json .dumps (vector_store )} " )
99
109
100
110
if not vector_store or not isinstance (vector_store , dict ):
101
- return None
111
+ LOGGER .info (f"No vector store configuration found for { kb_name } " )
112
+ return False , None
102
113
103
- if vector_store .get ("vectorStoreType" ) != "OPENSEARCH" :
104
- return None
114
+ vector_store_type = vector_store .get ("vectorStoreType" ) or vector_store .get ("type" )
115
+ LOGGER .info (f"Vector store type for { kb_name } : { vector_store_type } " )
116
+ if not vector_store_type or (vector_store_type .upper () != "OPENSEARCH" and vector_store_type .upper () != "OPENSEARCH_SERVERLESS" ):
117
+ LOGGER .info (f"Vector store type is not OpenSearch for { kb_name } " )
118
+ return False , None
105
119
106
120
opensearch_config = vector_store .get ("opensearchServerlessConfiguration" ) or vector_store .get ("opensearchConfiguration" )
121
+ LOGGER .info (f"OpenSearch config for { kb_name } : { json .dumps (opensearch_config )} " )
107
122
if not opensearch_config :
108
- return f"{ kb_name } (missing OpenSearch configuration)"
123
+ return True , f"{ kb_name } (missing OpenSearch configuration)"
109
124
110
125
if "collectionArn" in opensearch_config :
111
126
collection_id = opensearch_config ["collectionArn" ].split ("/" )[- 1 ]
112
- return check_opensearch_serverless (collection_id , kb_name )
127
+ LOGGER .info (f"Found OpenSearch Serverless collection { collection_id } for { kb_name } " )
128
+ return True , check_opensearch_serverless (collection_id , kb_name )
113
129
114
130
domain_endpoint = opensearch_config .get ("endpoint" , "" )
115
131
if not domain_endpoint :
116
- return f"{ kb_name } (missing OpenSearch domain endpoint)"
132
+ return True , f"{ kb_name } (missing OpenSearch domain endpoint)"
117
133
domain_name = domain_endpoint .split ("." )[0 ]
118
- return check_opensearch_domain (domain_name , kb_name )
134
+ LOGGER .info (f"Found OpenSearch domain { domain_name } for { kb_name } " )
135
+ return True , check_opensearch_domain (domain_name , kb_name )
119
136
120
137
except ClientError as e :
121
138
LOGGER .error (f"Error checking knowledge base { kb_id } : { str (e )} " )
122
139
if e .response ["Error" ]["Code" ] == "AccessDeniedException" :
123
- return f"{ kb_name } (access denied)"
140
+ return True , f"{ kb_name } (access denied)"
124
141
raise
125
142
126
143
127
- def evaluate_compliance (rule_parameters : dict ) -> tuple [str , str ]: # noqa: U100
144
+ def evaluate_compliance (rule_parameters : dict ) -> tuple [str , str ]: # noqa: U100, CFQ004
128
145
"""Evaluate if Bedrock Knowledge Base OpenSearch vector stores are encrypted with KMS CMK.
129
146
130
147
Args:
@@ -135,16 +152,20 @@ def evaluate_compliance(rule_parameters: dict) -> tuple[str, str]: # noqa: U100
135
152
"""
136
153
try :
137
154
non_compliant_kbs = []
155
+ has_opensearch = False
138
156
paginator = bedrock_agent_client .get_paginator ("list_knowledge_bases" )
139
157
140
158
for page in paginator .paginate ():
141
159
for kb in page ["knowledgeBaseSummaries" ]:
142
160
kb_id = kb ["knowledgeBaseId" ]
143
161
kb_name = kb .get ("name" , kb_id )
144
- error = check_knowledge_base (kb_id , kb_name )
162
+ is_opensearch , error = check_knowledge_base (kb_id , kb_name )
163
+ has_opensearch = has_opensearch or is_opensearch
145
164
if error :
146
165
non_compliant_kbs .append (error )
147
166
167
+ if not has_opensearch :
168
+ return "COMPLIANT" , "No OpenSearch vector stores found in knowledge bases"
148
169
if non_compliant_kbs :
149
170
return "NON_COMPLIANT" , (
150
171
"The following knowledge bases have OpenSearch vector stores not encrypted with CMK: " + f"{ '; ' .join (non_compliant_kbs )} "
@@ -153,7 +174,7 @@ def evaluate_compliance(rule_parameters: dict) -> tuple[str, str]: # noqa: U100
153
174
154
175
except Exception as e :
155
176
LOGGER .error (f"Error evaluating Bedrock Knowledge Base OpenSearch encryption: { str (e )} " )
156
- return "ERROR " , f"Error evaluating compliance: { str (e )} "
177
+ return "INSUFFICIENT_DATA " , f"Error evaluating compliance: { str (e )} "
157
178
158
179
159
180
def lambda_handler (event : dict , context : Any ) -> None : # noqa: U100
0 commit comments