Skip to content
This repository has been archived by the owner on Feb 1, 2021. It is now read-only.

Spark step #134

Merged
merged 1 commit into from
Feb 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,27 @@ copy_step = Elasticity::S3DistCpStep.new(true)

```

### Adding a Spark Step

```ruby
spark_step = Elasticity::SparkStep.new('jar_location', 'main_class_fqcn')

# Specifying arguments relative to Spark
spark_step.spark_arguments = { 'driver-memory' => '2G' }
# Specifying arguments relative to your application
spark_step.app_arguments = { 'arg1' => 'value1' }
```

This will be equivalent to the following script:

```bash
spark-submit \
--driver-memory 2G \
--class main_class_fqcn \
jar_location \
--arg1 value1
```

## 7 - Upload Assets (optional)

This isn't part of ```JobFlow```; more of an aside. Elasticity provides a very basic means of uploading assets to S3 so that your EMR job has access to them. Most commonly this will be a set of resources to run the job (e.g. JAR files, streaming scripts, etc.) and a set of resources used by the job itself (e.g. a TSV file with a range of valid values, join tables, etc.).
Expand Down
1 change: 1 addition & 0 deletions lib/elasticity.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
require 'elasticity/streaming_step'
require 'elasticity/script_step'
require 'elasticity/s3distcp_step'
require 'elasticity/spark_step'

module Elasticity

Expand Down
44 changes: 44 additions & 0 deletions lib/elasticity/spark_step.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
module Elasticity

class SparkStep

include Elasticity::JobFlowStep

attr_accessor :name
attr_accessor :main_class
attr_accessor :jar
attr_accessor :spark_arguments
attr_accessor :app_arguments
attr_accessor :action_on_failure

def initialize(jar, main_class)
@name = 'Elasticity Spark Step'
@main_class = main_class
@jar = jar
@spark_arguments = {}
@app_arguments = {}
@action_on_failure = 'TERMINATE_JOB_FLOW'
end

def to_aws_step(_)
args = %W(spark-submit --class #{@main_class})
spark_arguments.each do |arg, value|
args << "--#{arg}" << value
end
args.push(@jar)
app_arguments.each do |arg, value|
args << "--#{arg}" << value
end
{
:name => @name,
:action_on_failure => @action_on_failure,
:hadoop_jar_step => {
:jar => 'command-runner.jar',
:args => args
}
}
end

end

end
67 changes: 67 additions & 0 deletions spec/lib/elasticity/spark_step_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
describe Elasticity::SparkStep do

subject do
Elasticity::SparkStep.new('jar', 'class')
end

it { should be_a Elasticity::JobFlowStep }

describe '.initialize' do
it 'should set the fields appropriately' do
expect(subject.name).to eql('Elasticity Spark Step')
expect(subject.jar).to eql('jar')
expect(subject.main_class).to eql('class')
expect(subject.spark_arguments).to eql({})
expect(subject.app_arguments).to eql({})
expect(subject.action_on_failure).to eql('TERMINATE_JOB_FLOW')
end
end

describe '#to_aws_step' do

it { should respond_to(:to_aws_step).with(1).argument }

context 'when there are no arguments provided' do
let(:ss_with_no_args) { Elasticity::SparkStep.new('jar', 'class') }

it 'should convert to aws step format' do
ss_with_no_args.to_aws_step(Elasticity::JobFlow.new).should == {
:name => 'Elasticity Spark Step',
:action_on_failure => 'TERMINATE_JOB_FLOW',
:hadoop_jar_step => {
:jar => 'command-runner.jar',
:args => %w(spark-submit --class class jar)
}
}
end
end

context 'when there are arguments provided' do
let(:ss_with_args) do
Elasticity::SparkStep.new('jar', 'class').tap do |ss|
ss.spark_arguments = { 'key1' => 'value1' }
ss.app_arguments = { 'key2' => 'value2' }
end
end

it 'should convert to aws step format' do
ss_with_args.to_aws_step(Elasticity::JobFlow.new).should == {
:name => 'Elasticity Spark Step',
:action_on_failure => 'TERMINATE_JOB_FLOW',
:hadoop_jar_step => {
:jar => 'command-runner.jar',
:args => %w(spark-submit --class class --key1 value1 jar --key2 value2)
}
}
end
end

end

describe '.requires_installation?' do
it 'should not require installation' do
expect(Elasticity::SparkStep.requires_installation?).to be false
end
end

end