Skip to content

Commit

Permalink
Merge pull request #15 from tumugi/feature/extract-table
Browse files Browse the repository at this point in the history
Implement extract table to google cloud storage feature
  • Loading branch information
hakobera committed Jun 6, 2016
2 parents 27e1163 + 31bfa10 commit 042045c
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 12 deletions.
20 changes: 15 additions & 5 deletions lib/tumugi/plugin/bigquery/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ def tables(dataset_id, project_id: nil, limit: 1000, &blk)
process_error(e)
end

def table(dataset_id, table_id, project_id: nil)
@client.table(dataset_id, table_id, project_id: project_id || @project_id)
rescue Kura::ApiError => e
process_error(e)
end

def table_exist?(dataset_id, table_id, project_id: nil)
!@client.table(dataset_id, table_id, project_id: project_id || @project_id).nil?
rescue Kura::ApiError => e
Expand Down Expand Up @@ -174,6 +180,7 @@ def query(sql, mode: :truncate,
use_query_cache: true,
user_defined_function_resources: nil,
project_id: nil,
job_project_id: nil,
job_id: nil,
wait: nil,
dry_run: false,
Expand All @@ -186,7 +193,7 @@ def query(sql, mode: :truncate,
use_query_cache: use_query_cache,
user_defined_function_resources: user_defined_function_resources,
project_id: project_id || @project_id,
job_project_id: project_id || @project_id,
job_project_id: job_project_id || @project_id,
job_id: job_id,
wait: wait,
dry_run: dry_run,
Expand All @@ -207,6 +214,7 @@ def load(dataset_id, table_id, source_uris=nil,
skip_leading_rows: 0,
source_format: "CSV",
project_id: nil,
job_project_id: nil,
job_id: nil,
file: nil, wait: nil,
dry_run: false,
Expand All @@ -223,7 +231,7 @@ def load(dataset_id, table_id, source_uris=nil,
skip_leading_rows: skip_leading_rows,
source_format: source_format,
project_id: project_id || @project_id,
job_project_id: project_id || @project_id,
job_project_id: job_project_id || @project_id,
job_id: job_id,
file: file,
wait: wait,
Expand All @@ -239,6 +247,7 @@ def extract(dataset_id, table_id, dest_uris,
field_delimiter: ",",
print_header: true,
project_id: nil,
job_project_id: nil,
job_id: nil,
wait: nil,
dry_run: false,
Expand All @@ -249,7 +258,7 @@ def extract(dataset_id, table_id, dest_uris,
field_delimiter: field_delimiter,
print_header: print_header,
project_id: project_id || @project_id,
job_project_id: project_id || @project_id,
job_project_id: job_project_id || @project_id,
job_id: job_id,
wait: wait,
dry_run: dry_run,
Expand All @@ -262,6 +271,7 @@ def copy(src_dataset_id, src_table_id, dest_dataset_id, dest_table_id,
mode: :truncate,
src_project_id: nil,
dest_project_id: nil,
job_project_id: dest_project_id,
job_id: nil,
wait: nil,
dry_run: false,
Expand All @@ -270,7 +280,7 @@ def copy(src_dataset_id, src_table_id, dest_dataset_id, dest_table_id,
mode: mode,
src_project_id: src_project_id || @project_id,
dest_project_id: dest_project_id || @project_id,
job_project_id: dest_project_id || @project_id,
job_project_id: job_project_id || @project_id,
job_id: job_id,
wait: wait,
dry_run: dry_run,
Expand Down Expand Up @@ -300,7 +310,7 @@ def wait_job(job, timeout=60*10, project_id: nil, &blk)
private

def process_error(e)
raise Tumugi::Plugin::Bigquery::BigqueryError.new(e.reason, e.message)
raise Tumugi::Plugin::Bigquery::BigqueryError.new(e.message, e.reason)
end
end
end
Expand Down
111 changes: 111 additions & 0 deletions lib/tumugi/plugin/task/bigquery_export.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
require 'json'
require 'tumugi'
require_relative '../target/bigquery_table'

module Tumugi
module Plugin
class BigqueryExportTask < Tumugi::Task
Tumugi::Plugin.register_task('bigquery_export', self)

param :project_id, type: :string
param :job_project_id, type: :string
param :dataset_id, type: :string, required: true
param :table_id, type: :string, required: true

param :compression, type: :string, default: 'NONE' # GZIP
param :destination_format, type: :string, default: 'CSV' # NEWLINE_DELIMITED_JSON, AVRO

# Only effected if destiation_format == 'CSV'
param :field_delimiter, type: :string, default: ','
param :print_header, type: :bool, default: true

param :page_size, type: :integer, default: 10000

param :wait, type: :integer, default: 120

def run
unless output.is_a?(Tumugi::Plugin::FileSystemTarget)
raise Tumgi::TumguiError.new("BigqueryExportTask#output must be return a instance of Tumugi::Plugin::FileSystemTarget")
end

client = Tumugi::Plugin::Bigquery::Client.new(config)
table = Tumugi::Plugin::Bigquery::Table.new(project_id: client.project_id, dataset_id: dataset_id, table_id: table_id)
job_project_id = client.project_id if job_project_id.nil?

log "Source: #{table}"
log "Destination: #{output}"

if is_gcs?(output)
export_to_gcs(client)
else
if destination_format.upcase == 'AVRO'
raise Tumgi::TumguiError.new("destination_format='AVRO' is only supported when export to Google Cloud Storage")
end
if compression.upcase == 'GZIP'
logger.warn("compression parameter is ignored, it only supportd when export to Google Cloud Storage")
end
export_to_file_system(client)
end
end

private

def is_gcs?(target)
not target.to_s.match(/^gs:\/\/[^\/]+\/.+$/).nil?
end

def export_to_gcs(client)
options = {
compression: compression.upcase,
destination_format: destination_format.upcase,
field_delimiter: field_delimiter,
print_header: print_header,
project_id: client.project_id,
job_project_id: job_project_id || client.project_id,
wait: wait
}
client.extract(dataset_id, table_id, _output.to_s, options)
end

def export_to_file_system(client)
schema ||= client.table(dataset_id, table_id, project_id: client.project_id).schema.fields
field_names = schema.map{|f| f.respond_to?(:[]) ? (f["name"] || f[:name]) : f.name }
start_index = 0
page_token = nil
options = {
max_result: page_size,
project_id: client.project_id,
}

_output.open('w') do |file|
file.puts field_names.join(field_delimiter) if destination_format == 'CSV' && print_header
begin
table_data_list = client.list_tabledata(dataset_id, table_id, options.merge(start_index: start_index, page_token: page_token))
start_index += page_size
page_token = table_data_list[:next_token]
table_data_list[:rows].each do |row|
file.puts line(field_names, row, destination_format)
end
end while not page_token.nil?
end
end

def line(field_names, row, format)
case format
when 'CSV'
row.map{|v| v[1]}.join(field_delimiter)
when 'NEWLINE_DELIMITED_JSON'
JSON.generate(row.to_h)
end
end

def config
cfg = Tumugi.config.section('bigquery').to_h
unless project_id.nil?
cfg[:project_id] = project_id
end
cfg
end
end
end
end
20 changes: 13 additions & 7 deletions lib/tumugi/plugin/task/bigquery_load.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,8 @@ def run
raise Tumugi::ParameterError.new("Parameter 'schema' is required when 'mode' is 'truncate' or 'empty'") if schema.nil?
end

object_id = key
unless object_id.start_with?('/')
object_id = "/#{key}"
end
source_uri = "gs://#{bucket}#{object_id}"
log "Source: #{source_uri}"
src_uri = "gs://#{bucket}#{normalize_path(key)}"
log "Source: #{src_uri}"
log "Destination: #{output}"

bq_client = output.client
Expand All @@ -58,7 +54,17 @@ def run
project_id: _output.project_id,
wait: wait
}
bq_client.load(_output.dataset_id, _output.table_id, source_uri, opts)
bq_client.load(_output.dataset_id, _output.table_id, src_uri, opts)
end

private

def normalize_path(path)
unless path.start_with?('/')
"/#{path}"
else
path
end
end
end
end
Expand Down
7 changes: 7 additions & 0 deletions test/plugin/bigquery/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ class Tumugi::Plugin::Bigquery::ClientTest < Test::Unit::TestCase
@client.tables(TEST_DATASETS[0])
end

test "#table" do
any_instance_of(Kura::Client) do |klass|
mock(klass).table(TEST_DATASETS[0], 'test', project_id: credential[:project_id]) {}
end
@client.table(TEST_DATASETS[0], 'test')
end

test "#table_exist?" do
assert_true(@client.table_exist?(TEST_DATASETS[0], 'test'))
assert_false(@client.table_exist?(TEST_DATASETS[0], 'not_found'))
Expand Down
101 changes: 101 additions & 0 deletions test/plugin/task/bigquery_export_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
require_relative '../../test_helper'
require 'tumugi/plugin/task/bigquery_export'
require 'tumugi/plugin/target/google_cloud_storage_file'
require 'tumugi/plugin/target/local_file'

class Tumugi::Plugin::BigqueryExportTaskTest < Test::Unit::TestCase
include Tumugi::Plugin::BigqueryTestHelper

setup do
@klass = Class.new(Tumugi::Plugin::BigqueryExportTask)
@klass.param_set :project_id, 'bigquery-public-data'
@klass.param_set :job_project_id, 'tumugi-plugin-bigquery'
@klass.param_set :dataset_id, 'samples'
@klass.param_set :table_id, 'shakespeare'
@klass.param_set :compression, 'GZIP'
end

sub_test_case "parameters" do
test "should set correctly" do
task = @klass.new
assert_equal('bigquery-public-data', task.project_id)
assert_equal('tumugi-plugin-bigquery', task.job_project_id)
assert_equal('samples', task.dataset_id)
assert_equal('shakespeare', task.table_id)
assert_equal('GZIP', task.compression)
assert_equal(120, task.wait)
assert_equal(10000, task.page_size)
end

data({
"dataset_id" => [:dataset_id],
"table_id" => [:table_id],
})
test "raise error when required parameter is not set" do |params|
params.each do |param|
@klass.param_set(param, nil)
end
assert_raise(Tumugi::ParameterError) do
@klass.new
end
end
end

test "export to Google Cloud Storage" do
task = @klass.new
task.instance_eval do
def output
Tumugi::Plugin::GoogleCloudStorageFileTarget.new(bucket: 'tumugi-plugin-bigquery', key: 'export/test.csv.zip')
end
end
output = task.output
task.run
output.open("r") do |f|
count = 0
header = ''
in_row = ''
Zlib::GzipReader.open(f) do |gz|
while s = gz.gets
if count == 0
header = s
end
count += 1
if s.start_with?("in,")
in_row = s
end
end
end
assert_equal(164657, count)
assert_equal("word,word_count,corpus,corpus_date\n", header)
assert_equal("in,255,kinghenryviii,1612\n", in_row)
end
end

test "export to local file" do
task = @klass.new
task.instance_eval do
def output
Tumugi::Plugin::LocalFileTarget.new('tmp/export.csv')
end
end
output = task.output
task.run
output.open("r") do |f|
count = 0
header = ''
in_row = ''
while s = f.gets
if count == 0
header = s
end
count += 1
if s.start_with?("in,")
in_row = s
end
end
assert_equal(164657, count)
assert_equal("word,word_count,corpus,corpus_date\n", header)
assert_equal("in,255,kinghenryviii,1612\n", in_row)
end
end
end
7 changes: 7 additions & 0 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
require 'tumugi'
require 'kura'

Dir.mkdir('tmp') unless Dir.exist?('tmp')

module Tumugi
module Plugin
module BigqueryTestHelper
Expand Down Expand Up @@ -55,4 +57,9 @@ def credential
section.client_email = credential[:client_email]
section.private_key = credential[:private_key]
end

config.section('google_cloud_storage') do |section|
section.client_email = credential[:client_email]
section.private_key = credential[:private_key]
end
end
1 change: 1 addition & 0 deletions tumugi-plugin-bigquery.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'test-unit-rr'
spec.add_development_dependency 'coveralls'
spec.add_development_dependency 'github_changelog_generator'
spec.add_development_dependency 'tumugi-plugin-google_cloud_storage'
end

0 comments on commit 042045c

Please sign in to comment.