-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathaio.R
362 lines (342 loc) · 11.6 KB
/
aio.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
# Copyright (C) 2022-2024 Hibiki AI Limited <info@hibiki-ai.com>
#
# This file is part of nanonext.
#
# nanonext is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# nanonext is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# nanonext. If not, see <https://www.gnu.org/licenses/>.
# nanonext - Core - Aio Functions ----------------------------------------------
# send_aio/recv_aio ------------------------------------------------------------
#' Send Async
#'
#' Send data asynchronously over a connection (Socket, Context or Stream).
#'
#' @inheritParams send
#' @param timeout [default NULL] integer value in milliseconds or NULL, which
#' applies a socket-specific default, usually the same as no timeout.
#'
#' @return A 'sendAio' (object of class 'sendAio') (invisibly).
#'
#' @details Async send is always non-blocking and returns a 'sendAio'
#' immediately.
#'
#' For a 'sendAio', the send result is available at \code{$result}. An
#' 'unresolved' logical NA is returned if the async operation is yet to
#' complete. The resolved value will be zero on success, or else an integer
#' error code.
#'
#' To wait for and check the result of the send operation, use
#' \code{\link{call_aio}} on the returned 'sendAio' object.
#'
#' Alternatively, to stop the async operation, use \code{\link{stop_aio}}.
#'
#' @inheritSection send Send Modes
#'
#' @examples
#' pub <- socket("pub", dial = "inproc://nanonext")
#'
#' res <- send_aio(pub, data.frame(a = 1, b = 2), timeout = 100)
#' res
#' res$result
#'
#' res <- send_aio(pub, "example message", mode = "raw", timeout = 100)
#' call_aio(res)$result
#'
#' close(pub)
#'
#' @export
#'
send_aio <- function(con, data, mode = c("serial", "raw", "next"), timeout = NULL)
data <- .Call(rnng_send_aio, con, data, mode, timeout, environment())
#' Receive Async
#'
#' Receive data asynchronously over a connection (Socket, Context or Stream).
#'
#' @inheritParams recv
#' @inheritParams send_aio
#'
#' @return A 'recvAio' (object of class 'recvAio') (invisibly).
#'
#' @details Async receive is always non-blocking and returns a 'recvAio'
#' immediately.
#'
#' For a 'recvAio', the received message is available at \code{$data}. An
#' 'unresolved' logical NA is returned if the async operation is yet to
#' complete.
#'
#' To wait for the async operation to complete and retrieve the received
#' message, use \code{\link{call_aio}} on the returned 'recvAio' object.
#'
#' Alternatively, to stop the async operation, use \code{\link{stop_aio}}.
#'
#' In case of an error, an integer 'errorValue' is returned (to be
#' distiguishable from an integer message value). This can be checked using
#' \code{\link{is_error_value}}.
#'
#' If an error occurred in unserialization or conversion of the message data
#' to the specified mode, a raw vector will be returned instead to allow
#' recovery (accompanied by a warning).
#'
#' @examples
#' s1 <- socket("pair", listen = "inproc://nanonext")
#' s2 <- socket("pair", dial = "inproc://nanonext")
#'
#' res <- send_aio(s1, data.frame(a = 1, b = 2), timeout = 100)
#' msg <- recv_aio(s2, timeout = 100)
#' msg
#' msg$data
#'
#' res <- send_aio(s1, c(1.1, 2.2, 3.3), mode = "raw", timeout = 100)
#' msg <- recv_aio(s2, mode = "double", timeout = 100)
#' msg
#' msg$data
#'
#' res <- send_aio(s1, "example message", mode = "raw", timeout = 100)
#' msg <- recv_aio(s2, mode = "character", timeout = 100)
#' call_aio(msg)
#' msg$data
#'
#' close(s1)
#' close(s2)
#'
#' @export
#'
recv_aio <- function(con,
mode = c("serial", "character", "complex", "double",
"integer", "logical", "numeric", "raw", "string"),
timeout = NULL,
n = 65536L)
data <- .Call(rnng_recv_aio, con, mode, timeout, n, environment(),
function() {
cb <- data$callback
if (!is.null(cb)) {
cb(data)
}
}
)
#' Receive Async and Signal a Condition
#'
#' A signalling version of the function takes a 'conditionVariable' as an
#' additional argument and signals it when the async receive is complete.
#'
#' @param cv \strong{For the signalling version}: a 'conditionVariable' to
#' signal when the async receive is complete.
#'
#' @details \strong{For the signalling version}: when the receive is complete,
#' the supplied 'conditionVariable' is signalled by incrementing its value
#' by 1. This happens asynchronously and independently of the R execution
#' thread.
#'
#' @examples
#' # Signalling a condition variable
#'
#' s1 <- socket("pair", listen = "tcp://127.0.0.1:6546")
#' cv <- cv()
#' msg <- recv_aio_signal(s1, timeout = 100, cv = cv)
#' until(cv, 10L)
#' msg$data
#' close(s1)
#'
#' # in another process in parallel
#' s2 <- socket("pair", dial = "tcp://127.0.0.1:6546")
#' res <- send_aio(s2, c(1.1, 2.2, 3.3), mode = "raw", timeout = 100)
#' close(s2)
#'
#' @rdname recv_aio
#' @export
#'
recv_aio_signal <- function(con,
cv,
mode = c("serial", "character", "complex", "double",
"integer", "logical", "numeric", "raw", "string"),
timeout = NULL,
n = 65536L)
data <- .Call(rnng_recv_aio_signal, con, cv, mode, timeout, n, environment(),
function() {
cb <- data$callback
if (!is.null(cb)) {
cb(data)
}
}
)
#' @exportS3Method promises::is.promising
is.promising.recvAio <- function(x) {
TRUE
}
#' @exportS3Method promises::as.promise
as.promise.recvAio <- function(x) {
prom <- x$promise
if (is.null(prom)) {
prom <- promises::promise(function(resolve, reject) {
assign("callback", function(...) {
# WARNING: x$data is heavily side-effecty!
value <- x$data
if (is_error_value(value)) {
reject(simpleError(nng_error(value)))
} else {
resolve(value)
}
}, x)
})
# WARNING: x$data is heavily side-effecty!
value <- x$data
if (!inherits(value, "unresolvedValue")) {
if (is_error_value(value)) {
prom <- promises::promise_reject(simpleError(nng_error(value)))
} else {
prom <- promises::promise_resolve(value)
}
}
# Save for next time. This is not just an optimization but essential for
# correct behavior if as.promise is called multiple times, because only one
# `callback` can exist on the recvAio object at a time.
assign("promise", prom, x)
}
prom
}
# Core aio functions -----------------------------------------------------------
#' Call the Value of an Asynchronous Aio Operation
#'
#' \code{call_aio} retrieves the value of an asynchronous Aio operation, waiting
#' for the operation to complete if still in progress.
#'
#' @param aio an Aio (object of class 'sendAio', 'recvAio' or 'ncurlAio').
#'
#' @return The passed object (invisibly).
#'
#' @details For a 'recvAio', the received value may be retrieved at \code{$data}.
#'
#' For a 'sendAio', the send result may be retrieved at \code{$result}. This
#' will be zero on success, or else an integer error code.
#'
#' To access the values directly, use for example on a 'recvAio' \code{x}:
#' \code{call_aio(x)$data}.
#'
#' For a 'recvAio', if an error occurred in unserialization or conversion of
#' the message data to the specified mode, a raw vector will be returned
#' instead to allow recovery (accompanied by a warning).
#'
#' Once the value has been successfully retrieved, the Aio is deallocated
#' and only the value is stored in the Aio object.
#'
#' Note this function operates silently and does not error even if 'aio' is
#' not an active Aio, always returning invisibly the passed object.
#'
#' @section Alternatively:
#'
#' Aio values may be accessed directly at \code{$result} for a 'sendAio',
#' and \code{$data} for a 'recvAio'. If the Aio operation is yet to complete,
#' an 'unresolved' logical NA will be returned. Once complete, the resolved
#' value will be returned instead.
#'
#' \code{\link{unresolved}} may also be used, which returns TRUE only if an
#' Aio or Aio value has yet to resolve and FALSE otherwise. This is suitable
#' for use in control flow statements such as \code{while} or \code{if}.
#'
#' @examples
#' s1 <- socket("pair", listen = "inproc://nanonext")
#' s2 <- socket("pair", dial = "inproc://nanonext")
#'
#' res <- send_aio(s1, data.frame(a = 1, b = 2), timeout = 100)
#' res
#' call_aio(res)
#' res$result
#'
#' msg <- recv_aio(s2, timeout = 100)
#' msg
#' call_aio_(msg)$data
#'
#' close(s1)
#' close(s2)
#'
#' @export
#'
call_aio <- function(aio) invisible(.Call(rnng_aio_call, aio))
#' Call the Value of an Asynchronous Aio Operation
#'
#' \code{call_aio_} is a variant that allows user interrupts, suitable for
#' interactive use.
#'
#' @rdname call_aio
#' @export
#'
call_aio_ <- function(aio) invisible(.Call(rnng_wait_thread_create, aio))
#' Stop Asynchronous Aio Operation
#'
#' Stop an asynchronous Aio operation.
#'
#' @inheritParams call_aio
#'
#' @return Invisible NULL.
#'
#' @details Stops the asynchronous I/O operation associated with 'aio' by
#' aborting, and then waits for it to complete or to be completely aborted.
#' The Aio is then deallocated and no further operations may be performed on
#' it.
#'
#' Note this function operates silently and does not error even if 'aio' is
#' not an active Aio, always returning invisible NULL.
#'
#' @export
#'
stop_aio <- function(aio) invisible(.Call(rnng_aio_stop, aio))
#' Query if an Aio is Unresolved
#'
#' Query whether an Aio or Aio value remains unresolved. Unlike
#' \code{\link{call_aio}}, this function does not wait for completion.
#'
#' @param aio an Aio (object of class 'sendAio' or 'recvAio'), or Aio value
#' stored in \code{$result} or \code{$data} as the case may be.
#'
#' @return Logical TRUE if 'aio' is an unresolved Aio or Aio value, or FALSE
#' otherwise.
#'
#' @details Suitable for use in control flow statements such as \code{while} or
#' \code{if}.
#'
#' Note: querying resolution may cause a previously unresolved Aio to resolve.
#'
#' @examples
#' s1 <- socket("pair", listen = "inproc://nanonext")
#' aio <- send_aio(s1, "test", timeout = 100)
#'
#' while (unresolved(aio)) {
#' # do stuff before checking resolution again
#' cat("unresolved\n")
#' msleep(20)
#' }
#'
#' unresolved(aio)
#'
#' close(s1)
#'
#' @export
#'
unresolved <- function(aio) .Call(rnng_unresolved, aio)
#' Technical Utility: Query if an Aio is Unresolved
#'
#' Query whether an Aio remains unresolved. This is an experimental technical
#' utility version of \code{\link{unresolved}} not intended for ordinary
#' use. Provides a method of querying the busy status of an Aio without
#' altering its state in any way i.e. not attempting to retrieve the result
#' or message.
#'
#' @param aio an Aio (object of class 'sendAio' or 'recvAio').
#'
#' @return Logical TRUE if 'aio' is an unresolved Aio, or FALSE otherwise.
#'
#' @details \code{.unresolved()} is not intended to be used for 'recvAio'
#' returned by a signalling function, in which case \code{\link{unresolved}}
#' must be used in all cases.
#'
#' @export
#'
.unresolved <- function(aio) .Call(rnng_unresolved2, aio)