-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathlib.rs
228 lines (194 loc) · 6.63 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
//! DataFusion User Defined Functions (UDF/ UDAF) for IOx
#![warn(missing_docs)]
#![allow(unreachable_pub)]
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
use datafusion::{
execution::FunctionRegistry,
prelude::{lit, Expr, SessionContext},
};
use group_by::WindowDuration;
use window::EncodedWindowDuration;
pub mod coalesce_struct;
/// Grouping by structs
pub mod group_by;
/// Regular Expressions
mod regex;
/// Selector Functions
pub mod selectors;
/// Sleep function.
mod sleep;
/// window_bounds expressions
mod window;
pub mod gapfill;
/// Function registry
mod registry;
mod to_timestamp;
pub use crate::regex::clean_non_meta_escapes;
pub use crate::regex::REGEX_MATCH_UDF_NAME;
pub use crate::regex::REGEX_NOT_MATCH_UDF_NAME;
pub use crate::sleep::SLEEP_UDF_NAME;
/// Return an Expr that invokes a InfluxRPC compatible regex match to
/// determine which values satisfy the pattern. Equivalent to:
///
/// ```text
/// col ~= /pattern/
/// ```
pub fn regex_match_expr(input: Expr, pattern: String) -> Expr {
registry()
.udf(regex::REGEX_MATCH_UDF_NAME)
.expect("RegexMatch function not registered")
.call(vec![input, lit(pattern)])
}
/// Return an Expr that invokes a InfluxRPC compatible regex match to
/// determine which values do not satisfy the pattern. Equivalent to:
///
/// ```text
/// col !~ /pattern/
/// ```
pub fn regex_not_match_expr(input: Expr, pattern: String) -> Expr {
registry()
.udf(regex::REGEX_NOT_MATCH_UDF_NAME)
.expect("NotRegexMatch function not registered")
.call(vec![input, lit(pattern)])
}
/// Create a DataFusion `Expr` that invokes `window_bounds` with the
/// appropriate every and offset arguments at runtime
pub fn make_window_bound_expr(
time_arg: Expr,
every: WindowDuration,
offset: WindowDuration,
) -> Expr {
let encoded_every: EncodedWindowDuration = every.into();
let encoded_offset: EncodedWindowDuration = offset.into();
registry()
.udf(window::WINDOW_BOUNDS_UDF_NAME)
.expect("WindowBounds function not registered")
.call(vec![
time_arg,
lit(encoded_every.ty),
lit(encoded_every.field1),
lit(encoded_every.field2),
lit(encoded_offset.ty),
lit(encoded_offset.field1),
lit(encoded_offset.field2),
])
}
/// Return an [`FunctionRegistry`] with the implementations of IOx UDFs and the datafusion-provided UDFs
pub fn registry() -> &'static dyn FunctionRegistry {
registry::instance()
}
/// Return an [`FunctionRegistry`] with the implementations of IOx UDFs only
fn registry_iox_udfs() -> &'static dyn FunctionRegistry {
registry::instance_iox()
}
/// registers iox scalar functions into the [`SessionContext`] so they can be invoked via SQL
pub fn register_iox_scalar_functions(ctx: &SessionContext) {
let registry = registry_iox_udfs();
for f in registry.udfs() {
let udf = registry.udf(&f).unwrap();
ctx.register_udf(udf.as_ref().clone())
}
}
#[cfg(test)]
mod test {
use arrow::{
array::{ArrayRef, StringArray, TimestampNanosecondArray},
record_batch::RecordBatch,
};
use datafusion::{assert_batches_eq, prelude::col};
use schema::TIME_DATA_TIMEZONE;
use std::sync::Arc;
use super::*;
/// plumbing test to validate registry is connected. functions are
/// tested more thoroughly in their own modules
#[tokio::test]
async fn test_regex_match_expr() {
let batch = RecordBatch::try_from_iter(vec![(
"data",
Arc::new(StringArray::from(vec!["Foo", "Bar", "FooBar"])) as ArrayRef,
)])
.unwrap();
let ctx = SessionContext::new();
ctx.register_batch("t", batch).unwrap();
let result = ctx
.table("t")
.await
.unwrap()
.filter(regex_match_expr(col("data"), "Foo".into()))
.unwrap()
.collect()
.await
.unwrap();
let expected = vec![
"+--------+",
"| data |",
"+--------+",
"| Foo |",
"| FooBar |",
"+--------+",
];
assert_batches_eq!(&expected, &result);
}
/// plumbing test to validate registry is connected. functions are
/// tested more thoroughly in their own modules
#[tokio::test]
async fn test_regex_not_match_expr() {
let batch = RecordBatch::try_from_iter(vec![(
"data",
Arc::new(StringArray::from(vec!["Foo", "Bar", "FooBar"])) as ArrayRef,
)])
.unwrap();
let ctx = SessionContext::new();
ctx.register_batch("t", batch).unwrap();
let result = ctx
.table("t")
.await
.unwrap()
.filter(regex_not_match_expr(col("data"), "Foo".into()))
.unwrap()
.collect()
.await
.unwrap();
let expected = vec!["+------+", "| data |", "+------+", "| Bar |", "+------+"];
assert_batches_eq!(&expected, &result);
}
/// plumbing test to validate registry is connected. functions are
/// tested more thoroughly in their own modules
#[tokio::test]
async fn test_make_window_bound_expr() {
let batch = RecordBatch::try_from_iter(vec![(
"time",
Arc::new(
TimestampNanosecondArray::from(vec![Some(1000), Some(2000)])
.with_timezone_opt(TIME_DATA_TIMEZONE()),
) as ArrayRef,
)])
.unwrap();
let each = WindowDuration::Fixed { nanoseconds: 100 };
let every = WindowDuration::Fixed { nanoseconds: 200 };
let ctx = SessionContext::new();
ctx.register_batch("t", batch).unwrap();
let result = ctx
.table("t")
.await
.unwrap()
.select(vec![
col("time"),
make_window_bound_expr(col("time"), each, every).alias("bound"),
])
.unwrap()
.collect()
.await
.unwrap();
let expected = vec![
"+-----------------------------+--------------------------------+",
"| time | bound |",
"+-----------------------------+--------------------------------+",
"| 1970-01-01T00:00:00.000001Z | 1970-01-01T00:00:00.000001100Z |",
"| 1970-01-01T00:00:00.000002Z | 1970-01-01T00:00:00.000002100Z |",
"+-----------------------------+--------------------------------+",
];
assert_batches_eq!(&expected, &result);
}
}