group-by processing with Rcpp and data.table¶
GForce optimization¶
data.table is a fantastic R package because of its speed, its compact syntax and
lots of other things: fread
, roll joins, key… One reason of its speed
is the internal GForce optimization which avoids calling a function for each group.
By default, GForce (and other optimizations) are enabled. In this example, sum
function
is silently transformed into a gsum
, an internal data.table
GForce version which
avoids calling sum
for each group.
> system.time(dt[, sum(a), by=id, verbose=T])
Detected that j uses these columns: a
Finding groups using forderv ... 0.234s elapsed (1.988s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.010s
elapsed (0.012s cpu)
lapply optimization is on, j unchanged as 'sum(a)'
GForce optimized j to 'gsum(a)'
Making each group and running j (GForce TRUE) ... gforce initial population of
grp took 0.021
gforce assign high and low took 0.037
This gsum took (narm=FALSE) ... gather took ... 0.024s
0.042s
gforce eval took 0.042
0.127s elapsed (1.200s cpu)
utilisateur système écoulé
3.200 0.676 0.374
If optimizations are turned off (datatable.optimize = 0
), sum
is called 1800000 times because dt
has 1800000 groups. In this example, GForce_ is 15-20x faster than R base sum function.
> options(datatable.optimize = 0)
> system.time(dt[, sum(a), by=id, verbose=T])
Detected that j uses these columns: a
Finding groups using forderv ... 0.233s elapsed (1.904s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.010s elapsed (0.008s cpu)
All optimizations are turned off
Making each group and running j (GForce FALSE) ...
memcpy contiguous groups took 1.070s for 1800000 groups
eval(j) took 1.817s for 1800000 calls
4.869s elapsed (2.924s cpu)
utilisateur système écoulé
4.840 2.472 5.114
This example shows that it’s important to avoid calling function for each by group.
na_fill function¶
I often work with date and state data. For a variable V1 and 2 ids, I have this kind of table:
id date V1
id1 2001-01-01 A
id1 2003-01-01 B
id1 2008-01-01 A
id1 2010-01-01 Z
id2 2004-01-01 A
id2 2011-01-01 Z
Sometimes there are missing values.
id date V1
id1 2001-01-01 A
id1 2003-01-01 B
id1 2004-01-01 .
id1 2005-01-01 .
id1 2008-01-01 A
id1 2010-01-01 Z
id2 2002-01-01 .
id2 2004-01-01 A
id2 2009-01-01 .
id2 2011-01-01 Z
In this case, missing values can be replaced by the last non missing value. It can be made with a nafill type function and the last observation carried forward (LOCF) option like that:
id date V1
id1 2001-01-01 A
id1 2003-01-01 B
id1 2004-01-01 B
id1 2005-01-01 B
id1 2008-01-01 A
id1 2010-01-01 Z
id2 2002-01-01 .
id2 2004-01-01 A
id2 2009-01-01 A
id2 2011-01-01 Z
3 or 4 years ago, I didn’t find a R package with such a function. There’s a
na.fill
function in packages zoo
but it focuses on numeric variables
and makes an interpolation.
This kind of function is not easy to write efficiently in R because the loop depends of the previous term,but it’s easy in C or C++. And there is a very impressive package which permits to write C++ functions in R : Rcpp.
Writing a na_fill function with Rcpp is not difficult. Here a version for LOCF:
#include <Rcpp.h>
using namespace Rcpp;
// [[Rcpp::export]]
StringVector Cna_fill_char(StringVector xx):
StringVector x = clone(xx);
R_xlen_t n = x.size();
for(R_xlen_t i = 1; i<n; i++) {
if(StringVector::is_na(x[i]) && !StringVector::is_na(x[i-1])) {
x[i] = x[i-1];
}
}
return x;
}
clone
instruction is mandatory to avoid inplace processing on xx. But on big tables, this can
be a good idea to avoid this step. So an inplace
parameter is added with a line like:
StringVector x = inplace ? xx : clone(xx);
Another improvement that can be done is to implement NOCB (next observation carried backward). Final implementation adds
a type
parameter with 3 values :
1 : LOCF
2 : NOCB
3 : LOCF then NOCB
Source:
// [[Rcpp::export]]
StringVector Cna_fill_char(StringVector xx, unsigned int type = 1, bool inplace = false) {
StringVector x = inplace ? xx : clone(xx);
R_xlen_t n = x.size();
if (type & 0b01) {
for(R_xlen_t i = 1; i<n; i++) {
if(StringVector::is_na(x[i]) && !StringVector::is_na(x[i-1])) {
x[i] = x[i-1];
}
}
}
if (type & 0b10) {
for(R_xlen_t i = n - 1; i>=0; i--) {
if(StringVector::is_na(x[i-1]) && !StringVector::is_na(x[i])) {
x[i-1] = x[i];
}
}
}
return x;
}
C++ (and C) is a typed language. This version has to be duplicated for other types, NumericVector
and IntegerVector
. And dispatch is made with a R function like this:
na_fill <- function(x, type = 1, inplace = F) {
if (is.object(x)) {
stop("x is not a primitive type", call. = FALSE)
}
if (is.integer(x))
Cna_fill_int(x, type, inplace)
else if (is.double(x))
Cna_fill_num(x, type, inplace)
else if (is.character(x))
Cna_fill_char(x, type, inplace)
else
stop("Unimplemented type : ", typeof(x))
}
Real data : ~ 2 millions ids and ~ 50 rows by id¶
Typical data tables I used are ~ 100M rows with ~ 2 millions ids and ~ 50 rows by id.
For example, let’s create a data.table like that:
ngrp <- 1800000
nbygrp <- 50
dt <- data.table(
id=rep(1:ngrp, each=nbygrp),
a=sample(1:100, nbygrp * ngrp, replace=T),
b=sample(LETTERS, nbygrp * ngrp, replace=T),
c=sample((1:100 + 0.5), nbygrp * ngrp, replace=T),
d=sample(1:100, nbygrp * ngrp, replace=T),
e=sample(1:100, nbygrp * ngrp, replace=T)
)
dt[(1:.N %% 2 == 0), a:=NA]
dt[(1:.N %% 3 == 0), b:=NA]
dt[(1:.N %% 4 == 0), c:=NA]
dt[(1:.N %% 5 == 0), d:=NA]
dt[(1:.N %% 6 == 0), e:=NA]
Now we call na_fill
by id
for columns a
to e
:
> vars = c("a", "b", "c", "d", "e")
> outvars <- paste0("z", vars)
> system.time(dt[, (outvars) := lapply(.SD, na_fill), by="id", .SDcols = vars, verbose=T])
Finding groups using forderv ... 0.249s elapsed (2.120s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.013s elapsed (0.016s cpu)
lapply optimization changed j from 'lapply(.SD, na_fill)' to 'list(na_fill(a), na_fill(b), na_fill(c), na_fill(d), na_fill(e))'
Old mean optimization is on, left j unchanged.
Making each group and running j (GForce FALSE) ...
memcpy contiguous groups took 1.621s for 1800000 groups
eval(j) took 55.646s for 1800000 calls
00:01:02 elapsed (59.5s cpu)
utilisateur système écoulé
61.676 3.252 62.511
Not bad. But not very fast. With the same data and variable a
, gsum takes 0.3 seconds. Why ?
Because of message eval(j) took 55.646s for 1800000 calls
. Once again, call is a costly operation
in R. What would be great is to use GForce with Rcpp na_fill function. But I can’t find a way.
In my understanding, GForce_ is very internal in data.table and GForce functions like gsum
, gmean
…
are C only functions, invisibles from R.
Another solution is to provide group information to Rcpp na_fill function. But it’s not an easy task and It can be very slow. But data.table gives a solution. In the verbose output, note this line:
Finding groups using forderv ... 0.249s elapsed (2.120s cpu)
What is this forderv
function which finds groups ? It’s not exported but is accessible with the :::
.
Let’s have a look:
> data.table:::forderv(dt, by="id", retGrp = T)
integer(0)
attr(,"starts")
[1] 1 51 101 151 201 251 301 351 401 451 501 551 601
I will discuss integer(0)
further. But attr starts
is very interesting: it’s a integer vector of
starting indices for each group. First id (1) starts in row 1, second id (2) starts in row 51 and so on.
But row of what table ? Here, returning integer(0) means that table was already sorted and starts
rows are rows of the original data.table
.
But what happens if table is not sorted on by variable. Let’s have a look:
> data.table:::forderv(dt, by="a", retGrp = T)
[1] 2 4 6 8 10 12 14 16 18 20 22
attr(,"starts")
[1] 1 45000001 45449955 45898729 46349428 46798821
Now, forderv
returns a table and starts gives row indices of this table. The first group
begins at rows 1 and ends at row 45000001 - 1 = 45000000 and the first indices
of data.table rows are 2, 4, 6…
This group is constitued of all the rows with NA value for variable a
.
Every even rows in data.table are NA and that’s why first indices of forderv
return table are 2, 4, 6…
Of course, forderv
is not easy to understand and that’s why it’s not exported. But, it was exactly what
I need to improve in my na_fill
function.
The idea is to pass forderv
return value, which contains all valuable information on by groups, to
the Rcpp function. In the source below, its the rows
parameter ; the other parameters are
the same as first na_fill
function.
The tricky part is to deal between R indices which are 1-indexed and C++ indices which are 0-indexed
and to correctly use rows
and starts
integer vectors to retrieve data.table rows.
// [[Rcpp::export]]
IntegerVector Cna_fill_int_grp(IntegerVector x, IntegerVector rows, unsigned int opt = 1, bool inplace = false) {
R_xlen_t n = x.size();
R_xlen_t nrows = rows.size();
if (nrows != 0 && n != rows.size())
stop("x and rows must have the same length");
if (!rows.hasAttribute("starts"))
stop("rows must have 'starts' attribute");
IntegerVector grps = rows.attr("starts");
R_xlen_t ngrps = grps.size();
IntegerVector ret = inplace ? x : clone(x);
for(int g=0; g<ngrps; g++) {
// for each group, loop goes from f to <l
R_xlen_t f = grps[g] - 1; // start indice of group g (C indice = R indice - 1)
R_xlen_t l = g == (ngrps - 1) ? n : grps[g + 1] - 1; // last indice (n if last group)
if (opt & 0b01) {
for(R_xlen_t i = f + 1; i < l; i++) {
R_xlen_t r = nrows == 0 ? i : rows[i] - 1;
R_xlen_t r1 = nrows == 0 ? i - 1 : rows[i - 1] - 1;
if(IntegerVector::is_na(ret[r]) && !IntegerVector::is_na(ret[r1])) {
ret[r] = ret[r1];
}
}
}
if (opt & 0b10) {
for(R_xlen_t i = l - 1; i > f; i--) {
R_xlen_t r = nrows == 0 ? i : rows[i] - 1;
R_xlen_t r1 = nrows == 0 ? i - 1 : rows[i - 1] - 1;
if(IntegerVector::is_na(ret[r1]) && !IntegerVector::is_na(ret[r])) {
ret[r1] = ret[r];
}
}
}
}
return ret;
}
Timings¶
na_fill_grp with new variables¶
> vars = c("a", "b", "c", "d", "e")
> outvars <- paste0("z", vars)
> system.time(dt[, (outvars):= na_fill_grp(dt, var=vars, by="id")])
utilisateur système écoulé
3.832 1.556 3.426
na_fill inplace¶
> dt1 <- copy(dt)
> system.time(dt1[, (vars) := lapply(.SD, function(x) na_fill(x, inplace=T)), by="id", .SDcols = vars, verbose=T])
Finding groups using forderv ... 0.233s elapsed (1.872s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.010s elapsed (0.012s cpu)
lapply optimization changed j from 'lapply(.SD, function(x) na_fill(x, inplace = T))' to 'list(..FUN1(a), ..FUN1(b), ..FUN1(c), ..FUN1(d), ..FUN1(e))'
Old mean optimization is on, left j unchanged.
Making each group and running j (GForce FALSE) ...
memcpy contiguous groups took 1.540s for 1800000 groups
eval(j) took 61.073s for 1800000 calls
00:01:06 elapsed (00:01:04 cpu)
utilisateur système écoulé
66.032 2.792 66.585
na_fill_grp inplace¶
> dt2 <- copy(dt)
> system.time(na_fill_grp(dt2, var=vars, by="id", inplace=T))
utilisateur système écoulé
2.612 0.288 1.090
> identical(dt1, dt2)
[1] TRUE
Using Rcpp function with a data.table like by-group processing gives a significative
improvment with na_fill (about 60x faster). It’s relatevely easy to adapt na_fill_grp
because the hardest part is the same for all functions : iterating through groups and rows.
Comparaison with new data.table nafill¶
Current data.table branch master implements a new nafill function. Currently, characters are not supported which is essential for me (character support is on the roadmap). It implements a set function for inplace operations.
Let’s bench inplace versions with non character variables with the default number of threads (14 in my case) and with only one thread to avoid forming openmp team of threads:
> getDTthreads()
[1] 14
> dt0 <- copy(dt)
> system.time(dt0[, (vars2):=setnafill(.SD, type="locf", cols=vars2), .SDcols=vars2, by="id"])
utilisateur système écoulé
905.860 27.588 67.498
> dt1 <- copy(dt)
> system.time(dt1[, (vars2) := lapply(.SD, function(x) na_fill(x, inplace=T)), by="id", .SDcols = vars2])
utilisateur système écoulé
49.804 2.620 50.381
> dt2 <- copy(dt)
> system.time(na_fill_grp(dt2, var=vars2, by="id", inplace=T))
utilisateur système écoulé
2.168 0.328 0.690
> identical(dt0, dt1)
[1] TRUE
> identical(dt1, dt2)
[1] TRUE
> setDTthreads(1)
> getDTthreads()
[1] 1
>
> vars2 = c("a", "c", "d", "e")
> dt0 <- copy(dt)
> system.time(dt0[, (vars2):=setnafill(.SD, type="locf", cols=vars2), .SDcols=vars2, by="id"])
utilisateur système écoulé
35.124 23.620 58.733
> dt1 <- copy(dt)
> system.time(dt1[, (vars2) := lapply(.SD, function(x) na_fill(x, inplace=T)), by="id", .SDcols = vars2])
utilisateur système écoulé
50.492 2.436 52.918
> dt2 <- copy(dt)
> system.time(na_fill_grp(dt2, var=vars2, by="id", inplace=T))
utilisateur système écoulé
1.304 0.184 1.489
> identical(dt0, dt1)
[1] TRUE
> identical(dt1, dt2)
[1] TRUE
setnafill
performance is close to Rcpp na_fill
above version and is not optimized
for by processing. Jan Gorecki, a data.table developper, suggests to set
thread to 1L : he’s right because nafill function benefits of this (58s vs
67s). Rcpp versions doesn’t benefit of using only one thread.
Comparaison with GForce¶
For example, here is an simple equivalent of gsum written with compact loop:
#include <Rcpp.h>
using namespace Rcpp;
// [[Rcpp::export]]
NumericVector Csum_grp(NumericVector x, IntegerVector rows) {
R_xlen_t n = x.size();
R_xlen_t nrows = rows.size();
if (nrows != 0 && n != rows.size())
stop("x and rows must have the same length");
if (!rows.hasAttribute("starts"))
stop("rows must have 'starts' attribute");
IntegerVector grps = rows.attr("starts");
R_xlen_t ngrps = grps.size();
NumericVector ret = NumericVector(n);
for(int g=0; g<ngrps; g++) {
double somme = 0;
for(R_xlen_t ii = grps[g] - 1, i = (nrows == 0 ? ii : rows[ii] - 1);
ii < (g == (ngrps - 1) ? n : grps[g + 1] - 1);
ii++, i = (nrows == 0 ? ii : rows[ii] - 1)) {
somme += x[i];
}
for(R_xlen_t ii = grps[g] - 1, i = (nrows == 0 ? ii : rows[ii] - 1);
ii < (g == (ngrps - 1) ? n : grps[g + 1] - 1);
ii++, i = (nrows == 0 ? ii : rows[ii] - 1)) {
ret[i] = somme;
}
}
return ret;
}
The above version use multiple C ternary operators to check if nrows == 0 (case where data.table is already sorted). Michael Chirico, another data,table developper, suggests to shift nrows==0 branching as early as possible so you get more smaller loops.
Here is a second version:
// [[Rcpp::export]]
NumericVector Csum_grp2(NumericVector x, IntegerVector rows) {
R_xlen_t n = x.size();
R_xlen_t nrows = rows.size();
if (nrows != 0 && n != rows.size())
stop("x and rows must have the same length");
if (!rows.hasAttribute("starts"))
stop("rows must have 'starts' attribute");
IntegerVector grps = rows.attr("starts");
R_xlen_t ngrps = grps.size();
NumericVector ret = NumericVector(n);
for(int g=0; g<ngrps; g++) {
double somme = 0;
if (nrows == 0) {
for(R_xlen_t i = grps[g] - 1;
i < (g == (ngrps - 1) ? n : grps[g + 1] - 1);
i++) {
somme += x[i];
}
for(R_xlen_t i = grps[g] - 1;
i < (g == (ngrps - 1) ? n : grps[g + 1] - 1);
i++) {
ret[i] = somme;
}
} else {
for(R_xlen_t i = grps[g] - 1;
i < (g == (ngrps - 1) ? n : grps[g + 1] - 1);
i++) {
somme += x[rows[i] - 1];
}
for(R_xlen_t i = grps[g] - 1;
i < (g == (ngrps - 1) ? n : grps[g + 1] - 1);
i++) {
ret[rows[i] - 1] = somme;
}
}
}
return ret;
}
Csum_grp
and Csum_grp2
can be compared with gsum
with default
number of threads and one thread.
With sorted variable id
:
> setDTthreads()
> print(getDTthreads())
[1] 14
> system.time(dt[, sa := Csum_grp(c, group(.SD, "id"))])
user system elapsed
2.004 0.464 0.643
> system.time(dt[, sb := Csum_grp2(c, group(.SD, "id")), verbose=T])
Assigning to all 90000000 rows
RHS_list_of_columns == false
Direct plonk of unnamed RHS, no copy.
user system elapsed
1.944 0.424 0.589
> system.time(dt[, sc := sum(c), by=id, verbose=T])
Detected that j uses these columns: c
Finding groups using forderv ... 0.210s elapsed (1.652s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.012s elapsed (0.012s cpu)
lapply optimization is on, j unchanged as 'sum(c)'
Old mean optimization is on, left j unchanged.
Making each group and running j (GForce FALSE) ...
memcpy contiguous groups took 0.980s for 1800000 groups
eval(j) took 1.373s for 1800000 calls
4.827s elapsed (3.084s cpu)
user system elapsed
4.748 2.336 5.051
> system.time(dt[, sum(c), by=id, verbose=T])
Detected that j uses these columns: c
Finding groups using forderv ... 0.209s elapsed (1.684s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.012s elapsed (0.012s cpu)
lapply optimization is on, j unchanged as 'sum(c)'
GForce optimized j to 'gsum(c)'
Making each group and running j (GForce TRUE) ... gforce initial population of grp took 0.020
gforce assign high and low took 0.060
This gsum took (narm=FALSE) ... gather took ... 0.037s
0.055s
gforce eval took 0.055
0.161s elapsed (1.476s cpu)
user system elapsed
3.172 0.828 0.384
> setDTthreads(1)
> print(getDTthreads())
[1] 1
> system.time(dt[, sa := Csum_grp(c, group(.SD, "id"))])
user system elapsed
1.116 0.296 1.415
> system.time(dt[, sb := Csum_grp2(c, group(.SD, "id")), verbose=T])
Assigning to all 90000000 rows
RHS_list_of_columns == false
Direct plonk of unnamed RHS, no copy.
user system elapsed
1.188 0.276 1.462
> system.time(dt[, sc := sum(c), by=id, verbose=T])
Detected that j uses these columns: sc,c
Finding groups using forderv ... 1.049s elapsed (0.896s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.011s elapsed (0.008s cpu)
lapply optimization is on, j unchanged as 'sum(c)'
Old mean optimization is on, left j unchanged.
Making each group and running j (GForce FALSE) ...
memcpy contiguous groups took 1.100s for 1800000 groups
eval(j) took 1.444s for 1800000 calls
4.957s elapsed (2.924s cpu)
user system elapsed
3.832 2.188 6.018
> system.time(dt[, sum(c), by=id, verbose=T])
Detected that j uses these columns: c
Finding groups using forderv ... 0.974s elapsed (0.860s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.010s elapsed (0.008s cpu)
lapply optimization is on, j unchanged as 'sum(c)'
GForce optimized j to 'gsum(c)'
Making each group and running j (GForce TRUE) ... gforce initial population of grp took 0.133
gforce assign high and low took 0.441
This gsum took (narm=FALSE) ... gather took ... 0.374s
0.533s
gforce eval took 0.533
1.131s elapsed (0.796s cpu)
user system elapsed
1.668 0.448 2.117
> print(dt[sa!=sb])
Empty data.table (0 rows and 9 cols): id,a,b,c,d,e...
> print(dt[sb!=sc])
Empty data.table (0 rows and 9 cols): id,a,b,c,d,e...
With unsorted variable b
and 26 groups:
> setDTthreads()
> print(getDTthreads())
[1] 14
> system.time(dt[, sa := Csum_grp(c, group(.SD, "b"))])
user system elapsed
3.000 0.540 2.119
> system.time(dt[, sb := Csum_grp2(c, group(.SD, "b")), verbose=T])
Assigning to all 90000000 rows
RHS_list_of_columns == false
Direct plonk of unnamed RHS, no copy.
user system elapsed
3.008 0.512 2.127
> system.time(dt[, sc := sum(c), by=b, verbose=T])
Detected that j uses these columns: sc,c
Finding groups using forderv ... 0.163s elapsed (1.136s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.000s elapsed (0.000s cpu)
Getting back original order ... 0.000s elapsed (0.000s cpu)
lapply optimization is on, j unchanged as 'sum(c)'
Old mean optimization is on, left j unchanged.
Making each group and running j (GForce FALSE) ...
collecting discontiguous groups took 2.086s for 26 groups
eval(j) took 0.103s for 26 calls
2.599s elapsed (2.864s cpu)
user system elapsed
4.072 0.472 2.772
> system.time(dt[, sum(c), by=id, verbose=T])
Detected that j uses these columns: c
Finding groups using forderv ... 0.208s elapsed (1.664s cpu)
Finding group sizes from the positions (can be avoided to save RAM) ... 0.010s elapsed (0.012s cpu)
lapply optimization is on, j unchanged as 'sum(c)'
GForce optimized j to 'gsum(c)'
Making each group and running j (GForce TRUE) ... gforce initial population of grp took 0.021
gforce assign high and low took 0.060
This gsum took (narm=FALSE) ... gather took ... 0.037s
0.051s
gforce eval took 0.051
0.155s elapsed (1.428s cpu)
user system elapsed
3.108 0.820 0.376
Somme comments about results:
When using
:=
with a by statement, gforce is not used so the timings are better with Rcpp version (1.5s vs 6.0s).Timings are close between Rcpp/forderv and gforce/gsum. Of course, there is no interest in replacing GForce functions. But it’s a good news because it gives a way to have equivalent of GForce performance with the ease of use of customiized functions written in Rcpp.
Version with
nrows==0
are slightly better but need two distinct loopsWhen setting nthread to 1,
forderv
takes more time (1.0s vs 0.2s) because it’s no more multithreaded. So global timings are quite the same. for this kind of calculation, multithreading is not a decisive advantage.With small number of groups, gforce is the fastest (0.4s vs 2.1s)
Many thanks to Jan Gorecki and Micheal Chirico for reviews on this article.