diff --git a/bigquery/pom.xml b/bigquery/pom.xml index 3a0c8ab64a7..a101374d877 100644 --- a/bigquery/pom.xml +++ b/bigquery/pom.xml @@ -76,6 +76,10 @@ guava ${guava.version} + + org.apache.commons + commons-lang3 + diff --git a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java index f80a933b6e6..c23bd228e71 100644 --- a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java +++ b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java @@ -37,6 +37,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.common.base.Function; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +91,7 @@ public class BigQueryInterpreter extends Interpreter { static final String WAIT_TIME = "zeppelin.bigquery.wait_time"; static final String MAX_ROWS = "zeppelin.bigquery.max_no_of_rows"; static final String SQL_DIALECT = "zeppelin.bigquery.sql_dialect"; + static final String REGION = "zeppelin.bigquery.region"; private static String jobId = null; private static String projectId = null; @@ -227,6 +229,7 @@ private InterpreterResult executeSql(String sql) { long wTime = Long.parseLong(getProperty(WAIT_TIME)); long maxRows = Long.parseLong(getProperty(MAX_ROWS)); String sqlDialect = getProperty(SQL_DIALECT, "").toLowerCase(); + String region = getProperty(REGION, null); Boolean useLegacySql; switch (sqlDialect) { case "standardsql": @@ -241,7 +244,7 @@ private InterpreterResult executeSql(String sql) { } Iterator pages; try { - pages = run(sql, projId, wTime, maxRows, useLegacySql); + pages = run(sql, projId, wTime, maxRows, useLegacySql, region); } catch (IOException ex) { LOGGER.error(ex.getMessage()); return new InterpreterResult(Code.ERROR, ex.getMessage()); @@ -258,8 +261,9 @@ private InterpreterResult executeSql(String sql) { //Function to run the SQL on bigQuery service public static Iterator run(final String queryString, - final String projId, final long wTime, final long maxRows, Boolean useLegacySql) - throws IOException { + final String projId, final long wTime, final long maxRows, + Boolean useLegacySql, final String region) + throws IOException { try { LOGGER.info("Use legacy sql: {}", useLegacySql); QueryResponse query; @@ -275,6 +279,9 @@ public static Iterator run(final String queryString, GetQueryResults getRequest = service.jobs().getQueryResults( projectId, jobId); + if (StringUtils.isNotBlank(region)) { + getRequest = getRequest.setLocation(region); + } return getPages(getRequest); } catch (IOException ex) { throw ex; diff --git a/bigquery/src/main/resources/interpreter-setting.json b/bigquery/src/main/resources/interpreter-setting.json index 8023bed1522..989cc375d96 100644 --- a/bigquery/src/main/resources/interpreter-setting.json +++ b/bigquery/src/main/resources/interpreter-setting.json @@ -31,6 +31,13 @@ "defaultValue": "", "description": "BigQuery SQL dialect (standardSQL or legacySQL). If empty, query prefix like '#standardSQL' can be used.", "type": "string" + }, + "zeppelin.bigquery.region": { + "envName": null, + "propertyName": "zeppelin.bigquery.region", + "defaultValue": "", + "description": "Location of BigQuery dataset. Needed if it is a single-region dataset.", + "type": "string" } }, "editor": { diff --git a/docs/interpreter/bigquery.md b/docs/interpreter/bigquery.md index 1a585437f8a..da696a74f2e 100644 --- a/docs/interpreter/bigquery.md +++ b/docs/interpreter/bigquery.md @@ -53,6 +53,11 @@ limitations under the License. BigQuery SQL dialect (standardSQL or legacySQL). If empty, [query prefix](https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql#sql-prefix) like '#standardSQL' can be used. + + zeppelin.bigquery.region + + BigQuery dataset region (Needed for single region dataset) +