Creating an end-to-end observability platform
- Introduction to MLOps and how it helps with the entire ML project lifecycle
- Discuss the ride duration prediction use case, discuss the overall architecture, show the notebook with the solution, show the script with training it (do not implement it live - to save time, and refer to Lightweight MLOps Zoomcamp for details)
- Implement a simple Flask application for serving the model
- Log the predictions to a Kinesis stream and save them in a data lake (s3)
- Set up a batch job for pulling the data from S3 and analyzing it
- Analyze the data with Evidently, generate a simple visual report (Data Drift / Data Quality)
- Create a Prefect pipeline
- Automate data checks as part of the prediction pipeline. Design a custom test suite for data quality, data drift and prediction drift
- Model quality checks. Generate the report on model performance once you receive the ground truth
- Setting up slack/email alerts
- Wrapping up: summarizing key takeaways and reviewing recommended practices
- Knowledge of Python (see this article for a refresher)
- Basic knowledge of ML (see this course for a refresher)
- AWS account (instructions how to create an account)
- Install Python 3.9 (e.g. with Anaconda or Miniconda)
- Install pipenv (
pip install -U pipenv
) - Install the dependencies:
(cd train && pipenv install)
(cd serve && pipenv install)
(cd monitor && pipenv install)
Introduction to MLOps and how it helps with the entire ML project lifecycle
Discuss the ride duration prediction use case, discuss the overall architecture, show the notebook with the solution, show the script with training it (do not implement it live - to save time, and refer to Lightweight MLOps Zoomcamp for details)
cd train
Start jupyter
pipenv run jupyter notebook
If you have Anaconda, you can skip installing the packages and run the notebook:
jupyter notebook
Next, execute the code to get the model
Implement a simple Flask application for serving the model
cd serve
Run the serve_starter.py
file:
pipenv run python serve_starter.py
Send a request:
REQUEST='{
"ride_id": "XYZ10",
"ride": {
"PULocationID": 100,
"DOLocationID": 102,
"trip_distance": 30
}
}'
URL="http://localhost:9696/predict"
curl -X POST \
-d "${REQUEST}" \
-H "Content-Type: application/json" \
${URL}
Response:
{
"prediction": {
"duration": 20.77956787473484
}
}
Log the predictions to a Kinesis stream and save them in a data lake (s3)
Now let's modify our serve_starter.py
to add logging. We will log the
prediction to a kinesis stream, but you can use any other way of
logging.
Create a kinesis stream, e.g. duration_prediction_serve_logger
.
Add logging:
import json
PREDICTIONS_STREAM_NAME = 'duration_prediction_serve_logger'
kinesis_client = boto3.client('kinesis')
# in the serve function
prediction_event = {
'ride_id': ride_id,
'ride': ride,
'features': features,
'prediction': result
}
print(f'logging {prediction_event} to {PREDICTIONS_STREAM_NAME}...')
kinesis_client.put_record(
StreamName=PREDICTIONS_STREAM_NAME,
Data=json.dumps(prediction_event) + "\n",
PartitionKey=str(ride_id)
)
(Note + "\n"
- it's important)
Send a request:
REQUEST='{
"ride_id": "XYZ10",
"ride": {
"PULocationID": 100,
"DOLocationID": 102,
"trip_distance": 30
}
}'
URL="http://localhost:9696/predict"
curl -X POST \
-d "${REQUEST}" \
-H "Content-Type: application/json" \
${URL}
We can check the logs
KINESIS_STREAM_OUTPUT='duration_prediction_serve_logger'
SHARD='shardId-000000000000'
SHARD_ITERATOR=$(aws kinesis \
get-shard-iterator \
--shard-id ${SHARD} \
--shard-iterator-type TRIM_HORIZON \
--stream-name ${KINESIS_STREAM_OUTPUT} \
--query 'ShardIterator' \
)
RESULT=$(aws kinesis get-records --shard-iterator $SHARD_ITERATOR)
echo ${RESULT} | jq -r '.Records[0].Data' | base64 --decode
Set up a batch job for pulling the data from S3 and analyzing it
- Create an s3 bucket "duration-prediction-serve-logs"
- Enable firehose
- No data transformation (explore yourself)
- No data converstion (explore yourself)
- Destination: "s3://duration-prediction-serve-logs"
- Look at the files in the bucket
We can't wait for long, so we simulated the traffic and put the
data in the monitor/data folder. To generate it, run
the prepare-files.ipynb
notebook.
- Analyze the data with Evidently, generate a simple visual report (Data Drift / Data Quality).
Our virtual enviorment already has evidently installed. But if you're using your own environment, run
pip install evidently
Let's use it to generate a simple visual report
First, load the reference data (data we used for training)
df_reference = pd.read_parquet('data/2022/01/2022-01-full.parquet')
Evidently is quite slow when analyzing large datasets, so we should take a sample:
df_reference = pd.read_parquet('data/2022/01/2022-01-full.parquet')
Next, we load the "production" data. First, we load the trips:
year = 2023
month = 1
day = 2
trips_file = f'data/{year:04d}/{month:02d}/{year:04d}-{month:02d}-{day:02d}.parquet'
df_trips = pd.read_parquet(trips_file)
Second, load the logs:
logs_file = f'data/{year:04d}/{month:02d}/{year:04d}-{month:02d}-{day:02d}-predictions.jsonl'
df_logs = pd.read_json(logs_file, lines=True)
df_predictions = pd.DataFrame()
df_predictions['ride_id'] = df_logs['ride_id']
df_predictions['prediction'] = df_logs['prediction'].apply(lambda p: p['prediction']['duration'])
And merge them:
df = df_trips.merge(df_predictions, on='ride_id')
Now let's see if there's any drift. Import evidently:
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
Build a simple drift report:
report = Report(metrics=[
DataDriftPreset(columns=['PULocationID', 'DOLocationID', 'trip_distance']),
])
report.run(reference_data=df_reference_sample, current_data=df_trips)
report.show(mode='inline')
In this preset report, it uses Jensen-Shannon distance to measure the descrepancies between reference and production. While it says that drift is detected, we should be careful about it and check other months.
We can tune it:
report = Report(metrics=[
DataDriftPreset(
columns=['PULocationID', 'DOLocationID', 'trip_distance'],
cat_stattest='psi',
cat_stattest_threshold=0.2
num_stattest='ks',
num_stattest_threshold=0.2,
),
])
report.run(reference_data=df_reference_sample, current_data=df_trips)
report.show(mode='inline')
Save the report as HTML:
report.save_html(f'reports/report-{year:04d}-{month:02d}-{day:02d}.html')
We can also extract information from this report and use it for e.g. sending an alert:
report_metrics = report.as_dict()['metrics']
report_metrics = {d['metric']: d['result'] for d in report_metrics}
drift_report = report_metrics['DataDriftTable']
if drift_report['dataset_drift']:
# send alert
print('drift detected!')
We won't implement the logic for sending alerts here, but you can find a lot of examples online. Or use ChatGPT to help you.
You can generate these reports in your automatic pipelines and then send them e.g. over email.
Let's create this pipeline.
Now we'll use Prefect to orchestrate the report generation.
We will take the code we created and put it into a Python script.
See pipeline_sample.py
for details.
Run prefect server:
pipenv run prefect config set PREFECT_UI_API_URL=http://127.0.0.1:4200/api
pipenv run prefect server start
Run the pipeline:
pipenv run prefect config set PREFECT_API_URL=http://127.0.0.1:4200/api
pipenv run python pipeline_sample.py
- Automate data checks as part of the prediction pipeline. Design a custom test suite for data quality, data drift and prediction drift.
Now we want to add data quality checks. We will start with simple integrity checks: data types, missing values and so on.
They are also done via reports:
from evidently.metrics import DatasetSummaryMetric, DatasetMissingValuesMetric
data_integrity_report = Report(metrics=[
DatasetSummaryMetric(),
DatasetMissingValuesMetric()
])
data_integrity_report.run(reference_data=df_reference_sample, current_data=df_trips)
data_integrity_report.show(mode='inline')
In addition to reports, we can add tests. You can think of these tests as unit/integration tests for software. They pass or fail, and if they fail, we get an alert - something is wrong with the data, so we need to look at it.
from evidently.test_suite import TestSuite
from evidently.test_preset import DataStabilityTestPreset
data_stability = TestSuite(tests=[
DataStabilityTestPreset(),
])
data_stability.run(reference_data=df_reference_sample, current_data=df_trips)
data_stability.show(mode='inline')
Let's tune the test:
data_stability = TestSuite(tests=[
TestNumberOfRows(gte=1000, lte=20000),
TestNumberOfColumns(),
TestColumnsType(),
TestAllColumnsShareOfMissingValues(),
TestNumColumnsOutOfRangeValues(),
TestCatColumnsOutOfListValues(
columns=['PULocationID', 'DOLocationID', 'trip_distance']
),
TestNumColumnsMeanInNSigmas(),
])
data_stability.run(reference_data=df_reference_sample, current_data=df_trips)
data_stability.show(mode='inline')
We can add this to our pipeline too:
test_results = data_stability.as_dict()['tests']
failed_tests = []
for test in test_results:
status = test['status']
if status == 'FAIL':
failed_tests.append(test)
if len(failed_tests) > 0:
print('tests failed:')
print(failed_tests)
Examples:
We also have labels that come with delay - every time the ride ended, we can compare the predictions with the actual duration and make some conclusions. If our model performance drifts, we can notice it and react (e.g. by retraining the model)
First, we need to prepare the data a bit:
df_reference_sample = df_reference_sample.rename(columns={'duration': 'target'})
df = df.rename(columns={'duration': 'target'})
Now let's run the report:
regression_performance_report = Report(metrics=[
RegressionPreset(columns=['PULocationID', 'DOLocationID', 'trip_distance']),
])
regression_performance_report.run(reference_data=df_reference_sample, current_data=df)
regression_performance_report.show(mode='inline')
Note: for classification you can use this report:
from evidently.metric_preset import ClassificationPreset
- Setting up slack/email alerts
- Wrapping up: summarizing key takeaways and reviewing recommended practices