-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy pathsql_api.rs
167 lines (136 loc) · 5.34 KB
/
sql_api.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use datafusion::prelude::*;
use tempfile::TempDir;
#[tokio::test]
async fn unsupported_ddl_returns_error() {
// Verify SessionContext::with_sql_options errors appropriately
let ctx = SessionContext::new();
ctx.sql("CREATE TABLE test (x int)").await.unwrap();
// disallow ddl
let options = SQLOptions::new().with_allow_ddl(false);
let sql = "CREATE VIEW test_view AS SELECT * FROM test";
let df = ctx.sql_with_options(sql, options).await;
assert_eq!(
df.unwrap_err().strip_backtrace(),
"Error during planning: DDL not supported: CreateView"
);
// allow ddl
let options = options.with_allow_ddl(true);
ctx.sql_with_options(sql, options).await.unwrap();
}
#[tokio::test]
async fn unsupported_dml_returns_error() {
let ctx = SessionContext::new();
ctx.sql("CREATE TABLE test (x int)").await.unwrap();
let options = SQLOptions::new().with_allow_dml(false);
let sql = "INSERT INTO test VALUES (1)";
let df = ctx.sql_with_options(sql, options).await;
assert_eq!(
df.unwrap_err().strip_backtrace(),
"Error during planning: DML not supported: Insert Into"
);
let options = options.with_allow_dml(true);
ctx.sql_with_options(sql, options).await.unwrap();
}
#[tokio::test]
async fn dml_output_schema() {
use arrow::datatypes::Schema;
use arrow::datatypes::{DataType, Field};
let ctx = SessionContext::new();
ctx.sql("CREATE TABLE test (x int)").await.unwrap();
let sql = "INSERT INTO test VALUES (1)";
let df = ctx.sql(sql).await.unwrap();
let count_schema = Schema::new(vec![Field::new("count", DataType::UInt64, false)]);
assert_eq!(Schema::from(df.schema()), count_schema);
}
#[tokio::test]
async fn unsupported_copy_returns_error() {
let tmpdir = TempDir::new().unwrap();
let tmpfile = tmpdir.path().join("foo.parquet");
let ctx = SessionContext::new();
ctx.sql("CREATE TABLE test (x int)").await.unwrap();
let options = SQLOptions::new().with_allow_dml(false);
let sql = format!(
"COPY (values(1)) TO '{}' STORED AS parquet",
tmpfile.to_string_lossy()
);
let df = ctx.sql_with_options(&sql, options).await;
assert_eq!(
df.unwrap_err().strip_backtrace(),
"Error during planning: DML not supported: COPY"
);
let options = options.with_allow_dml(true);
ctx.sql_with_options(&sql, options).await.unwrap();
}
#[tokio::test]
async fn unsupported_statement_returns_error() {
let ctx = SessionContext::new();
ctx.sql("CREATE TABLE test (x int)").await.unwrap();
let options = SQLOptions::new().with_allow_statements(false);
let sql = "set datafusion.execution.batch_size = 5";
let df = ctx.sql_with_options(sql, options).await;
assert_eq!(
df.unwrap_err().strip_backtrace(),
"Error during planning: Statement not supported: SetVariable"
);
let options = options.with_allow_statements(true);
ctx.sql_with_options(sql, options).await.unwrap();
}
#[tokio::test]
async fn empty_statement_returns_error() {
let ctx = SessionContext::new();
ctx.sql("CREATE TABLE test (x int)").await.unwrap();
let state = ctx.state();
// Give it an empty string which contains no statements
let plan_res = state.create_logical_plan("").await;
assert_eq!(
plan_res.unwrap_err().strip_backtrace(),
"Error during planning: No SQL statements were provided in the query string"
);
}
#[tokio::test]
async fn multiple_statements_returns_error() {
let ctx = SessionContext::new();
ctx.sql("CREATE TABLE test (x int)").await.unwrap();
let state = ctx.state();
// Give it a string that contains multiple statements
let plan_res = state
.create_logical_plan(
"INSERT INTO test (x) VALUES (1); INSERT INTO test (x) VALUES (2)",
)
.await;
assert_eq!(
plan_res.unwrap_err().strip_backtrace(),
"This feature is not implemented: The context currently only supports a single SQL statement"
);
}
#[tokio::test]
async fn ddl_can_not_be_planned_by_session_state() {
let ctx = SessionContext::new();
// make a table via SQL
ctx.sql("CREATE TABLE test (x int)").await.unwrap();
let state = ctx.state();
// can not create a logical plan for catalog DDL
let sql = "DROP TABLE test";
let plan = state.create_logical_plan(sql).await.unwrap();
let physical_plan = state.create_physical_plan(&plan).await;
assert_eq!(
physical_plan.unwrap_err().strip_backtrace(),
"This feature is not implemented: Unsupported logical plan: DropTable"
);
}