From 96f5aafbc51ca8cd74ccbe9aca3631237a028b75 Mon Sep 17 00:00:00 2001 From: Kazuyuki Honda Date: Mon, 30 May 2016 11:38:05 +0900 Subject: [PATCH] Implement load task from Google Cloud Storage --- README.md | 22 ++++++- examples/load.rb | 24 ++++++++ examples/test.csv | 6 ++ lib/tumugi/plugin/bigquery/client.rb | 23 ++++--- lib/tumugi/plugin/task/bigquery_load.rb | 65 ++++++++++++++++++++ test/plugin/bigquery/client_test.rb | 2 +- test/plugin/task/bigquery_load_test.rb | 80 +++++++++++++++++++++++++ 7 files changed, 213 insertions(+), 9 deletions(-) create mode 100644 examples/load.rb create mode 100644 examples/test.csv create mode 100644 lib/tumugi/plugin/task/bigquery_load.rb create mode 100644 test/plugin/task/bigquery_load_test.rb diff --git a/README.md b/README.md index 84db2c4..af09023 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,8 @@ end #### Usage +Copy `test.src_table` to `test.dest_table`. + ```rb task :task1, type: :bigquery_copy do param_set :src_dataset_id, 'test' @@ -77,6 +79,24 @@ task :task1, type: :bigquery_copy do end ``` +### Tumugi::Plugin::BigqueryLoadTask + +`Tumugi::Plugin::BigqueryLoadTask` is task to load structured data from GCS into BigQuery. + +#### Usage + +Load `gs://test_bucket/load_data.csv` into `dest_project:dest_dataset.dest_table` + +```rb +task :task1, type: :bigquery_load do + param_set :bucket, 'test_bucket' + param_set :key, 'load_data.csv' + param_set :project_id, 'dest_project' + param_set :datset_id, 'dest_dataset' + param_set :table_id, 'dest_table' +end +``` + ### Config Section tumugi-plugin-bigquery provide config section named "bigquery" which can specified BigQuery autenticaion info. @@ -84,7 +104,7 @@ tumugi-plugin-bigquery provide config section named "bigquery" which can specifi #### Authenticate by client_email and private_key ```rb -Tumugi.config do |config| +Tumugi.configure do |config| config.section("bigquery") do |section| section.project_id = "xxx" section.client_email = "yyy@yyy.iam.gserviceaccount.com" diff --git a/examples/load.rb b/examples/load.rb new file mode 100644 index 0000000..8fdf826 --- /dev/null +++ b/examples/load.rb @@ -0,0 +1,24 @@ +task :task1, type: :bigquery_load do + requires :task2 + param_set :bucket, 'tumugi-plugin-bigquery' + param_set :key, 'test.csv' + param_set :dataset_id, -> { input.dataset_id } + param_set :table_id, 'load_test' + param_set :skip_leading_rows, 1 + param_set :schema, [ + { + name: 'row_number', + type: 'INTEGER', + mode: 'NULLABLE' + }, + { + name: 'value', + type: 'INTEGER', + mode: 'NULLABLE' + }, + ] +end + +task :task2, type: :bigquery_dataset do + param_set :dataset_id, 'test' +end diff --git a/examples/test.csv b/examples/test.csv new file mode 100644 index 0000000..519a9c7 --- /dev/null +++ b/examples/test.csv @@ -0,0 +1,6 @@ +row_number,value +1,1 +2,2 +3,3 +4,4 +5,5 diff --git a/lib/tumugi/plugin/bigquery/client.rb b/lib/tumugi/plugin/bigquery/client.rb index 997d69a..c7bbd09 100644 --- a/lib/tumugi/plugin/bigquery/client.rb +++ b/lib/tumugi/plugin/bigquery/client.rb @@ -185,11 +185,15 @@ def query(sql, mode: :truncate, end def load(dataset_id, table_id, source_uris=nil, - schema: nil, delimiter: ",", field_delimiter: delimiter, mode: :append, - allow_jagged_rows: false, max_bad_records: 0, + schema: nil, + field_delimiter: ",", + mode: :append, + allow_jagged_rows: false, + max_bad_records: 0, ignore_unknown_values: false, allow_quoted_newlines: false, - quote: '"', skip_leading_rows: 0, + quote: '"', + skip_leading_rows: 0, source_format: "CSV", project_id: nil, job_id: nil, @@ -197,16 +201,21 @@ def load(dataset_id, table_id, source_uris=nil, dry_run: false, &blk) @client.load(dataset_id, table_id, source_uris=source_uris, - schema: schema, delimiter: delimiter, field_delimiter: field_delimiter, mode: mode, - allow_jagged_rows: allow_jagged_rows, max_bad_records: max_bad_records, + schema: schema, + field_delimiter: field_delimiter, + mode: mode, + allow_jagged_rows: allow_jagged_rows, + max_bad_records: max_bad_records, ignore_unknown_values: ignore_unknown_values, allow_quoted_newlines: allow_quoted_newlines, - quote: quote, skip_leading_rows: skip_leading_rows, + quote: quote, + skip_leading_rows: skip_leading_rows, source_format: source_format, project_id: project_id || @project_id, job_project_id: project_id || @project_id, job_id: job_id, - file: file, wait: wait, + file: file, + wait: wait, dry_run: dry_run, &blk) rescue Kura::ApiError => e diff --git a/lib/tumugi/plugin/task/bigquery_load.rb b/lib/tumugi/plugin/task/bigquery_load.rb new file mode 100644 index 0000000..e8f1f34 --- /dev/null +++ b/lib/tumugi/plugin/task/bigquery_load.rb @@ -0,0 +1,65 @@ +require 'tumugi' +require_relative '../target/bigquery_table' + +module Tumugi + module Plugin + class BigqueryLoadTask < Tumugi::Task + Tumugi::Plugin.register_task('bigquery_load', self) + + param :bucket, type: :string, required: true + param :key, type: :string, required: true + param :project_id, type: :string + param :dataset_id, type: :string, required: true + param :table_id, type: :string, required: true + + param :schema # type: :array + param :field_delimiter, type: :string, default: ',' + param :mode, type: :string, default: 'append' # truncate, empty + param :allow_jagged_rows, type: :bool, default: false + param :max_bad_records, type: :integer, default: 0 + param :ignore_unknown_values, type: :bool, default: false + param :allow_quoted_newlines, type: :bool, default: false + param :quote, type: :string, default: '"' + param :skip_leading_rows, type: :interger, default: 0 + param :source_format, type: :string, default: 'CSV' # NEWLINE_DELIMITED_JSON, AVRO + param :wait, type: :integer, default: 60 + + def output + opts = { dataset_id: dataset_id, table_id: table_id } + opts[:project_id] = project_id if project_id + Tumugi::Plugin::BigqueryTableTarget.new(opts) + end + + def run + if mode != 'append' + raise Tumugi::ParameterError.new("Parameter 'schema' is required when 'mode' is 'truncate' or 'empty'") if schema.nil? + end + + object_id = key + unless object_id.start_with?('/') + object_id = "/#{key}" + end + source_uri = "gs://#{bucket}#{object_id}" + log "Source: #{source_uri}" + log "Destination: #{output}" + + bq_client = output.client + opts = { + schema: schema, + field_delimiter: field_delimiter, + mode: mode.to_sym, + allow_jagged_rows: allow_jagged_rows, + max_bad_records: max_bad_records, + ignore_unknown_values: ignore_unknown_values, + allow_quoted_newlines: allow_quoted_newlines, + quote: quote, + skip_leading_rows: skip_leading_rows, + source_format: source_format, + project_id: _output.project_id, + wait: wait + } + bq_client.load(_output.dataset_id, _output.table_id, source_uri, opts) + end + end + end +end diff --git a/test/plugin/bigquery/client_test.rb b/test/plugin/bigquery/client_test.rb index 7c25df9..4f84101 100644 --- a/test/plugin/bigquery/client_test.rb +++ b/test/plugin/bigquery/client_test.rb @@ -165,7 +165,7 @@ class Tumugi::Plugin::Bigquery::ClientTest < Test::Unit::TestCase test "#load" do any_instance_of(Kura::Client) do |klass| mock(klass).load(TEST_DATASETS[0], 'test', 'gs://tumugi-plugin-bigquery/test.csv', - schema: nil, delimiter: ",", field_delimiter: ",", mode: :append, + schema: nil, field_delimiter: ",", mode: :append, allow_jagged_rows: false, max_bad_records: 0, ignore_unknown_values: false, allow_quoted_newlines: false, diff --git a/test/plugin/task/bigquery_load_test.rb b/test/plugin/task/bigquery_load_test.rb new file mode 100644 index 0000000..1b06215 --- /dev/null +++ b/test/plugin/task/bigquery_load_test.rb @@ -0,0 +1,80 @@ +require_relative '../../test_helper' +require 'tumugi/plugin/task/bigquery_load' + +class Tumugi::Plugin::BigqueryLoadTaskTest < Test::Unit::TestCase + include Tumugi::Plugin::BigqueryTestHelper + + setup do + @klass = Class.new(Tumugi::Plugin::BigqueryLoadTask) + @klass.param_set :bucket, 'tumugi-plugin-bigquery' + @klass.param_set :key, 'test.csv' + @klass.param_set :dataset_id, Tumugi::Plugin::BigqueryTestHelper::TEST_DATASETS[0] + @klass.param_set :table_id, 'load_test' + @klass.param_set :skip_leading_rows, 1 + @klass.param_set :schema, [ + { + name: 'row_number', + type: 'INTEGER', + mode: 'NULLABLE' + }, + { + name: 'value', + type: 'INTEGER', + mode: 'NULLABLE' + }, + ] + end + + sub_test_case "parameters" do + test "should set correctly" do + task = @klass.new + assert_equal('tumugi-plugin-bigquery', task.bucket) + assert_equal('test.csv', task.key) + assert_equal(nil, task.project_id) + assert_equal(TEST_DATASETS[0], task.dataset_id) + assert_equal('load_test', task.table_id) + assert_equal(60, task.wait) + end + + data({ + "bucket" => [:bucket], + "key" => [:key], + "dataset_id" => [:dataset_id], + "table_id" => [:table_id], + }) + test "raise error when required parameter is not set" do |params| + params.each do |param| + @klass.param_set(param, nil) + end + assert_raise(Tumugi::ParameterError) do + @klass.new + end + end + end + + test "#output" do + task = @klass.new + output = task.output + assert_true(output.is_a? Tumugi::Plugin::BigqueryTableTarget) + assert_equal(ENV['PROJECT_ID'], output.project_id) + assert_equal(Tumugi::Plugin::BigqueryTestHelper::TEST_DATASETS[0], output.dataset_id) + assert_equal('load_test', output.table_id) + end + + test "#run" do + task = @klass.new + output = task.output + task.run + result = output.client.list_tabledata(task.dataset_id, task.table_id, project_id: task.project_id) + assert_equal(5, result[:total_rows]) + + expected = [ + {"row_number"=>"1", "value"=>"1"}, + {"row_number"=>"2", "value"=>"2"}, + {"row_number"=>"3", "value"=>"3"}, + {"row_number"=>"4", "value"=>"4"}, + {"row_number"=>"5", "value"=>"5"} + ] + assert_equal(expected, result[:rows]) + end +end