diff --git a/5003-project new.pdf b/5003-project new.pdf new file mode 100644 index 0000000..57d1825 Binary files /dev/null and b/5003-project new.pdf differ diff --git a/model_pipeline_andy.ipynb b/model_pipeline_andy.ipynb new file mode 100644 index 0000000..4b6cd94 --- /dev/null +++ b/model_pipeline_andy.ipynb @@ -0,0 +1,1515 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from pyspark.sql import SQLContext\n", + "from pyspark.sql.types import *\n", + "from pyspark.sql import Row\n", + "from pyspark.mllib.regression import LabeledPoint\n", + "from pyspark.sql.functions import udf\n", + "from pyspark.mllib.linalg import Vectors\n", + "from pyspark.ml.classification import LogisticRegression\n", + "from pyspark.ml.param import Param, Params\n", + "from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel\n", + "from pyspark.mllib.regression import LabeledPoint\n", + "from pyspark.mllib.stat import Statistics\n", + "from pyspark.ml.feature import OneHotEncoder, StringIndexer\n", + "from pyspark.mllib.linalg import Vectors\n", + "from pyspark.ml.feature import VectorAssembler\n", + "from IPython.display import display\n", + "from ipywidgets import interact\n", + "from pyspark.sql.functions import *\n", + "from pyspark.sql.functions import udf\n", + "from pyspark.sql.types import IntegerType\n", + "from operator import add\n", + "import sys\n", + "import numpy as np\n", + "import pandas as pd\n", + "import time\n", + "import datetime\n", + "from pyspark.mllib.tree import RandomForest, RandomForestModel\n", + "\n", + "from pyspark import SparkContext\n", + "from pyspark.sql import SparkSession\n", + "\n", + "sc = SparkContext(\"local\",'app')\n", + "spark = SparkSession.builder.appName('name').config('spark.sql.shuffle.partitions',10).getOrCreate()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 1. load data\n", + "## 1.1 big trainset" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- DAY_OF_WEEK: integer (nullable = true)\n", + " |-- AIRLINE: string (nullable = true)\n", + " |-- ORIGIN_AIRPORT: string (nullable = true)\n", + " |-- DESTINATION_AIRPORT: string (nullable = true)\n", + " |-- DEPARTURE_TIME: integer (nullable = true)\n", + " |-- DEPARTURE_DELAY: integer (nullable = true)\n", + " |-- SCHEDULED_TIME: integer (nullable = true)\n", + " |-- ELAPSED_TIME: integer (nullable = true)\n", + " |-- DISTANCE: integer (nullable = true)\n", + " |-- SCHEDULED_ARRIVAL: integer (nullable = true)\n", + " |-- ARRIVAL_TIME: integer (nullable = true)\n", + " |-- ARRIVAL_DELAY: integer (nullable = true)\n", + " |-- NEW_SCHEDULED_DEPARTURE: integer (nullable = true)\n", + " |-- NEW_DAY: integer (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "data=spark.read.csv('train_test/train_flight.csv',header=True,inferSchema=True)\n", + "data.printSchema()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1.2 small_trainset" + ] + }, + { + "cell_type": "code", + "execution_count": 55, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "[u'MQ',\n", + " u'NK',\n", + " u'OO',\n", + " u'B6',\n", + " u'F9',\n", + " u'US',\n", + " u'WN',\n", + " u'VX',\n", + " u'AA',\n", + " u'DL',\n", + " u'UA',\n", + " u'EV',\n", + " u'AS',\n", + " u'HA']" + ] + }, + "execution_count": 55, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "airlines=data.select('AIRLINE').distinct().rdd.map(lambda row:row['AIRLINE']).collect()\n", + "airlines" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
DAY_OF_WEEKAIRLINEORIGIN_AIRPORTDESTINATION_AIRPORTDEPARTURE_TIMEDEPARTURE_DELAYSCHEDULED_TIMEELAPSED_TIMEDISTANCESCHEDULED_ARRIVALARRIVAL_TIMEARRIVAL_DELAYNEW_SCHEDULED_DEPARTURENEW_DAY
01AAABQCLT28-122051871449605535-3040201
11AAABQCLT31-92051851449605536-2940215
21AAABQCLT31-92051941449605545-2040243
31AAABQCLT34-62051971449605551-1440208
41AAABQCLT36-42052131449605609440229
\n", + "
" + ], + "text/plain": [ + " DAY_OF_WEEK AIRLINE ORIGIN_AIRPORT DESTINATION_AIRPORT DEPARTURE_TIME \\\n", + "0 1 AA ABQ CLT 28 \n", + "1 1 AA ABQ CLT 31 \n", + "2 1 AA ABQ CLT 31 \n", + "3 1 AA ABQ CLT 34 \n", + "4 1 AA ABQ CLT 36 \n", + "\n", + " DEPARTURE_DELAY SCHEDULED_TIME ELAPSED_TIME DISTANCE SCHEDULED_ARRIVAL \\\n", + "0 -12 205 187 1449 605 \n", + "1 -9 205 185 1449 605 \n", + "2 -9 205 194 1449 605 \n", + "3 -6 205 197 1449 605 \n", + "4 -4 205 213 1449 605 \n", + "\n", + " ARRIVAL_TIME ARRIVAL_DELAY NEW_SCHEDULED_DEPARTURE NEW_DAY \n", + "0 535 -30 40 201 \n", + "1 536 -29 40 215 \n", + "2 545 -20 40 243 \n", + "3 551 -14 40 208 \n", + "4 609 4 40 229 " + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "data_df=data.toPandas()\n", + "data_df.head()" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
DAY_OF_WEEKDEPARTURE_TIMESCHEDULED_TIMEELAPSED_TIMEDISTANCESCHEDULED_ARRIVALARRIVAL_TIMEARRIVAL_DELAYNEW_SCHEDULED_DEPARTURENEW_DAYDEPARTURE_DELAYsquare_schedulesquare_dayschedule_by_day
count479982.000000479982.000000479982.000000479982.000000479982.000000479982.000000479982.000000479982.000000479982.000000479982.000000479982.0000004.799820e+05479982.000000479982.000000
mean3.9471981299.554883172.461207167.0992891047.9948791502.5471061489.613533-3.427633791.441200198.4021881.9342857.146360e+0549968.049387156880.277765
std1.990313499.97615877.90531477.405218636.651491510.987688518.13596819.024725297.080736102.97884912.8990764.849673e+0540496.916305105326.948777
min1.0000001.00000042.00000034.00000083.0000001.0000001.000000-87.0000005.0000001.000000-68.0000002.500000e+011.00000010.000000
25%2.000000854.000000114.000000110.000000599.0000001112.0000001104.000000-15.000000535.000000115.000000-5.0000002.862250e+0513225.00000074730.000000
50%4.0000001301.000000163.000000157.000000980.0000001525.0000001518.000000-7.000000780.000000207.000000-3.0000006.084000e+0542849.000000138990.000000
75%6.0000001719.000000210.000000205.0000001345.0000001925.0000001922.0000005.0000001035.000000270.0000003.0000001.071225e+0672900.000000224790.000000
max7.0000002400.000000523.000000630.0000003784.0000002359.0000002400.000000225.0000001439.000000365.00000059.0000002.070721e+06133225.000000525235.000000
\n", + "
" + ], + "text/plain": [ + " DAY_OF_WEEK DEPARTURE_TIME SCHEDULED_TIME ELAPSED_TIME \\\n", + "count 479982.000000 479982.000000 479982.000000 479982.000000 \n", + "mean 3.947198 1299.554883 172.461207 167.099289 \n", + "std 1.990313 499.976158 77.905314 77.405218 \n", + "min 1.000000 1.000000 42.000000 34.000000 \n", + "25% 2.000000 854.000000 114.000000 110.000000 \n", + "50% 4.000000 1301.000000 163.000000 157.000000 \n", + "75% 6.000000 1719.000000 210.000000 205.000000 \n", + "max 7.000000 2400.000000 523.000000 630.000000 \n", + "\n", + " DISTANCE SCHEDULED_ARRIVAL ARRIVAL_TIME ARRIVAL_DELAY \\\n", + "count 479982.000000 479982.000000 479982.000000 479982.000000 \n", + "mean 1047.994879 1502.547106 1489.613533 -3.427633 \n", + "std 636.651491 510.987688 518.135968 19.024725 \n", + "min 83.000000 1.000000 1.000000 -87.000000 \n", + "25% 599.000000 1112.000000 1104.000000 -15.000000 \n", + "50% 980.000000 1525.000000 1518.000000 -7.000000 \n", + "75% 1345.000000 1925.000000 1922.000000 5.000000 \n", + "max 3784.000000 2359.000000 2400.000000 225.000000 \n", + "\n", + " NEW_SCHEDULED_DEPARTURE NEW_DAY DEPARTURE_DELAY \\\n", + "count 479982.000000 479982.000000 479982.000000 \n", + "mean 791.441200 198.402188 1.934285 \n", + "std 297.080736 102.978849 12.899076 \n", + "min 5.000000 1.000000 -68.000000 \n", + "25% 535.000000 115.000000 -5.000000 \n", + "50% 780.000000 207.000000 -3.000000 \n", + "75% 1035.000000 270.000000 3.000000 \n", + "max 1439.000000 365.000000 59.000000 \n", + "\n", + " square_schedule square_day schedule_by_day \n", + "count 4.799820e+05 479982.000000 479982.000000 \n", + "mean 7.146360e+05 49968.049387 156880.277765 \n", + "std 4.849673e+05 40496.916305 105326.948777 \n", + "min 2.500000e+01 1.000000 10.000000 \n", + "25% 2.862250e+05 13225.000000 74730.000000 \n", + "50% 6.084000e+05 42849.000000 138990.000000 \n", + "75% 1.071225e+06 72900.000000 224790.000000 \n", + "max 2.070721e+06 133225.000000 525235.000000 " + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "data_df.describe()" + ] + }, + { + "cell_type": "code", + "execution_count": 82, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(505811,) (505811, 100)\n" + ] + } + ], + "source": [ + "%matplotlib inline\n", + "data_small=data_df[['DEPARTURE_DELAY','NEW_DAY',\"NEW_SCHEDULED_DEPARTURE\"]]\n", + "data_join=data_small.join(pd.get_dummies(data_df['ORIGIN_AIRPORT'],prefix='AIRPORT'))\n", + "feature_col=list(data_join.columns)\n", + "feature_col.remove('DEPARTURE_DELAY')\n", + "labels=data_join['DEPARTURE_DELAY'].as_matrix()\n", + "X=data_join[feature_col].as_matrix()\n", + "print labels.shape,X.shape" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from sklearn.preprocessing import StandardScaler\n", + "from sklearn.preprocessing import PolynomialFeatures\n", + "from sklearn.metrics import mean_squared_error\n", + "scaler = StandardScaler()\n", + "X_train=scaler.fit_transform(X)\n", + "\n", + "poly = PolynomialFeatures(degree = 2)\n", + "from sklearn.linear_model import LinearRegression\n", + "regr = LinearRegression()\n", + "X_ = poly.fit_transform(X_train)\n", + "regr.fit(X_, labels)\n", + "result = regr.predict(X_)\n", + "score =mean_squared_error(result, labels)\n", + "print(\"Mean squared error = \", score)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "print X_.shape" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1.3 choose one dataset" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "4185551\n", + "root\n", + " |-- DAY_OF_WEEK: integer (nullable = true)\n", + " |-- AIRLINE: string (nullable = true)\n", + " |-- ORIGIN_AIRPORT: string (nullable = true)\n", + " |-- DESTINATION_AIRPORT: string (nullable = true)\n", + " |-- DEPARTURE_TIME: integer (nullable = true)\n", + " |-- DEPARTURE_DELAY: integer (nullable = true)\n", + " |-- SCHEDULED_TIME: integer (nullable = true)\n", + " |-- ELAPSED_TIME: integer (nullable = true)\n", + " |-- DISTANCE: integer (nullable = true)\n", + " |-- SCHEDULED_ARRIVAL: integer (nullable = true)\n", + " |-- ARRIVAL_TIME: integer (nullable = true)\n", + " |-- ARRIVAL_DELAY: integer (nullable = true)\n", + " |-- NEW_SCHEDULED_DEPARTURE: integer (nullable = true)\n", + " |-- NEW_DAY: integer (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "#dataset=data_small\n", + "dataset_unprocess1=data\n", + "print(dataset_unprocess1.count())\n", + "dataset_unprocess1.printSchema()\n", + "#dataset.select(\"SCHEDULED_TIME\",'schedule_departure').show()" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "122036" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# just try airport clustering\n", + "dataset_unprocess2=dataset_unprocess1.filter(dataset_unprocess1['AIRLINE']=='AS')\n", + "dataset_unprocess2=dataset_unprocess2.filter(dataset_unprocess2['DEPARTURE_DELAY']<60)\n", + "dataset_unprocess2.count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 0.1 change label to classification" + ] + }, + { + "cell_type": "raw", + "metadata": {}, + "source": [ + "from pyspark.sql.functions import UserDefinedFunction\n", + "from pyspark.sql.types import IntegerType\n", + "name = 'DEPARTURE_DELAY'\n", + "\n", + "def tran_label(element):\n", + " if element < 0:\n", + " return 0\n", + " else:\n", + " return 1\n", + " \n", + "udf = UserDefinedFunction(lambda x: tran_label(x), IntegerType())\n", + "\n", + "new_data=dataset.select('*',udf(dataset['DEPARTURE_DELAY']).alias('class_labels'))\n", + "dataset=new_data.drop('DEPARTURE_DELAY')\n", + "dataset=dataset.withColumnRenamed('class_labels','DEPARTURE_DELAY')\n", + "dataset.printSchema()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 0.2 change label to doubletype\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- DAY_OF_WEEK: integer (nullable = true)\n", + " |-- AIRLINE: string (nullable = true)\n", + " |-- ORIGIN_AIRPORT: string (nullable = true)\n", + " |-- DESTINATION_AIRPORT: string (nullable = true)\n", + " |-- DEPARTURE_TIME: integer (nullable = true)\n", + " |-- SCHEDULED_TIME: integer (nullable = true)\n", + " |-- ELAPSED_TIME: integer (nullable = true)\n", + " |-- DISTANCE: integer (nullable = true)\n", + " |-- SCHEDULED_ARRIVAL: integer (nullable = true)\n", + " |-- ARRIVAL_TIME: integer (nullable = true)\n", + " |-- ARRIVAL_DELAY: integer (nullable = true)\n", + " |-- NEW_SCHEDULED_DEPARTURE: integer (nullable = true)\n", + " |-- NEW_DAY: integer (nullable = true)\n", + " |-- DEPARTURE_DELAY: double (nullable = true)\n", + "\n", + "root\n", + " |-- DAY_OF_WEEK: integer (nullable = true)\n", + " |-- AIRLINE: string (nullable = true)\n", + " |-- ORIGIN_AIRPORT: string (nullable = true)\n", + " |-- DESTINATION_AIRPORT: string (nullable = true)\n", + " |-- DEPARTURE_TIME: integer (nullable = true)\n", + " |-- SCHEDULED_TIME: integer (nullable = true)\n", + " |-- ELAPSED_TIME: integer (nullable = true)\n", + " |-- DISTANCE: integer (nullable = true)\n", + " |-- SCHEDULED_ARRIVAL: integer (nullable = true)\n", + " |-- ARRIVAL_TIME: integer (nullable = true)\n", + " |-- ARRIVAL_DELAY: integer (nullable = true)\n", + " |-- NEW_SCHEDULED_DEPARTURE: integer (nullable = true)\n", + " |-- NEW_DAY: integer (nullable = true)\n", + " |-- DEPARTURE_DELAY: double (nullable = true)\n", + " |-- square_schedule: integer (nullable = true)\n", + " |-- square_day: integer (nullable = true)\n", + " |-- schedule_by_day: integer (nullable = true)\n", + "\n" + ] + } + ], + "source": [ + "from pyspark.sql.functions import UserDefinedFunction\n", + "from pyspark.sql.types import DoubleType,IntegerType\n", + "\n", + "dataset=dataset_unprocess2\n", + "\n", + "udf = UserDefinedFunction(lambda x: x*1.0, DoubleType())\n", + "new_data=dataset.select('*',udf(dataset['DEPARTURE_DELAY']).alias('double_labels'))\n", + "dataset=new_data.drop('DEPARTURE_DELAY')\n", + "dataset=dataset.withColumnRenamed('double_labels','DEPARTURE_DELAY')\n", + "dataset.printSchema()\n", + "# create a new feature square_schedule\n", + "udf = UserDefinedFunction(lambda x: x*x, IntegerType())\n", + "new_data=dataset.select('*',udf(dataset['NEW_SCHEDULED_DEPARTURE']).alias('square_schedule'))\n", + "dataset=new_data\n", + "\n", + "#create a new feature square_day\n", + "udf = UserDefinedFunction(lambda x: x*x, IntegerType())\n", + "new_data=dataset.select('*',udf(dataset['NEW_DAY']).alias('square_day'))\n", + "dataset=new_data\n", + "\n", + "# create a new feature schedule_by_day\n", + "new_data=dataset.select('*',(dataset['NEW_DAY']*dataset['NEW_SCHEDULED_DEPARTURE']).alias('schedule_by_day'))\n", + "dataset=new_data\n", + "dataset.printSchema()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 2 feature transformation pipeline\n", + "## 2.1 feature selection (can be updated)" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# dataset=data\n", + "categoricalColumns = [] # to add\n", + "numericCols = ['NEW_SCHEDULED_DEPARTURE',\"square_day\",\"square_schedule\",\"NEW_DAY\",\"DISTANCE\",\"DAY_OF_WEEK\"] # to add\n", + "# all_features=categoricalColumns+numericCols" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2.2 transform and onehot" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "root\n", + " |-- label: double (nullable = true)\n", + " |-- features: vector (nullable = true)\n", + "\n", + "+-----+-------------------------------------------+\n", + "|label|features |\n", + "+-----+-------------------------------------------+\n", + "|-18.0|[980.0,49284.0,960400.0,222.0,1180.0,1.0] |\n", + "|-14.0|[980.0,52441.0,960400.0,229.0,1180.0,1.0] |\n", + "|-12.0|[980.0,37636.0,960400.0,194.0,1180.0,1.0] |\n", + "|-12.0|[980.0,40401.0,960400.0,201.0,1180.0,1.0] |\n", + "|-11.0|[980.0,43264.0,960400.0,208.0,1180.0,1.0] |\n", + "|-8.0 |[980.0,32400.0,960400.0,180.0,1180.0,1.0] |\n", + "|-8.0 |[980.0,46225.0,960400.0,215.0,1180.0,1.0] |\n", + "|-5.0 |[980.0,34969.0,960400.0,187.0,1180.0,1.0] |\n", + "|-8.0 |[1000.0,59049.0,1000000.0,243.0,1180.0,1.0]|\n", + "|-6.0 |[1000.0,55696.0,1000000.0,236.0,1180.0,1.0]|\n", + "|-12.0|[495.0,32400.0,245025.0,180.0,253.0,1.0] |\n", + "|-9.0 |[495.0,55696.0,245025.0,236.0,253.0,1.0] |\n", + "|-8.0 |[495.0,40401.0,245025.0,201.0,253.0,1.0] |\n", + "|-2.0 |[495.0,46225.0,245025.0,215.0,253.0,1.0] |\n", + "|0.0 |[495.0,59049.0,245025.0,243.0,253.0,1.0] |\n", + "|0.0 |[495.0,29929.0,245025.0,173.0,253.0,1.0] |\n", + "|2.0 |[495.0,49284.0,245025.0,222.0,253.0,1.0] |\n", + "|17.0 |[495.0,52441.0,245025.0,229.0,253.0,1.0] |\n", + "|-12.0|[1035.0,29929.0,1071225.0,173.0,253.0,1.0] |\n", + "|-7.0 |[1035.0,59049.0,1071225.0,243.0,253.0,1.0] |\n", + "+-----+-------------------------------------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "from pyspark.sql.types import *\n", + "from pyspark.ml import Pipeline\n", + "from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler\n", + "cols=dataset.columns\n", + "\n", + "stages = [] \n", + "feature_names=[]\n", + "for categoricalCol in categoricalColumns:\n", + " stringIndexer = StringIndexer(inputCol=categoricalCol, \n", + " outputCol=categoricalCol+\"Index\")\n", + " encoder = OneHotEncoder(inputCol=categoricalCol+\"Index\", \n", + " outputCol=categoricalCol+\"classVec\")\n", + " stages += [stringIndexer, encoder]\n", + "\n", + "assemblerInputs = map(lambda c: c + \"classVec\", categoricalColumns) + numericCols\n", + "assembler = VectorAssembler(inputCols=assemblerInputs, outputCol=\"Features\")\n", + "stages += [assembler]\n", + "pipeline = Pipeline(stages=stages)\n", + "pipelineModel = pipeline.fit(dataset)\n", + "dataset_transformed = pipelineModel.transform(dataset)\n", + "pipelineModel.write().overwrite().save(\"test_pipeline\")\n", + "from pyspark.ml.feature import PolynomialExpansion\n", + "from pyspark.mllib.linalg import Vectors\n", + "\n", + "px = PolynomialExpansion(degree=1, inputCol=\"Features\", outputCol=\"features\")\n", + "polyDF = px.transform(dataset_transformed)\n", + "\n", + "selectedcols = ['DEPARTURE_DELAY', \"features\"] \n", + "dataset_transformed = polyDF.select(selectedcols)\n", + "dataset_transformed=dataset_transformed.select('*').withColumnRenamed('DEPARTURE_DELAY','label')\n", + "dataset_transformed.printSchema()\n", + "dataset_transformed.select('*').show(truncate=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "30388" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "testDataset=dataset_transformed\n", + "testDataset.count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2.3 sample and split dataset into trainingData and testData" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "85622\n", + "36414\n" + ] + } + ], + "source": [ + "#dataset_used,left_behind=dataset_transformed.randomSplit((0.3,0.7),1)\n", + "trainingData,testData=dataset_transformed.randomSplit((0.7,0.3),1)\n", + "print trainingData.count()\n", + "print testData.count()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2.7 feature importances" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['NEW_SCHEDULED_DEPARTURE', 'square_day', 'square_schedule', 'NEW_DAY', 'DISTANCE', 'DAY_OF_WEEK']\n" + ] + }, + { + "data": { + "text/plain": [ + "SparseVector(6, {0: 0.4367, 1: 0.0583, 2: 0.2718, 3: 0.1039, 4: 0.1193, 5: 0.0101})" + ] + }, + "execution_count": 27, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from pyspark.ml.regression import RandomForestRegressor\n", + "rf= RandomForestRegressor(numTrees=10, maxDepth=3, seed=42)\n", + "model = rf.fit(dataset_transformed)\n", + "print(assemblerInputs)\n", + "model.featureImportances" + ] + }, + { + "cell_type": "code", + "execution_count": 58, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Feature Name:NEW_SCHEDULED_DEPARTURE\t importances:0.436664050418\n", + "Feature Name:square_day\t importances:0.0583377762333\n", + "Feature Name:square_schedule\t importances:0.271783558573\n", + "Feature Name:NEW_DAY\t importances:0.103858173085\n", + "Feature Name:DISTANCE\t importances:0.11925619668\n", + "Feature Name:DAY_OF_WEEK\t importances:0.0101002450105\n" + ] + } + ], + "source": [ + "for i in range(len(assemblerInputs)):\n", + " print \"Feature Name:\"+str(assemblerInputs[i])+\"\\t importances:\"+str(model.featureImportances[i])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 4 machine learning model(by pyspark.ml.)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 4.1 train model (RandomForest)" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from pyspark.ml import Pipeline\n", + "from pyspark.ml.regression import RandomForestRegressor\n", + "from pyspark.ml.tuning import CrossValidator, ParamGridBuilder\n", + "from pyspark.ml.evaluation import RegressionEvaluator\n", + "numFolds =3\n", + "\n", + "rf = RandomForestRegressor(labelCol=\"label\", featuresCol=\"features\") \n", + "paramGrid = ParamGridBuilder()\\\n", + " .addGrid(rf.numTrees,[50]) \\\n", + " .build()\n", + "crossval = CrossValidator(\n", + " estimator=rf,\n", + " estimatorParamMaps=paramGrid,\n", + " evaluator=RegressionEvaluator(),\n", + " numFolds=numFolds)\n", + "model = crossval.fit(trainingData)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4.2 linear family model" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from pyspark.ml.regression import IsotonicRegression\n", + "from pyspark.ml.regression import LinearRegression\n", + "from pyspark.ml.regression import GeneralizedLinearRegression\n", + "# Isotonic\n", + "model = IsotonicRegression(labelCol=\"label\", featuresCol=\"features\").fit(trainingData)\n", + "\n", + "#linear regression\n", + "#lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=1,labelCol=\"label\", featuresCol=\"features\")\n", + "#model = lr.fit(trainingData)\n", + "\n", + "# G LR, gaussian,\n", + "#glr = GeneralizedLinearRegression(family=\"Tweedie\", maxIter=10, regParam=0.3,labelCol=\"label\", featuresCol=\"features\")\n", + "#model = glr.fit(trainingData)\n", + "#model.write().overwrite().save('name'+'_model')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4.2 performance" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "=====================================================================\n", + "TrainingData count: 85622\n", + "TestData count: 36414\n", + "=====================================================================\n", + "Training data MSE = 365.319181986\n", + "Training data RMSE = 19.1133247235\n", + "Training data R-squared = -1.49562158888\n", + "Training data MAE = 15.4465324333\n", + "=====================================================================\n", + "Validation data MSE = 372.668039765\n", + "Validation data RMSE = 19.3046118781\n", + "Validation data R-squared = -1.53560656357\n", + "Validation data MAE = 15.5763717252\n", + "=====================================================================\n", + "\n" + ] + } + ], + "source": [ + "from pyspark.mllib.evaluation import RegressionMetrics\n", + "cvModel=model\n", + "trainPredictionsAndLabels = cvModel.transform(trainingData).select(\"label\", \"prediction\").rdd\n", + "validPredictionsAndLabels = cvModel.transform(testData).select(\"label\", \"prediction\").rdd\n", + "trainRegressionMetrics = RegressionMetrics(trainPredictionsAndLabels)\n", + "validRegressionMetrics = RegressionMetrics(validPredictionsAndLabels)\n", + "\n", + "bestModel = cvModel\n", + "#featureImportances = bestModel.featureImportances.toArray()\n", + "#print (featureImportances)\n", + "\n", + "output = str(\"\\n=====================================================================\\n\" +\n", + " \"TrainingData count: {0}\\n\".format(trainingData.count()) +\n", + " \"TestData count: {0}\\n\".format(testData.count()) +\n", + " \"=====================================================================\\n\" +\n", + " \"Training data MSE = {}\\n\".format(trainRegressionMetrics.meanSquaredError) +\n", + " \"Training data RMSE = {}\\n\".format(trainRegressionMetrics.rootMeanSquaredError) +\n", + " \"Training data R-squared = {}\\n\".format(trainRegressionMetrics.r2) +\n", + " \"Training data MAE = {}\\n\".format(trainRegressionMetrics.meanAbsoluteError) +\n", + " #\"Training data Explained variance = {}\\n\".format(trainRegressionMetrics.explainedVariance) +\n", + " \"=====================================================================\\n\" +\n", + " \"Validation data MSE = {0}\\n\".format(validRegressionMetrics.meanSquaredError) +\n", + " \"Validation data RMSE = {0}\\n\".format(validRegressionMetrics.rootMeanSquaredError) +\n", + " \"Validation data R-squared = {0}\\n\".format(validRegressionMetrics.r2) +\n", + " \"Validation data MAE = {0}\\n\".format(validRegressionMetrics.meanAbsoluteError) +\n", + " #\"Validation data Explained variance = {0}\\n\".format(validRegressionMetrics.explainedVariance) +\n", + " # \"=====================================================================\\n\" +\n", + " #\"CV params explained: {}\\n\".format(cvModel.explainParams()) +\n", + " # \"RandomForest params explained: {}\\n\".format(bestModel.explainParams()) +\n", + " #\"RandomForest features importances:\\n {0}\\n\".format(\"\\n\".join(map(lambda z: \"{0} = {1}\".format(str(z[0]),str(z[1])), zip(featureCols, featureImportances)))) +\n", + "\"=====================================================================\\n\")\n", + "print(output)" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "=====================================================================\n", + "testing data MSE = 364.903810715\n", + "testing data RMSE = 19.10245562\n", + "testing data R-squared = -1.48519198925\n", + "testing data MAE = 15.4240818744\n", + "=====================================================================\n", + "\n" + ] + } + ], + "source": [ + "#trainPredictionsAndLabels = cvModel.transform(trainingData).select(\"label\", \"prediction\").rdd\n", + "testPredictionsAndLabels = cvModel.transform(testDataset).select(\"label\", \"prediction\").rdd\n", + "\n", + "testRegressionMetrics = RegressionMetrics(testPredictionsAndLabels)\n", + "output = str(\"\\n=====================================================================\\n\" +\n", + " \"testing data MSE = {0}\\n\".format(testRegressionMetrics.meanSquaredError) +\n", + " \"testing data RMSE = {0}\\n\".format(testRegressionMetrics.rootMeanSquaredError) +\n", + " \"testing data R-squared = {0}\\n\".format(testRegressionMetrics.r2) +\n", + " \"testing data MAE = {0}\\n\".format(testRegressionMetrics.meanAbsoluteError) +\n", + "\"=====================================================================\\n\")\n", + "print(output)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# 3. ML model (by MLlib)\n", + "## 3.1 generate RDD" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# change into RDD\n", + "from pyspark.ml.linalg import Vector as MLVector, Vectors as MLVectors\n", + "from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors\n", + "from pyspark.ml import linalg as ml_linalg\n", + "\n", + "def as_mllib(v):\n", + " if isinstance(v, ml_linalg.SparseVector):\n", + " return MLLibVectors.sparse(v.size, v.indices, v.values)\n", + " elif isinstance(v, ml_linalg.DenseVector):\n", + " return MLLibVectors.dense(v.toArray())\n", + " else:\n", + " raise TypeError(\"Unsupported type: {0}\".format(type(v)))\n", + " \n", + "airlineRDD=dataset_transformed.rdd.map(lambda row: LabeledPoint(row['label'],as_mllib(row['features'])))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3.2 split trainset and testset " + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# Spliting dataset into train and test dtasets\n", + "airlineRDD.cache()\n", + "use_data,left_data=airlineRDD.randomSplit([0.2,0.8])\n", + "trainingData,testData=use_data.randomSplit([0.8,0.2])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3.3 use Random Forest classifier" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from pyspark.mllib.tree import RandomForest, RandomForestModel\n", + "# Train a RandomForest model.\n", + "# Empty categoricalFeaturesInfo indicates all features are continuous.\n", + "# Note: Use larger numTrees in practice.\n", + "# Setting featureSubsetStrategy=\"auto\" lets the algorithm choose.\n", + "model = RandomForest.trainRegressor(trainingData, categoricalFeaturesInfo={},\n", + " numTrees=100, featureSubsetStrategy=\"auto\",\n", + " impurity='variance', maxDepth=10, maxBins=32)\n", + "# Evaluate model on test instances and compute test error\n", + "#predictions = model.predict(testData.map(lambda x: x.features))\n", + "#labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)\n", + "#testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\\\n", + "# float(testData.count())\n", + "#print('Test Mean Squared Error = ' + str(testMSE))" + ] + }, + { + "cell_type": "code", + "execution_count": 44, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "training performance===========\n", + "MSE = 154.176016496\n", + "RMSE = 12.4167635274\n", + "R-squared = -13.1173920111\n", + "MAE = 8.37304414901\n", + "testing===============\n", + "MSE = 153.607064852\n", + "RMSE = 12.393831726\n", + "R-squared = -12.9510277354\n", + "MAE = 8.43209812783\n" + ] + } + ], + "source": [ + "from pyspark.mllib.evaluation import RegressionMetrics\n", + "print('training performance===========')\n", + "predictions = model.predict(trainingData.map(lambda x: x.features)).map(lambda element:float(element))\n", + "labelsAndPredictions = trainingData.map(lambda lp: lp.label).zip(predictions)\n", + "# Instantiate metrics object\n", + "metrics = RegressionMetrics(labelsAndPredictions)\n", + "# Squared Error\n", + "print(\"MSE = %s\" % metrics.meanSquaredError)\n", + "print(\"RMSE = %s\" % metrics.rootMeanSquaredError)\n", + " # R-squared\n", + "print(\"R-squared = %s\" % metrics.r2)\n", + "# Mean absolute error\n", + "print(\"MAE = %s\" % metrics.meanAbsoluteError)\n", + "\n", + "print(\"testing===============\")\n", + "predictions = model.predict(testData.map(lambda x: x.features)).map(lambda element:float(element))\n", + "labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)\n", + "# Instantiate metrics object\n", + "metrics = RegressionMetrics(labelsAndPredictions)\n", + "# Squared Error\n", + "print(\"MSE = %s\" % metrics.meanSquaredError)\n", + "print(\"RMSE = %s\" % metrics.rootMeanSquaredError)\n", + " # R-squared\n", + "print(\"R-squared = %s\" % metrics.r2)\n", + "# Mean absolute error\n", + "print(\"MAE = %s\" % metrics.meanAbsoluteError)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3.4 use GBDT" + ] + }, + { + "cell_type": "code", + "execution_count": 41, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel\n", + "model = GradientBoostedTrees.trainRegressor(trainingData,\n", + " categoricalFeaturesInfo={}, numIterations=200)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3.6 Ridge Regression" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from pyspark.mllib.regression import LabeledPoint,RidgeRegressionWithSGD\n", + "# Build the model\n", + "model = RidgeRegressionWithSGD.train(trainingData, iterations=100, step=0.000001)" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Mean Squared Error = nan\n" + ] + } + ], + "source": [ + "# Evaluate the model on training data\n", + "valuesAndPreds = trainingData.map(lambda p: (p.label, model.predict(p.features)))\n", + "MSE = valuesAndPreds \\\n", + " .map(lambda vp: (vp[0] - vp[1])**2) \\\n", + " .reduce(lambda x, y: x + y) / valuesAndPreds.count()\n", + "print(\"Mean Squared Error = \" + str(MSE))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3.3 model training" + ] + }, + { + "cell_type": "code", + "execution_count": 45, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "/home/andy/spark-2.2.0-bin-hadoop2.7/python/pyspark/mllib/regression.py:281: UserWarning: Deprecated in 2.0.0. Use ml.regression.LinearRegression.\n", + " warnings.warn(\"Deprecated in 2.0.0. Use ml.regression.LinearRegression.\")\n" + ] + } + ], + "source": [ + "# train models\n", + "from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel\n", + "model = LinearRegressionWithSGD.train(trainRDD, iterations=100, step=0.0000001)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3.4 model evaluation" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# Evaluate the model on training data\n", + "valuesAndPreds = testRDD.map(lambda p: (p.label, model.predict(p.features)))\n", + "MSE = valuesAndPreds \\\n", + " .map(lambda vp: (vp[0] - vp[1])**2) \\\n", + " .reduce(lambda x, y: x + y) / valuesAndPreds.count()\n", + "print(\"Mean Squared Error = \" + str(MSE))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3.5 save model" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "# Save and load model\n", + "model.save(sc, \"model/pythonLinearRegressionWithSGDModel\")\n", + "\n", + "sameModel = LinearRegressionModel.load(sc, \"model/pythonLinearRegressionWithSGDModel\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 2", + "language": "python", + "name": "python2" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 2 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython2", + "version": "2.7.14" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/model_train.py b/model_train.py new file mode 100644 index 0000000..5919901 --- /dev/null +++ b/model_train.py @@ -0,0 +1,88 @@ + +# coding: utf-8 + +#import findspark +#findspark.init() + +from pyspark.sql import SQLContext +from pyspark.sql.types import * + +from pyspark.sql.functions import udf +from pyspark.ml.feature import OneHotEncoder, StringIndexer +from pyspark.ml.feature import VectorAssembler +from pyspark.sql.functions import * +from pyspark.sql.functions import udf +from pyspark.sql.types import IntegerType +from operator import add +import sys +import numpy as np +import pandas as pd +from pyspark.sql.functions import UserDefinedFunction +from pyspark.sql.types import DoubleType,IntegerType +from pyspark import SparkContext +from pyspark.sql import SparkSession +from pyspark.sql.types import * +from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler +from pyspark.ml import Pipeline +from pyspark.ml.evaluation import RegressionEvaluator +from pyspark.ml.regression import LinearRegression + +sc = SparkContext("local",'app') +spark = SparkSession.builder.appName('name').config('spark.sql.shuffle.partitions',10).getOrCreate() + +# # 1. load data + +data=spark.read.csv('./data/train_flight.csv',header=True,inferSchema=True) + + +dataset_unprocess1=data + +airlines=data.select('AIRLINE').distinct().rdd.map(lambda row:row['AIRLINE']).collect() + +for airline in airlines: + print "begin" + dataset_unprocess2=dataset_unprocess1.filter(dataset_unprocess1['AIRLINE']==airline) + dataset_unprocess2=dataset_unprocess2.filter(dataset_unprocess2['DEPARTURE_DELAY']<60) + + dataset=dataset_unprocess2 + + udf = UserDefinedFunction(lambda x: x*1.0, DoubleType()) + new_data=dataset.select('*',udf(dataset['DEPARTURE_DELAY']).alias('double_labels')) + dataset=new_data.drop('DEPARTURE_DELAY') + dataset=dataset.withColumnRenamed('double_labels','DEPARTURE_DELAY') + + categoricalColumns = ['ORIGIN_AIRPORT'] # to add + numericCols = ['NEW_SCHEDULED_DEPARTURE'] # to add + + cols=dataset.columns + + stages = [] + feature_names=[] + for categoricalCol in categoricalColumns: + stringIndexer = StringIndexer(inputCol=categoricalCol, + outputCol=categoricalCol+"Index") + encoder = OneHotEncoder(inputCol=categoricalCol+"Index", + outputCol=categoricalCol+"classVec") + stages += [stringIndexer, encoder] + + assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols + assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="Features") + stages += [assembler] + pipeline = Pipeline(stages=stages) + pipelineModel = pipeline.fit(dataset) + dataset_transformed = pipelineModel.transform(dataset) + + pipelineModel.write().overwrite().save(str(airline)+'_pipeline') + + selectedcols = ['DEPARTURE_DELAY', "features"] + dataset_transformed = dataset_transformed .select(selectedcols) + dataset_transformed=dataset_transformed.select('*').withColumnRenamed('DEPARTURE_DELAY','label') + + trainingData=dataset_transformed + + #linear regression + lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.5,labelCol="label", featuresCol="features") + model = lr.fit(trainingData) + model.write().overwrite().save(str(airline)+'_model') + print 'done' + diff --git a/test_prediction.py b/test_prediction.py new file mode 100644 index 0000000..f66e308 --- /dev/null +++ b/test_prediction.py @@ -0,0 +1,91 @@ + +# coding: utf-8 + +from pyspark.ml.regression import LinearRegression,LinearRegressionModel +from pyspark.sql import SQLContext +from pyspark.sql.types import * +from pyspark.ml import Pipeline,PipelineModel +from pyspark.sql import Row +from pyspark.sql.functions import udf +from pyspark.ml.param import Param, Params +from pyspark.sql.functions import * +from pyspark.sql.functions import udf +from pyspark.sql.types import IntegerType +import numpy as np +from pyspark.sql.types import StructType, StructField +from pyspark.sql.types import StringType, IntegerType +from pyspark import SparkContext +from pyspark.sql import SparkSession +import pickle + +models={} + +def init_model(): + all_airlines = ['AA', 'AS','B6',"F9","DL", 'EV', 'HA', 'MQ', 'NK', 'OO', "UA", "US", "VX", "WN"] + for airline in all_airlines: + pipeline = PipelineModel.load('model/'+str(airline) + "_pipeline") + models[str(airline) + "_pipeline"] = pipeline + model = LinearRegressionModel.load('model/'+str(airline) + "_model") + models[str(airline) + "_model"] = model + airport_information=pickle.load(open("model/airport_information.pkl",'r')) + models["airport"]=airport_information + print "load finish" + +def prediction(input): + input=input.split(",") + hours = (int)(input[1]) + minutes = (int)(input[2]) + times = hours * 60 + minutes + airport = input[0] + result={} + for airline in all_airlines: + schema = StructType([ + StructField("ORIGIN_AIRPORT", StringType(), nullable=False), + StructField("AIRLINE", StringType(), nullable=True), + StructField("DEPARTURE_DELAY", DoubleType(), nullable=True), + StructField("NEW_SCHEDULED_DEPARTURE", IntegerType(), nullable=True)]) + data = [] + for minute in range(-5, 5, 1): + data.append((airport, airline, 0.0, times + minute)) + df = spark.createDataFrame(data, schema) + pipeline = models[str(airline + "_pipeline")] + if airport not in models["airport"][airline]: + continue + data_transformed = pipeline.transform(df) + + data_transformed = data_transformed.withColumnRenamed('Features', 'features') + selectedcols = ['DEPARTURE_DELAY', "features"] + dataset_transformed = data_transformed.select(selectedcols) + dataset_transformed = dataset_transformed.select('*').withColumnRenamed('DEPARTURE_DELAY', 'label') + + model=models[str(airline+"_model")] + temp_result = model.transform(dataset_transformed).select('prediction').rdd.map( + lambda element: element['prediction']).collect() + result[airline]=np.array(temp_result).mean() + print airline,result[airline] + + return result + +if __name__ == "__main__": + + sc = SparkContext("local",'app') + spark = SparkSession.builder.appName('name').config('spark.sql.shuffle.partitions',10).getOrCreate() + + all_airlines=['AA','AS',"B6","F9","DL",'EV','HA','MQ','NK','OO',"UA","US","VX","WN"] + + init_model() + + all_airport=[] + for airline in all_airlines: + all_airport.extend(models['airport'][airline]) + all_airport=set(all_airport) + input="BNA,4,20" #origin_airport+hours+minutes + + for airport in all_airport: + input=airport+","+'21'+",20" + print input + result=prediction(input) + #lines=[origin_airport, airline, schedule_departure_hout,schedule_minute] + print(sorted(result.items(),cmp=lambda x, y: cmp(x[1], y[1]))) + +