Skip to content

Commit 578d1da

Browse files
committed
fixes from flake8 and mypy refactoring; update api calls for failures
1 parent dba9d0e commit 578d1da

File tree

1 file changed

+29
-6
lines changed
  • aws_sra_examples/solutions/genai/bedrock_org/lambda/rules/sra_bedrock_check_kb_ingestion_encryption

1 file changed

+29
-6
lines changed

aws_sra_examples/solutions/genai/bedrock_org/lambda/rules/sra_bedrock_check_kb_ingestion_encryption/app.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
config_client = boto3.client("config", region_name=AWS_REGION)
3030

3131

32-
def check_data_sources(kb_id: str, kb_name: str) -> str | None: # type: ignore # noqa: CFQ004
33-
"""Check if a knowledge base's data sources are encrypted.
32+
def check_data_sources(kb_id: str, kb_name: str) -> str | None: # type: ignore # noqa: CFQ004, CCR001
33+
"""Check if a knowledge base's data sources are encrypted with KMS during ingestion.
3434
3535
Args:
3636
kb_id (str): Knowledge base ID
@@ -44,18 +44,41 @@ def check_data_sources(kb_id: str, kb_name: str) -> str | None: # type: ignore
4444
"""
4545
try:
4646
data_sources = bedrock_agent_client.list_data_sources(knowledgeBaseId=kb_id)
47+
LOGGER.info(f"Data sources: {data_sources}")
4748
if not isinstance(data_sources, dict):
4849
return f"{kb_name} (invalid data sources response)"
50+
4951
unencrypted_sources = []
5052
for source in data_sources.get("dataSourceSummaries", []):
53+
LOGGER.info(f"Source: {source}")
5154
if not isinstance(source, dict):
5255
continue
53-
encryption_config = source.get("serverSideEncryptionConfiguration", {})
54-
if not isinstance(encryption_config, dict) or not encryption_config.get("kmsKeyArn"):
55-
unencrypted_sources.append(source.get("name", source["dataSourceId"]))
56+
57+
# Get the detailed data source configuration
58+
try:
59+
source_details = bedrock_agent_client.get_data_source(
60+
knowledgeBaseId=kb_id,
61+
dataSourceId=source["dataSourceId"]
62+
)
63+
LOGGER.info(f"Source details: {source_details}")
64+
65+
# Check for KMS encryption configuration
66+
data_source = source_details.get("dataSource", {})
67+
encryption_config = data_source.get("serverSideEncryptionConfiguration", {})
68+
LOGGER.info(f"Encryption config: {encryption_config}")
69+
70+
# Check if KMS key is configured for encryption
71+
if not encryption_config.get("kmsKeyArn"):
72+
unencrypted_sources.append(source.get("name", source["dataSourceId"]))
73+
74+
except ClientError as e:
75+
LOGGER.error(f"Error getting data source details for {source.get('name', source['dataSourceId'])}: {str(e)}")
76+
if e.response["Error"]["Code"] == "AccessDeniedException":
77+
unencrypted_sources.append(f"{source.get('name', source['dataSourceId'])} (access denied)")
78+
continue
5679

5780
if unencrypted_sources:
58-
return f"{kb_name} (unencrypted sources: {', '.join(unencrypted_sources)})"
81+
return f"{kb_name} (sources without KMS encryption: {', '.join(unencrypted_sources)})"
5982
return None
6083
except ClientError as e:
6184
LOGGER.error(f"Error checking data sources for knowledge base {kb_name}: {str(e)}")

0 commit comments

Comments
 (0)