Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Survey] Spark (Hadoop) Support via S3 API #595

Open
windkit opened this issue Jan 31, 2017 · 13 comments
Open

[Survey] Spark (Hadoop) Support via S3 API #595

windkit opened this issue Jan 31, 2017 · 13 comments

Comments

@windkit
Copy link
Contributor

windkit commented Jan 31, 2017

Description

As LeoFS is good at handling small files (images, logs, ...), it may fill in the missing part of HDFS (which does not work well with small files)

Environment

Spark 1.6.1 (Hadoop 2.6.2)

Extra Libraries

  • hadoop-aws-2.7.1.jar
  • aws-java-sdk-1.7.4.jar

Testing

./pyspark --jars /opt/spark-1.6.1-bin-hadoop2.6/lib/hadoop-aws-2.7.1.jar,/opt/spark-1.6.1-bin-hadoop2.6/lib/aws-java-sdk-1.7.4.jar --master yarn-client --num-executors 10
>>> sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://127.0.0.1:12345")
>>> sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "05236")
>>> sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "802562235")
>>>
>>> sc.textFile("s3a://test/test.txt").count()
17/01/31 16:15:12 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 222.4 KB, free 465.2 KB)
17/01/31 16:15:12 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 21.2 KB, free 486.4 KB)
17/01/31 16:15:12 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.11.21:53056 (size: 21.2 KB, free: 511.1 MB)
17/01/31 16:15:12 INFO SparkContext: Created broadcast 1 from textFile at NativeMethodAccessorImpl.java:-2
17/01/31 16:15:13 INFO FileInputFormat: Total input paths to process : 1
17/01/31 16:15:13 INFO SparkContext: Starting job: count at <stdin>:1
17/01/31 16:15:13 INFO DAGScheduler: Got job 0 (count at <stdin>:1) with 2 output partitions
17/01/31 16:15:13 INFO DAGScheduler: Final stage: ResultStage 0 (count at <stdin>:1)
17/01/31 16:15:13 INFO DAGScheduler: Parents of final stage: List()
17/01/31 16:15:13 INFO DAGScheduler: Missing parents: List()
17/01/31 16:15:13 INFO DAGScheduler: Submitting ResultStage 0 (PythonRDD[5] at count at <stdin>:1), which has no missing parents
17/01/31 16:15:13 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 5.3 KB, free 491.8 KB)
17/01/31 16:15:13 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 3.4 KB, free 495.2 KB)
17/01/31 16:15:13 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.11.21:53056 (size: 3.4 KB, free: 511.1 MB)
17/01/31 16:15:13 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
17/01/31 16:15:13 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (PythonRDD[5] at count at <stdin>:1)
17/01/31 16:15:13 INFO YarnScheduler: Adding task set 0.0 with 2 tasks
17/01/31 16:15:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, spark04, partition 0,PROCESS_LOCAL, 2247 bytes)
17/01/31 16:15:13 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, spark02, partition 1,PROCESS_LOCAL, 2247 bytes)
17/01/31 16:15:14 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on spark02:46011 (size: 3.4 KB, free: 511.5 MB)
17/01/31 16:15:14 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on spark02:46011 (size: 21.2 KB, free: 511.5 MB)
17/01/31 16:15:14 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on spark04:45269 (size: 3.4 KB, free: 511.5 MB)
17/01/31 16:15:14 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on spark04:45269 (size: 21.2 KB, free: 511.5 MB)
17/01/31 16:15:16 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3292 ms on spark04 (1/2)
17/01/31 16:15:18 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 5347 ms on spark02 (2/2)
17/01/31 16:15:18 INFO DAGScheduler: ResultStage 0 (count at <stdin>:1) finished in 5.365 s
17/01/31 16:15:18 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/01/31 16:15:18 INFO DAGScheduler: Job 0 finished: count at <stdin>:1, took 5.413051 s
2993
@windkit
Copy link
Contributor Author

windkit commented Jan 31, 2017

Write Performance Issue

The current write to file with s3a:// does not fit S3-API well in terms of performance
For a simple write, there are >3000 operations to LeoFS involved.

[HEAD]  test    test/test20.txt 0       0       2017-01-31 16:58:34.134929 +0900        1485849514134990        404     2
[HEAD]  test    test/test20.txt/        0       0       2017-01-31 16:58:34.139119 +0900        1485849514139151        404     2
[BUCKET-GET]    test/   test20.txt/     0       0       2017-01-31 16:58:34.143923 +0900        1485849514143956        200     3
[HEAD]  test    test/test20.txt/_temporary/0    0       0       2017-01-31 16:58:34.148843 +0900        1485849514148889        404     2
[HEAD]  test    test/test20.txt/_temporary/0/   0       0       2017-01-31 16:58:34.153007 +0900        1485849514153042        404     2
[BUCKET-GET]    test/   test20.txt/_temporary/0/        0       0       2017-01-31 16:58:34.157585 +0900        1485849514157609        200     2
[HEAD]  test    test/test20.txt/_temporary/0    0       0       2017-01-31 16:58:34.161161 +0900        1485849514161191        404     2
[HEAD]  test    test/test20.txt/_temporary/0/   0       0       2017-01-31 16:58:34.165068 +0900        1485849514165102        404     2
[BUCKET-GET]    test/   test20.txt/_temporary/0/        0       0       2017-01-31 16:58:34.169758 +0900        1485849514169782        200     3
[HEAD]  test    test/test20.txt/_temporary      0       0       2017-01-31 16:58:34.173196 +0900        1485849514173232        404     2
[HEAD]  test    test/test20.txt/_temporary/     0       0       2017-01-31 16:58:34.176176 +0900        1485849514176220        404     1
[BUCKET-GET]    test/   test20.txt/_temporary/  0       0       2017-01-31 16:58:34.180714 +0900        1485849514180744        200     3
[HEAD]  test    test/test20.txt 0       0       2017-01-31 16:58:34.183919 +0900        1485849514183952        404     1
[HEAD]  test    test/test20.txt/        0       0       2017-01-31 16:58:34.187476 +0900        1485849514187502        404     2
[BUCKET-GET]    test/   test20.txt/     0       0       2017-01-31 16:58:34.191703 +0900        1485849514191733        200     2
[BUCKET-GET]    test/           0       0       2017-01-31 16:58:34.228879 +0900        1485849514228920        200     35
[PUT]   test    test/test20.txt/_temporary/0/   0       0       2017-01-31 16:58:34.233079 +0900        1485849514233105        200     2
[PUT]   test    test/test20.txt/_temporary/0/_temporary/attempt_201701311658_0002_m_000023_103/part-00023       0       0       2017-01-31 16:58:34.359964 +0900        1485849514360034     200      2
[PUT]   test    test/test20.txt/_temporary/0/_temporary/attempt_201701311658_0002_m_000004_84/part-00004        0       0       2017-01-31 16:58:34.360232 +0900        1485849514360266     200      2
[PUT]   test    test/test20.txt/_temporary/0/_temporary/attempt_201701311658_0002_m_000032_112/part-00032       0       0       2017-01-31 16:58:34.360844 +0900        1485849514360875     200      2
...

spark_write.access.log.gz

@windkit
Copy link
Contributor Author

windkit commented Jan 31, 2017

Write Issue

From time to time, the write could be failed

Logs

Normal
spark_write.access.log.gz
spark_write.spark.log.gz

Error
spark_fail_write.access.log.gz
spark_fail_write.spark.log.gz

@windkit
Copy link
Contributor Author

windkit commented Feb 1, 2017

Read Issue

Read requests are issued as Range Get (as tasks could be partitioned to multiple worker)
Cache in leo_gateway is not used.

@windkit
Copy link
Contributor Author

windkit commented Feb 9, 2017

Test with Spark 2.1.0 + Hadoop 3.0.0-alpha2

To test with new version of Hadoop which includes performance improvement of S3 support

Logs

spark_210_hadoop_3a2_write.access.log.gz

Number of requests decreases from >3000 to ~600 for one small write

$ bin/pyspark
>>> sc.parallelize([1,2,3]).saveAsTextFile("s3a://test/testout.txt")

@windkit
Copy link
Contributor Author

windkit commented Feb 9, 2017

Small File Testing

** Data Set **
~3900 Image File (Total: ~170 MB)

Hadoop Setting

1x Name node (+Secondary Name Node)
3x Data node

Read from Hadoop

>>> sc.textFile("hdfs://leofs-ubuntu1404-node08:9000/images/*").count()
1209364

Duration: 9s

Metric Min 25th percentile Median 75th percentile Max
Duration 5 ms 10 ms 41 ms 42 ms 0.1 s
GC Time 0 ms 0 ms 0 ms 0 ms 51 ms

Read from LeoFS

>>> sc.textFile("s3a://test/*").count()
1209364

Duration: 9s

Metric Min 25th percentile Median 75th percentile Max
Duration 10 ms 16 ms 41 ms 45 ms 0.1 s
GC Time 0 ms 0 ms 0 ms 0 ms 30 ms

@windkit
Copy link
Contributor Author

windkit commented Mar 15, 2017

With a large data set (500 dirs x 1600 files), it took too long to list the number of objects
Know Issue: #548

It is quite difficult to work with common use pattern e.g. sc.textfile("s3a://test_bucket/*")

@mocchira
Copy link
Member

@windkit yes it's unavoidable in case sc.textfile("s3a://test_bucket/*") until #548 fixed.
however that could be mitigated if sc.textfile("s3a://test_bucket/${DIR}/*") could be used instead as the prefix match can be effective on the local node with metadata backed by leveldb.
isn't it possible in your use case?

@windkit
Copy link
Contributor Author

windkit commented Mar 15, 2017

@mocchira Yes, that's the way I am trying to work around the bottleneck. Will update here later

@yosukehara yosukehara modified the milestones: 1.4.0, 1.3.3 Apr 11, 2017
@yosukehara yosukehara added v1.4 and removed v1.3 labels Apr 11, 2017
@yosukehara yosukehara removed their assignment May 8, 2017
@windkit
Copy link
Contributor Author

windkit commented Jul 3, 2017

Gateway Logs (Added Copy Log at info)
spark_fail.access.txt
spark_fail.info.txt

Storage Logs (Added prefix search log at info)
spark_fail.saccess.txt
spark_fail.sinfo.txt

@windkit
Copy link
Contributor Author

windkit commented Jul 3, 2017

  • Investigate Write Problem
  • Performance Tuning
    • Range Cache (Hadoop always uses Range Get)

@windkit
Copy link
Contributor Author

windkit commented Jul 5, 2017

Issue Summary

Directory deletion is done asynchronously, if the directory is re-created afterwards, incorrect deletion will happen, similar to the bucket case #150

spark_fail_sinfo.txt
spark_fail_access.txt
spark_fail_info.txt
spark_fail_saccess.txt

Related PR

#782

@windkit
Copy link
Contributor Author

windkit commented Aug 24, 2017

Issue fixed with
Spark 2.2.0 + Hadoop 2.7.3 and Spark 1.6.1 + Hadoop 2.6.3 with
hadoop-aws-2.7.3.jar and aws-java-sdk-1.7.4.jar

@windkit
Copy link
Contributor Author

windkit commented Jan 11, 2018

With s3a:// available, it may be possible to use cp / distcp to copy file to/from hadoop

https://community.hortonworks.com/questions/7165/how-to-copy-hdfs-file-to-aws-s3-bucket-hadoop-dist.html

TODO

  • Test cp/distcp

@yosukehara yosukehara removed this from the 1.4.0 milestone Mar 6, 2018
@yosukehara yosukehara added this to the 1.4.1 milestone Mar 6, 2018
@mocchira mocchira modified the milestones: 1.4.1, 1.4.2 Mar 23, 2018
@mocchira mocchira modified the milestones: 1.4.2, 1.4.3 Apr 25, 2018
@mocchira mocchira modified the milestones: 1.4.3, 1.5.0 Jul 25, 2018
@yosukehara yosukehara removed the v1.4 label Feb 25, 2019
@yosukehara yosukehara modified the milestones: 1.5.0, 1.6.0 Feb 27, 2019
@yosukehara yosukehara assigned yosukehara and unassigned windkit Feb 27, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants