forked from aws/amazon-sagemaker-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sklearn_abalone_featurizer.py
202 lines (162 loc) · 6.7 KB
/
sklearn_abalone_featurizer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
from __future__ import print_function
import argparse
import csv
import json
import os
import shutil
import sys
import time
from io import StringIO
import joblib
import numpy as np
import pandas as pd
from sagemaker_containers.beta.framework import (
content_types,
encoders,
env,
modules,
transformer,
worker,
)
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.impute import SimpleImputer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import Binarizer, OneHotEncoder, StandardScaler
# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
"sex", # M, F, and I (infant)
"length", # Longest shell measurement
"diameter", # perpendicular to length
"height", # with meat in shell
"whole_weight", # whole abalone
"shucked_weight", # weight of meat
"viscera_weight", # gut weight (after bleeding)
"shell_weight",
] # after being dried
label_column = "rings"
feature_columns_dtype = {
"sex": "category",
"length": "float64",
"diameter": "float64",
"height": "float64",
"whole_weight": "float64",
"shucked_weight": "float64",
"viscera_weight": "float64",
"shell_weight": "float64",
}
label_column_dtype = {"rings": "float64"} # +1.5 gives the age in years
def merge_two_dicts(x, y):
z = x.copy() # start with x's keys and values
z.update(y) # modifies z with y's keys and values & returns None
return z
if __name__ == "__main__":
parser = argparse.ArgumentParser()
# Sagemaker specific arguments. Defaults are set in the environment variables.
parser.add_argument("--output-data-dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"])
parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
parser.add_argument("--train", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
args = parser.parse_args()
# Take the set of files and read them all into a single pandas dataframe
input_files = [os.path.join(args.train, file) for file in os.listdir(args.train)]
if len(input_files) == 0:
raise ValueError(
(
"There are no files in {}.\n"
+ "This usually indicates that the channel ({}) was incorrectly specified,\n"
+ "the data specification in S3 was incorrectly specified or the role specified\n"
+ "does not have permission to access the data."
).format(args.train, "train")
)
raw_data = [
pd.read_csv(
file,
header=None,
names=feature_columns_names + [label_column],
dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
)
for file in input_files
]
concat_data = pd.concat(raw_data)
# Labels should not be preprocessed. predict_fn will reinsert the labels after featurizing.
concat_data.drop(label_column, axis=1, inplace=True)
# This section is adapted from the scikit-learn example of using preprocessing pipelines:
#
# https://scikit-learn.org/stable/auto_examples/compose/plot_column_transformer_mixed_types.html
#
# We will train our classifier with the following features:
# Numeric Features:
# - length: Longest shell measurement
# - diameter: Diameter perpendicular to length
# - height: Height with meat in shell
# - whole_weight: Weight of whole abalone
# - shucked_weight: Weight of meat
# - viscera_weight: Gut weight (after bleeding)
# - shell_weight: Weight after being dried
# Categorical Features:
# - sex: categories encoded as strings {'M', 'F', 'I'} where 'I' is Infant
numeric_transformer = make_pipeline(SimpleImputer(strategy="median"), StandardScaler())
categorical_transformer = make_pipeline(
SimpleImputer(strategy="constant", fill_value="missing"),
OneHotEncoder(handle_unknown="ignore"),
)
preprocessor = ColumnTransformer(
transformers=[
("num", numeric_transformer, make_column_selector(dtype_exclude="category")),
("cat", categorical_transformer, make_column_selector(dtype_include="category")),
]
)
preprocessor.fit(concat_data)
joblib.dump(preprocessor, os.path.join(args.model_dir, "model.joblib"))
print("saved model!")
def input_fn(input_data, content_type):
"""Parse input data payload
We currently only take csv input. Since we need to process both labelled
and unlabelled data we first determine whether the label column is present
by looking at how many columns were provided.
"""
if content_type == "text/csv":
# Read the raw input data as CSV.
df = pd.read_csv(StringIO(input_data), header=None)
if len(df.columns) == len(feature_columns_names) + 1:
# This is a labelled example, includes the ring label
df.columns = feature_columns_names + [label_column]
elif len(df.columns) == len(feature_columns_names):
# This is an unlabelled example.
df.columns = feature_columns_names
return df
else:
raise ValueError("{} not supported by script!".format(content_type))
def output_fn(prediction, accept):
"""Format prediction output
The default accept/content-type between containers for serial inference is JSON.
We also want to set the ContentType or mimetype as the same value as accept so the next
container can read the response payload correctly.
"""
if accept == "application/json":
instances = []
for row in prediction.tolist():
instances.append({"features": row})
json_output = {"instances": instances}
return worker.Response(json.dumps(json_output), mimetype=accept)
elif accept == "text/csv":
return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
else:
raise RuntimeException("{} accept type is not supported by this script.".format(accept))
def predict_fn(input_data, model):
"""Preprocess input data
We implement this because the default predict_fn uses .predict(), but our model is a preprocessor
so we want to use .transform().
The output is returned in the following order:
rest of features either one hot encoded or standardized
"""
features = model.transform(input_data)
if label_column in input_data:
# Return the label (as the first column) and the set of features.
return np.insert(features, 0, input_data[label_column], axis=1)
else:
# Return only the set of features
return features
def model_fn(model_dir):
"""Deserialize fitted model"""
preprocessor = joblib.load(os.path.join(model_dir, "model.joblib"))
return preprocessor