WARNING: This is a technical and hands on workshop. Hoping for a lecture? Go to a different talk.
I will not be lecturing on Pig. By diving right in and actually using Apache Pig to solve an interesting problem we’ll see some of its peculiarities, how to work with it, and how to integrate it into a typical analytic workflow. The idea here is to use me, yourself, your neighbors, and possibly even google to work through the following outline:
The question we want to answer is a simple (but interesting) one:
“How have domestic airports changed since 1990?”
- Get Data
- We will gather all our raw data sources together
- Explore Data
- head, cut, etc just to see what it looks like
- Load Data
- We will upload our data to the HDFS (Hadoop Distributed File System)
- Pig Setup
- An aside moment to ensure everyone is on the same page with Apache Pig
- Analyze Data (Pig!)
- flights and passengers degree distribution
- join with geo-location
- Get answers
- look at degree (passenger, seat, and flight) distributions as a function of time
Got it? Let’s take an hour and see how far we get. Go.
You will definitely need a laptop (or a desktop if you’re crazy enough to carry one in on your back like a sherpa), a unix terminal, and a text editor. You can follow along without these things but it’s going to be a lot less fun.
Before we can even begin to answer our question we’ve got to have the data. Fortunately all of the data we need (and more) can be found at Infochimps. The first data set of interest contains over 9000 airports, their airport codes, and most importantly, their geo-locations. It can be downloaded here. Here’s a quick sample of that dataset:
airport_code | latitude | longitude | airport_name | city | country | country_abbv | gmt_offset | runway_length | runway_elevation | ||||||||||
MSW | 15.67 | 39.3700 | Massawa International | Massawa | Eritrea | ER | +3.0 | 11471 | 194 | ||||||||||
TES | 15.1166 | 36.6833 | Tessenei | Tessenei | Eritrea | ER | +3.0 | 6234 | 2018 | ||||||||||
ASM | 15.2919 | 38.9105 | Yohannes IV | Asmara | Eritrea | ER | +3.0 | 9843 | 7661 | ||||||||||
ASA | 13.0716 | 42.6449 | Assab International | Assab | Eritrea | ER | +3.0 | 7546 | 46 | ||||||||||
NLK | -29.0416 | 167.9386 | Norfolk Island | Norfolk Island | Norfolk Island | NF | +11.5 | 6400 | 371 | ||||||||||
URT | 9.1 | 99.3333 | Surat Thani | Surat Thani | Thailand | TH | -7.0 | 8202 | 19 | ||||||||||
PHZ | 8.1666 | 98.2833 | Phi Phi Island | Phi Phi Island | Thailand | TH | -7.0 | |PHS |
16.7833 | 100.2666 | Phitsanulok | Phitsanulok | Thailand | TH | -7.0 | 9843 | 145 | ||
UTP | 12.6666 | 100.9833 | Utapao | Utapao | Thailand | TH | +7.0 | 11500 | 59 | ||||||||||
UTH | 17.3863 | 102.7883 | Udon Thani | Udon Thani | Thailand | TH | +7.0 | 10000 | 579 |
The last data set we need is really just a giant network graph detailing the number of flights and passengers between domestic airports for almost 20 years. It’s awesome. This can be downloaded for free here for data day only. And here’s what that data set looks like:
origin_airport | destin_airport | origin_city | destin_city | passengers | seats | flights | distance | month | origin_pop | destin_pop |
MFR | RDM | Medford, OR | Bend, OR | 0 | 0 | 1 | 156 | 200810 | 200298 | 157730 |
AMA | EKO | Amarillo, TX | Elko, NV | 124 | 124 | 1 | 858 | 199308 | 202960 | 40259 |
TUS | EKO | Tucson, AZ | Elko, NV | 112 | 124 | 1 | 658 | 199308 | 711392 | 40259 |
AMA | EKO | Amarillo, TX | Elko, NV | 115 | 124 | 1 | 858 | 199406 | 206315 | 41668 |
ICT | EKO | Wichita, KS | Elko, NV | 100 | 124 | 1 | 1007 | 199607 | 552884 | 45034 |
SPS | EKO | Wichita Falls, TX | Elko, NV | 122 | 124 | 1 | 1059 | 199603 | 147683 | 45034 |
Note this data set normally lives on infochimps here
All the data here is small enough to explore on a single machine and with standard unix command-line tools. Take a moment to do so. Recommended: cat
, head
, cut
, wc -l
, uniq -c
. This allows you to really understand what it is that you’re dealing with. Don’t fly blind.
This step is (basically) optional. That is, if previously in the day you set up Hadoop in distributed (or pseudo-distributed) mode, this step is for you. Otherwise, put your data files in some suitable work directory (say /data) and keep exploring.
Assuming your data files are called flight_edges.tsv
and airport_locations.tsv
to upload simply do:
hadoop fs -mkdir /data/domestic/airports
hadoop fs -put flight_edges.tsv /data/domestic/airports/
hadoop fs -put airport_locations.tsv /data/domestic/airports/
will put your data files into a directory called /data/domestice/airports/
on the hdfs. Now pig can see them when ran in hadoop mode.
Now that we have the data on the hdfs (or locally) let’s open up a Pig interactive shell, load the data into pig, and dump out the first 10 lines or so. This will really quickly ensure we’ve got all our configuration in order:
$: pig
grunt> flight_edges = LOAD '/data/domestic/airports/flight_edges.tsv' AS (origin_code:chararray, destin_code:chararray, origin_city:chararray, destin_city:chararray, passengers:int, seats:int, flights:int, distance:float, month:int, origin_pop:int, destin_pop:int);
grunt> ten_flight_edges = LIMIT flight_edges 10;
grunt> DUMP ten_flight_edges;
You should see Pig start spamming your screen with all sorts of messages. Ultimately you’ll see 10 lines of data on your screen.
Do the same for the airport locations.
Notes: If pig doesn’t start up right away detecting your hadoop setup PIG_CLASSPATH
is probably to blame. Aim it at the directory containing core-site.xml
and mapred-site.xml
. This is typically /etc/hadoop/conf
.
Questions?
The next step (finally, some pig!) is to actually analyze our data. Before we can do that we have to decide what question we want to answer:
How have domestic airports changed since 1990?
and break this into smaller, more specific, questions:
- What is the yearly passenger degree distribution? That is, for every airport and year, what is the sum of inbound + outbound passengers? What about seats available? What about flights?
- What airports or regions were most affected? the least?
- Anything else interesting here?
And then decide how to answer these smaller questions with Pig syntax (some will require making plots for full effect).
Before we can answer any of the above question we have to notice that our data is broken down by month and not year. Let’s fix that with a pig projection:
-- Cut off all monthly data portion, we'll sum everything for a whole year in the next step
year_data = FOREACH flight_edges {
year = (int)month/(int)100;
GENERATE
origin_code AS origin_code,
destin_code AS destin_code,
passengers AS passengers,
seats AS seats,
flights AS flights,
year AS year
;
};
Notice all the nifty things we can do (ie. before the GENERATE
part we could do a whole lot more, and we can rename and rearrange fields). Often you find yourself doing simple math during a projection as we’ll see next.
Next we are interested in the degree distributions for every year. Before we can do that we need to generate passengers, seats, and flights out for every airport as well as passengers, seats, and flights in for every airport. Pig allows us to say this very succinctly:
-- For every (airport,month) pair get passengers, seats, and flights out
edges_out = FOREACH year_data GENERATE
origin_code AS airport,
year AS year,
passengers AS passengers_out,
seats AS seats_out,
flights AS flights_out
;
-- For every (airport,month) pair get passengers, seats, and flights in
edges_in = FOREACH year_data GENERATE
destin_code AS airport,
year AS year,
passengers AS passengers_in,
seats AS seats_in,
flights AS flights_in
;
This part wasn’t strictly necessary but it reads well. It doesn’t hurt us either because the pig optimizer rolls up all contiguous projections into one hadoop map task.
Finally, we’ll join the outbound data for a given year and airport togeher with the inbound data for the same year and aiport and sum the results:
-- group them together and sum
grouped_edges = COGROUP edges_in BY (airport,year), edges_out BY (airport,year);
degree_dist = FOREACH grouped_edges {
passenger_degree = SUM(edges_in.passengers_in) + SUM(edges_out.passengers_out);
seats_degree = SUM(edges_in.seats_in) + SUM(edges_out.seats_out);
flights_degree = SUM(edges_in.flights_in) + SUM(edges_out.flights_out);
GENERATE
FLATTEN(group) AS (airport, year),
passenger_degree AS passenger_degree,
seats_degree AS seats_degree,
flights_degree AS flights_degree
;
};
Here we’re using the COGROUP
operator to join the two relations (edges out and edges in) by their common key (airport,year). This will result in two big bags that must be flattened in the projection. During the projection we SUM
the appropriate meta data. One peculiarity of pig is that the join key after a GROUP
or COGROUP
operation is called group. We have to use the FLATTEN
operator on it.
One final thing is to store the data out to disk:
STORE degree_dist INTO '/data/domestic/airports/degree_distribution';
and run it (save the code into a file called ‘degree_distribution.pig’ or just run the one in the git repo):
# local mode
pig -x local -p FLIGHT_EDGES=/data/domestic/airports/flight_edges.tsv -p DEG_DIST=/data/domestic/airports/degree_distribution degree_distribution.pig
# hadoop mode
pig -p FLIGHT_EDGES=/data/domestic/airports/flight_edges.tsv -p DEG_DIST=/data/domestic/airports/degree_distribution degree_distribution.pig
Well, a simple degree distribution is nice and all, but what if we want to see what regions were more affected? We might want to plot our data on a map. To do that we’ll have to join our degree distribtion data with our geolocation data:
Here’s how to say that with Pig:
-- Load data (boring part)
deg_dist = LOAD '/data/domestic/airports/degree_distribution' AS (airport_code:chararray, year:int, passenger_degree:int, seats_degree:int, flights_degree:int);
airports = LOAD '/data/domestic/airports/airport_locations.tsv' AS (airport_code:chararray, latitude:float, longitude:float); -- other fields will be dropped
--
-- Join tables together with inner join on common field
with_geo = JOIN airports BY airport_code, deg_dist BY airport_code;
with_geo_flat = FOREACH with_geo GENERATE
airports::airport_code AS airport_code,
airports::latitude AS latitude,
airports::longitude AS longitude,
deg_dist::passenger_degree AS passenger_degree,
deg_dist::seats_degree AS seats_degree,
deg_dist::flights_degree AS flights_degree,
deg_dist::year AS year
;
-- Store into a flat file
STORE with_geo_flat INTO '$DEG_DIST_GEO';
Notice that we use the projection after the JOIN
to pick out the fields we want from the operation.
run it:
# local mode:
pig -x local -p DEG_DIST=/data/domestic/airports/degree_distribution -p AIRPORTS=/data/domestic/airports/airport_locations.tsv -p DEG_DIST_GEO=/data/domestic/airports/deg_dist_with_geo join_with_geo.pig
# hadoop mode:
pig -p DEG_DIST=/data/domestic/airports/degree_distribution -p AIRPORTS=/data/domestic/airports/airport_locations.tsv -p DEG_DIST_GEO=/data/domestic/airports/deg_dist_with_geo join_with_geo.pig
You can go a bit further if you choose to and think about some other things you might do with this data:
-- get the maximum flights in + out per year
by_year = GROUP degree_dist BY year;
big_airports = FOREACH by_year {
max_flights = MAX(degree_dist.flights_degree);
GENERATE
group AS year,
max_flights AS max_flights;
};
and so on. The pig documentation here is a great overview. For more pig code examples see sounder.
The last hadoop portion is to pull your result set down from the hdfs. Hadoop provides a nice convenience utility for doing this (only applies if you used hadoop):
# copy degree distribution
hadoop fs -copyToLocal /data/domestic/airports/degree_distribution data/deg_dist
cat data/deg_dist/part-* > data/yearly_degrees.tsv
# copy degree dist with geo
hadoop fs -copyToLocal /data/domestic/airports/deg_dist_with_geo data/deg_dist_with_geo
cat data/deg_dist_with_geo/part-* > data/yearly_degrees_with_geo.tsv
Now that your result data is local the next logical step is to import it into some analytics software and explore it with tools you already know. See the code in the plot
directory for making simple plots in R and the www
directory for a visualization of the degree distribution on a map using protovis