Home
Miron Kursa edited this page 2 years ago

Rethinker tutorial

First, we have to load the package and open connection to the RethinkDB server. Here we assume it works on a local host listening on a default port; if not, appropriate parameters can be fed to the openConnection() function.

library(rethinker)
cn<-openConnection()

Anyhow, we get a connection handle which can be used to issue queries; thus, we save it into a variable cn.

print(cn)
## 
##  Opened connection to RethinkDB @ localhost:28015.

Now, we will create a throw-away database and table not to mess with any important data. To this end, we need to formulate a ReQL query; similar to other RethinkDB drivers, rethinker uses native language structures for that. For now, we will also discard the results.

r()$dbCreate("rethinker_test")$run(cn)->ignore

Next, we will need a throw-away table; to this end, we have to first select our rethinker_test db, and then issue $tableCreate().

r()$db("rethinker_test")$tableCreate("A")$run(cn)->ignore

Time for some actual query; let's check how many objects are in A; should be zero, as it was just created.

r()$db("rethinker_test")$table("A")$count()$run(cn)
## [1] 0

Whenever there is an error in query, it is re-thrown as an R error; also the connection gets terminated, so a new one should be used.

print(attr(try(
 r()$db("doesNotExist")$table("xxx")$count()$run(cn)
),"condition"))
## <simpleError in fetchResponseRaw(x, token): Error: Database `doesNotExist` does not exist.>
print(cn)
## 
##  Lost connection to RethinkDB @ localhost:28015.
cn<-openConnection()

While $db()$table() is long to write, we can use a shortcut, namely specify them in r().

r("rethinker_test","A")$count()$run(cn)
## [1] 0

Time to insert some object; RethinkDB stores JSON, but rethinker allows one to use native R objects.

r("rethinker_test","A")$insert(
 list(
  id='a',
  string='abc',
  number=7,
  array=1:10,
  #JSON arrays may have different types
  another_array=list(1,'two',3),
  object=list(three=3,array=letters[1:3],sub=list(i_am="a nested object"))
 )
)$run(cn)->ans

Conversion follows rjson conventions (by actually using rjson for that purpose), i.e. unnamed lists and vectors turn into arrays, named lists into objects, and 1-element vectors into scalars.

Anyway, as an answer, we get a typical RethinkDB object describing changes given query caused;

ans
## $deleted
## [1] 0
## 
## $errors
## [1] 0
## 
## $inserted
## [1] 1
## 
## $replaced
## [1] 0
## 
## $skipped
## [1] 0
## 
## $unchanged
## [1] 0

here we see a new object was created. Let's try to retrieve it, by using $get().

r("rethinker_test","A")$get("a")$run(cn)->ans
str(ans)
## List of 6
##  $ another_array:List of 3
##   ..$ : num 1
##   ..$ : chr "two"
##   ..$ : num 3
##  $ array        : num [1:10] 1 2 3 4 5 6 7 8 9 10
##  $ id           : chr "a"
##  $ number       : num 7
##  $ object       :List of 3
##   ..$ array: chr [1:3] "a" "b" "c"
##   ..$ sub  :List of 1
##   .. ..$ i_am: chr "a nested object"
##   ..$ three: num 3
##  $ string       : chr "abc"

Note that objects keys have been sorted; unlike R, JSON does not store object's element order, neither does RethinkDB. It is also worth noticing that $get() only works with primary index, by default the id element, which has to be unique and is randomly created by the server in case it is not specified.

We can store several objects at once (note than unlist() is only used to prettify the query output)...

unlist(r("rethinker_test","A")$insert(
 list(
  list(id="b",number=17),
  list(id="c",number=-3)
 )
)$run(cn))
##   deleted    errors  inserted  replaced   skipped unchanged 
##         0         0         2         0         0         0

... or use JSON strings, which may be faster; to this end, we will use $json() function.

unlist(r("rethinker_test","A")$insert(
 r()$json('{"id":"d","number":49}')
)$run(cn))
##   deleted    errors  inserted  replaced   skipped unchanged 
##         0         0         1         0         0         0

Note that every ReQL function must be chained to the ReQL root; thus we had to use r() to create it inside $insert().

Let's now update some object.

unlist(r("rethinker_test","A")$get("c")$update(
 list(
  number=21
 )
)$run(cn))
##   deleted    errors  inserted  replaced   skipped unchanged 
##         0         0         0         1         0         0

Note that we have now replaced rather than inserted.

To drill-down into objects, JavaScript driver uses (), and Python driver []; in rethinker, one has to use a $bracket() function.

r("rethinker_test","A")$get("c")$bracket("number")$run(cn)
## [1] 21

One can also remotely run functions on the objects stored in the database; they must only contain ReQL statements, though. Let's try filtering.

r("rethinker_test","A")$filter(
 function(x) r()$and(
  x$bracket('number')$lt(30),
  x$bracket('number')$gt(0)
 )
)$run(cn)->ans
print(ans)
## 
##  Active RethinkDB cursor;
##  3 response(s) cached, no more on the server.

Because the query returned many records, we got a cursor object which represents a (potentially very long, even infinite) sequence of documents. Still, when one knows it is reasonably short, cursorToList() function can be used to download the whole sequence represented by the cursor and return it as a list.

str(cursorToList(ans))
## List of 3
##  $ :List of 2
##   ..$ id    : chr "b"
##   ..$ number: num 17
##  $ :List of 6
##   ..$ another_array:List of 3
##   .. ..$ : num 1
##   .. ..$ : chr "two"
##   .. ..$ : num 3
##   ..$ array        : num [1:10] 1 2 3 4 5 6 7 8 9 10
##   ..$ id           : chr "a"
##   ..$ number       : num 7
##   ..$ object       :List of 3
##   .. ..$ array: chr [1:3] "a" "b" "c"
##   .. ..$ sub  :List of 1
##   .. .. ..$ i_am: chr "a nested object"
##   .. ..$ three: num 3
##   ..$ string       : chr "abc"
##  $ :List of 2
##   ..$ id    : chr "c"
##   ..$ number: num 21

Alternatively, one can use cursorNext() function to iteratively pull data from a cursor, and isCursorEmpty() to check whether there is any more data.

Anyhow, when cursor still has some data but is no longer needed, it should be released with the close() function.

close(ans)
## 
##  Empty RethinkDB cursor.

This way server is able to release resources associated with it. On the other hand, closing connection will also release all the cursors associated with it; empty cursors do not need closing either.

Some ReQL commands may require named parameters; this is implemented with R's named function arguments. Here we will use $insert() with conflict=update, to make it update already existing object c rather than throwing an error, and return_changes=TRUE to enrich the query result with its actual results.

r("rethinker_test","A")$insert(
 list(
  id="c",
  number=49
 ),
 conflict="update",
 return_changes=TRUE #Not returnChanges
)$run(cn)
## $changes
## $changes[[1]]
## $changes[[1]]$new_val
## $changes[[1]]$new_val$id
## [1] "c"
## 
## $changes[[1]]$new_val$number
## [1] 49
## 
## 
## $changes[[1]]$old_val
## $changes[[1]]$old_val$id
## [1] "c"
## 
## $changes[[1]]$old_val$number
## [1] 21
## 
## 
## 
## 
## $deleted
## [1] 0
## 
## $errors
## [1] 0
## 
## $inserted
## [1] 0
## 
## $replaced
## [1] 1
## 
## $skipped
## [1] 0
## 
## $unchanged
## [1] 0

The catch here are two-word parameters; they should be given in a snake_case fashion (like in Python driver) rather than in camelCase (like in JavaScript driver and like term names in rethinker).

Apart from cursors, one can use asynchronous queries; the idea is that query results are passed to a callback function, similarly to how lapply works. This is especially important as it allows to use RethinkDB's change feeds. Still, R has no event loop, thus rethinker exports drainConnection function which blocks R session and allows asynchronous processing of query results, even from many concurrent queries and queries started from within callbacks. Obviously, no asynchronous callback will be actually executed before drainConnection is called. Callback is expected to return a boolean value; when it is FALSE, the async query it handles is terminated. This way, drainConnection can return, namely when there is no more active queries on a given connection.

{
r("rethinker_test","A")$runAsync(cn,
 function(x){
  message("Some element in rethinker_tests/A:")
  print(x)
  #This is for demonstration so we won't wait for more
  message("Terminating this query.")
  return(FALSE)
 }
)
message("Query was executed but nothing yet happened.")
message("Now, drainConnection is called.")
drainConnection(cn)
message("Drain connection exits.")
}
## Query was executed but nothing yet happened.
## Now, drainConnection is called.
## Some element in rethinker_tests/A:
## $id
## [1] "d"
## 
## $number
## [1] 49
## Terminating this query.
## Drain connection exits.

This concludes this brief tutorial; for more information, it is best to consult the RethinkDB documentation and the rethinker manual. Bug reports, feature requests and pull requests are welcome on GitHub.

Still, we have to clean up.

r()$dbDrop("rethinker_test")$run(cn)