You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Flow.run() should be able to extract a subgraph of the overall pipeline when given a reference to a task output and additional specification where in the graph we want to insert the actual input data and where we want to save tables flowing in the training pipeline and simply load them as constant input data when running the PROD pipeline subgraph.
usage syntax hypothesis
This could be the top level wiring code for a close to reality data pipeline:
classCalibrationState:
def__init__(self):
self.feature_parameters=dag.LazyDict()
self.parameter_tbls=dag.LazyDict()
defget_pipeline(attrs):
withFlow("typical_pipeline") asflow:
withStage("01_raw_input"):
raw_tbls=read_input_data()
withStage("02_early_cleaning"):
clean_tbls=clean(raw_tbls)
withStage("03_economic_representation"):
train_test_set=mark_train_test_set(clean_tbls, **attrs["train_test_set"])
# tbls = dict with keys: insuree, contract, provider, case, positiontbls=economic_representation(clean_tbls, train_test_set)
withStage("04_features"):
feature_tbls=features(tbls, train_test_set)
# calibration_state holds dictionaries of parameters and parameter tables# which are computed during training based on train set but already applied# to rows of test set. For deployment, the calibration_state will be loaded# and injected as constant input into the pipeline subgraph that runs in# production.calibration_state=CalibrationState()
feature_tbls2=calibrated_features(tbls, feature_tbls, train_test_set, calibration_state)
feature_tbls.update(feature_tbls2) # will be executed lazily in consumer taskswithStage("05_model_training"):
run_id=get_run_id()
input_data_train, target_train, encoding_paramters=model_encoding(
tbls, feature_tbls, train_test_set, run_id, train=True
)
model, model_run_id, output_train=model_train(input_data_train, target_train, run_id)
withStage("06_model_evaluation"):
run_id=get_run_id(run_id) # get a run id if only evaluation is runningdocument_link(run_id, model_run_id)
input_data_test, target_test, _=model_encoding(
tbls, feature_tbls, train_test_set, run_id, encoding_parameters=encoding_paramters
)
output_test=model_predict(model, input_data_test, run_id)
evaluation_result=evaluate_model(output_test, target_test, run_id)
document_evaluation(evaluation_result, run_id)
prod_in_out_spec=InOutSpecification(
input=clean_tbls, ignore=[train_test_set, run_id],
const=[model, model_run_id, feature_parameters, parameter_tbls, encoding_paramters], output=output_test,
)
returnflow, prod_in_out_spec
The InOutSpecification object here would be nothing more than a dictionary holding references from within the graph. Those references may be tables, dictionary of tables, or anything a pipedag task may produce or consume.
If you simply want to run the training pipeline, you run this code as:
# pure trainingflow, _=get_pipeline(cfg.attrs)
result=flow.run(config=cfg)
assertresult.successful
If you like to prepare a production release, you might want to do the following:
# prepare releasewithStageLockContext(): # keep cache locked until outputs are read backflow, prod_in_out_spec=get_pipeline(cfg.attrs)
result=flow.run(config=cfg)
assertresult.successfulconst_inputs=result.get(prod_in_out_spec.const, input_type=pl.DataFrame)
write_const_inputs(const_inputs, const_input_directory)
It would be nice if this is enough to run a pipeline subgraph in production:
Please note the two different approaches to solve the somewhat recursive behavior that some functions might calibrate parameters on training data or even produce constant tables and apply this calibrated information on both training and test set: a) model_encoding() is called twice and b) calibrated_features() is only called once with a parameter calibration_state which is empty unless it is saved as constant input for production
Flow.run() needs to traverse the task graph from desired output to source nodes until some task inputs are mentioned in the inputs dictionary. If a reference should be ignored in production, we feed None as input into the tasks reading this reference.
The text was updated successfully, but these errors were encountered:
Flow.run() should be able to extract a subgraph of the overall pipeline when given a reference to a task output and additional specification where in the graph we want to insert the actual input data and where we want to save tables flowing in the training pipeline and simply load them as constant input data when running the PROD pipeline subgraph.
usage syntax hypothesis
This could be the top level wiring code for a close to reality data pipeline:
The InOutSpecification object here would be nothing more than a dictionary holding references from within the graph. Those references may be tables, dictionary of tables, or anything a pipedag task may produce or consume.
If you simply want to run the training pipeline, you run this code as:
If you like to prepare a production release, you might want to do the following:
It would be nice if this is enough to run a pipeline subgraph in production:
details
model_encoding()
is called twice and b)calibrated_features()
is only called once with a parametercalibration_state
which is empty unless it is saved as constant input for productionThe text was updated successfully, but these errors were encountered: