Use Postman to make an API call; the Amazon API Gateway will trigger a Lambda function; the function will write into the s3 bucket; then, Snowpipe will start to write the data into a Snowflake. To not activate Snowpipe for every API call and optimize the process, we will use Amazon Kinesis data Stream to collect the data first (dump the data based on the buffer size or the time or if there are few messages), then use Kinesis Firehouse to leave the data into s3.

Step 2: Create the lambda function to read the data from API Gateway and put it in Kinesis Data Stream


import json
import datetime
import random
import boto3
client = boto3.client('kinesis')
def lambda_handler(event, context):
TODO implement
data = json.dumps(event['body'])
client.put_record(StreamName="project3_kinesis_apigateway", Data=data, PartitionKey="1")
print("Data Inserted")
- Create a simple HTTP API and integrate it with the lambda function.

- Configure the route: POST method.




Get the API endpoint " https://fwpwl5qova.execute-api.us-east-1.amazonaws.com/dev/project3_lambda1_kinesis_apigateway"


This second lambda function will transform the data( decode the data to put a delimiter in between each record) to make it easy to put on Snowflake. We will convert the binary data into string data. Then, add a \n to make a new line.
import json
import boto3
import base64
output = []
def lambda_handler(event, context):
print(event)
for record in event['records']:
payload = base64.b64decode(record['data']).decode('utf-8')
print('payload:', payload)
row_w_newline = payload + "\n"
print('row_w_newline type:', type(row_w_newline))
row_w_newline = base64.b64encode(row_w_newline.encode('utf-8'))
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': row_w_newline
}
output.append(output_record)
print('Processed {} records.'.format(len(event['records'])))
return {'records': output}
- Create kinesis firehouse


- Activate Transform source records with AWS Lambda.


Storage Integration Creation: Copy the snowflake role arm into the snowflake console and copy the s3 bucket arm role.
create warehouse s3_to_snowflake_wh;
use s3_to_snowflake_wh;
--Specify the role
use role ACCOUNTADMIN;
drop database if exists s3_to_snowflake;
--Database Creation
create database if not exists s3_to_snowflake;
--Specify the active/current database for the session.
use s3_to_snowflake;
--Storage Integration Creation
create or replace storage integration s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::200105849428:role/project3_snwoflake_role'
STORAGE_ALLOWED_LOCATIONS = ('s3://gakas-project3-kinesis-apigateway')
COMMENT = 'Testing Snowflake getting refresh or not';
--Describe the Integration Object
DESC INTEGRATION s3_int;
--External Stage Creation
create stage mystage
url = 's3://gakas-project3-kinesis-apigateway'
storage_integration = s3_int;
list @mystage;
--File Format Creation
create or replace file format my_json_format
type = json;
--Table Creation
create or replace external table s3_to_snowflake.PUBLIC.Person with location = @mystage file_format ='my_json_format';
show external tables;
select * from s3_to_snowflake.public.person;
--Query the table
select parse_json(VALUE):Age as Age , trim(parse_json(VALUE):Name,'"') as Name from s3_to_snowflake.PUBLIC.Person;
Copy the 'STORAGE_AWS_IAM_USER_ARN' ARN and 'STORAGE_AWS_EXTERNAL_ID' from Snowflake and update the Trust Policy in the Snowflake role in IAM.

Create an event notification for the s3 bucket.



-
Open Postman and pass a series of records {"Name": "wang", "Age":4} {"Name": "ali", "Age":32} {"Name": "li", "Age":54} {"Name": "Moctar", "Age":44} {"Name": "she", "Age":86} {"Name": "Abdoul", "Age":22} {"Name": "lie", "Age":34} {"Name": "Cheng", "Age":55} {"Name": "Karim", "Age":23} {"Name": "ram", "Age":34} {"Name": "li", "Age":23} {"Name": "she", "Age":36}

- Check S3

- Check the data into snowflake
