-
Notifications
You must be signed in to change notification settings - Fork 10
/
problem22
73 lines (64 loc) · 6.02 KB
/
problem22
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
scala> val content = sc.textFile("/user/smartlin1/data/Content.txt")
17/10/11 17:31:11 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 336.8 KB, free 336.8 KB)
17/10/11 17:31:11 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 28.4 KB, free 365.2 KB)
17/10/11 17:31:11 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:36106 (size: 28.4 KB, free: 511.1 MB)
17/10/11 17:31:11 INFO SparkContext: Created broadcast 0 from textFile at <console>:27
content: org.apache.spark.rdd.RDD[String] = /user/smartlin1/data/Content.txt MapPartitionsRDD[1] at textFile at <console>:27
scala> val remove = sc.textFile("/user/smartlin1/data/Remove.txt")
17/10/11 17:33:25 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 336.8 KB, free 702.0 KB)
17/10/11 17:33:25 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 28.4 KB, free 730.4 KB)
17/10/11 17:33:25 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:36106 (size: 28.4 KB, free: 511.1 MB)
17/10/11 17:33:25 INFO SparkContext: Created broadcast 1 from textFile at <console>:27
content: org.apache.spark.rdd.RDD[String] = /user/smartlin1/data/Remove.txt MapPartitionsRDD[3] at textFile at <console>:27
scala> val removeRdd = remove.flatMap(x => x.split(",")).map(word => word.trim)
removeRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at map at <console>:29
scala> val removeRDD = remove.flatMap(_.split(",")).map(_.trim)
removeRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at map at <console>:29
// Broadcast the remove list
scala> val broadcastRemove = sc.broadcast(removeRDD.collect().toList)
17/10/11 18:07:29 INFO FileInputFormat: Total input paths to process : 1
17/10/11 18:07:29 INFO SparkContext: Starting job: collect at <console>:31
17/10/11 18:07:29 INFO DAGScheduler: Got job 0 (collect at <console>:31) with 2 output partitions
17/10/11 18:07:29 INFO DAGScheduler: Final stage: ResultStage 0 (collect at <console>:31)
17/10/11 18:07:29 INFO DAGScheduler: Parents of final stage: List()
17/10/11 18:07:29 INFO DAGScheduler: Missing parents: List()
17/10/11 18:07:29 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[5] at map at <console>:29), which has no missing parents
17/10/11 18:07:29 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.5 KB, free 733.9 KB)
17/10/11 18:07:30 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2007.0 B, free 735.9 KB)
17/10/11 18:07:30 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:49985 (size: 2007.0 B, free: 511.1 MB)
17/10/11 18:07:30 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1008
17/10/11 18:07:30 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[5] at map at <console>:29)
17/10/11 18:07:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
17/10/11 18:07:30 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2163 bytes)
17/10/11 18:07:30 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,ANY, 2163 bytes)
17/10/11 18:07:30 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
17/10/11 18:07:30 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
17/10/11 18:07:30 INFO HadoopRDD: Input split: hdfs://nn01.itversity.com:8020/user/smartlin1/data/Remove.txt:0+10
17/10/11 18:07:30 INFO HadoopRDD: Input split: hdfs://nn01.itversity.com:8020/user/smartlin1/data/Remove.txt:10+11
17/10/11 18:07:30 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
17/10/11 18:07:30 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
17/10/11 18:07:30 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
17/10/11 18:07:30 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
17/10/11 18:07:30 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
17/10/11 18:07:30 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2070 bytes result sent to driver
17/10/11 18:07:30 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 2044 bytes result sent to driver
17/10/11 18:07:30 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 178 ms on localhost (1/2)
17/10/11 18:07:30 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 145 ms on localhost (2/2)
17/10/11 18:07:30 INFO DAGScheduler: ResultStage 0 (collect at <console>:31) finished in 0.192 s
17/10/11 18:07:30 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/10/11 18:07:30 INFO DAGScheduler: Job 0 finished: collect at <console>:31, took 0.298007 s
17/10/11 18:07:30 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 336.0 B, free 736.2 KB)
17/10/11 18:07:30 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 220.0 B, free 736.4 KB)
17/10/11 18:07:30 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:49985 (size: 220.0 B, free: 511.1 MB)
17/10/11 18:07:30 INFO SparkContext: Created broadcast 3 from broadcast at <console>:31
broadcastRemove: org.apache.spark.broadcast.Broadcast[List[String]] = Broadcast(3)
scala> val words = content.flatMap(_.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at flatMap at <console>:29
scala> val filtered = words.filter{case(word) => !broadcastRemove.value.contains(word)}
filtered: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at filter at <console>:37
scala> val pairRDD = filtered.map(word => (word, 1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:39
scala> val wordCount = pairRDD.reduceByKey(_+_)
17/10/11 18:16:06 INFO FileInputFormat: Total input paths to process : 1
wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:41
wordCount.saveAsTextFile("/user/smartlin1/output/result_wordCount.txt")