Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ZEPPELIN-6080] Add support for bq single region dataset query #4815

Merged
merged 5 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,13 @@ public class BigQueryInterpreter extends Interpreter {
static final String WAIT_TIME = "zeppelin.bigquery.wait_time";
static final String MAX_ROWS = "zeppelin.bigquery.max_no_of_rows";
static final String SQL_DIALECT = "zeppelin.bigquery.sql_dialect";
static final String REGION = "zeppelin.bigquery.region";

meharanjan318 marked this conversation as resolved.
Show resolved Hide resolved

private static String jobId = null;
private static String projectId = null;
private static String location = null;
meharanjan318 marked this conversation as resolved.
Show resolved Hide resolved


private static final List NO_COMPLETION = new ArrayList<>();
private Exception exceptionOnConnect;
Expand Down Expand Up @@ -227,6 +231,7 @@ private InterpreterResult executeSql(String sql) {
long wTime = Long.parseLong(getProperty(WAIT_TIME));
long maxRows = Long.parseLong(getProperty(MAX_ROWS));
String sqlDialect = getProperty(SQL_DIALECT, "").toLowerCase();
String region = getProperty(REGION, null);
Boolean useLegacySql;
switch (sqlDialect) {
case "standardsql":
Expand All @@ -241,7 +246,7 @@ private InterpreterResult executeSql(String sql) {
}
Iterator<GetQueryResultsResponse> pages;
try {
pages = run(sql, projId, wTime, maxRows, useLegacySql);
pages = run(sql, projId, wTime, maxRows, useLegacySql, region);
} catch (IOException ex) {
LOGGER.error(ex.getMessage());
return new InterpreterResult(Code.ERROR, ex.getMessage());
Expand All @@ -258,8 +263,9 @@ private InterpreterResult executeSql(String sql) {

//Function to run the SQL on bigQuery service
public static Iterator<GetQueryResultsResponse> run(final String queryString,
meharanjan318 marked this conversation as resolved.
Show resolved Hide resolved
final String projId, final long wTime, final long maxRows, Boolean useLegacySql)
throws IOException {
final String projId, final long wTime, final long maxRows,
Boolean useLegacySql, final String region)
throws IOException {
try {
LOGGER.info("Use legacy sql: {}", useLegacySql);
QueryResponse query;
Expand All @@ -275,6 +281,7 @@ public static Iterator<GetQueryResultsResponse> run(final String queryString,
GetQueryResults getRequest = service.jobs().getQueryResults(
projectId,
jobId);
if (region != null) getRequest = getRequest.setLocation(region);
meharanjan318 marked this conversation as resolved.
Show resolved Hide resolved
return getPages(getRequest);
} catch (IOException ex) {
throw ex;
Expand Down
7 changes: 7 additions & 0 deletions bigquery/src/main/resources/interpreter-setting.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@
"defaultValue": "",
"description": "BigQuery SQL dialect (standardSQL or legacySQL). If empty, query prefix like '#standardSQL' can be used.",
"type": "string"
},
"zeppelin.bigquery.region": {
"envName": null,
"propertyName": "zeppelin.bigquery.region",
"defaultValue": "",
"description": "Location of BigQuery dataset. Needed if it is a single-region dataset.",
"type": "string"
}
},
"editor": {
Expand Down