Skip to content

Commit

Permalink
Implement load task from Google Cloud Storage
Browse files Browse the repository at this point in the history
  • Loading branch information
hakobera committed May 31, 2016
1 parent 70bedb3 commit 96f5aaf
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 9 deletions.
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ end

#### Usage

Copy `test.src_table` to `test.dest_table`.

```rb
task :task1, type: :bigquery_copy do
param_set :src_dataset_id, 'test'
Expand All @@ -77,14 +79,32 @@ task :task1, type: :bigquery_copy do
end
```

### Tumugi::Plugin::BigqueryLoadTask

`Tumugi::Plugin::BigqueryLoadTask` is task to load structured data from GCS into BigQuery.

#### Usage

Load `gs://test_bucket/load_data.csv` into `dest_project:dest_dataset.dest_table`

```rb
task :task1, type: :bigquery_load do
param_set :bucket, 'test_bucket'
param_set :key, 'load_data.csv'
param_set :project_id, 'dest_project'
param_set :datset_id, 'dest_dataset'
param_set :table_id, 'dest_table'
end
```

### Config Section

tumugi-plugin-bigquery provide config section named "bigquery" which can specified BigQuery autenticaion info.

#### Authenticate by client_email and private_key

```rb
Tumugi.config do |config|
Tumugi.configure do |config|
config.section("bigquery") do |section|
section.project_id = "xxx"
section.client_email = "[email protected]"
Expand Down
24 changes: 24 additions & 0 deletions examples/load.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
task :task1, type: :bigquery_load do
requires :task2
param_set :bucket, 'tumugi-plugin-bigquery'
param_set :key, 'test.csv'
param_set :dataset_id, -> { input.dataset_id }
param_set :table_id, 'load_test'
param_set :skip_leading_rows, 1
param_set :schema, [
{
name: 'row_number',
type: 'INTEGER',
mode: 'NULLABLE'
},
{
name: 'value',
type: 'INTEGER',
mode: 'NULLABLE'
},
]
end

task :task2, type: :bigquery_dataset do
param_set :dataset_id, 'test'
end
6 changes: 6 additions & 0 deletions examples/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
row_number,value
1,1
2,2
3,3
4,4
5,5
23 changes: 16 additions & 7 deletions lib/tumugi/plugin/bigquery/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,28 +185,37 @@ def query(sql, mode: :truncate,
end

def load(dataset_id, table_id, source_uris=nil,
schema: nil, delimiter: ",", field_delimiter: delimiter, mode: :append,
allow_jagged_rows: false, max_bad_records: 0,
schema: nil,
field_delimiter: ",",
mode: :append,
allow_jagged_rows: false,
max_bad_records: 0,
ignore_unknown_values: false,
allow_quoted_newlines: false,
quote: '"', skip_leading_rows: 0,
quote: '"',
skip_leading_rows: 0,
source_format: "CSV",
project_id: nil,
job_id: nil,
file: nil, wait: nil,
dry_run: false,
&blk)
@client.load(dataset_id, table_id, source_uris=source_uris,
schema: schema, delimiter: delimiter, field_delimiter: field_delimiter, mode: mode,
allow_jagged_rows: allow_jagged_rows, max_bad_records: max_bad_records,
schema: schema,
field_delimiter: field_delimiter,
mode: mode,
allow_jagged_rows: allow_jagged_rows,
max_bad_records: max_bad_records,
ignore_unknown_values: ignore_unknown_values,
allow_quoted_newlines: allow_quoted_newlines,
quote: quote, skip_leading_rows: skip_leading_rows,
quote: quote,
skip_leading_rows: skip_leading_rows,
source_format: source_format,
project_id: project_id || @project_id,
job_project_id: project_id || @project_id,
job_id: job_id,
file: file, wait: wait,
file: file,
wait: wait,
dry_run: dry_run,
&blk)
rescue Kura::ApiError => e
Expand Down
65 changes: 65 additions & 0 deletions lib/tumugi/plugin/task/bigquery_load.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
require 'tumugi'
require_relative '../target/bigquery_table'

module Tumugi
module Plugin
class BigqueryLoadTask < Tumugi::Task
Tumugi::Plugin.register_task('bigquery_load', self)

param :bucket, type: :string, required: true
param :key, type: :string, required: true
param :project_id, type: :string
param :dataset_id, type: :string, required: true
param :table_id, type: :string, required: true

param :schema # type: :array
param :field_delimiter, type: :string, default: ','
param :mode, type: :string, default: 'append' # truncate, empty
param :allow_jagged_rows, type: :bool, default: false
param :max_bad_records, type: :integer, default: 0
param :ignore_unknown_values, type: :bool, default: false
param :allow_quoted_newlines, type: :bool, default: false
param :quote, type: :string, default: '"'
param :skip_leading_rows, type: :interger, default: 0
param :source_format, type: :string, default: 'CSV' # NEWLINE_DELIMITED_JSON, AVRO
param :wait, type: :integer, default: 60

def output
opts = { dataset_id: dataset_id, table_id: table_id }
opts[:project_id] = project_id if project_id
Tumugi::Plugin::BigqueryTableTarget.new(opts)
end

def run
if mode != 'append'
raise Tumugi::ParameterError.new("Parameter 'schema' is required when 'mode' is 'truncate' or 'empty'") if schema.nil?
end

object_id = key
unless object_id.start_with?('/')
object_id = "/#{key}"
end
source_uri = "gs://#{bucket}#{object_id}"
log "Source: #{source_uri}"
log "Destination: #{output}"

bq_client = output.client
opts = {
schema: schema,
field_delimiter: field_delimiter,
mode: mode.to_sym,
allow_jagged_rows: allow_jagged_rows,
max_bad_records: max_bad_records,
ignore_unknown_values: ignore_unknown_values,
allow_quoted_newlines: allow_quoted_newlines,
quote: quote,
skip_leading_rows: skip_leading_rows,
source_format: source_format,
project_id: _output.project_id,
wait: wait
}
bq_client.load(_output.dataset_id, _output.table_id, source_uri, opts)
end
end
end
end
2 changes: 1 addition & 1 deletion test/plugin/bigquery/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class Tumugi::Plugin::Bigquery::ClientTest < Test::Unit::TestCase
test "#load" do
any_instance_of(Kura::Client) do |klass|
mock(klass).load(TEST_DATASETS[0], 'test', 'gs://tumugi-plugin-bigquery/test.csv',
schema: nil, delimiter: ",", field_delimiter: ",", mode: :append,
schema: nil, field_delimiter: ",", mode: :append,
allow_jagged_rows: false, max_bad_records: 0,
ignore_unknown_values: false,
allow_quoted_newlines: false,
Expand Down
80 changes: 80 additions & 0 deletions test/plugin/task/bigquery_load_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
require_relative '../../test_helper'
require 'tumugi/plugin/task/bigquery_load'

class Tumugi::Plugin::BigqueryLoadTaskTest < Test::Unit::TestCase
include Tumugi::Plugin::BigqueryTestHelper

setup do
@klass = Class.new(Tumugi::Plugin::BigqueryLoadTask)
@klass.param_set :bucket, 'tumugi-plugin-bigquery'
@klass.param_set :key, 'test.csv'
@klass.param_set :dataset_id, Tumugi::Plugin::BigqueryTestHelper::TEST_DATASETS[0]
@klass.param_set :table_id, 'load_test'
@klass.param_set :skip_leading_rows, 1
@klass.param_set :schema, [
{
name: 'row_number',
type: 'INTEGER',
mode: 'NULLABLE'
},
{
name: 'value',
type: 'INTEGER',
mode: 'NULLABLE'
},
]
end

sub_test_case "parameters" do
test "should set correctly" do
task = @klass.new
assert_equal('tumugi-plugin-bigquery', task.bucket)
assert_equal('test.csv', task.key)
assert_equal(nil, task.project_id)
assert_equal(TEST_DATASETS[0], task.dataset_id)
assert_equal('load_test', task.table_id)
assert_equal(60, task.wait)
end

data({
"bucket" => [:bucket],
"key" => [:key],
"dataset_id" => [:dataset_id],
"table_id" => [:table_id],
})
test "raise error when required parameter is not set" do |params|
params.each do |param|
@klass.param_set(param, nil)
end
assert_raise(Tumugi::ParameterError) do
@klass.new
end
end
end

test "#output" do
task = @klass.new
output = task.output
assert_true(output.is_a? Tumugi::Plugin::BigqueryTableTarget)
assert_equal(ENV['PROJECT_ID'], output.project_id)
assert_equal(Tumugi::Plugin::BigqueryTestHelper::TEST_DATASETS[0], output.dataset_id)
assert_equal('load_test', output.table_id)
end

test "#run" do
task = @klass.new
output = task.output
task.run
result = output.client.list_tabledata(task.dataset_id, task.table_id, project_id: task.project_id)
assert_equal(5, result[:total_rows])

expected = [
{"row_number"=>"1", "value"=>"1"},
{"row_number"=>"2", "value"=>"2"},
{"row_number"=>"3", "value"=>"3"},
{"row_number"=>"4", "value"=>"4"},
{"row_number"=>"5", "value"=>"5"}
]
assert_equal(expected, result[:rows])
end
end

0 comments on commit 96f5aaf

Please sign in to comment.