Skip to content

Commit b4902ac

Browse files
committed
more flexible results extraction
1 parent d970c09 commit b4902ac

12 files changed

+90
-67
lines changed

README.md

+13-13
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,39 @@ AioPool makes sure _no more_ and _no less_ (if possible) than `size` spawned cor
88

99
Read [code doctrings](../master/asyncio_pool/base_pool.py) for details.
1010

11-
> `AioPool(size=4, *, loop=None)`
11+
#### AioPool(size=4, *, loop=None)
1212

1313
Creates pool of `size` concurrent tasks. Supports async context manager interface.
1414

15-
> `spawn(coro, cb=None, ctx=None)`
15+
#### spawn(coro, cb=None, ctx=None)
1616

1717
Waits for pool space, then creates task for `coro` coroutine, returning future for it's result. Can spawn coroutine, created by `cb` with result of `coro` as first argument. `ctx` context is passed to callback as third positinal argument.
1818

19-
> `exec(coro, cb=None, ctx=None)`
19+
#### exec(coro, cb=None, ctx=None)
2020

2121
Waits for pool space, then creates task for `coro`, then waits for it to finish, then returns result of `coro` if no callback is provided, otherwise creates task for callback, waits for it and returns result of callback.
2222

23-
> `spawn_n(coro, cb=None, ctx=None)`
23+
#### spawn_n(coro, cb=None, ctx=None)
2424

2525
Creates waiting task for `coro`, returns future without waiting for pool space. Task is executed "in pool" when pool space is available.
2626

27-
> `join()`
27+
#### join()
2828

2929
Waits for all spawned (active and waiting) tasks to finish. Joining pool from coroutine, spawned by the same pool leads to *deadlock*.
3030

31-
> `cancel(*futures)`
31+
#### cancel(*futures)
3232

3333
Cancels spawned tasks (active and waiting), finding them by provided `futures`. If no futures provided -- cancels all spawned tasks.
3434

35-
> `map(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)`
35+
#### map(fn, iterable, cb=None, ctx=None, *, get_result=getres.flat)
3636

3737
Spawns coroutines created by `fn` function for each item in `iterable` with `spawn`, waits for all of them to finish (including callbacks), returns results maintaining order of `iterable`.
3838

39-
> `map_n(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)`
39+
#### map_n(fn, iterable, cb=None, ctx=None, *, get_result=getres.flat)
4040

4141
Spawns coroutines created by `fn` function for each item in `iterable` with `spawn_n`, returns futures for task results maintaining order of `iterable`.
4242

43-
> `itermap(fn, iterable, cb=None, ctx=None, *, flat=True, exc_as_result=True, timeout=None, yield_when=asyncio.ALL_COMPLETED)`
43+
#### itermap(fn, iterable, cb=None, ctx=None, *, flat=True, get_result=getres.flat, timeout=None, yield_when=asyncio.ALL_COMPLETED)
4444

4545
Spawns tasks with `map_n(fn, iterable, cb, ctx)`, then waits for results with `asyncio.wait` function, yielding ready results one by one if `flat` == True, otherwise yielding list of ready results.
4646

@@ -114,7 +114,6 @@ async def map_usage(todo=range(100)):
114114

115115

116116
async def itermap_usage(todo=range(1,11)):
117-
# Python 3.6+
118117
result = 0
119118
async with AioPool(size=10) as pool:
120119
# Combines spawn_n and iterwait, which is a wrapper for asyncio.wait,
@@ -153,9 +152,10 @@ async def callbacks_usage():
153152

154153
results = []
155154
for fut in futures:
156-
# there's a helper with this code:
157-
# from asyncio_pool import result_noraise
158-
# results.append(result_noraise(fut))
155+
# there are helpers for result extraction. `flat` one will do
156+
# exactly what's written below
157+
# from asyncio_pool import getres
158+
# results.append(getres.flat(fut))
159159
try:
160160
results.append(fut.result())
161161
except Exception as e:

asyncio_pool/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import sys
44

5-
from .utils import result_noraise
5+
from .results import getres
66
from .base_pool import BaseAioPool
77

88

asyncio_pool/base_pool.py

+15-8
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import traceback
55
import asyncio as aio
6-
from .utils import result_noraise
6+
from .results import getres
77

88

99
class BaseAioPool(object):
@@ -212,13 +212,20 @@ async def map_n(self, fn, iterable, cb=None, ctx=None):
212212
futures.append(fut)
213213
return futures
214214

215-
async def map(self, fn, iterable, cb=None, ctx=None, *, exc_as_result=True):
215+
async def map(self, fn, iterable, cb=None, ctx=None, *,
216+
get_result=getres.flat):
216217
'''Spawns coroutines, created with `fn` function for each item in
217218
`iterable`, waits for all of them to finish, crash or be cancelled,
218219
returning resuls.
219220
220-
If coroutine or callback crashes or is cancelled, with `exc_as_result`
221-
== True exceptions object will be returned, with == False -- just None.
221+
`get_result` is function, that accepts future as only positional
222+
argument, whose goal is to extract result from future. You can pass
223+
your own, or use inluded `getres` object, that has 3 extractors:
224+
`getres.dont` will return future untouched, `getres.flat` will return
225+
exception object if coroutine crashed or was cancelled, otherwise will
226+
return result of a coroutine (or of the callback), `getres.pair` will
227+
return tuple of (`result', 'exception` object) with None in place of
228+
missing item.
222229
223230
Read more about callbacks in `spawn` docstring.
224231
'''
@@ -228,10 +235,10 @@ async def map(self, fn, iterable, cb=None, ctx=None, *, exc_as_result=True):
228235
futures.append(fut)
229236

230237
await aio.wait(futures)
231-
return [result_noraise(fut, exc_as_result) for fut in futures]
238+
return [get_result(fut) for fut in futures]
232239

233240
async def itermap(self, fn, iterable, cb=None, ctx=None, *, flat=True,
234-
exc_as_result=True, timeout=None, yield_when=aio.ALL_COMPLETED):
241+
get_result=getres.flat, timeout=None, yield_when=aio.ALL_COMPLETED):
235242
'''Spawns coroutines created with `fn` for each item in `iterable`, then
236243
waits for results with `iterwait` (implementation specific). See docs
237244
for `map_n` and `iterwait` (in mixins for py3.5 and py3.6+).
@@ -256,7 +263,7 @@ def _cancel(self, *futures):
256263
cancelled = sum([1 for fut in tasks if fut.cancel()])
257264
return cancelled, _futures
258265

259-
async def cancel(self, *futures, exc_as_result=True):
266+
async def cancel(self, *futures, get_result=getres.flat):
260267
'''Cancels spawned or waiting tasks, found by their `futures`. If no
261268
`futures` are passed -- cancels all spwaned and waiting tasks.
262269
@@ -269,5 +276,5 @@ async def cancel(self, *futures, exc_as_result=True):
269276
cancelled, _futures = self._cancel(*futures)
270277
await aio.sleep(0) # let them actually cancel
271278
# need to collect them anyway, to supress warnings
272-
results = [result_noraise(fut, exc_as_result) for fut in _futures]
279+
results = [get_result(fut) for fut in _futures]
273280
return cancelled, results

asyncio_pool/mx_asyncgen.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
'''Mixin for BaseAioPool with async generator features, python3.6+'''
33

44
import asyncio as aio
5-
from .utils import result_noraise
5+
from .results import getres
66

77

8-
async def iterwait(futures, *, flat=True, exc_as_result=True,
8+
async def iterwait(futures, *, flat=True, get_result=getres.flat,
99
timeout=None, yield_when=aio.ALL_COMPLETED, loop=None):
1010
'''Wraps `asyncio.wait` into asynchronous generator, accessible with
1111
`async for` syntax. May be useful in conjunction with `spawn_n`.
@@ -23,22 +23,22 @@ async def iterwait(futures, *, flat=True, exc_as_result=True,
2323
return_when=yield_when, loop=loop)
2424
if flat:
2525
for fut in done:
26-
yield result_noraise(fut, exc_as_result)
26+
yield get_result(fut)
2727
else:
28-
yield [result_noraise(f, exc_as_result) for f in done]
28+
yield [get_result(fut) for fut in done]
2929

3030

3131
class MxAsyncGenPool(object):
3232
# Asynchronous generator wrapper for asyncio.wait.
3333

3434
async def itermap(self, fn, iterable, cb=None, ctx=None, *, flat=True,
35-
exc_as_result=True, timeout=None, yield_when=aio.ALL_COMPLETED):
35+
get_result=getres.flat, timeout=None,
36+
yield_when=aio.ALL_COMPLETED):
3637
'''Spawns coroutines created with `fn` for each item in `iterable`, then
3738
waits for results with `iterwait`. See docs for `map_n` and `iterwait`.
3839
'''
3940
futures = await self.map_n(fn, iterable, cb, ctx)
4041
generator = iterwait(futures, flat=flat, timeout=timeout,
41-
exc_as_result=exc_as_result, yield_when=yield_when,
42-
loop=self.loop)
42+
get_result=get_result, yield_when=yield_when, loop=self.loop)
4343
async for batch in generator:
4444
yield batch # TODO is it possible to return a generator?

asyncio_pool/mx_asynciter.py

+7-6
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,18 @@
44
import asyncio as aio
55
from collections import deque
66
from functools import partial
7-
from .utils import result_noraise
7+
from .results import getres
88

99

1010
class iterwait:
1111

12-
def __init__(self, futures, *, flat=True, exc_as_result=True,
12+
def __init__(self, futures, *, flat=True, get_result=getres.flat,
1313
timeout=None, yield_when=aio.ALL_COMPLETED, loop=None):
1414

1515
self.results = deque()
1616
self.flat = flat
1717
self._futures = futures
18-
self._extract = partial(result_noraise, exc_as_result=exc_as_result)
18+
self._getres = get_result
1919
self._wait = partial(aio.wait, timeout=timeout, loop=loop,
2020
return_when=yield_when)
2121

@@ -33,7 +33,7 @@ async def _wait_next(self):
3333
while True:
3434
done, self._futures = await self._wait(self._futures)
3535
if done:
36-
batch = [self._extract(fut) for fut in done]
36+
batch = [self._getres(fut) for fut in done]
3737
if self.flat:
3838
self.results.extend(batch)
3939
else:
@@ -44,13 +44,14 @@ async def _wait_next(self):
4444
class MxAsyncIterPool(object):
4545

4646
def itermap(self, fn, iterable, cb=None, ctx=None, *, flat=True,
47-
exc_as_result=True, timeout=None, yield_when=aio.ALL_COMPLETED):
47+
get_result=getres.flat, timeout=None,
48+
yield_when=aio.ALL_COMPLETED):
4849
'''Spawns coroutines created with `fn` for each item in `iterable`, then
4950
waits for results with `iterwait`. See docs for `map_n` and `iterwait`.
5051
'''
5152
mk_map = partial(self.map_n, fn, iterable, cb=cb, ctx=ctx)
5253
mk_waiter = partial(iterwait, flat=flat, loop=self.loop,
53-
exc_as_result=exc_as_result, timeout=timeout,
54+
get_result=get_result, timeout=timeout,
5455
yield_when=yield_when)
5556

5657
class _itermap:

asyncio_pool/results.py

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# coding: utf8
2+
3+
from functools import partial
4+
5+
6+
def result_noraise(future, flat=True):
7+
'''Extracts result from future, never raising an exception.
8+
9+
If `flat` is True -- returns result or exception instance (including
10+
CancelledError), if `flat` is False -- returns tuple of (`result`,
11+
`exception` object).
12+
13+
If traceback is needed -- just re-raise returned exception.'''
14+
try:
15+
res = future.result()
16+
return res if flat else (res, None)
17+
except Exception as exc:
18+
return exc if flat else (None, exc)
19+
20+
21+
class getres:
22+
dont = lambda fut: fut
23+
flat = partial(result_noraise, flat=True)
24+
pair = partial(result_noraise, flat=False)

asyncio_pool/utils.py

-10
This file was deleted.

docs/_readme_template.md

+9-9
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,39 @@ AioPool makes sure _no more_ and _no less_ (if possible) than `size` spawned cor
88

99
Read [code doctrings](../master/asyncio_pool/base_pool.py) for details.
1010

11-
> `AioPool(size=4, *, loop=None)`
11+
#### AioPool(size=4, *, loop=None)
1212

1313
Creates pool of `size` concurrent tasks. Supports async context manager interface.
1414

15-
> `spawn(coro, cb=None, ctx=None)`
15+
#### spawn(coro, cb=None, ctx=None)
1616

1717
Waits for pool space, then creates task for `coro` coroutine, returning future for it's result. Can spawn coroutine, created by `cb` with result of `coro` as first argument. `ctx` context is passed to callback as third positinal argument.
1818

19-
> `exec(coro, cb=None, ctx=None)`
19+
#### exec(coro, cb=None, ctx=None)
2020

2121
Waits for pool space, then creates task for `coro`, then waits for it to finish, then returns result of `coro` if no callback is provided, otherwise creates task for callback, waits for it and returns result of callback.
2222

23-
> `spawn_n(coro, cb=None, ctx=None)`
23+
#### spawn_n(coro, cb=None, ctx=None)
2424

2525
Creates waiting task for `coro`, returns future without waiting for pool space. Task is executed "in pool" when pool space is available.
2626

27-
> `join()`
27+
#### join()
2828

2929
Waits for all spawned (active and waiting) tasks to finish. Joining pool from coroutine, spawned by the same pool leads to *deadlock*.
3030

31-
> `cancel(*futures)`
31+
#### cancel(*futures)
3232

3333
Cancels spawned tasks (active and waiting), finding them by provided `futures`. If no futures provided -- cancels all spawned tasks.
3434

35-
> `map(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)`
35+
#### map(fn, iterable, cb=None, ctx=None, *, get_result=getres.flat)
3636

3737
Spawns coroutines created by `fn` function for each item in `iterable` with `spawn`, waits for all of them to finish (including callbacks), returns results maintaining order of `iterable`.
3838

39-
> `map_n(fn, iterable, cb=None, ctx=None, *, exc_as_result=True)`
39+
#### map_n(fn, iterable, cb=None, ctx=None, *, get_result=getres.flat)
4040

4141
Spawns coroutines created by `fn` function for each item in `iterable` with `spawn_n`, returns futures for task results maintaining order of `iterable`.
4242

43-
> `itermap(fn, iterable, cb=None, ctx=None, *, flat=True, exc_as_result=True, timeout=None, yield_when=asyncio.ALL_COMPLETED)`
43+
#### itermap(fn, iterable, cb=None, ctx=None, *, flat=True, get_result=getres.flat, timeout=None, yield_when=asyncio.ALL_COMPLETED)
4444

4545
Spawns tasks with `map_n(fn, iterable, cb, ctx)`, then waits for results with `asyncio.wait` function, yielding ready results one by one if `flat` == True, otherwise yielding list of ready results.
4646

examples/_usage.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,10 @@ async def cb(res, err, ctx): # callback
104104

105105
results = []
106106
for fut in futures:
107-
# there's a helper with this code:
108-
# from asyncio_pool import result_noraise
109-
# results.append(result_noraise(fut))
107+
# there are helpers for result extraction. `flat` one will do
108+
# exactly what's written below
109+
# from asyncio_pool import getres
110+
# results.append(getres.flat(fut))
110111
try:
111112
results.append(fut.result())
112113
except Exception as e:

tests/loadtest.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import time
99
import argparse
1010
import asyncio as aio
11-
from asyncio_pool import AioPool, result_noraise
11+
from asyncio_pool import AioPool, getres
1212

1313

1414
async def loadtest_spawn(tasks, pool_size, duration):
@@ -18,7 +18,7 @@ async def loadtest_spawn(tasks, pool_size, duration):
1818
fut = await pool.spawn(aio.sleep(duration))
1919
futures.append(fut)
2020

21-
return [result_noraise(f) for f in futures]
21+
return [getres.flat(fut) for fut in futures]
2222

2323

2424
async def loadtest_spawn_n(tasks, pool_size, duration):
@@ -28,7 +28,7 @@ async def loadtest_spawn_n(tasks, pool_size, duration):
2828
fut = await pool.spawn_n(aio.sleep(duration))
2929
futures.append(fut)
3030

31-
return [result_noraise(f) for f in futures]
31+
return [getres.flat(f) for f in futures]
3232

3333

3434
async def loadtest_map(tasks, pool_size, duration):

tests/test_callbacks.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import sys
44
import pytest
55
import asyncio as aio
6-
from asyncio_pool import AioPool, result_noraise
6+
from asyncio_pool import AioPool, getres
77

88

99
pytestmark = pytest.mark.filterwarnings('ignore:coroutine')
@@ -34,7 +34,7 @@ async def test_spawn_n():
3434
fut = await pool.spawn_n(wrk(i), cb, ctx)
3535
futures.append(fut)
3636

37-
results = [result_noraise(f) for f in futures]
37+
results = [getres.flat(f) for f in futures]
3838
assert all(isinstance(e, ZeroDivisionError) for e in results[:2])
3939
assert sum(results[2:]) == 2 * (sum(todo) - 0 - 1)
4040

@@ -54,7 +54,7 @@ async def test_map_n():
5454
async with AioPool(size=3) as pool:
5555
futures = await pool.map_n(wrk, todo, cb)
5656

57-
results = [result_noraise(f) for f in futures]
57+
results = [getres.flat(f) for f in futures]
5858
assert 2 * sum(todo) == sum(results)
5959

6060

@@ -134,7 +134,7 @@ async def wrk(n):
134134
await aio.sleep(0)
135135
return n
136136

137-
_r = result_noraise
137+
_r = getres.flat
138138
_ctx = (123, 456, [1,2], {3,4})
139139
inst = CbCls()
140140

0 commit comments

Comments
 (0)