Skip to content

Commit

Permalink
Implement export table to any FileSystemTarget
Browse files Browse the repository at this point in the history
  • Loading branch information
hakobera committed Jun 6, 2016
1 parent 4d67207 commit 31bfa10
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 43 deletions.
8 changes: 7 additions & 1 deletion lib/tumugi/plugin/bigquery/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ def tables(dataset_id, project_id: nil, limit: 1000, &blk)
process_error(e)
end

def table(dataset_id, table_id, project_id: nil)
@client.table(dataset_id, table_id, project_id: project_id || @project_id)
rescue Kura::ApiError => e
process_error(e)
end

def table_exist?(dataset_id, table_id, project_id: nil)
!@client.table(dataset_id, table_id, project_id: project_id || @project_id).nil?
rescue Kura::ApiError => e
Expand Down Expand Up @@ -304,7 +310,7 @@ def wait_job(job, timeout=60*10, project_id: nil, &blk)
private

def process_error(e)
raise Tumugi::Plugin::Bigquery::BigqueryError.new(e.reason, e.message)
raise Tumugi::Plugin::Bigquery::BigqueryError.new(e.message, e.reason)
end
end
end
Expand Down
95 changes: 69 additions & 26 deletions lib/tumugi/plugin/task/bigquery_export.rb
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
require 'json'
require 'tumugi'
require 'tumugi/plugin/target/google_cloud_storage_file'
require_relative '../target/bigquery_table'

module Tumugi
module Plugin
class BigqueryExportTask < Tumugi::Task
Tumugi::Plugin.register_task('bigquery_export', self)

param :bucket, type: :string, required: true
param :key, type: :string, required: true
param :project_id, type: :string
param :job_project_id, type: :string
param :dataset_id, type: :string, required: true
param :table_id, type: :string, required: true

Expand All @@ -20,48 +19,92 @@ class BigqueryExportTask < Tumugi::Task
param :field_delimiter, type: :string, default: ','
param :print_header, type: :bool, default: true

param :wait, type: :integer, default: 120
param :page_size, type: :integer, default: 10000

def output
Tumugi::Plugin::GoogleCloudStorageFileTarget.new(bucket: bucket, key: key)
end
param :wait, type: :integer, default: 120

def run
proj_id = project_id || client.project_id
table = Tumugi::Plugin::Bigquery::Table.new(project_id: proj_id, dataset_id: dataset_id, table_id: table_id)
dest_uri = normalize_uri(key)
unless output.is_a?(Tumugi::Plugin::FileSystemTarget)
raise Tumgi::TumguiError.new("BigqueryExportTask#output must be return a instance of Tumugi::Plugin::FileSystemTarget")
end

client = Tumugi::Plugin::Bigquery::Client.new(config)
table = Tumugi::Plugin::Bigquery::Table.new(project_id: client.project_id, dataset_id: dataset_id, table_id: table_id)
job_project_id = client.project_id if job_project_id.nil?

log "Source: #{table}"
log "Destination: #{output}"

opts = {
compression: compression,
destination_format: destination_format,
if is_gcs?(output)
export_to_gcs(client)
else
if destination_format.upcase == 'AVRO'
raise Tumgi::TumguiError.new("destination_format='AVRO' is only supported when export to Google Cloud Storage")
end
if compression.upcase == 'GZIP'
logger.warn("compression parameter is ignored, it only supportd when export to Google Cloud Storage")
end
export_to_file_system(client)
end
end

private

def is_gcs?(target)
not target.to_s.match(/^gs:\/\/[^\/]+\/.+$/).nil?
end

def export_to_gcs(client)
options = {
compression: compression.upcase,
destination_format: destination_format.upcase,
field_delimiter: field_delimiter,
print_header: print_header,
project_id: proj_id,
job_project_id: client.project_id,
project_id: client.project_id,
job_project_id: job_project_id || client.project_id,
wait: wait
}
client.extract(dataset_id, table_id, dest_uri, opts)
client.extract(dataset_id, table_id, _output.to_s, options)
end

private
def export_to_file_system(client)
schema ||= client.table(dataset_id, table_id, project_id: client.project_id).schema.fields
field_names = schema.map{|f| f.respond_to?(:[]) ? (f["name"] || f[:name]) : f.name }
start_index = 0
page_token = nil
options = {
max_result: page_size,
project_id: client.project_id,
}

def client
@client ||= Tumugi::Plugin::Bigquery::Client.new
_output.open('w') do |file|
file.puts field_names.join(field_delimiter) if destination_format == 'CSV' && print_header
begin
table_data_list = client.list_tabledata(dataset_id, table_id, options.merge(start_index: start_index, page_token: page_token))
start_index += page_size
page_token = table_data_list[:next_token]
table_data_list[:rows].each do |row|
file.puts line(field_names, row, destination_format)
end
end while not page_token.nil?
end
end

def normalize_path(path)
unless path.start_with?('/')
"/#{path}"
else
path
def line(field_names, row, format)
case format
when 'CSV'
row.map{|v| v[1]}.join(field_delimiter)
when 'NEWLINE_DELIMITED_JSON'
JSON.generate(row.to_h)
end
end

def normalize_uri(path)
"gs://#{bucket}#{normalize_path(path)}"
def config
cfg = Tumugi.config.section('bigquery').to_h
unless project_id.nil?
cfg[:project_id] = project_id
end
cfg
end
end
end
Expand Down
7 changes: 7 additions & 0 deletions test/plugin/bigquery/client_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ class Tumugi::Plugin::Bigquery::ClientTest < Test::Unit::TestCase
@client.tables(TEST_DATASETS[0])
end

test "#table" do
any_instance_of(Kura::Client) do |klass|
mock(klass).table(TEST_DATASETS[0], 'test', project_id: credential[:project_id]) {}
end
@client.table(TEST_DATASETS[0], 'test')
end

test "#table_exist?" do
assert_true(@client.table_exist?(TEST_DATASETS[0], 'test'))
assert_false(@client.table_exist?(TEST_DATASETS[0], 'not_found'))
Expand Down
53 changes: 39 additions & 14 deletions test/plugin/task/bigquery_export_test.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
require_relative '../../test_helper'
require 'tumugi/plugin/task/bigquery_export'
require 'tumugi/plugin/target/google_cloud_storage_file'
require 'tumugi/plugin/target/local_file'

class Tumugi::Plugin::BigqueryExportTaskTest < Test::Unit::TestCase
include Tumugi::Plugin::BigqueryTestHelper

setup do
@klass = Class.new(Tumugi::Plugin::BigqueryExportTask)
@klass.param_set :bucket, 'tumugi-plugin-bigquery'
@klass.param_set :key, 'export/test.csv.zip'
@klass.param_set :project_id, 'bigquery-public-data'
@klass.param_set :job_project_id, 'tumugi-plugin-bigquery'
@klass.param_set :dataset_id, 'samples'
@klass.param_set :table_id, 'shakespeare'
@klass.param_set :compression, 'GZIP'
Expand All @@ -17,18 +18,16 @@ class Tumugi::Plugin::BigqueryExportTaskTest < Test::Unit::TestCase
sub_test_case "parameters" do
test "should set correctly" do
task = @klass.new
assert_equal('tumugi-plugin-bigquery', task.bucket)
assert_equal('export/test.csv.zip', task.key)
assert_equal('bigquery-public-data', task.project_id)
assert_equal('tumugi-plugin-bigquery', task.job_project_id)
assert_equal('samples', task.dataset_id)
assert_equal('shakespeare', task.table_id)
assert_equal('GZIP', task.compression)
assert_equal(120, task.wait)
assert_equal(10000, task.page_size)
end

data({
"bucket" => [:bucket],
"key" => [:key],
"dataset_id" => [:dataset_id],
"table_id" => [:table_id],
})
Expand All @@ -42,15 +41,13 @@ class Tumugi::Plugin::BigqueryExportTaskTest < Test::Unit::TestCase
end
end

test "#output" do
task = @klass.new
output = task.output
assert_true(output.is_a? Tumugi::Plugin::GoogleCloudStorageFileTarget)
assert_equal('gs://tumugi-plugin-bigquery/export/test.csv.zip', output.path)
end

test "#run" do
test "export to Google Cloud Storage" do
task = @klass.new
task.instance_eval do
def output
Tumugi::Plugin::GoogleCloudStorageFileTarget.new(bucket: 'tumugi-plugin-bigquery', key: 'export/test.csv.zip')
end
end
output = task.output
task.run
output.open("r") do |f|
Expand All @@ -73,4 +70,32 @@ class Tumugi::Plugin::BigqueryExportTaskTest < Test::Unit::TestCase
assert_equal("in,255,kinghenryviii,1612\n", in_row)
end
end

test "export to local file" do
task = @klass.new
task.instance_eval do
def output
Tumugi::Plugin::LocalFileTarget.new('tmp/export.csv')
end
end
output = task.output
task.run
output.open("r") do |f|
count = 0
header = ''
in_row = ''
while s = f.gets
if count == 0
header = s
end
count += 1
if s.start_with?("in,")
in_row = s
end
end
assert_equal(164657, count)
assert_equal("word,word_count,corpus,corpus_date\n", header)
assert_equal("in,255,kinghenryviii,1612\n", in_row)
end
end
end
3 changes: 2 additions & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
require 'tumugi'
require 'kura'

Dir.mkdir('tmp') unless Dir.exist?('tmp')

module Tumugi
module Plugin
module BigqueryTestHelper
Expand Down Expand Up @@ -57,7 +59,6 @@ def credential
end

config.section('google_cloud_storage') do |section|
section.project_id = credential[:project_id]
section.client_email = credential[:client_email]
section.private_key = credential[:private_key]
end
Expand Down
2 changes: 1 addition & 1 deletion tumugi-plugin-bigquery.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ Gem::Specification.new do |spec|

spec.add_runtime_dependency "tumugi", ">= 0.5.1"
spec.add_runtime_dependency "kura", "~> 0.2.17"
spec.add_runtime_dependency "tumugi-plugin-google_cloud_storage", "~> 0.1.0"

spec.add_development_dependency 'bundler', '~> 1.11'
spec.add_development_dependency 'rake', '~> 10.0'
spec.add_development_dependency 'test-unit', '~> 3.1'
spec.add_development_dependency 'test-unit-rr'
spec.add_development_dependency 'coveralls'
spec.add_development_dependency 'github_changelog_generator'
spec.add_development_dependency 'tumugi-plugin-google_cloud_storage'
end

0 comments on commit 31bfa10

Please sign in to comment.