Skip to content

Commit 9514a3c

Browse files
committed
remove map arg to process, use map->step to convert map first. Put full describe result in process datafy as :desc and remove components
1 parent 0cb226c commit 9514a3c

File tree

3 files changed

+73
-54
lines changed

3 files changed

+73
-54
lines changed

Diff for: deps.edn

+1
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,4 @@
3333
:output-path "docs"
3434
:html {:namespace-list :flat}}}
3535
}}
36+

Diff for: src/main/clojure/clojure/core/async/flow.clj

+61-39
Original file line numberDiff line numberDiff line change
@@ -142,27 +142,24 @@
142142
[g [pid io-id :as coord] msgs] (g/inject g coord msgs))
143143

144144
(defn process
145-
"Given a function of four arities (0-3), aka the 'step-fn', or a map
146-
of functions corresponding thereto (described below), returns a
147-
launcher that creates a process compliant with the process
145+
"Given a function of four arities (0-3), aka the 'step-fn',
146+
returns a launcher that creates a process compliant with the process
148147
protocol (see the spi/ProcLauncher doc).
149148
150-
The possible arities/entries for the step-fn/map are
149+
The possible arities for the step-fn are
151150
152-
0 - :describe,
153-
1 - :init,
154-
2 - :transition
155-
3 - :transform.
151+
0 - 'describe', () -> description
152+
1 - 'init', (arg-map) -> initial-state
153+
2 - 'transition', (state transition) -> state'
154+
3 - 'transform', (state input msg) -> [state' output-map]
156155
157156
This is the core facility for defining the logic for processes via
158157
ordinary functions. Using a var holding a fn as the 'step-fn' is the
159158
preferred method for defining a proc, as it enables
160159
hot-code-reloading of the proc logic in a flow, and better names in
161-
datafy. You can use the map form to compose the proc logic from
162-
disparate functions or to leverage the optionality of some of the
163-
entry points.
160+
datafy.
164161
165-
arity 0, or :describe - required, () -> description
162+
arity 0 - 'describe', () -> description
166163
where description is a map with keys :params :ins and :outs, each of which
167164
in turn is a map of keyword to doc string, and :workload with
168165
possible values of :mixed :io :compute. All entries in the describe
@@ -182,14 +179,13 @@
182179
the proc. It will also be called by the impl in order to discover
183180
what channels are needed.
184181
185-
arity 1, or :init - optional, (arg-map) -> initial-state
182+
arity 1 - 'init', (arg-map) -> initial-state
186183
187-
init will be called once by the process to establish any initial
184+
The init arity will be called once by the process to establish any initial
188185
state. The arg-map will be a map of param->val, as supplied in the
189186
flow def. The key ::flow/pid will be added, mapped to the pid
190187
associated with the process (useful e.g. if the process wants to
191-
refer to itself in reply-to coordinates). init must be provided if
192-
'describe' returns :params.
188+
refer to itself in reply-to coordinates).
193189
194190
Optionally, a returned init state may contain the
195191
keys ::flow/in-ports and/or ::flow/out-ports. These should be maps
@@ -200,34 +196,35 @@
200196
outside of it. Use :transition to coordinate the lifecycle of these
201197
external channels.
202198
203-
Optionally, _any_ returned state, whether from :init, :transition
204-
or :transform, may contain the key ::flow/input-filter, a predicate
199+
Optionally, _any_ returned state, whether from init, transition
200+
or transform, may contain the key ::flow/input-filter, a predicate
205201
of cid. Only inputs (including in-ports) satisfying the predicate
206202
will be part of the next channel read set. In the absence of this
207203
predicate all inputs are read.
208204
209-
arity 2, or :transition - optional, (state transition) -> state'
205+
arity 2 - 'transition', (state transition) -> state'
210206
211-
transition will be called when the process makes a state transition,
212-
transition being one of ::flow/resume, ::flow/pause or ::flow/stop
207+
The transition arity will be called when the process makes a state
208+
transition, transition being one of ::flow/resume, ::flow/pause
209+
or ::flow/stop
213210
214-
With this fn a process impl can track changes and coordinate
211+
With this a process impl can track changes and coordinate
215212
resources, especially cleaning up any resources on :stop, since the
216213
process will no longer be used following that. See the SPI for
217214
details. state' will be the state supplied to subsequent calls.
218215
219-
arity 3, or :transform - required, (state in-name msg) -> [state' output]
216+
arity 3 - 'transform', (state in-name msg) -> [state' output]
220217
where output is a map of outid->[msgs*]
221218
222-
The transform fn will be called every time a message arrives at any
219+
The transform arity will be called every time a message arrives at any
223220
of the inputs. Output can be sent to none, any or all of the :outs
224221
enumerated, and/or an input named by a [pid inid] tuple (e.g. for
225222
reply-to), and/or to the ::flow/report output. A step need not
226223
output at all (output or msgs can be empyt/nil), however an output _message_
227224
may never be nil (per core.async channels). state' will be the state
228225
supplied to subsequent calls.
229226
230-
process accepts an option map with keys:
227+
process also accepts an option map with keys:
231228
:workload - one of :mixed, :io or :compute
232229
:compute-timeout-ms - if :workload is :compute, this timeout (default 5000 msec)
233230
will be used when getting the return from the future - see below
@@ -242,18 +239,38 @@
242239
243240
When :io is specified, transform should not do extensive computation.
244241
245-
When :compute is specified (only allowed for :transform), each call
246-
to transform will be run in a separate thread. The process loop will
247-
run in an :io context (since it no longer directly calls transform,
248-
all it does is I/O) and it will submit transform to the :compute
249-
executor then await (blocking, for compute-timeout-ms) the
250-
completion of the future returned by the executor. If the future
251-
times out it will be reported on ::flow/error.
242+
When :compute is specified, each call to transform will be run in a
243+
separate thread. The process loop will run in an :io context (since
244+
it no longer directly calls transform, all it does is I/O) and it
245+
will submit transform to the :compute executor then await (blocking,
246+
for compute-timeout-ms) the completion of the future returned by the
247+
executor. If the future times out it will be reported
248+
on ::flow/error.
252249
253250
When :compute is specified transform must not block!"
254-
([fn-or-map] (process fn-or-map nil))
255-
([fn-or-map {:keys [workload compute-timeout-ms] :as opts}]
256-
(impl/proc fn-or-map opts)))
251+
([step-fn] (process step-fn nil))
252+
([step-fn {:keys [workload compute-timeout-ms] :as opts}]
253+
(impl/proc step-fn opts)))
254+
255+
(defn map->step
256+
"given a map of functions corresponding to step fn arities (see
257+
'process'), returns a step fn suitable for passing to 'process'. You
258+
can use this map form to compose the proc logic from disparate
259+
functions or to leverage the optionality of some of the entry
260+
points.
261+
262+
The keys in the map are:
263+
:describe, arity 0 - required
264+
:init, arity 1 - optional, but should be provided if 'describe' returns :params.
265+
:transition, arity 2 - optional
266+
:transform, arity 3 - required"
267+
[{:keys [describe init transition transform]}]
268+
(assert (and describe transform) "must provide :describe and :transform")
269+
(fn
270+
([] (describe))
271+
([arg-map] (when init (init arg-map)))
272+
([state trans] (if transition (transition state trans) state))
273+
([state input msg] (transform state input msg))))
257274

258275
(defn lift*->step
259276
"given a fn f taking one arg and returning a collection of non-nil
@@ -263,15 +280,20 @@
263280
(fn
264281
([] {:ins {:in (str "the argument to " f)}
265282
:outs {:out (str "the return of " f)}})
266-
([_] nil)
267-
([_ _] nil)
268-
([_ _ msg] [nil {:out (f msg)}])))
283+
([arg-map] nil)
284+
([state transition] nil)
285+
([state input msg] [nil {:out (f msg)}])))
269286

270287
(defn lift1->step
271288
"like lift*->step except taking a fn returning one value, which when
272289
nil will yield no output."
273290
[f]
274-
(lift*->step #(when-some [m (f %)] (vector m))))
291+
(fn
292+
([] {:ins {:in (str "the argument to " f)}
293+
:outs {:out (str "the return of " f)}})
294+
([arg-map] nil)
295+
([state transition] nil)
296+
([state input msg] [nil (when-some [m (f msg)] {:out (vector m)})])))
275297

276298
(defn futurize
277299
"Takes a fn f and returns a fn that takes the same arguments as f

Diff for: src/main/clojure/clojure/core/async/flow/impl.clj

+11-15
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@
189189
(defn handle-transition
190190
"when transition, returns maybe new state"
191191
[transition status nstatus state]
192-
(if (and transition (not= status nstatus))
192+
(if (not= status nstatus)
193193
(transition state (case nstatus
194194
:exit ::flow/stop
195195
:running ::flow/resume
@@ -219,29 +219,25 @@
219219

220220
(defn proc
221221
"see lib ns for docs"
222-
[fm {:keys [workload compute-timeout-ms] :or {compute-timeout-ms 5000}}]
223-
(let [{:keys [describe init transition transform] :as impl}
224-
(if (map? fm) fm {:describe fm :init fm :transition fm :transform fm})
225-
{:keys [params ins] :as desc} (describe)
222+
[step {:keys [workload compute-timeout-ms] :or {compute-timeout-ms 5000}}]
223+
(let [{:keys [params ins] :as desc} (step)
226224
workload (or workload (:workload desc) :mixed)]
227-
(assert transform "must provide :transform")
228-
(assert (or (not params) init) "must have :init if :params")
225+
;;(assert (or (not params) init) "must have :init if :params")
229226
(reify
230227
clojure.core.protocols/Datafiable
231228
(datafy [_]
232229
(let [{:keys [params ins outs]} desc]
233-
(walk/postwalk datafy {:impl fm :params (-> params keys vec)
234-
:ins (-> ins keys vec) :outs (-> outs keys vec)})))
230+
(walk/postwalk datafy {:step step :desc desc})))
235231
spi/ProcLauncher
236232
(describe [_] desc)
237233
(start [_ {:keys [pid args ins outs resolver]}]
238234
(assert (or (not params) args) "must provide :args if :params")
239235
(let [transform (if (= workload :compute)
240-
#(.get ^Future ((futurize transform {:exec (spi/get-exec resolver :compute)}) %1 %2 %3)
236+
#(.get ^Future ((futurize step {:exec (spi/get-exec resolver :compute)}) %1 %2 %3)
241237
compute-timeout-ms TimeUnit/MILLISECONDS)
242-
transform)
238+
step)
243239
exs (spi/get-exec resolver (if (= workload :mixed) :mixed :io))
244-
state (when init (init args))
240+
state (step args)
245241
ins (into (or ins {}) (::flow/in-ports state))
246242
outs (into (or outs {}) (::flow/out-ports state))
247243
io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
@@ -261,7 +257,7 @@
261257
(try
262258
(if (= status :paused)
263259
(let [nstatus (handle-command status (async/<!! control))
264-
nstate (handle-transition transition status nstatus state)]
260+
nstate (handle-transition step status nstatus state)]
265261
[nstatus nstate count read-ins])
266262
;;:running
267263
(let [ ;;TODO rotate/randomize after control per normal alts?
@@ -275,13 +271,13 @@
275271
cid (io-id c)]
276272
(if (= c control)
277273
(let [nstatus (handle-command status msg)
278-
nstate (handle-transition transition status nstatus state)]
274+
nstate (handle-transition step status nstatus state)]
279275
[nstatus nstate count read-ins])
280276
(try
281277
(let [[nstate outputs] (transform state cid msg)
282278
[nstatus nstate]
283279
(send-outputs status nstate outputs outs
284-
resolver control handle-command transition)]
280+
resolver control handle-command step)]
285281
[nstatus nstate (inc count) (if (some? msg)
286282
read-ins
287283
(dissoc read-ins cid))])

0 commit comments

Comments
 (0)