Receive data asynchronously over a connection (Socket, Context or Stream).
A signalling version of the function takes a 'conditionVariable' as an additional argument and signals it when the async receive is complete.
recv_aio(
con,
mode = c("serial", "character", "complex", "double", "integer", "logical", "numeric",
"raw", "string"),
timeout = NULL,
n = 65536L
)
recv_aio_signal(
con,
cv,
mode = c("serial", "character", "complex", "double", "integer", "logical", "numeric",
"raw", "string"),
timeout = NULL,
n = 65536L
)
a Socket, Context or Stream.
[default 'serial'] mode of vector to be received - one of 'serial', 'character', 'complex', 'double', 'integer', 'logical', 'numeric', 'raw', or 'string'. The default 'serial' means a serialised R object, for the other modes, the raw vector received will be converted into the respective mode. 'string' is a faster alternative to 'character' for receiving a length 1 character string. For Streams, 'serial' is not an option and the default is 'character'. Alternatively, specify an integer position in the vector of choices e.g. 1L for 'serial', 2L for 'character' etc.
[default NULL] integer value in milliseconds or NULL, which applies a socket-specific default, usually the same as no timeout.
[default 65536L] applicable to Streams only, the maximum number of bytes to receive. Can be an over-estimate, but note that a buffer of this size is reserved.
For the signalling version: a 'conditionVariable' to signal when the async receive is complete.
A 'recvAio' (object of class 'recvAio') (invisibly).
Async receive is always non-blocking and returns a 'recvAio' immediately.
For a 'recvAio', the received message is available at $data
. An
'unresolved' logical NA is returned if the async operation is yet to
complete.
To wait for the async operation to complete and retrieve the received
message, use call_aio
on the returned 'recvAio' object.
Alternatively, to stop the async operation, use stop_aio
.
In case of an error, an integer 'errorValue' is returned (to be
distiguishable from an integer message value). This can be checked using
is_error_value
.
If an error occurred in unserialization or conversion of the message data to the specified mode, a raw vector will be returned instead to allow recovery (accompanied by a warning).
For the signalling version: when the receive is complete, the supplied 'conditionVariable' is signalled by incrementing its value by 1. This happens asynchronously and independently of the R execution thread.
s1 <- socket("pair", listen = "inproc://nanonext")
s2 <- socket("pair", dial = "inproc://nanonext")
res <- send_aio(s1, data.frame(a = 1, b = 2), timeout = 100)
msg <- recv_aio(s2, timeout = 100)
msg
#> < recvAio >
#> - $data for message data
msg$data
#> a b
#> 1 1 2
res <- send_aio(s1, c(1.1, 2.2, 3.3), mode = "raw", timeout = 100)
msg <- recv_aio(s2, mode = "double", timeout = 100)
msg
#> < recvAio >
#> - $data for message data
msg$data
#> [1] 1.1 2.2 3.3
res <- send_aio(s1, "example message", mode = "raw", timeout = 100)
msg <- recv_aio(s2, mode = "character", timeout = 100)
call_aio(msg)
msg$data
#> [1] "example message"
close(s1)
close(s2)
# Signalling a condition variable
s1 <- socket("pair", listen = "tcp://127.0.0.1:6546")
cv <- cv()
msg <- recv_aio_signal(s1, timeout = 100, cv = cv)
until(cv, 10L)
msg$data
#> 'unresolved' logi NA
close(s1)
# in another process in parallel
s2 <- socket("pair", dial = "tcp://127.0.0.1:6546")
res <- send_aio(s2, c(1.1, 2.2, 3.3), mode = "raw", timeout = 100)
close(s2)