Skip to content

Commit

Permalink
bigquery: clean up implementation of LoadSource
Browse files Browse the repository at this point in the history
Have insertJob take a single struct that contains both a job and a
media (io.Reader).

Change-Id: I7443bcb1ec760ce8a97e10883e36c42f9c17181a
Reviewed-on: https://code-review.googlesource.com/8970
Reviewed-by: Michael McGreevy <[email protected]>
  • Loading branch information
jba committed Oct 31, 2016
1 parent f9c9ec4 commit f7f94a2
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 38 deletions.
2 changes: 1 addition & 1 deletion bigquery/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,5 @@ func (c *Copier) Run(ctx context.Context) (*Job, error) {
}
job := &bq.Job{Configuration: &bq.JobConfiguration{Copy: conf}}
setJobRef(job, c.JobID, c.c.projectID)
return c.c.service.insertJob(ctx, job, c.c.projectID, nil)
return c.c.service.insertJob(ctx, c.c.projectID, &insertJobConf{job: job})
}
2 changes: 1 addition & 1 deletion bigquery/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,5 @@ func (e *Extractor) Run(ctx context.Context) (*Job, error) {
conf.PrintHeader = &f
}

return e.c.service.insertJob(ctx, job, e.c.projectID, nil)
return e.c.service.insertJob(ctx, e.c.projectID, &insertJobConf{job: job})
}
7 changes: 3 additions & 4 deletions bigquery/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ func NewReaderSource(r io.Reader) *ReaderSource {
return &ReaderSource{r: r}
}

func (r *ReaderSource) populateLoadConfig(conf *bq.JobConfigurationLoad) {
r.FileConfig.populateLoadConfig(conf)
func (r *ReaderSource) populateInsertJobConfForLoad(conf *insertJobConf) {
conf.media = r.r
r.FileConfig.populateLoadConfig(conf.job.Configuration.Load)
}

func (r *ReaderSource) reader() io.Reader { return r.r }

// FileConfig contains configuration options that pertain to files, typically
// text files that require interpretation to be used as a BigQuery table. A
// file may live in Google Cloud Storage (see GCSReference), or it may be
Expand Down
14 changes: 4 additions & 10 deletions bigquery/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@

package bigquery

import (
"io"

bq "google.golang.org/api/bigquery/v2"
)
import bq "google.golang.org/api/bigquery/v2"

// GCSReference is a reference to one or more Google Cloud Storage objects, which together constitute
// an input or output to a BigQuery operation.
Expand Down Expand Up @@ -57,13 +53,11 @@ const (
Gzip Compression = "GZIP"
)

func (gcs *GCSReference) populateLoadConfig(conf *bq.JobConfigurationLoad) {
conf.SourceUris = gcs.uris
gcs.FileConfig.populateLoadConfig(conf)
func (gcs *GCSReference) populateInsertJobConfForLoad(conf *insertJobConf) {
conf.job.Configuration.Load.SourceUris = gcs.uris
gcs.FileConfig.populateLoadConfig(conf.job.Configuration.Load)
}

func (gcs *GCSReference) reader() io.Reader { return nil }

func (gcs *GCSReference) externalDataConfig() bq.ExternalDataConfiguration {
conf := bq.ExternalDataConfiguration{
Compression: string(gcs.Compression),
Expand Down
21 changes: 9 additions & 12 deletions bigquery/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package bigquery

import (
"io"

"golang.org/x/net/context"
bq "google.golang.org/api/bigquery/v2"
)
Expand Down Expand Up @@ -53,8 +51,7 @@ type Loader struct {
// This package defines two LoadSources: GCSReference, for Google Cloud Storage
// objects, and ReaderSource, for data read from an io.Reader.
type LoadSource interface {
populateLoadConfig(*bq.JobConfigurationLoad)
reader() io.Reader
populateInsertJobConfForLoad(conf *insertJobConf)
}

// LoaderFrom returns a Loader which can be used to load data into a BigQuery table.
Expand All @@ -71,19 +68,19 @@ func (t *Table) LoaderFrom(src LoadSource) *Loader {

// Run initiates a load job.
func (l *Loader) Run(ctx context.Context) (*Job, error) {
conf := &bq.JobConfigurationLoad{
CreateDisposition: string(l.CreateDisposition),
WriteDisposition: string(l.WriteDisposition),
}
l.Src.populateLoadConfig(conf)
job := &bq.Job{
Configuration: &bq.JobConfiguration{
Load: conf,
Load: &bq.JobConfigurationLoad{
CreateDisposition: string(l.CreateDisposition),
WriteDisposition: string(l.WriteDisposition),
},
},
}
conf := &insertJobConf{job: job}
l.Src.populateInsertJobConfForLoad(conf)
setJobRef(job, l.JobID, l.c.projectID)

conf.DestinationTable = l.Dst.tableRefProto()
job.Configuration.Load.DestinationTable = l.Dst.tableRefProto()

return l.c.service.insertJob(ctx, job, l.c.projectID, l.Src.reader())
return l.c.service.insertJob(ctx, l.c.projectID, conf)
}
2 changes: 1 addition & 1 deletion bigquery/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (q *Query) Run(ctx context.Context) (*Job, error) {
setJobRef(job, q.JobID, q.client.projectID)

q.QueryConfig.populateJobQueryConfig(job.Configuration.Query)
j, err := q.client.service.insertJob(ctx, job, q.client.projectID, nil)
j, err := q.client.service.insertJob(ctx, q.client.projectID, &insertJobConf{job: job})
if err != nil {
return nil, err
}
Expand Down
15 changes: 10 additions & 5 deletions bigquery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
// of the generated BigQuery API.
type service interface {
// Jobs
insertJob(ctx context.Context, job *bq.Job, projectId string, r io.Reader) (*Job, error)
insertJob(ctx context.Context, projectId string, conf *insertJobConf) (*Job, error)
getJobType(ctx context.Context, projectId, jobID string) (jobType, error)
jobCancel(ctx context.Context, projectId, jobID string) error
jobStatus(ctx context.Context, projectId, jobID string) (*JobStatus, error)
Expand Down Expand Up @@ -93,10 +93,15 @@ func getPages(token string, getPage func(token string) (nextToken string, err er
}
}

func (s *bigqueryService) insertJob(ctx context.Context, job *bq.Job, projectID string, r io.Reader) (*Job, error) {
call := s.s.Jobs.Insert(projectID, job).Context(ctx)
if r != nil {
call.Media(r)
type insertJobConf struct {
job *bq.Job
media io.Reader
}

func (s *bigqueryService) insertJob(ctx context.Context, projectID string, conf *insertJobConf) (*Job, error) {
call := s.s.Jobs.Insert(projectID, conf.job).Context(ctx)
if conf.media != nil {
call.Media(conf.media)
}
res, err := call.Do()
if err != nil {
Expand Down
6 changes: 2 additions & 4 deletions bigquery/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package bigquery

import (
"io"

"golang.org/x/net/context"
bq "google.golang.org/api/bigquery/v2"
)
Expand All @@ -39,8 +37,8 @@ type testService struct {
service
}

func (s *testService) insertJob(ctx context.Context, job *bq.Job, projectID string, r io.Reader) (*Job, error) {
s.Job = job
func (s *testService) insertJob(ctx context.Context, projectID string, conf *insertJobConf) (*Job, error) {
s.Job = conf.job
return &Job{}, nil
}

Expand Down

0 comments on commit f7f94a2

Please sign in to comment.