Skip to content

Commit

Permalink
Add BigqueryLoadTask
Browse files Browse the repository at this point in the history
  • Loading branch information
hakobera committed May 30, 2016
1 parent 4555dee commit 625216f
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 8 deletions.
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ end

#### Usage

Copy `test.src_table` to `test.dest_table`.

```rb
task :task1, type: :bigquery_copy do
param_set :src_dataset_id, 'test'
Expand All @@ -77,14 +79,32 @@ task :task1, type: :bigquery_copy do
end
```

### Tumugi::Plugin::BigqueryLoadTask

`Tumugi::Plugin::BigqueryLoadTask` is task to load structured data from GCS into BigQuery.

#### Usage

Load `gs://test_bucket/load_data.csv` into `dest_project:dest_dataset.dest_table`

```rb
task :task1, type: :bigquery_load do
param_set :bucket, 'test_bucket'
param_set :key, 'load_data.csv'
param_set :project_id, 'dest_project'
param_set :datset_id, 'dest_dataset'
param_set :table_id, 'dest_table'
end
```

### Config Section

tumugi-plugin-bigquery provide config section named "bigquery" which can specified BigQuery autenticaion info.

#### Authenticate by client_email and private_key

```rb
Tumugi.config do |config|
Tumugi.configure do |config|
config.section("bigquery") do |section|
section.project_id = "xxx"
section.client_email = "[email protected]"
Expand Down
23 changes: 16 additions & 7 deletions lib/tumugi/plugin/bigquery/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,28 +185,37 @@ def query(sql, mode: :truncate,
end

def load(dataset_id, table_id, source_uris=nil,
schema: nil, delimiter: ",", field_delimiter: delimiter, mode: :append,
allow_jagged_rows: false, max_bad_records: 0,
schema: nil,
field_delimiter: ",",
mode: :append,
allow_jagged_rows: false,
max_bad_records: 0,
ignore_unknown_values: false,
allow_quoted_newlines: false,
quote: '"', skip_leading_rows: 0,
quote: '"',
skip_leading_rows: 0,
source_format: "CSV",
project_id: nil,
job_id: nil,
file: nil, wait: nil,
dry_run: false,
&blk)
@client.load(dataset_id, table_id, source_uris=source_uris,
schema: schema, delimiter: delimiter, field_delimiter: field_delimiter, mode: mode,
allow_jagged_rows: allow_jagged_rows, max_bad_records: max_bad_records,
schema: schema,
field_delimiter: field_delimiter,
mode: mode,
allow_jagged_rows: allow_jagged_rows,
max_bad_records: max_bad_records,
ignore_unknown_values: ignore_unknown_values,
allow_quoted_newlines: allow_quoted_newlines,
quote: quote, skip_leading_rows: skip_leading_rows,
quote: quote,
skip_leading_rows: skip_leading_rows,
source_format: source_format,
project_id: project_id || @project_id,
job_project_id: project_id || @project_id,
job_id: job_id,
file: file, wait: wait,
file: file,
wait: wait,
dry_run: dry_run,
&blk)
rescue Kura::ApiError => e
Expand Down
62 changes: 62 additions & 0 deletions lib/tumugi/plugin/task/bigquery_load.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
require 'tumugi'
require_relative '../target/bigquery_table'

module Tumugi
module Plugin
class BigqueryLoadTask < Tumugi::Task
Tumugi::Plugin.register_task('bigquery_load', self)

param :bucket, type: :string, required: true, auto_bind: false
param :key, type: :string, required: true, auto_bind: false
param :project_id, type: :string, auto_bind: false
param :dataset_id, type: :string, required: true, auto_bind: false
param :table_id, type: :string, required: true, auto_bind: false

param :schema, auto_bind: false # type: :array
param :field_delimiter, type: :string, auto_bind: false, default: ','
param :mode, type: :string, auto_bind: false, default: 'append' # truncate, empty
param :allow_jagged_rows, type: :bool, auto_bind: false, default: false
param :max_bad_records, type: :integer, auto_bind: false, default: 0
param :ignore_unknown_values, type: :bool, auto_bind: false, default: false
param :allow_quoted_newlines, type: :bool, auto_bind: false, default: false
param :quote, type: :string, auto_bind: false, default: '"'
param :skip_leading_rows, type: :interger, auto_bind: false, default: 0
param :source_format, type: :string, auto_bind: false, default: 'CSV' # NEWLINE_DELIMITED_JSON, AVRO
param :wait, type: :integer, auto_bind: false, default: 60

def output
opts = { dataset_id: dataset_id, table_id: table_id }
opts[:project_id] = project_id if project_id
Tumugi::Plugin::BigqueryTableTarget.new(opts)
end

def run
if mode != 'append'
raise Tumugi::ParameterError.new("Parameter 'schema' is required when 'mode' is 'truncate' or 'empty'") if schema.nil?
end

key = "/#{key}" unless key.start_with?('/')
source_uri = "gs://#{bucket}#{key}"
log "Source: #{source_uri}"
log "Destination: #{output}"

bq_client = output.client
opts = {
schema: schema,
field_delimiter: field_delimiter,
mode: mode.to_sym,
allow_jagged_rows: allow_jagged_rows,
max_bad_records: max_bad_records,
ignore_unknown_values: ignore_unknown_values,
allow_quoted_newlines: allow_quoted_newlines,
quote: quote,
skip_leading_rows: skip_leading_rows,
source_format: source_format,
project_id: _output.project_id,
wait: wait
}
bq_client.load(_output.dataset_id, _output.table_id, source_uri, opts)
end
end
end
end

0 comments on commit 625216f

Please sign in to comment.