-
Notifications
You must be signed in to change notification settings - Fork 39
/
Copy patharray_stream.c
223 lines (184 loc) · 8.48 KB
/
array_stream.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#define R_NO_REMAP
#include <R.h>
#include <Rinternals.h>
#include "array.h"
#include "array_stream.h"
#include "nanoarrow.h"
#include "schema.h"
#include "util.h"
// Ideally user-supplied finalizers are written in such a way that they don't jump;
// however if they do it is likely that memory will leak. Here, we use
// R_tryCatchError to minimize the chances of that happening.
static SEXP run_finalizer_wrapper(void* data) {
SEXP finalizer_sym = PROTECT(Rf_install("array_stream_finalizer"));
SEXP finalizer_call = PROTECT(Rf_lang1(finalizer_sym));
Rf_eval(finalizer_call, (SEXP)data);
UNPROTECT(2);
return R_NilValue;
}
static SEXP run_finalizer_error_handler(SEXP cond, void* hdata) {
REprintf("Error evaluating user-supplied array stream finalizer");
return R_NilValue;
}
static void run_user_array_stream_finalizer(SEXP array_stream_xptr) {
SEXP protected = PROTECT(R_ExternalPtrProtected(array_stream_xptr));
R_SetExternalPtrProtected(array_stream_xptr, R_NilValue);
if (Rf_inherits(protected, "nanoarrow_array_stream_finalizer")) {
R_tryCatchError(&run_finalizer_wrapper, protected, &run_finalizer_error_handler,
NULL);
}
UNPROTECT(1);
}
SEXP nanoarrow_c_array_stream_get_schema(SEXP array_stream_xptr) {
struct ArrowArrayStream* array_stream =
nanoarrow_array_stream_from_xptr(array_stream_xptr);
SEXP schema_xptr = PROTECT(nanoarrow_schema_owning_xptr());
struct ArrowSchema* schema = nanoarrow_output_schema_from_xptr(schema_xptr);
int result = ArrowArrayStreamGetSchema(array_stream, schema, NULL);
if (result != 0) {
Rf_error("array_stream->get_schema(): [%d] %s", result,
ArrowArrayStreamGetLastError(array_stream));
}
UNPROTECT(1);
return schema_xptr;
}
SEXP nanoarrow_c_array_stream_get_next(SEXP array_stream_xptr) {
struct ArrowArrayStream* array_stream =
nanoarrow_array_stream_from_xptr(array_stream_xptr);
SEXP array_xptr = PROTECT(nanoarrow_array_owning_xptr());
struct ArrowArray* array = nanoarrow_output_array_from_xptr(array_xptr);
int result = ArrowArrayStreamGetNext(array_stream, array, NULL);
if (result != NANOARROW_OK) {
Rf_error("array_stream->get_next(): [%d] %s", result,
ArrowArrayStreamGetLastError(array_stream));
}
UNPROTECT(1);
return array_xptr;
}
SEXP nanoarrow_c_basic_array_stream(SEXP batches_sexp, SEXP schema_xptr,
SEXP validate_sexp) {
int validate = LOGICAL(validate_sexp)[0];
// Schema needs a copy here because ArrowBasicArrayStreamInit() takes ownership
SEXP schema_copy_xptr = PROTECT(nanoarrow_schema_owning_xptr());
struct ArrowSchema* schema_copy = nanoarrow_output_schema_from_xptr(schema_copy_xptr);
schema_export(schema_xptr, schema_copy);
SEXP array_stream_xptr = PROTECT(nanoarrow_array_stream_owning_xptr());
struct ArrowArrayStream* array_stream =
nanoarrow_output_array_stream_from_xptr(array_stream_xptr);
int64_t n_arrays = Rf_xlength(batches_sexp);
if (ArrowBasicArrayStreamInit(array_stream, schema_copy, n_arrays) != NANOARROW_OK) {
Rf_error("Failed to initialize array stream");
}
struct ArrowArray array;
for (int64_t i = 0; i < n_arrays; i++) {
array_export(VECTOR_ELT(batches_sexp, i), &array);
ArrowBasicArrayStreamSetArray(array_stream, i, &array);
}
if (validate) {
struct ArrowError error;
if (ArrowBasicArrayStreamValidate(array_stream, &error) != NANOARROW_OK) {
Rf_error("ArrowBasicArrayStreamValidate(): %s", ArrowErrorMessage(&error));
}
}
UNPROTECT(2);
return array_stream_xptr;
}
SEXP nanoarrow_c_array_list_total_length(SEXP list_of_array_xptr) {
int64_t total_length = 0;
R_xlen_t num_chunks = Rf_xlength(list_of_array_xptr);
for (R_xlen_t i = 0; i < num_chunks; i++) {
struct ArrowArray* chunk =
(struct ArrowArray*)R_ExternalPtrAddr(VECTOR_ELT(list_of_array_xptr, i));
total_length += chunk->length;
}
return length_sexp_from_int64(total_length);
}
// Implementation of an ArrowArrayStream that keeps a dependent object valid
struct WrapperArrayStreamData {
SEXP parent_array_stream_xptr;
struct ArrowArrayStream* parent_array_stream;
};
static void finalize_wrapper_array_stream(struct ArrowArrayStream* array_stream) {
if (array_stream->private_data != NULL) {
struct WrapperArrayStreamData* data =
(struct WrapperArrayStreamData*)array_stream->private_data;
// Run the parent array stream release callback
data->parent_array_stream->release(data->parent_array_stream);
// If safe to do so, attempt to do an eager evaluation of a release
// callback that may have been registered. If it is not safe to do so,
// garbage collection will run any finalizers that have been set
// on the chain of environments leading up to the finalizer.
if (nanoarrow_is_main_thread()) {
run_user_array_stream_finalizer(data->parent_array_stream_xptr);
}
nanoarrow_release_sexp(data->parent_array_stream_xptr);
ArrowFree(array_stream->private_data);
}
array_stream->release = NULL;
}
static const char* wrapper_array_stream_get_last_error(
struct ArrowArrayStream* array_stream) {
struct WrapperArrayStreamData* data =
(struct WrapperArrayStreamData*)array_stream->private_data;
return data->parent_array_stream->get_last_error(data->parent_array_stream);
}
static int wrapper_array_stream_get_schema(struct ArrowArrayStream* array_stream,
struct ArrowSchema* out) {
struct WrapperArrayStreamData* data =
(struct WrapperArrayStreamData*)array_stream->private_data;
return data->parent_array_stream->get_schema(data->parent_array_stream, out);
}
static int wrapper_array_stream_get_next(struct ArrowArrayStream* array_stream,
struct ArrowArray* out) {
struct WrapperArrayStreamData* data =
(struct WrapperArrayStreamData*)array_stream->private_data;
return data->parent_array_stream->get_next(data->parent_array_stream, out);
}
void array_stream_export(SEXP parent_array_stream_xptr,
struct ArrowArrayStream* array_stream_copy) {
struct ArrowArrayStream* parent_array_stream =
nanoarrow_array_stream_from_xptr(parent_array_stream_xptr);
// If there is no dependent object, don't bother with this wrapper
SEXP dependent_sexp = R_ExternalPtrProtected(parent_array_stream_xptr);
if (dependent_sexp == R_NilValue) {
ArrowArrayStreamMove(parent_array_stream, array_stream_copy);
return;
}
// Allocate a new external pointer for an array stream (for consistency:
// we always move an array stream when exporting)
SEXP parent_array_stream_xptr_new = PROTECT(nanoarrow_array_stream_owning_xptr());
struct ArrowArrayStream* parent_array_stream_new =
(struct ArrowArrayStream*)R_ExternalPtrAddr(parent_array_stream_xptr_new);
ArrowArrayStreamMove(parent_array_stream, parent_array_stream_new);
R_SetExternalPtrProtected(parent_array_stream_xptr_new, dependent_sexp);
array_stream_copy->private_data = NULL;
array_stream_copy->get_last_error = &wrapper_array_stream_get_last_error;
array_stream_copy->get_schema = &wrapper_array_stream_get_schema;
array_stream_copy->get_next = &wrapper_array_stream_get_next;
array_stream_copy->release = &finalize_wrapper_array_stream;
struct WrapperArrayStreamData* data =
(struct WrapperArrayStreamData*)ArrowMalloc(sizeof(struct WrapperArrayStreamData));
check_trivial_alloc(data, "struct WrapperArrayStreamData");
data->parent_array_stream_xptr = parent_array_stream_xptr_new;
data->parent_array_stream = parent_array_stream_new;
array_stream_copy->private_data = data;
// Transfer responsibility for the stream_xptr to the C object
nanoarrow_preserve_sexp(parent_array_stream_xptr_new);
UNPROTECT(1);
}