Receive data asynchronously over a connection (Socket, Context or Stream).

A signalling version of the function takes a 'conditionVariable' as an additional argument and signals it when the async receive is complete.

recv_aio(
  con,
  mode = c("serial", "character", "complex", "double", "integer", "logical", "numeric",
    "raw"),
  timeout = NULL,
  keep.raw = FALSE,
  n = 65536L
)

recv_aio_signal(
  con,
  mode = c("serial", "character", "complex", "double", "integer", "logical", "numeric",
    "raw"),
  timeout = NULL,
  keep.raw = FALSE,
  n = 65536L,
  cv
)

Arguments

con

a Socket, Context or Stream.

mode

[default 'serial'] mode of vector to be received - one of 'serial', 'character', 'complex', 'double', 'integer', 'logical', 'numeric', or 'raw'. The default 'serial' means a serialised R object, for the other modes, the raw vector received will be converted into the respective mode. For Streams, 'serial' is not an option and the default is 'character'. Alternatively, for performance, specify an integer position in the vector of choices e.g. 1L for 'serial', 2L for 'character' etc.

timeout

[default NULL] integer value in milliseconds or NULL, which applies a socket-specific default, usually the same as no timeout.

keep.raw

[default FALSE] logical flag whether to keep and return the received raw vector along with the converted data. Supplying a non-logical value will error.

n

[default 65536L] applicable to Streams only, the maximum number of bytes to receive. Can be an over-estimate, but note that a buffer of this size is reserved.

cv

For the signalling version: a 'conditionVariable' that should be signalled when the async receive is complete.

Value

A 'recvAio' (object of class 'recvAio') (invisibly).

Details

Async receive is always non-blocking and returns a 'recvAio' immediately.

For a 'recvAio', the received message is available at $data, and the raw message at $raw (if kept). An 'unresolved' logical NA is returned if the async operation is yet to complete.

To wait for the async operation to complete and retrieve the received message, use call_aio on the returned 'recvAio' object.

Alternatively, to stop the async operation, use stop_aio.

In case of an error, an integer 'errorValue' is returned (to be distiguishable from an integer message value). This can be verified using is_error_value.

If the raw message was successfully received but an error occurred in unserialisation or data conversion (for example if the incorrect mode was specified), the received raw vector will be stored at $data to allow for the data to be recovered.

For the signalling version: when the receive is complete, the supplied 'conditionVariable' is signalled by incrementing its value by 1. This happens asynchronously and independently of the R execution thread.

Examples

s1 <- socket("pair", listen = "inproc://nanonext")
s2 <- socket("pair", dial = "inproc://nanonext")

res <- send_aio(s1, data.frame(a = 1, b = 2), timeout = 100)
msg <- recv_aio(s2, timeout = 100)
msg
#> < recvAio >
#>  - $data for message data
msg$data
#>   a b
#> 1 1 2

res <- send_aio(s1, c(1.1, 2.2, 3.3), mode = "raw", timeout = 100)
msg <- recv_aio(s2, mode = "double", timeout = 100, keep.raw = TRUE)
msg
#> < recvAio >
#>  - $raw for raw message
#>  - $data for message data
msg$raw
#>  [1] 9a 99 99 99 99 99 f1 3f 9a 99 99 99 99 99 01 40 66 66 66 66 66 66 0a 40
msg$data
#> [1] 1.1 2.2 3.3

res <- send_aio(s1, "example message", mode = "raw", timeout = 100)
msg <- recv_aio(s2, mode = "character", timeout = 100, keep.raw = TRUE)
call_aio(msg)
msg$raw
#>  [1] 65 78 61 6d 70 6c 65 20 6d 65 73 73 61 67 65 00
msg$data
#> [1] "example message"

close(s1)
close(s2)

# Signalling a condition variable

s1 <- socket("pair", listen = "tcp://127.0.0.1:6546")
cv <- cv()
msg <- recv_aio_signal(s1, timeout = 100, cv = cv)
until(cv, 10L)
msg$data
#> 'unresolved' logi NA
close(s1)

# in another process in parallel
s2 <- socket("pair", dial = "tcp://127.0.0.1:6546")
res <- send_aio(s2, c(1.1, 2.2, 3.3), mode = "raw", timeout = 100)
close(s2)