-
Notifications
You must be signed in to change notification settings - Fork 10
/
problem77
46 lines (31 loc) · 2.72 KB
/
problem77
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
scala> val manager = sc.textFile("problem/EmployeeManager.csv")
manager: org.apache.spark.rdd.RDD[String] = problem/EmployeeManager.csv MapPartitionsRDD[1] at textFile at <console>:27
scala> val managerPairRDD = manager.map(x => (x.split(",")(0), x.split(",")(1)))
managerPairRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:29
scala> val name = sc.textFile("problem77/EmployeeName.csv")
name: org.apache.spark.rdd.RDD[String] = problem77/EmployeeName.csv MapPartitionsRDD[4] at textFile at <console>:27
scala> val manager = sc.textFile("problem77/EmployeeManager.csv")
manager: org.apache.spark.rdd.RDD[String] = problem77/EmployeeManager.csv MapPartitionsRDD[6] at textFile at <console>:27
scala> val managerPairRDD = manager.map(x => (x.split(",")(0), x.split(",")(1)))
managerPairRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[7] at map at <console>:29
scala> val namePairRDD = name.map(x => (x.split(",")(0), x.split(",")(1)))
namePairRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[8] at map at <console>:29
scala> val manager = sc.textFile("problem77/EmployeeSalary.csv")
manager: org.apache.spark.rdd.RDD[String] = problem77/EmployeeSalary.csv MapPartitionsRDD[10] at textFile at <console>:27
scala> val manager = sc.textFile("problem77/EmployeeManager.csv")
manager: org.apache.spark.rdd.RDD[String] = problem77/EmployeeManager.csv MapPartitionsRDD[12] at textFile at <console>:27
scala> val salary = sc.textFile("problem77/EmployeeSalary.csv")
salary: org.apache.spark.rdd.RDD[String] = problem77/EmployeeSalary.csv MapPartitionsRDD[14] at textFile at <console>:27
scala> val salaryPairRDD = salary.map(x => (x.split(",")(0), x.split(",")(1)))
salaryPairRDD: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[15] at map at <console>:29
scala> val joined = namePairRDD.join(salaryPairRDD).join(managerPairRDD)
joined: org.apache.spark.rdd.RDD[(String, ((String, String), String))] = MapPartitionsRDD[21] at join at <console>:39
scala> val joinedSortedData = joined.sortByKey()
joinedSortedData: org.apache.spark.rdd.RDD[(String, ((String, String), String))] = ShuffledRDD[24] at sortByKey at <console>:41
scala> val finalData = joinedData.map(v=> (v._1, v._2._1._1, v._2._1._2, v._2._2))
<console>:25: error: not found: value joinedData
val finalData = joinedData.map(v=> (v._1, v._2._1._1, v._2._1._2, v._2._2))
^
scala> val finalData = joinedSortedData.map(v=> (v._1, v._2._1._1, v._2._1._2, v._2._2))
finalData: org.apache.spark.rdd.RDD[(String, String, String, String)] = MapPartitionsRDD[25] at map at <console>:43
scala> finalData.saveAsTextFile("spark1/result.txt")