Skip to content

Commit f7c3163

Browse files
authored
Define pmapreduce as a parallel to mapreduce, and pmapbatch as a parallel to pmap (#9)
* pmapreduce parallels mapreduce * Add pmapbatch * fix docstrings * Add tests for toptree
1 parent c0911f8 commit f7c3163

21 files changed

+2779
-4740
lines changed

Project.toml

+1-7
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,14 @@
11
name = "ParallelUtilities"
22
uuid = "fad6cfc8-4f83-11e9-06cc-151124046ad0"
33
authors = ["Jishnu Bhattacharya"]
4-
version = "0.7.7"
4+
version = "0.8.0"
55

66
[deps]
77
DataStructures = "864edb3b-99cc-5e75-8d2d-829cb0a9cfe8"
88
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
9-
OffsetArrays = "6fe1bfb0-de20-5000-8ca7-80f57d26f881"
10-
ProgressMeter = "92933f4c-e287-5a05-a399-4b506db050ca"
11-
Reexport = "189a3867-3050-52da-a836-e630ba90ab69"
129

1310
[compat]
1411
DataStructures = "0.17, 0.18"
15-
OffsetArrays = "1"
16-
ProgressMeter = "1.2"
17-
Reexport = "0.2, 1.0"
1812
julia = "1.2"
1913

2014
[extras]

README.md

+16-334
Original file line numberDiff line numberDiff line change
@@ -7,354 +7,36 @@
77

88
Parallel mapreduce and other helpful functions for HPC, meant primarily for embarassingly parallel operations that often require one to split up a list of tasks into subsections that may be processed on individual cores.
99

10+
Note: This package deals with distributed (multi-core) parallelism, and at this moment it has not been tested alongside multi-threading.
11+
1012
# Installation
1113

1214
Install the package using
1315

1416
```julia
1517
pkg> add ParallelUtilities
16-
julia> using ParallelUtilities
17-
```
18-
# Quick start
19-
20-
```julia
21-
julia> addprocs(2)
22-
2-element Array{Int64,1}:
23-
2
24-
3
25-
26-
julia> @everywhere using ParallelUtilities
27-
28-
julia> pmapreduce(x -> ones(2) .* myid(), x -> hcat(x...), 1:nworkers())
29-
2×2 Array{Float64,2}:
30-
2.0 3.0
31-
2.0 3.0
32-
33-
julia> pmapreduce_commutative(x -> ones(2) .* myid(), sum, 1:nworkers())
34-
2-element Array{Float64,1}:
35-
5.0
36-
5.0
37-
38-
julia> pmapsum(x -> ones(2) .* myid(), 1:nworkers())
39-
2-element Array{Float64,1}:
40-
5.0
41-
5.0
42-
```
43-
44-
# Performance
45-
46-
The `pmapreduce`-related functions are expected to be more performant than `@distributed` for loops. As an example, running the following on a Slurm cluster using 2 nodes with 28 cores on each leads to
47-
48-
```julia
49-
julia> @time @distributed (+) for i=1:nworkers()
50-
ones(10_000, 1_000)
51-
end;
52-
22.355047 seconds (7.05 M allocations: 8.451 GiB, 6.73% gc time)
53-
54-
julia> @time pmapsum(x -> ones(10_000, 1_000), 1:nworkers());
55-
2.672838 seconds (52.83 k allocations: 78.295 MiB, 0.53% gc time)
56-
```
57-
58-
The difference becomes more apparent as larger data needs to be communicated across workers. This is because `ParallelUtilities.pmapreduce*` perform local reductions on each node before communicating across nodes.
59-
60-
# Usage
61-
62-
The package splits up a collection of ranges into subparts of roughly equal length, so that all the cores are approximately equally loaded. This is best understood using an example: let's say that we have a function `f` that is defined as
63-
64-
```julia
65-
julia> @everywhere begin
66-
f(x,y,z) = x+y+z
67-
end
68-
```
69-
70-
where each parameter takes up values in a range, and we would like to sample the entire parameter space. As an example, we choose the ranges to be
71-
72-
```julia
73-
julia> xrange, yrange, zrange = 1:3, 2:4, 3:6 # ranges should be strictly increasing
74-
```
75-
76-
There are a total of 36 possible `(x,y,z)` combinations possible given these ranges. Let's say that we would like to split the evaluation of the function over 10 processors. We describe the simple way to evaluate this and then explain how this is achieved.
77-
78-
The set of parameters may be split up using the function `ProductSplit`. In this example each of the 10 processors receive a chunk as listed below
79-
80-
```julia
81-
julia> [collect(ProductSplit((xrange,yrange,zrange),10,i)) for i=1:10]
82-
10-element Array{Array{Tuple{Int64,Int64,Int64},1},1}:
83-
[(1, 2, 3), (2, 2, 3), (3, 2, 3), (1, 3, 3)]
84-
[(2, 3, 3), (3, 3, 3), (1, 4, 3), (2, 4, 3)]
85-
[(3, 4, 3), (1, 2, 4), (2, 2, 4), (3, 2, 4)]
86-
[(1, 3, 4), (2, 3, 4), (3, 3, 4), (1, 4, 4)]
87-
[(2, 4, 4), (3, 4, 4), (1, 2, 5), (2, 2, 5)]
88-
[(3, 2, 5), (1, 3, 5), (2, 3, 5), (3, 3, 5)]
89-
[(1, 4, 5), (2, 4, 5), (3, 4, 5)]
90-
[(1, 2, 6), (2, 2, 6), (3, 2, 6)]
91-
[(1, 3, 6), (2, 3, 6), (3, 3, 6)]
92-
[(1, 4, 6), (2, 4, 6), (3, 4, 6)]
93-
```
94-
95-
The first six processors receive 4 tuples of parameters each and the final four receive 3 each. This is the splitting used by the various functions described next.
96-
97-
## pmap-related functions
98-
99-
The package provides versions of `pmap` with an optional reduction. These differ from the one provided by `Distributed` in a few key aspects: firstly, the iterator product of the argument is what is passed to the function and not the arguments by elementwise, so the i-th task will be `Iterators.product(args...)[i]` and not `[x[i] for x in args]`. Specifically the second set of parameters in the example above will be `(2,2,3)` and not `(2,3,4)`.
100-
101-
Secondly, the iterator is passed to the function in batches and not elementwise, and it is left to the function to iterate over the collection. Thirdly, the tasks are passed on to processors sorted by rank, so the first task is passed to the first processor and the last to the last active worker. The tasks are also approximately evenly distributed across processors. The exported function `pmapbatch_elementwise` passes the elements to the function one-by-one as splatted tuples. This produces the same result as `pmap` for a single range as the argument.
102-
103-
### pmapbatch and pmapbatch_elementwise
104-
105-
As an example we demonstrate how to evaluate the function `f` for the ranges of parameters listed above:
106-
107-
```julia
108-
julia> p = pmapbatch_elementwise(f, (xrange,yrange,zrange));
109-
110-
julia> Tuple(p)
111-
(6, 7, 8, 7, 8, 9, 8, 9, 10, 7, 8, 9, 8, 9, 10, 9, 10, 11, 8, 9, 10, 9, 10, 11, 10, 11, 12, 9, 10, 11, 10, 11, 12, 11, 12, 13)
112-
```
113-
114-
There is also a function `pmapbatch` that deals with batches of parameters that are passed to each processor, and `pmap_elementwise` calls this function under the hood to process the parameters one by one. We may use this directly as well if we need the entire batch for some reason (eg. reading values off a disk, which needs to be done once for the entire set and not for every parameter). As an example we demonstrate how to obtain the same result as above using `pmapbatch`:
115-
116-
```julia
117-
julia> p = pmapbatch(x->[f(i...) for i in x], (xrange,yrange,zrange));
118-
119-
julia> Tuple(p)
120-
(6, 7, 8, 7, 8, 9, 8, 9, 10, 7, 8, 9, 8, 9, 10, 9, 10, 11, 8, 9, 10, 9, 10, 11, 10, 11, 12, 9, 10, 11, 10, 11, 12, 11, 12, 13)
121-
```
122-
123-
### pmapsum and pmapreduce
124-
125-
Often a parallel execution is followed by a reduction (eg. a sum over the results). A reduction may be commutative (in which case the order of results do not matter), or non-commutative (in which the order does matter). There are two functions that are exported that carry out these tasks: `pmapreduce_commutative` and `pmapreduce`, where the former does not preserve ordering and the latter does. For convenience, the package also provides the function `pmapsum` that chooses `sum` as the reduction operator. The map-reduce operation is similar in many ways to the distributed `for` loop provided by julia, but the main difference is that the reduction operation is not binary for the functions in this package (eg. we need `sum` and not `(+)`to add the results). There is also the difference as above that the function gets the parameters in batches, with functions having the suffix `_elementwise` taking on parameters individually as splatted `Tuple`s. The function `pmapreduce` does not take on parameters elementwise at this point, although this might be implemented in the future.
126-
127-
As an example, to sum up a list of numbers in parallel we may call
128-
```julia
129-
julia> pmapsum_elementwise(identity, 1:1000)
130-
500500
18+
julia> using ParallelUtilities
13119
```
13220

133-
Here the mapped function is taken to by `identity` which just returns its argument. To sum the squares of the numbers in a list we may use
134-
135-
```julia
136-
julia> pmapsum_elementwise(x -> x^2, 1:1000)
137-
333833500
138-
```
139-
140-
We may choose an arbitrary reduction operator in the function `pmapreduce` and `pmapreduce_commutative`, and the elementwise function `pmapreduce_commutative_elementwise`. The reductions are carried out as a binary tree across all workers.
141-
142-
```julia
143-
# Compute 1^2 * 2^2 * 3^2 in parallel
144-
julia> pmapreduce_commutative_elementwise(x -> x^2, prod, 1:3)
145-
36
146-
```
147-
148-
The function `pmapreduce` sorts the results obtained from each processor, so it is useful for concatenations.
149-
150-
```julia
151-
julia> workers()
152-
2-element Array{Int64,1}:
153-
2
154-
3
155-
156-
# The signature is pmapreduce(fmap, freduce, range_or_tuple_of_ranges)
157-
julia> pmapreduce(x -> ones(2).*myid(), x -> hcat(x...), 1:nworkers())
158-
2×2 Array{Float64,2}:
159-
2.0 3.0
160-
2.0 3.0
161-
```
162-
163-
The functions `pmapreduce` produces the same result as `pmapreduce_commutative` if the reduction operator is commutative (ie. the order of results received from the children workers does not matter).
164-
165-
The function `pmapsum` sets the reduction function to `sum`.
166-
167-
```julia
168-
julia> sum(workers())
169-
5
170-
171-
# We compute ones(2).*sum(workers()) in parallel
172-
julia> pmapsum(x -> ones(2).*myid(), 1:nworkers())
173-
2-element Array{Float64,1}:
174-
5.0
175-
5.0
176-
```
177-
178-
It is possible to specify the return types of the map and reduce operations in these functions. To specify the return types use the following variants:
179-
180-
```julia
181-
# Signature is pmapreduce(fmap, Tmap, freduce, Treduce, range_or_tuple_of_ranges)
182-
julia> pmapreduce(x -> ones(2).*myid(), Vector{Float64}, x -> hcat(x...), Matrix{Float64}, 1:nworkers())
183-
2×2 Array{Float64,2}:
184-
2.0 3.0
185-
2.0 3.0
186-
187-
# Signature is pmapsum(fmap, Tmap, range_or_tuple_of_ranges)
188-
julia> pmapsum(x -> ones(2).*myid(), Vector{Float64}, 1:nworkers())
189-
2-element Array{Float64,1}:
190-
5.0
191-
5.0
192-
```
193-
194-
Specifying the types would lead to a type coercion if possible, or an error if a conversion is not possible. This might help in asserting the correctness of the result obtained. For example:
195-
196-
```julia
197-
# The result is converted from Vector{Float64} to Vector{Int}.
198-
# Conversion works as the numbers are integers
199-
julia> pmapsum(x -> ones(2).*myid(), Vector{Int}, 1:nworkers())
200-
2-element Array{Int64,1}:
201-
5
202-
5
203-
204-
# Conversion fails here as the numbers aren't integers
205-
julia> pmapsum(x -> rand(2), Vector{Int}, 1:nworkers())
206-
ERROR: On worker 2:
207-
InexactError: Int64(0.7742577217010362)
208-
```
209-
210-
### Progress bar
211-
212-
The progress of the map-reduce operation might be tracked by setting the keyword argument `showprogress` to true. This might be useful in case certain workers have a heavier load than others.
213-
214-
```julia
215-
# Running on 8 workers, artificially induce load using sleep
216-
julia> pmapreduce(x -> (sleep(myid()); myid()), x -> hcat(x...), 1:nworkers(), showprogress=true)
217-
Progress in pmapreduce : 100%|██████████████████████████████████████████████████| Time: 0:00:09
218-
1×8 Array{Int64,2}:
219-
2 3 4 5 6 7 8 9
220-
221-
julia> pmapreduce(x -> (sleep(myid()); myid()), x -> hcat(x...), 1:nworkers(), showprogress=true, progressdesc="Progress : ")
222-
Progress : 100%|████████████████████████████████████████████████████████████████| Time: 0:00:09
223-
1×8 Array{Int64,2}:
224-
2 3 4 5 6 7 8 9
225-
```
226-
227-
Note that this does not track the progress of the individual maps, it merely tracks how many are completed. The progress of the individual maps may be tracked by explicitly passing a `RemoteChannel` to the mapping function and pushing the progress status to it from the workers.
228-
229-
### Why two mapreduce functions?
230-
231-
The two separate functions `pmapreduce` and `pmapreduce_commutative` exist for historical reasons. They use different binary tree structures for reduction. The commutative one might be removed in the future in favour of `pmapreduce`.
232-
233-
## ProductSplit
234-
235-
In the above examples we have talked about the tasks being distributed approximately equally among the workers without going into details about the distribution, which is what we describe here. The package provides an iterator `ProductSplit` that lists that ranges of parameters that would be passed on to each core. This may equivalently be achieved using an
236-
237-
```Iterators.Take{Iterators.Drop{Iterators.ProductIterator}}```
238-
239-
with appropriately chosen parameters, and in many ways a `ProductSplit` behaves similarly. However a `ProductSplit` supports several extra features such as `O(1)` indexing, which eliminates the need to actually iterate over it in many scenarios.
240-
241-
The signature of the constructor is
242-
243-
```julia
244-
ProductSplit(tuple_of_ranges, number_of_processors, processor_rank)
245-
```
246-
247-
where `processor_rank` takes up values in `1:number_of_processors`. Note that this is different from MPI where the rank starts from 0. For example, we check the tasks that are passed on to the processor number 4:
248-
249-
```julia
250-
julia> ps = ProductSplit((xrange, yrange, zrange), 10, 4);
251-
252-
julia> collect(ps)
253-
4-element Array{Tuple{Int64,Int64,Int64},1}:
254-
(1, 3, 4)
255-
(2, 3, 4)
256-
(3, 3, 4)
257-
(1, 4, 4)
258-
```
259-
260-
where the object loops over values of `(x,y,z)`, and the values are sorted in reverse lexicographic order (the last index increases the slowest while the first index increases the fastest). The ranges roll over as expected. The tasks are evenly distributed with the remainders being split among the first few processors. In this example the first six processors receive 4 tasks each and the last four receive 3 each. We can see this by evaluating the length of the `ProductSplit` operator on each processor
261-
262-
```julia
263-
julia> Tuple(length(ProductSplit((xrange,yrange,zrange), 10, i)) for i=1:10)
264-
(4, 4, 4, 4, 4, 4, 3, 3, 3, 3)
265-
```
266-
267-
### Indexing
268-
269-
The iterator supports fast indexing
270-
```julia
271-
julia> ps[3]
272-
(3, 3, 4)
273-
274-
julia> @btime $ps[3]
275-
9.493 ns (0 allocations: 0 bytes)
276-
(3, 3, 4)
277-
```
278-
279-
This is useful if we have a large number of parameters to analyze on each processor.
280-
281-
```julia
282-
julia> xrange_long,yrange_long,zrange_long = 1:3000,1:3000,1:3000
283-
(1:3000, 1:3000, 1:3000)
284-
285-
julia> params_long = (xrange_long,yrange_long,zrange_long);
286-
287-
julia> ps = ProductSplit(params_long, 10, 3)
288-
2700000000-element ProductSplit((1:3000, 1:3000, 1:3000), 10, 3)
289-
[(1, 1, 601), ... , (3000, 3000, 900)]
290-
291-
# Evaluate length using random ranges to avoid compiler optimizations
292-
julia> @btime length(p) setup = (n = rand(3000:4000); p = ProductSplit((1:n,1:n,1:n), 200, 2));
293-
2.674 ns (0 allocations: 0 bytes)
294-
295-
julia> @btime $ps_long[1000000] # also fast, does not iterate
296-
32.530 ns (0 allocations: 0 bytes)
297-
(1000, 334, 901)
298-
299-
julia> @btime first($ps_long)
300-
31.854 ns (0 allocations: 0 bytes)
301-
(1, 1, 901)
302-
303-
julia> @btime last($ps_long)
304-
31.603 ns (0 allocations: 0 bytes)
305-
(3000, 3000, 1200)
306-
```
21+
# Quick start
30722

308-
We may evaluate whether or not a value exists in the list and its index in `O(1)` time.
23+
Just replace `mapreduce` by `pmapreduce` in your code and things should work the same.
30924

31025
```julia
311-
julia> val = (3,3,4)
312-
(3, 3, 4)
313-
314-
julia> val in ps
315-
true
316-
317-
julia> localindex(ps, val)
318-
3
319-
320-
julia> val = (10,2,901);
26+
julia> @everywhere f(x) = (sleep(1); x^2); # some expensive calculation
32127

322-
julia> @btime $val in $ps_long
323-
50.183 ns (0 allocations: 0 bytes)
324-
true
28+
julia> nworkers()
29+
2
32530

326-
julia> @btime localindex($ps_long, $val)
327-
104.513 ns (0 allocations: 0 bytes)
328-
3010
329-
```
330-
331-
Another useful function is `whichproc` that returns the rank of the processor a specific set of parameters will be on, given the total number of processors. This is also computed using a binary search.
332-
333-
```julia
334-
julia> whichproc(params_long, val, 10)
335-
4
31+
julia> @time mapreduce(f, +, 1:10) # Serial
32+
10.021436 seconds (40 allocations: 1.250 KiB)
33+
385
33634

337-
julia> @btime whichproc($params_long, $val, 10);
338-
353.706 ns (0 allocations: 0 bytes)
35+
julia> @time pmapreduce(f, +, 1:10) # Parallel
36+
5.137051 seconds (863 allocations: 39.531 KiB)
37+
385
33938
```
34039

341-
### Extrema
342-
343-
We may compute the ranges of each variable on any processor in `O(1)` time.
344-
345-
```julia
346-
julia> extrema(ps, dim = 2) # extrema of the second parameter on this processor
347-
(3, 4)
348-
349-
julia> Tuple(extrema(ps, dim = i) for i in 1:3)
350-
((1, 3), (3, 4), (4, 4))
351-
352-
# Minimum and maximum work similarly
353-
354-
julia> (minimum(ps, dim = 2), maximum(ps, dim = 2))
355-
(3, 4)
40+
# Usage
35641

357-
julia> @btime extrema($ps_long, dim=2)
358-
52.813 ns (0 allocations: 0 bytes)
359-
(1, 3000)
360-
```
42+
See [the documentation](https://jishnub.github.io/ParallelUtilities.jl/stable) for examples and the API.

0 commit comments

Comments
 (0)