Skip to content

Latest commit

 

History

History
261 lines (181 loc) · 10.1 KB

team_discussion_for_06182020.md

File metadata and controls

261 lines (181 loc) · 10.1 KB

resources for discussion

Why XGBoost

drawing

decision tree, weak learner(individually they are quite inaccurate, but slightly better when work together)

drawing

second tree must provide positive effort when combine with first tree

PCA

drawing

  • Dimension for original data is reasonable for human beings, but might not friendly for decision tree

  • Try to represent data with different coordinate system

  • Make the dimension the principal component with most variation - (Minimize difference between min value and max value)

Why Parquet

Target of column based data organize

  • Only load needed data
  • Organize data storage based on needs(query, filter)

Sample data input
drawing

Flat data vs. Nested data

drawing

Option on Parquet with scala

val flatDF = sc.read.option("delimiter", "\t")
                    .option("header", "true")
                    .csv(flatInput)
                    .rdd
                    .map(r => transformRow(r))
                    .toDF

flatDF.write.option("compression", "snappy")
            .parquet(flatOutput)

var nestedDF = sc.read.json(nestedInput)

nestedDF.write.option("compression", "snappy")
              .parquet(nestedOutput)

How to avoid reading un-related column?

drawing

By record data column oriented, we could use different way to compress the data
Incrementally(record diff), or use dictionary

More details could be found in parquet encoding

How to save space

drawing

drawing

Parquet file internal

tree structure

image

parquet file format

  • file metadata: schema, num of rows
  • Row group: each time when write massive data, we are not going to write them all together but piece by piece
  • Column Chunk: consider each column individually(if whole column is no, directly skip)
  • Page header: size
  • Page: record meta data like count, max, min to help quick filter

The power of partition

dataFrame.write
         .partitionBy("Year", "Month", "Day", "Hour")
         .parquet(outputFile)

Understanding the case in paper

https://github.com/CodeBear801/tech_summary/blob/master/tech-summary/papers/dremel.md#understanding-the-case-in-paper

Experiment

https://github.com/apache/parquet-mr

set up env

docker pull nathanhowell/parquet-tools:latest

Experiment via docker

wget https://github.com/apache/drill/blob/master/exec/java-exec/src/test/resources/lateraljoin/nested-customer.parquet

docker run -it --rm nathanhowell/parquet-tools:latest --help

docker run --rm -it -v /yourlocalpath:/test nathanhowell/parquet-tools:latest schema test/nested-customer.parquet
➜  tmp docker run --rm -it -v /Users/xunliu/Downloads/tmp:/test nathanhowell/parquet-tools:latest schema test/nested-customer.parquet
message root {
  optional binary _id (UTF8);
  optional binary c_address (UTF8);
  optional double c_id;
  optional binary c_name (UTF8);
  repeated group orders {
    repeated group items {
      optional binary i_name (UTF8);
      optional double i_number;
      optional binary i_supplier (UTF8);
    }
    optional double o_amount;
    optional double o_id;
    optional binary o_shop (UTF8);
  }
}

Why apache dataframe

Why resilient distribute data

map reduce flow drawing

Let's say there are multiple stage of map reduce

  • how to represent distribute data for programming language

  • what if middle step failed, how to recover

  • how to let programmer easy to write mr program

  • let's say we want to first add 1 on all numbers then filter odd numbers, can we optimize calculation?

RDD means Resilient Distributed Datasets, an RDD is a collection of partitions of records.

The main challenge in designing RDDs is defining a programming interface 
that can provide fault tolerance efficiently. 

What is RDD

每个RDD都包含:
(1)一组RDD分区(partition,即数据集的原子组成部分);
(2)对父RDD的一组依赖,这些依赖描述了RDD的Lineage;
(3)一个函数,即在父RDD上执行何种计算;
(4)元数据,描述分区模式和数据存放的位置。
例如,一个表示HDFS文件的RDD包含:各个数据块的一个分区,并知道各个数据块放在哪些节点上。
而且这个RDD上的map操作结果也具有同样的分区,map函数是在父数据上执行的。

Example code

val rdd = sc.textFile("/mnt/wikipediapagecounts.gz")
var parsedRDD = rdd.flatMap{
    line => line.split("""\s+""") match {
        case Array(project, page, numRequests,-)=>Some((project, page, numRequests))
        case _=None
    }
}

// filter only english pages; count pages and requests to it
parsedRDD.filter{case(project, page, numRequests) => project == "en"}
         .map{ case(_, page, numRequests) => (page, numRequests)}
         .reduceByKey(_+_)
         .take(100)
         .foreach{case (page, requests) => println(s"$page:$requests")}

why not rdd

drawing

Spark don't look into lambda functions, and he don't know what's the data/type

drawing

DataFrame Sample

// convert RDD -> DF with colum names
val df = parsedRDD.toDF("project", "page", "numRequests")
// filter, groupBy, sum, and then agg()
df.filter($"project" === "en")
  .groupBy($"page")
  .agg(sum($"numRequests").as("count"))
  .limit(100)
  .show(100)
project page numRequests
en 23 45
en 24 200

drawing

drawing

drawing

Experiment

More info