1
1
import argparse
2
2
import asyncio
3
3
import hashlib
4
- import importlib
5
4
import json
6
5
import shutil
7
6
import subprocess
8
7
import sys
8
+ import os
9
9
10
10
import traceback
11
- import timeit
12
11
import uuid
13
12
14
13
import logging
15
14
16
- import math
17
15
from functools import partial
18
16
19
17
from aiohttp import web
20
18
from os import path
21
19
22
- from lambda_local .main import load_lib
23
- from lambda_local .context import Context
24
20
25
- SOURCES_DIR = "/src "
21
+ SOURCES_DIR = "/var/task "
26
22
SOURCES_REQUIREMENTS_NAME = path .join (SOURCES_DIR , "requirements.txt" )
27
23
PACKAGES_DIR = "/packages"
28
24
PACKAGES_REQUIREMENTS_PATH = path .join (PACKAGES_DIR , "requirements.txt" )
25
+ LAMBDA_USER_ID = 496
29
26
30
27
logger = logging .getLogger ('lambda' )
31
28
32
29
30
+ def demote (user_uid , user_gid ):
31
+ os .setgid (user_gid )
32
+ os .setuid (user_uid )
33
+
34
+
33
35
def jsonify (data , status_code = 200 ):
34
36
return web .Response (
35
37
text = json .dumps (data ),
36
38
headers = {'Content-Type' : 'application/json' },
37
39
status = status_code )
38
40
39
41
40
- import os
41
- import types
42
- import importlib
43
-
44
-
45
- def reload_package (package ):
46
- assert (hasattr (package , "__package__" ))
47
- fn = package .__file__
48
- fn_dir = os .path .dirname (fn ) + os .sep
49
- module_visit = {fn }
50
- del fn
51
-
52
- def reload_recursive_ex (module ):
53
- importlib .reload (module )
54
-
55
- for module_child in vars (module ).values ():
56
- if isinstance (module_child , types .ModuleType ):
57
- fn_child = getattr (module_child , "__file__" , None )
58
- if (fn_child is not None ) and fn_child .startswith (fn_dir ):
59
- if fn_child not in module_visit :
60
- # print("reloading:", fn_child, "from", module)
61
- module_visit .add (fn_child )
62
- reload_recursive_ex (module_child )
63
-
64
- return reload_recursive_ex (package )
65
-
66
-
67
- def load (directory , module , handler_path ):
68
- file_path = path .join (directory , module )
69
- file_directory = path .dirname (file_path )
70
- sys .path .append (file_directory )
71
- function_file , function_name = path .splitext (handler_path )
72
- mod = importlib .import_module (function_file )
73
- reload_package (mod )
74
- func = getattr (mod , function_name .strip ('.' ))
75
- return func
76
-
77
-
78
42
async def parse_request (args , request ):
79
43
data = await request .json ()
80
44
arn_string = data .get ('arn' , '' )
@@ -86,53 +50,30 @@ async def parse_request(args, request):
86
50
if isinstance (event , str ):
87
51
event = json .loads (event )
88
52
89
- context = Context (args .timeout , arn_string , version )
90
- func = load (args .directory , module , handler )
91
- return func , event , context
53
+ return handler , event
92
54
93
55
94
- async def execute (func , event , context ):
95
- loop = asyncio .get_event_loop ()
96
- try :
97
- future = loop .run_in_executor (None , func , event , context )
98
- result = await asyncio .wait_for (future , context .timeout )
99
- except asyncio .TimeoutError as e :
100
- result = e
101
- except Exception :
102
- err = sys .exc_info ()
103
- result = json .dumps ({
104
- "errorMessage" : str (err [1 ]),
105
- "stackTrace" : traceback .format_tb (err [2 ]),
106
- "errorType" : err [0 ].__name__
107
- }, indent = 4 , separators = (',' , ': ' ))
108
- return result
109
-
110
-
111
- async def async_execute (request_id , func , event , context ):
112
- logger .info ("Event: {}" .format (event ))
113
- logger .info ("START RequestId: {}" .format (request_id ))
114
-
115
- start_time = timeit .default_timer ()
116
- result = await execute (func , event , context )
117
- end_time = timeit .default_timer ()
118
-
119
- logger .info ("END RequestId: {}" .format (request_id ))
120
-
121
- output_func = logger .error if type (result ) is Exception else logger .info
122
- output_func (f"RESULT:\n { result } " )
123
- duration = (end_time - start_time ) * 1000
124
- billed_duration = math .ceil (duration / 100 ) * 100
125
- logger .info (
126
- f"REPORT RequestId: { request_id } \t "
127
- f"Duration: { duration :.2f} ms\t "
128
- f"Billed Duration: { billed_duration :.2f} ms" )
129
- return result
56
+ async def async_execute (handler , event ):
57
+ process = await asyncio .create_subprocess_exec (
58
+ "python" , "bootstrap.py" , handler , json .dumps (event ),
59
+ cwd = '/var/runtime/awslambda/' ,
60
+ env = {** os .environ , 'PYTHONPATH' : PACKAGES_DIR },
61
+ preexec_fn = partial (demote , LAMBDA_USER_ID , LAMBDA_USER_ID ),
62
+ stdin = asyncio .subprocess .PIPE ,
63
+ stdout = asyncio .subprocess .PIPE ,
64
+ stderr = asyncio .subprocess .PIPE ,
65
+ )
66
+ stdout , stderr = await process .communicate ()
67
+ if stdout :
68
+ stdout = json .loads (stdout .decode ('utf-8' ))
69
+ if stderr :
70
+ stderr = stderr .decode ('utf-8' )
71
+ return {'stdout' : stdout , 'stderr' : stderr }
130
72
131
73
132
74
async def run_lambda (args , request ):
133
- func , event , context = await parse_request (args , request )
134
- request_id = uuid .uuid4 ()
135
- result = await async_execute (request_id , func , event , context )
75
+ handler , event = await parse_request (args , request )
76
+ result = await async_execute (handler , event )
136
77
return jsonify (data = result )
137
78
138
79
@@ -162,7 +103,6 @@ def install_requirements(args, force=False):
162
103
shutil .copy (requirements_path , previous_requirements_path )
163
104
else :
164
105
logger .info ("Requirements not changed, skipping update..." )
165
- load_lib (PACKAGES_DIR )
166
106
167
107
168
108
def init_logging (args ):
0 commit comments