-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstata_to_db_batch_read.R
73 lines (57 loc) · 2.18 KB
/
stata_to_db_batch_read.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
library("pacman")
p_load(readstata13,
glue,
dplyr,
purrr,
duckdb,
here)
#Load common functions
source(here("functions","00_common_functions.R"))
#Set folders - Add
rawdata <- folder_reference_factory("/share/raw_siab")
dbfolder <- folder_reference_factory("/share/duckdb/")
#Setup an empty database
con <- dbConnect(duckdb(), dbdir = dbfolder("siab.duckdb"), read_only = FALSE)
# Close DuckDB connection
dbDisconnect(con, shutdown = TRUE)
# Convert the base data to a DuckDB database
convert_to_duckdb <- function(.siab_file, .batch_size) {
cat("Generate batches from persnr column of siab \n")
# Read the persnr column of the STATA file and get row splits along the batch-size that are clean splits between persnr
read_rows <- .siab_file %>%
read.dta13(convert.factors = FALSE, select.cols = "persnr") %>%
group_by(persnr) %>%
count() %>%
ungroup() %>%
mutate(a = cumsum(n),
r = floor(a / .batch_size)+1) %>%
group_by(r) %>%
summarise(max_r = max(a)) %>%
mutate(min_r = lag(max_r) + 1L,
min_r = if_else(is.na(min_r), 1L, min_r))
invisible(gc())
num_batches <- read_rows %>% ungroup() %>% pull(r) %>% max()
glue(" -> Read data in {num_batches} batches \n") %>% cat("\n")
# Convert a batch of the file to DuckDB
convert_batch_to_duckdb <- function(r, min_r, max_r) {
glue("Uploading batch {r}/{num_batches} to DuckDB\n") %>% cat("\n")
batch_data <- read.dta13(file = .siab_file,
convert.factors = FALSE,
convert.dates = TRUE, select.rows = c(min_r, max_r)) %>%
mutate(pn_batch=r)
# Open DuckDB connection
con <- dbConnect(duckdb::duckdb(), dbdir = dbfolder("siab.duckdb"), read_only = FALSE)
# Append batch data to DuckDB table
dbWriteTable(con, "orig", batch_data, append = TRUE)
# Close DuckDB connection
dbDisconnect(con, shutdown = TRUE)
# Apparently this is needed
invisible(gc())
}
# Convert all batches
read_rows %>%
pwalk(convert_batch_to_duckdb)
}
#Convert a SIAB SUF to a duckdb database object
rawdata("siab_r_7514_v1.dta") %>%
convert_to_duckdb(3000000)