From 5d33b1b75b5a5d06b86f1ab5914db485c7110ef6 Mon Sep 17 00:00:00 2001 From: Michael McGreevy Date: Fri, 7 Oct 2016 15:29:46 +1100 Subject: [PATCH] bigquery: Make Query more configurable and add a Run method. Change-Id: I05384196ef43489b3fd8d960e913bcb8c27bee44 Reviewed-on: https://code-review.googlesource.com/8154 Reviewed-by: Jonathan Amsterdam --- bigquery/bigquery.go | 33 ---- bigquery/copy.go | 7 +- bigquery/extract.go | 7 +- bigquery/integration_test.go | 9 +- bigquery/job.go | 15 ++ bigquery/legacy.go | 223 +++++++++++++++++++++- bigquery/load_test.go | 8 +- bigquery/query.go | 164 +++++++++++++++- bigquery/query_op.go | 161 ---------------- bigquery/query_test.go | 329 +++++++++++++++++++++++++++++--- bigquery/table.go | 32 +--- bigquery/utils_test.go | 2 +- examples/bigquery/query/main.go | 16 +- 13 files changed, 715 insertions(+), 291 deletions(-) delete mode 100644 bigquery/query_op.go diff --git a/bigquery/bigquery.go b/bigquery/bigquery.go index af98c584b17e..04c0715cf310 100644 --- a/bigquery/bigquery.go +++ b/bigquery/bigquery.go @@ -85,39 +85,6 @@ func (c *Client) Close() error { return nil } -// Query creates a query with string q. You may optionally set -// DefaultProjectID and DefaultDatasetID on the returned query before using it. -func (c *Client) Query(q string) *Query { - return &Query{Q: q, client: c} -} - -// Read submits a query for execution and returns the results via an Iterator. -// -// Read uses a temporary table to hold the results of the query job. -// -// For more control over how a query is performed, don't use this method but -// instead pass the Query as a Source to Client.Copy, and call Read on the -// resulting Job. -func (q *Query) Read(ctx context.Context, options ...ReadOption) (*Iterator, error) { - dest := &Table{} - job, err := q.client.Copy(ctx, dest, q, WriteTruncate) - if err != nil { - return nil, err - } - return job.Read(ctx, options...) -} - -// executeQuery submits a query for execution and returns the results via an Iterator. -func (c *Client) executeQuery(ctx context.Context, q *Query, options ...ReadOption) (*Iterator, error) { - dest := &Table{} - job, err := c.Copy(ctx, dest, q, WriteTruncate) - if err != nil { - return nil, err - } - - return c.Read(ctx, job, options...) -} - // Dataset creates a handle to a BigQuery dataset in the client's project. func (c *Client) Dataset(id string) *Dataset { return c.DatasetInProject(c.projectID, id) diff --git a/bigquery/copy.go b/bigquery/copy.go index 127f78964af3..e689ca8e7960 100644 --- a/bigquery/copy.go +++ b/bigquery/copy.go @@ -70,12 +70,7 @@ func (c *Copier) Run(ctx context.Context) (*Job, error) { }, } - if c.JobID != "" { - job.JobReference = &bq.JobReference{ - JobId: c.JobID, - ProjectId: c.c.projectID, - } - } + setJobRef(job, c.JobID, c.c.projectID) conf.DestinationTable = c.Dst.tableRefProto() for _, t := range c.Srcs { diff --git a/bigquery/extract.go b/bigquery/extract.go index 21638e9439a2..d743a4cf7f03 100644 --- a/bigquery/extract.go +++ b/bigquery/extract.go @@ -61,12 +61,7 @@ func (e *Extractor) Run(ctx context.Context) (*Job, error) { }, } - if e.JobID != "" { - job.JobReference = &bq.JobReference{ - JobId: e.JobID, - ProjectId: e.c.projectID, - } - } + setJobRef(job, e.JobID, e.c.projectID) e.Dst.customizeExtractDst(job.Configuration.Extract) e.Src.customizeExtractSrc(job.Configuration.Extract) diff --git a/bigquery/integration_test.go b/bigquery/integration_test.go index 8fc615b01d36..8aa42f4e1ca1 100644 --- a/bigquery/integration_test.go +++ b/bigquery/integration_test.go @@ -136,11 +136,10 @@ func TestIntegration(t *testing.T) { checkRead(table) // Query the table. - q := &Query{ - Q: "select name, num from t1", - DefaultProjectID: projID, - DefaultDatasetID: ds.id, - } + q := c.Query("select name, num from t1") + q.DefaultProjectID = projID + q.DefaultDatasetID = ds.id + checkRead(q) // Query the long way. diff --git a/bigquery/job.go b/bigquery/job.go index 1f1467f0dbd5..3580d8f98431 100644 --- a/bigquery/job.go +++ b/bigquery/job.go @@ -71,6 +71,21 @@ type JobStatus struct { Errors []*Error } +// setJobRef initializes job's JobReference if given a non-empty jobID. +// projectID must be non-empty. +func setJobRef(job *bq.Job, jobID, projectID string) { + if jobID == "" { + return + } + // We don't check whether projectID is empty; the server will return an + // error when it encounters the resulting JobReference. + + job.JobReference = &bq.JobReference{ + JobId: jobID, + ProjectId: projectID, + } +} + // jobOption is an Option which modifies a bq.Job proto. // This is used for configuring values that apply to all operations, such as setting a jobReference. type jobOption interface { diff --git a/bigquery/legacy.go b/bigquery/legacy.go index c72794898bd9..955c33452f0a 100644 --- a/bigquery/legacy.go +++ b/bigquery/legacy.go @@ -59,12 +59,30 @@ func (c *Client) Read(ctx context.Context, src ReadSource, options ...ReadOption case *Job: return src.Read(ctx, options...) case *Query: - // For compatibility, support Query values created by literal, rather - // than Client.Query. + // Query used not to contain a QueryConfig. By moving its + // top-level fields down into a QueryConfig field, we break + // code that uses a Query literal. If users make the minimal + // change to fix this (e.g. moving the "Q" field into a nested + // QueryConfig within the Query), they will end up with a Query + // that has no Client. It's preferable to make Read continue + // to work in this case too, at least until we delete Read + // completely. So we copy QueryConfig into a Query with an + // actual client. if src.client == nil { - src.client = c + src = &Query{ + client: c, + QueryConfig: src.QueryConfig, + Q: src.Q, + DefaultProjectID: src.DefaultProjectID, + DefaultDatasetID: src.DefaultDatasetID, + } } return src.Read(ctx, options...) + case *QueryConfig: + // For compatibility, support QueryConfig values created by literal, rather + // than Client.Query. + q := &Query{client: c, QueryConfig: *src} + return q.Read(ctx, options...) case *Table: return src.Read(ctx, options...) } @@ -194,14 +212,42 @@ func (opt ignoreUnknownValues) customizeLoad(conf *bq.JobConfigurationLoad) { conf.IgnoreUnknownValues = true } +// CreateDisposition returns an Option that specifies the TableCreateDisposition to use. +// Deprecated: use the CreateDisposition field in Query, CopyConfig or LoadConfig instead. +func CreateDisposition(disp TableCreateDisposition) Option { return disp } + +func (opt TableCreateDisposition) implementsOption() {} + +func (opt TableCreateDisposition) customizeCopy(conf *bq.JobConfigurationTableCopy) { + conf.CreateDisposition = string(opt) +} + func (opt TableCreateDisposition) customizeLoad(conf *bq.JobConfigurationLoad) { conf.CreateDisposition = string(opt) } +func (opt TableCreateDisposition) customizeQuery(conf *bq.JobConfigurationQuery) { + conf.CreateDisposition = string(opt) +} + +// WriteDisposition returns an Option that specifies the TableWriteDisposition to use. +// Deprecated: use the WriteDisposition field in Query, CopyConfig or LoadConfig instead. +func WriteDisposition(disp TableWriteDisposition) Option { return disp } + +func (opt TableWriteDisposition) implementsOption() {} + +func (opt TableWriteDisposition) customizeCopy(conf *bq.JobConfigurationTableCopy) { + conf.WriteDisposition = string(opt) +} + func (opt TableWriteDisposition) customizeLoad(conf *bq.JobConfigurationLoad) { conf.WriteDisposition = string(opt) } +func (opt TableWriteDisposition) customizeQuery(conf *bq.JobConfigurationQuery) { + conf.WriteDisposition = string(opt) +} + type extractOption interface { customizeExtract(conf *bq.JobConfigurationExtract) } @@ -299,6 +345,9 @@ func (c *Client) Copy(ctx context.Context, dst Destination, src Source, options return c.cp(ctx, dst, src, options) case *Query: return c.query(ctx, dst, src, options) + case *QueryConfig: + q := &Query{QueryConfig: *src} + return c.query(ctx, dst, q, options) } case *GCSReference: if src, ok := src.(*Table); ok { @@ -322,3 +371,171 @@ type Destination interface { type ReadSource interface { implementsReadSource() } + +type queryOption interface { + customizeQuery(conf *bq.JobConfigurationQuery) +} + +// DisableQueryCache returns an Option that prevents results being fetched from the query cache. +// If this Option is not used, results are fetched from the cache if they are available. +// The query cache is a best-effort cache that is flushed whenever tables in the query are modified. +// Cached results are only available when TableID is unspecified in the query's destination Table. +// For more information, see https://cloud.google.com/bigquery/querying-data#querycaching +// +// Deprecated: use Query.DisableQueryCache instead. +func DisableQueryCache() Option { return disableQueryCache{} } + +type disableQueryCache struct{} + +func (opt disableQueryCache) implementsOption() {} + +func (opt disableQueryCache) customizeQuery(conf *bq.JobConfigurationQuery) { + f := false + conf.UseQueryCache = &f +} + +// DisableFlattenedResults returns an Option that prevents results being flattened. +// If this Option is not used, results from nested and repeated fields are flattened. +// DisableFlattenedResults implies AllowLargeResults +// For more information, see https://cloud.google.com/bigquery/docs/data#nested +// Deprecated: use Query.DisableFlattenedResults instead. +func DisableFlattenedResults() Option { return disableFlattenedResults{} } + +type disableFlattenedResults struct{} + +func (opt disableFlattenedResults) implementsOption() {} + +func (opt disableFlattenedResults) customizeQuery(conf *bq.JobConfigurationQuery) { + f := false + conf.FlattenResults = &f + // DisableFlattenedResults implies AllowLargeResults + allowLargeResults{}.customizeQuery(conf) +} + +// AllowLargeResults returns an Option that allows the query to produce arbitrarily large result tables. +// The destination must be a table. +// When using this option, queries will take longer to execute, even if the result set is small. +// For additional limitations, see https://cloud.google.com/bigquery/querying-data#largequeryresults +// Deprecated: use Query.AllowLargeResults instead. +func AllowLargeResults() Option { return allowLargeResults{} } + +type allowLargeResults struct{} + +func (opt allowLargeResults) implementsOption() {} + +func (opt allowLargeResults) customizeQuery(conf *bq.JobConfigurationQuery) { + conf.AllowLargeResults = true +} + +// JobPriority returns an Option that causes a query to be scheduled with the specified priority. +// The default priority is InteractivePriority. +// For more information, see https://cloud.google.com/bigquery/querying-data#batchqueries +// Deprecated: use Query.Priority instead. +func JobPriority(priority string) Option { return jobPriority(priority) } + +type jobPriority string + +func (opt jobPriority) implementsOption() {} + +func (opt jobPriority) customizeQuery(conf *bq.JobConfigurationQuery) { + conf.Priority = string(opt) +} + +// MaxBillingTier returns an Option that sets the maximum billing tier for a Query. +// Queries that have resource usage beyond this tier will fail (without +// incurring a charge). If this Option is not used, the project default will be used. +// Deprecated: use Query.MaxBillingTier instead. +func MaxBillingTier(tier int) Option { return maxBillingTier(tier) } + +type maxBillingTier int + +func (opt maxBillingTier) implementsOption() {} + +func (opt maxBillingTier) customizeQuery(conf *bq.JobConfigurationQuery) { + tier := int64(opt) + conf.MaximumBillingTier = &tier +} + +// MaxBytesBilled returns an Option that limits the number of bytes billed for +// this job. Queries that would exceed this limit will fail (without incurring +// a charge). +// If this Option is not used, or bytes is < 1, the project default will be +// used. +// Deprecated: use Query.MaxBytesBilled instead. +func MaxBytesBilled(bytes int64) Option { return maxBytesBilled(bytes) } + +type maxBytesBilled int64 + +func (opt maxBytesBilled) implementsOption() {} + +func (opt maxBytesBilled) customizeQuery(conf *bq.JobConfigurationQuery) { + if opt >= 1 { + conf.MaximumBytesBilled = int64(opt) + } +} + +// QueryUseStandardSQL returns an Option that set the query to use standard SQL. +// The default setting is false (using legacy SQL). +// Deprecated: use Query.UseStandardSQL instead. +func QueryUseStandardSQL() Option { return queryUseStandardSQL{} } + +type queryUseStandardSQL struct{} + +func (opt queryUseStandardSQL) implementsOption() {} + +func (opt queryUseStandardSQL) customizeQuery(conf *bq.JobConfigurationQuery) { + conf.UseLegacySql = false + conf.ForceSendFields = append(conf.ForceSendFields, "UseLegacySql") +} + +func (c *Client) query(ctx context.Context, dst *Table, src *Query, options []Option) (*Job, error) { + job, options := initJobProto(c.projectID, options) + payload := &bq.JobConfigurationQuery{} + + dst.customizeQueryDst(payload) + + // QueryConfig now contains a Dst field. If it is set, it will override dst. + // This should not affect existing client code which does not set QueryConfig.Dst. + src.QueryConfig.customizeQuerySrc(payload) + + // For compatability, allow some legacy fields to be set directly on the query. + // TODO(jba): delete this code when deleting Client.Copy. + if src.Q != "" { + payload.Query = src.Q + } + if src.DefaultProjectID != "" || src.DefaultDatasetID != "" { + payload.DefaultDataset = &bq.DatasetReference{ + DatasetId: src.DefaultDatasetID, + ProjectId: src.DefaultProjectID, + } + } + // end of compatability code. + + for _, opt := range options { + o, ok := opt.(queryOption) + if !ok { + return nil, fmt.Errorf("option (%#v) not applicable to dst/src pair: dst: %T ; src: %T", opt, dst, src) + } + o.customizeQuery(payload) + } + + job.Configuration = &bq.JobConfiguration{ + Query: payload, + } + j, err := c.service.insertJob(ctx, job, c.projectID) + if err != nil { + return nil, err + } + j.isQuery = true + return j, nil +} + +// Read submits a query for execution and returns the results via an Iterator. +// Deprecated: Call Read on the Job returned by Query.Run instead. +func (q *Query) Read(ctx context.Context, options ...ReadOption) (*Iterator, error) { + job, err := q.Run(ctx) + if err != nil { + return nil, err + } + return job.Read(ctx, options...) +} diff --git a/bigquery/load_test.go b/bigquery/load_test.go index 0b46efc8e8d2..6f24d24393fb 100644 --- a/bigquery/load_test.go +++ b/bigquery/load_test.go @@ -278,14 +278,18 @@ func TestConfiguringLoader(t *testing.T) { want := defaultLoadJob() want.Configuration.Load.CreateDisposition = "CREATE_NEVER" want.Configuration.Load.WriteDisposition = "WRITE_TRUNCATE" + want.JobReference = &bq.JobReference{ + JobId: "ajob", + ProjectId: "project-id", + } loader := dst.LoaderFrom(src) loader.CreateDisposition = CreateNever loader.WriteDisposition = WriteTruncate + loader.JobID = "ajob" if _, err := loader.Run(context.Background()); err != nil { - t.Errorf("err calling Loader.Run: %v", err) - return + t.Fatalf("err calling Loader.Run: %v", err) } if !reflect.DeepEqual(s.Job, want) { t.Errorf("loading: got:\n%v\nwant:\n%v", s.Job, want) diff --git a/bigquery/query.go b/bigquery/query.go index 519dd232ce37..22c42d58fc7d 100644 --- a/bigquery/query.go +++ b/bigquery/query.go @@ -14,10 +14,21 @@ package bigquery -import bq "google.golang.org/api/bigquery/v2" +import ( + "golang.org/x/net/context" + bq "google.golang.org/api/bigquery/v2" +) + +// QueryConfig holds the configuration for a query job. +type QueryConfig struct { + // JobID is the ID to use for the query job. If this field is empty, a job ID + // will be automatically created. + JobID string + + // Dst is the table into which the results of the query will be written. + // If this field is nil, a temporary table will be created. + Dst *Table -// Query represents a query to be executed. Use Client.Query to create a query. -type Query struct { // The query to execute. See https://cloud.google.com/bigquery/query-reference for details. Q string @@ -30,14 +41,85 @@ type Query struct { // The map keys may be used as table names in the query string. TableDefinitions map[string]ExternalData + // CreateDisposition specifies the circumstances under which the destination table will be created. + // The default is CreateIfNeeded. + CreateDisposition TableCreateDisposition + + // WriteDisposition specifies how existing data in the destination table is treated. + // The default is WriteAppend. + WriteDisposition TableWriteDisposition + + // DisableQueryCache prevents results being fetched from the query cache. + // If this field is false, results are fetched from the cache if they are available. + // The query cache is a best-effort cache that is flushed whenever tables in the query are modified. + // Cached results are only available when TableID is unspecified in the query's destination Table. + // For more information, see https://cloud.google.com/bigquery/querying-data#querycaching + DisableQueryCache bool + + // DisableFlattenedResults prevents results being flattened. + // If this field is false, results from nested and repeated fields are flattened. + // DisableFlattenedResults implies AllowLargeResults + // For more information, see https://cloud.google.com/bigquery/docs/data#nested + DisableFlattenedResults bool + + // AllowLargeResults allows the query to produce arbitrarily large result tables. + // The destination must be a table. + // When using this option, queries will take longer to execute, even if the result set is small. + // For additional limitations, see https://cloud.google.com/bigquery/querying-data#largequeryresults + AllowLargeResults bool + + // Priority species the priority with which to schedule the query. + // The default priority is InteractivePriority. + // For more information, see https://cloud.google.com/bigquery/querying-data#batchqueries + Priority QueryPriority + + // MaxBillingTier sets the maximum billing tier for a Query. + // Queries that have resource usage beyond this tier will fail (without + // incurring a charge). If this field is zero, the project default will be used. + MaxBillingTier int + + // MaxBytesBilled limits the number of bytes billed for + // this job. Queries that would exceed this limit will fail (without incurring + // a charge). + // If this field is less than 1, the project default will be + // used. + MaxBytesBilled int64 + + // UseStandardSQL causes the query to use standard SQL. + // The default is false (using legacy SQL). + UseStandardSQL bool +} + +// QueryPriority species a priority with which a query is to be executed. +type QueryPriority string + +const ( + BatchPriority QueryPriority = "BATCH" + InteractivePriority QueryPriority = "INTERACTIVE" +) + +// A Query queries data from a BigQuery table. Use Client.Query to create a Query. +type Query struct { client *Client + QueryConfig + + // The query to execute. See https://cloud.google.com/bigquery/query-reference for details. + // Deprecated: use QueryConfig.Q instead. + Q string + + // DefaultProjectID and DefaultDatasetID specify the dataset to use for unqualified table names in the query. + // If DefaultProjectID is set, DefaultDatasetID must also be set. + DefaultProjectID string // Deprecated: use QueryConfig.DefaultProjectID instead. + DefaultDatasetID string // Deprecated: use QueryConfig.DefaultDatasetID instead. } -func (q *Query) implementsSource() {} +func (q *QueryConfig) implementsSource() {} +func (q *QueryConfig) implementsReadSource() {} +func (q *Query) implementsSource() {} func (q *Query) implementsReadSource() {} -func (q *Query) customizeQuerySrc(conf *bq.JobConfigurationQuery) { +func (q *QueryConfig) customizeQuerySrc(conf *bq.JobConfigurationQuery) { conf.Query = q.Q if len(q.TableDefinitions) > 0 { @@ -53,4 +135,76 @@ func (q *Query) customizeQuerySrc(conf *bq.JobConfigurationQuery) { ProjectId: q.DefaultProjectID, } } + + if tier := int64(q.MaxBillingTier); tier > 0 { + conf.MaximumBillingTier = &tier + } + conf.CreateDisposition = string(q.CreateDisposition) + conf.WriteDisposition = string(q.WriteDisposition) + conf.AllowLargeResults = q.AllowLargeResults + conf.Priority = string(q.Priority) + + f := false + if q.DisableQueryCache { + conf.UseQueryCache = &f + } + if q.DisableFlattenedResults { + conf.FlattenResults = &f + // DisableFlattenResults implies AllowLargeResults. + conf.AllowLargeResults = true + } + if q.MaxBytesBilled >= 1 { + conf.MaximumBytesBilled = q.MaxBytesBilled + } + if q.UseStandardSQL { + conf.UseLegacySql = false + conf.ForceSendFields = append(conf.ForceSendFields, "UseLegacySql") + } + + if q.Dst != nil { + q.Dst.customizeQueryDst(conf) + } +} + +// Query creates a query with string q. +// The returned Query may optionally be further configured before its Run method is called. +func (c *Client) Query(q string) *Query { + return &Query{ + client: c, + QueryConfig: QueryConfig{Q: q}, + } +} + +// Run initiates a query job. +func (q *Query) Run(ctx context.Context) (*Job, error) { + job := &bq.Job{ + Configuration: &bq.JobConfiguration{ + Query: &bq.JobConfigurationQuery{}, + }, + } + setJobRef(job, q.JobID, q.client.projectID) + + q.QueryConfig.customizeQuerySrc(job.Configuration.Query) + + // For compatability, allow some legacy fields to be set directly on the query. + // Even though Query.Run is new, it is called by Query.Read, which may have non-empty deprecated fields. + // TODO(jba): delete this code when deleting Client.Copy. + conf := job.Configuration.Query + if q.Q != "" { + conf.Query = q.Q + } + if q.DefaultProjectID != "" || q.DefaultDatasetID != "" { + conf.DefaultDataset = &bq.DatasetReference{ + DatasetId: q.DefaultDatasetID, + ProjectId: q.DefaultProjectID, + } + } + // end of compatability code. + + j, err := q.client.service.insertJob(ctx, job, q.client.projectID) + if err != nil { + return nil, err + } + j.isQuery = true + return j, nil } diff --git a/bigquery/query_op.go b/bigquery/query_op.go deleted file mode 100644 index 0a7098827bce..000000000000 --- a/bigquery/query_op.go +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright 2015 Google Inc. All Rights Reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package bigquery - -import ( - "fmt" - - "golang.org/x/net/context" - bq "google.golang.org/api/bigquery/v2" -) - -type queryOption interface { - customizeQuery(conf *bq.JobConfigurationQuery) -} - -// DisableQueryCache returns an Option that prevents results being fetched from the query cache. -// If this Option is not used, results are fetched from the cache if they are available. -// The query cache is a best-effort cache that is flushed whenever tables in the query are modified. -// Cached results are only available when TableID is unspecified in the query's destination Table. -// For more information, see https://cloud.google.com/bigquery/querying-data#querycaching -func DisableQueryCache() Option { return disableQueryCache{} } - -type disableQueryCache struct{} - -func (opt disableQueryCache) implementsOption() {} - -func (opt disableQueryCache) customizeQuery(conf *bq.JobConfigurationQuery) { - f := false - conf.UseQueryCache = &f -} - -// DisableFlattenedResults returns an Option that prevents results being flattened. -// If this Option is not used, results from nested and repeated fields are flattened. -// DisableFlattenedResults implies AllowLargeResults -// For more information, see https://cloud.google.com/bigquery/docs/data#nested -func DisableFlattenedResults() Option { return disableFlattenedResults{} } - -type disableFlattenedResults struct{} - -func (opt disableFlattenedResults) implementsOption() {} - -func (opt disableFlattenedResults) customizeQuery(conf *bq.JobConfigurationQuery) { - f := false - conf.FlattenResults = &f - // DisableFlattenedResults implies AllowLargeResults - allowLargeResults{}.customizeQuery(conf) -} - -// AllowLargeResults returns an Option that allows the query to produce arbitrarily large result tables. -// The destination must be a table. -// When using this option, queries will take longer to execute, even if the result set is small. -// For additional limitations, see https://cloud.google.com/bigquery/querying-data#largequeryresults -func AllowLargeResults() Option { return allowLargeResults{} } - -type allowLargeResults struct{} - -func (opt allowLargeResults) implementsOption() {} - -func (opt allowLargeResults) customizeQuery(conf *bq.JobConfigurationQuery) { - conf.AllowLargeResults = true -} - -// JobPriority returns an Option that causes a query to be scheduled with the specified priority. -// The default priority is InteractivePriority. -// For more information, see https://cloud.google.com/bigquery/querying-data#batchqueries -func JobPriority(priority string) Option { return jobPriority(priority) } - -type jobPriority string - -func (opt jobPriority) implementsOption() {} - -func (opt jobPriority) customizeQuery(conf *bq.JobConfigurationQuery) { - conf.Priority = string(opt) -} - -const ( - BatchPriority = "BATCH" - InteractivePriority = "INTERACTIVE" -) - -// MaxBillingTier returns an Option that sets the maximum billing tier for a Query. -// Queries that have resource usage beyond this tier will fail (without -// incurring a charge). If this Option is not used, the project default will be used. -func MaxBillingTier(tier int) Option { return maxBillingTier(tier) } - -type maxBillingTier int - -func (opt maxBillingTier) implementsOption() {} - -func (opt maxBillingTier) customizeQuery(conf *bq.JobConfigurationQuery) { - tier := int64(opt) - conf.MaximumBillingTier = &tier -} - -// MaxBytesBilled returns an Option that limits the number of bytes billed for -// this job. Queries that would exceed this limit will fail (without incurring -// a charge). -// If this Option is not used, or bytes is < 1, the project default will be -// used. -func MaxBytesBilled(bytes int64) Option { return maxBytesBilled(bytes) } - -type maxBytesBilled int64 - -func (opt maxBytesBilled) implementsOption() {} - -func (opt maxBytesBilled) customizeQuery(conf *bq.JobConfigurationQuery) { - if opt >= 1 { - conf.MaximumBytesBilled = int64(opt) - } -} - -// QueryUseStandardSQL returns an Option that set the query to use standard SQL. -// The default setting is false (using legacy SQL). -func QueryUseStandardSQL() Option { return queryUseStandardSQL{} } - -type queryUseStandardSQL struct{} - -func (opt queryUseStandardSQL) implementsOption() {} - -func (opt queryUseStandardSQL) customizeQuery(conf *bq.JobConfigurationQuery) { - conf.UseLegacySql = false - conf.ForceSendFields = append(conf.ForceSendFields, "UseLegacySql") -} - -func (c *Client) query(ctx context.Context, dst *Table, src *Query, options []Option) (*Job, error) { - job, options := initJobProto(c.projectID, options) - payload := &bq.JobConfigurationQuery{} - - dst.customizeQueryDst(payload) - src.customizeQuerySrc(payload) - - for _, opt := range options { - o, ok := opt.(queryOption) - if !ok { - return nil, fmt.Errorf("option (%#v) not applicable to dst/src pair: dst: %T ; src: %T", opt, dst, src) - } - o.customizeQuery(payload) - } - - job.Configuration = &bq.JobConfiguration{ - Query: payload, - } - j, err := c.service.insertJob(ctx, job, c.projectID) - if err != nil { - return nil, err - } - j.isQuery = true - return j, nil -} diff --git a/bigquery/query_test.go b/bigquery/query_test.go index 32dcb7f5eb4c..af7f22547408 100644 --- a/bigquery/query_test.go +++ b/bigquery/query_test.go @@ -42,7 +42,7 @@ func defaultQueryJob() *bq.Job { } } -func TestQuery(t *testing.T) { +func TestQueryWithOptions(t *testing.T) { s := &testService{} c := &Client{ projectID: "project-id", @@ -50,35 +50,10 @@ func TestQuery(t *testing.T) { } testCases := []struct { dst *Table - src *Query + src *QueryConfig options []Option want *bq.Job }{ - { - dst: c.Dataset("dataset-id").Table("table-id"), - src: defaultQuery, - want: defaultQueryJob(), - }, - { - dst: c.Dataset("dataset-id").Table("table-id"), - src: &Query{ - Q: "query string", - }, - want: func() *bq.Job { - j := defaultQueryJob() - j.Configuration.Query.DefaultDataset = nil - return j - }(), - }, - { - dst: &Table{}, - src: defaultQuery, - want: func() *bq.Job { - j := defaultQueryJob() - j.Configuration.Query.DestinationTable = nil - return j - }(), - }, { dst: &Table{ ProjectID: "project-id", @@ -166,9 +141,57 @@ func TestQuery(t *testing.T) { return j }(), }, + } + + for _, tc := range testCases { + // Only the old-style Client.Copy method can take options. + if _, err := c.Copy(context.Background(), tc.dst, tc.src, tc.options...); err != nil { + t.Errorf("err calling query: %v", err) + continue + } + if !reflect.DeepEqual(s.Job, tc.want) { + t.Errorf("querying: got:\n%v\nwant:\n%v", s.Job, tc.want) + } + } +} + +func TestQuery(t *testing.T) { + c := &Client{ + projectID: "project-id", + } + testCases := []struct { + dst *Table + src *QueryConfig + want *bq.Job + }{ + { + dst: c.Dataset("dataset-id").Table("table-id"), + src: defaultQuery, + want: defaultQueryJob(), + }, + { + dst: c.Dataset("dataset-id").Table("table-id"), + src: &QueryConfig{ + Q: "query string", + }, + want: func() *bq.Job { + j := defaultQueryJob() + j.Configuration.Query.DefaultDataset = nil + return j + }(), + }, + { + dst: &Table{}, + src: defaultQuery, + want: func() *bq.Job { + j := defaultQueryJob() + j.Configuration.Query.DestinationTable = nil + return j + }(), + }, { dst: c.Dataset("dataset-id").Table("table-id"), - src: &Query{ + src: &QueryConfig{ Q: "query string", TableDefinitions: map[string]ExternalData{ "atable": &GCSReference{ @@ -220,7 +243,255 @@ func TestQuery(t *testing.T) { } for _, tc := range testCases { - if _, err := c.Copy(context.Background(), tc.dst, tc.src, tc.options...); err != nil { + // Old-style: Client.Copy. + s := &testService{} + c.service = s + if _, err := c.Copy(context.Background(), tc.dst, tc.src); err != nil { + t.Errorf("err calling query: %v", err) + continue + } + if !reflect.DeepEqual(s.Job, tc.want) { + t.Errorf("querying: got:\n%v\nwant:\n%v", s.Job, tc.want) + } + + // New-style: Client.Query.Run. + s = &testService{} + c.service = s + query := c.Query("") + query.QueryConfig = *tc.src + query.Dst = tc.dst + if _, err := query.Run(context.Background()); err != nil { + t.Errorf("err calling query: %v", err) + continue + } + if !reflect.DeepEqual(s.Job, tc.want) { + t.Errorf("querying: got:\n%v\nwant:\n%v", s.Job, tc.want) + } + } +} + +func TestConfiguringQuery(t *testing.T) { + s := &testService{} + c := &Client{ + projectID: "project-id", + service: s, + } + + query := c.Query("q") + query.JobID = "ajob" + query.DefaultProjectID = "def-project-id" + query.DefaultDatasetID = "def-dataset-id" + // Note: Other configuration fields are tested in other tests above. + // A lot of that can be consolidated once Client.Copy is gone. + + want := &bq.Job{ + Configuration: &bq.JobConfiguration{ + Query: &bq.JobConfigurationQuery{ + Query: "q", + DefaultDataset: &bq.DatasetReference{ + ProjectId: "def-project-id", + DatasetId: "def-dataset-id", + }, + }, + }, + JobReference: &bq.JobReference{ + JobId: "ajob", + ProjectId: "project-id", + }, + } + + if _, err := query.Run(context.Background()); err != nil { + t.Fatalf("err calling Query.Run: %v", err) + } + if !reflect.DeepEqual(s.Job, want) { + t.Errorf("querying: got:\n%v\nwant:\n%v", s.Job, want) + } +} + +func TestDeprecatedFields(t *testing.T) { + // TODO(jba): delete this test once the deprecated top-level Query fields (e.g. "Q") have been removed. TestConfiguringQuery will suffice then. + s := &testService{} + c := &Client{ + projectID: "project-id", + service: s, + } + + query := c.Query("original query") + query.QueryConfig.DefaultProjectID = "original project id" + query.QueryConfig.DefaultDatasetID = "original dataset id" + + // Set deprecated fields. Should override the one in QueryConfig. + query.Q = "override query" + query.DefaultProjectID = "override project id" + query.DefaultDatasetID = "override dataset id" + + want := &bq.Job{ + Configuration: &bq.JobConfiguration{ + Query: &bq.JobConfigurationQuery{ + Query: "override query", + DefaultDataset: &bq.DatasetReference{ + ProjectId: "override project id", + DatasetId: "override dataset id", + }, + }, + }, + } + + if _, err := query.Run(context.Background()); err != nil { + t.Fatalf("err calling Query.Run: %v", err) + } + if !reflect.DeepEqual(s.Job, want) { + t.Errorf("querying: got:\n%v\nwant:\n%v", s.Job, want) + } + + // Clear deprecated fields. The ones in QueryConfig should now be used. + query.Q = "" + query.DefaultProjectID = "" + query.DefaultDatasetID = "" + + want = &bq.Job{ + Configuration: &bq.JobConfiguration{ + Query: &bq.JobConfigurationQuery{ + Query: "original query", + DefaultDataset: &bq.DatasetReference{ + ProjectId: "original project id", + DatasetId: "original dataset id", + }, + }, + }, + } + + if _, err := query.Run(context.Background()); err != nil { + t.Fatalf("err calling Query.Run: %v", err) + } + if !reflect.DeepEqual(s.Job, want) { + t.Errorf("querying: got:\n%v\nwant:\n%v", s.Job, want) + } + +} + +func TestBackwardsCompatabilityOfQuery(t *testing.T) { + // TODO(jba): delete this test once Queries can only be created via Client.Query. + c := &Client{ + projectID: "project-id", + } + testCases := []struct { + src interface{} + want *bq.Job + }{ + { + src: &Query{ + Q: "query string", + DefaultProjectID: "def-project-id", + DefaultDatasetID: "def-dataset-id", + }, + want: &bq.Job{ + Configuration: &bq.JobConfiguration{ + Query: &bq.JobConfigurationQuery{ + Query: "query string", + DefaultDataset: &bq.DatasetReference{ + ProjectId: "def-project-id", + DatasetId: "def-dataset-id", + }, + }, + }, + }, + }, + { + src: &QueryConfig{ + Q: "query string", + DefaultProjectID: "def-project-id", + DefaultDatasetID: "def-dataset-id", + }, + want: &bq.Job{ + Configuration: &bq.JobConfiguration{ + Query: &bq.JobConfigurationQuery{ + Query: "query string", + DefaultDataset: &bq.DatasetReference{ + ProjectId: "def-project-id", + DatasetId: "def-dataset-id", + }, + }, + }, + }, + }, + { + src: &Query{ + QueryConfig: QueryConfig{ + Q: "query string", + DefaultProjectID: "def-project-id", + DefaultDatasetID: "def-dataset-id", + }, + }, + want: &bq.Job{ + Configuration: &bq.JobConfiguration{ + Query: &bq.JobConfigurationQuery{ + Query: "query string", + DefaultDataset: &bq.DatasetReference{ + ProjectId: "def-project-id", + DatasetId: "def-dataset-id", + }, + }, + }, + }, + }, + { + src: func() *Query { + q := c.Query("query string") + q.DefaultProjectID = "def-project-id" + q.DefaultDatasetID = "def-dataset-id" + return q + }(), + want: &bq.Job{ + Configuration: &bq.JobConfiguration{ + Query: &bq.JobConfigurationQuery{ + Query: "query string", + DefaultDataset: &bq.DatasetReference{ + ProjectId: "def-project-id", + DatasetId: "def-dataset-id", + }, + }, + }, + }, + }, + { + src: func() *Query { + q := c.Query("query string") + q.QueryConfig.DefaultProjectID = "def-project-id" + q.QueryConfig.DefaultDatasetID = "def-dataset-id" + return q + }(), + want: &bq.Job{ + Configuration: &bq.JobConfiguration{ + Query: &bq.JobConfigurationQuery{ + Query: "query string", + DefaultDataset: &bq.DatasetReference{ + ProjectId: "def-project-id", + DatasetId: "def-dataset-id", + }, + }, + }, + }, + }, + } + + dst := &Table{} + for _, tc := range testCases { + // Old-style: Client.Copy. + s := &testService{} + c.service = s + if _, err := c.Copy(context.Background(), dst, tc.src.(Source)); err != nil { + t.Errorf("err calling query: %v", err) + continue + } + if !reflect.DeepEqual(s.Job, tc.want) { + t.Errorf("querying: got:\n%v\nwant:\n%v", s.Job, tc.want) + } + + // Old-style Client.Read. + s = &testService{} + c.service = s + if _, err := c.Read(context.Background(), tc.src.(ReadSource)); err != nil { t.Errorf("err calling query: %v", err) continue } diff --git a/bigquery/table.go b/bigquery/table.go index 410a487542fa..31160d48bbec 100644 --- a/bigquery/table.go +++ b/bigquery/table.go @@ -78,18 +78,6 @@ const ( CreateNever TableCreateDisposition = "CREATE_NEVER" ) -func CreateDisposition(disp TableCreateDisposition) Option { return disp } - -func (opt TableCreateDisposition) implementsOption() {} - -func (opt TableCreateDisposition) customizeCopy(conf *bq.JobConfigurationTableCopy) { - conf.CreateDisposition = string(opt) -} - -func (opt TableCreateDisposition) customizeQuery(conf *bq.JobConfigurationQuery) { - conf.CreateDisposition = string(opt) -} - // TableWriteDisposition specifies how existing data in a destination table is treated. // Default is WriteAppend. type TableWriteDisposition string @@ -107,18 +95,6 @@ const ( WriteEmpty TableWriteDisposition = "WRITE_EMPTY" ) -func WriteDisposition(disp TableWriteDisposition) Option { return disp } - -func (opt TableWriteDisposition) implementsOption() {} - -func (opt TableWriteDisposition) customizeCopy(conf *bq.JobConfigurationTableCopy) { - conf.WriteDisposition = string(opt) -} - -func (opt TableWriteDisposition) customizeQuery(conf *bq.JobConfigurationQuery) { - conf.WriteDisposition = string(opt) -} - // TableType is the type of table. type TableType string @@ -364,13 +340,7 @@ func (l *Loader) Run(ctx context.Context) (*Job, error) { }, }, } - - if l.JobID != "" { - job.JobReference = &bq.JobReference{ - JobId: l.JobID, - ProjectId: l.c.projectID, - } - } + setJobRef(job, l.JobID, l.c.projectID) l.Src.customizeLoadSrc(job.Configuration.Load) l.Dst.customizeLoadDst(job.Configuration.Load) diff --git a/bigquery/utils_test.go b/bigquery/utils_test.go index 10fd6c398130..d29fbc727698 100644 --- a/bigquery/utils_test.go +++ b/bigquery/utils_test.go @@ -25,7 +25,7 @@ func defaultGCS() *GCSReference { } } -var defaultQuery = &Query{ +var defaultQuery = &QueryConfig{ Q: "query string", DefaultProjectID: "def-project-id", DefaultDatasetID: "def-dataset-id", diff --git a/examples/bigquery/query/main.go b/examples/bigquery/query/main.go index ef496a8d2279..4ddcf4af3be5 100644 --- a/examples/bigquery/query/main.go +++ b/examples/bigquery/query/main.go @@ -56,19 +56,17 @@ func main() { log.Fatalf("Creating bigquery client: %v", err) } - d := &bigquery.Table{} - if *dest != "" { - d = client.Dataset(*dataset).Table(*dest) - } + query := client.Query(*q) + query.DefaultProjectID = *project + query.DefaultDatasetID = *dataset + query.WriteDisposition = bigquery.WriteTruncate - query := &bigquery.Query{ - Q: *q, - DefaultProjectID: *project, - DefaultDatasetID: *dataset, + if *dest != "" { + query.Dst = client.Dataset(*dataset).Table(*dest) } // Query data. - job, err := client.Copy(ctx, d, query, bigquery.WriteTruncate) + job, err := query.Run(ctx) if err != nil { log.Fatalf("Querying: %v", err)