From 3d0f93625aa93c1562a014d445f42d7f121718af Mon Sep 17 00:00:00 2001 From: mango Date: Sun, 29 May 2022 22:28:48 +0800 Subject: [PATCH 1/6] fix: fix the error of pylint in demo directory Signed-off-by: mango --- .../script/convert_data.py | 2 +- .../script/predict.py | 5 - .../script/predict_server.py | 47 ++++---- .../script/train.py | 14 +-- .../predict-taxi-trip-duration/test/import.py | 22 ++-- .../predict_server.py | 16 ++- .../train_and_serve.py | 104 +++++++++--------- pylintrc | 2 + 8 files changed, 103 insertions(+), 109 deletions(-) diff --git a/demo/predict-taxi-trip-duration/script/convert_data.py b/demo/predict-taxi-trip-duration/script/convert_data.py index 0486e81d148..f905af04296 100644 --- a/demo/predict-taxi-trip-duration/script/convert_data.py +++ b/demo/predict-taxi-trip-duration/script/convert_data.py @@ -15,7 +15,7 @@ # limitations under the License. import sys -import time, datetime +import time i = 0 for line in sys.stdin: if i == 0: diff --git a/demo/predict-taxi-trip-duration/script/predict.py b/demo/predict-taxi-trip-duration/script/predict.py index d48e1dddba7..885ee1071e8 100644 --- a/demo/predict-taxi-trip-duration/script/predict.py +++ b/demo/predict-taxi-trip-duration/script/predict.py @@ -15,11 +15,6 @@ # limitations under the License. import requests -import os -import base64 -import random -import time -import hashlib url = "http://127.0.0.1:8887/predict" req ={"id":"id0376262", diff --git a/demo/predict-taxi-trip-duration/script/predict_server.py b/demo/predict-taxi-trip-duration/script/predict_server.py index a44e8746236..4be63ae4278 100644 --- a/demo/predict-taxi-trip-duration/script/predict_server.py +++ b/demo/predict-taxi-trip-duration/script/predict_server.py @@ -19,46 +19,49 @@ import tornado.ioloop import json import lightgbm as lgb -import sqlalchemy as db import requests import argparse bst = None table_schema = [ - ("id", "string"), - ("vendor_id", "int"), - ("pickup_datetime", "timestamp"), - ("dropoff_datetime", "timestamp"), - ("passenger_count", "int"), - ("pickup_longitude", "double"), - ("pickup_latitude", "double"), - ("dropoff_longitude", "double"), - ("dropoff_latitude", "double"), - ("store_and_fwd_flag", "string"), - ("trip_duration", "int"), + ("id", "string"), + ("vendor_id", "int"), + ("pickup_datetime", "timestamp"), + ("dropoff_datetime", "timestamp"), + ("passenger_count", "int"), + ("pickup_longitude", "double"), + ("pickup_latitude", "double"), + ("dropoff_longitude", "double"), + ("dropoff_latitude", "double"), + ("store_and_fwd_flag", "string"), + ("trip_duration", "int"), ] url = "" + def get_schema(): dict_schema = {} for i in table_schema: dict_schema[i[0]] = i[1] return dict_schema + dict_schema = get_schema() json_schema = json.dumps(dict_schema) + def build_feature(rs): - var_Y = [rs[0]] - var_X = [rs[1:12]] - return np.array(var_X) + var_x = [rs[1:12]] + return np.array(var_x) + class SchemaHandler(tornado.web.RequestHandler): def get(self): self.write(json_schema) + class PredictHandler(tornado.web.RequestHandler): def post(self): row = json.loads(self.request.body) @@ -72,7 +75,8 @@ def post(self): row_data.append(row.get(i[0], 0)) else: row_data.append(None) - data["input"].append(row_data) + + data["input"].append(row_data) rs = requests.post(url, json=data) result = json.loads(rs.text) for r in result["data"]["data"]: @@ -80,13 +84,15 @@ def post(self): self.write("----------------ins---------------\n") self.write(str(ins) + "\n") duration = bst.predict(ins) - self.write("---------------predict trip_duration -------------\n") - self.write("%s s"%str(duration[0])) + self.write(f"---------------predict trip_duration -------------\n") + self.write("%s s" % str(duration[0])) + class MainHandler(tornado.web.RequestHandler): def get(self): self.write("real time execute sparksql demo") + def make_app(): return tornado.web.Application([ (r"/", MainHandler), @@ -94,10 +100,11 @@ def make_app(): (r"/predict", PredictHandler), ]) + if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("endpoint", help="specify the endpoint of apiserver") - parser.add_argument("model_path", help="specify the model path") + parser.add_argument("endpoint", help="specify the endpoint of apiserver") + parser.add_argument("model_path", help="specify the model path") args = parser.parse_args() url = "http://%s/dbs/demo_db/deployments/demo" % args.endpoint bst = lgb.Booster(model_file=args.model_path) diff --git a/demo/predict-taxi-trip-duration/script/train.py b/demo/predict-taxi-trip-duration/script/train.py index 3952a8094fc..c28ef8dfb7a 100644 --- a/demo/predict-taxi-trip-duration/script/train.py +++ b/demo/predict-taxi-trip-duration/script/train.py @@ -16,26 +16,24 @@ import lightgbm as lgb import pandas as pd -from sklearn.metrics import mean_squared_error -from sklearn.model_selection import GridSearchCV from sklearn.model_selection import train_test_split import argparse import os parser = argparse.ArgumentParser() -parser.add_argument("feature_path", help="specify the feature path") -parser.add_argument("model_path", help="specify the model path") +parser.add_argument('feature_path', help='specify the feature path') +parser.add_argument('model_path', help='specify the model path') args = parser.parse_args() feature_path = args.feature_path # merge file if os.path.isdir(feature_path): path_list = os.listdir(feature_path) - new_file = "/tmp/merged_feature.csv" + new_file = '/tmp/merged_feature.csv' with open(new_file, 'w') as wf: has_write_header = False for filename in path_list: - if filename == "_SUCCESS" or filename.startswith('.'): + if filename == '_SUCCESS' or filename.startswith('.'): continue with open(os.path.join(feature_path, filename), 'r') as f: first_line = True @@ -50,7 +48,7 @@ feature_path = new_file # run batch sql and get instances -df = pd.read_csv(feature_path); +df = pd.read_csv(feature_path) train_set, predict_set = train_test_split(df, test_size=0.2) y_train = train_set['trip_duration'] x_train = train_set.drop(columns=['trip_duration']) @@ -83,4 +81,4 @@ early_stopping_rounds=5) gbm.save_model(args.model_path) -print("save model.txt done") +print('save model.txt done') diff --git a/demo/predict-taxi-trip-duration/test/import.py b/demo/predict-taxi-trip-duration/test/import.py index e88a4081a03..35d14b1502a 100644 --- a/demo/predict-taxi-trip-duration/test/import.py +++ b/demo/predict-taxi-trip-duration/test/import.py @@ -14,15 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -""" -""" import sqlalchemy as db - -import sys import datetime -ddl=""" +ddl = """ create table t1( id string, vendor_id int, @@ -42,26 +38,28 @@ engine = db.create_engine('openmldb:///db_test?zk=127.0.0.1:2181&zkPath=/openmldb') connection = engine.connect() try: - connection.execute("create database db_test;"); + connection.execute('acreate database db_test;') except Exception as e: print(e) try: - connection.execute(ddl); + connection.execute(ddl) except Exception as e: print(e) + def insert_row(line): row = line.split(',') - row[2] = '%dl'%int(datetime.datetime.strptime(row[2], '%Y-%m-%d %H:%M:%S').timestamp() * 1000) - row[3] = '%dl'%int(datetime.datetime.strptime(row[3], '%Y-%m-%d %H:%M:%S').timestamp() * 1000) - insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);"% tuple(row) + row[2] = '%dl' % int(datetime.datetime.strptime(row[2], '%Y-%m-%d %H:%M:%S').timestamp() * 1000) + row[3] = '%dl' % int(datetime.datetime.strptime(row[3], '%Y-%m-%d %H:%M:%S').timestamp() * 1000) + insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);" % tuple(row) connection.execute(insert) + with open('data/taxi_tour_table_train_simple.csv', 'r') as fd: idx = 0 - for line in fd: + for csv_line in fd: if idx == 0: idx = idx + 1 continue - insert_row(line.replace('\n', '')) + insert_row(csv_line.replace('\n', '')) idx = idx + 1 diff --git a/demo/talkingdata-adtracking-fraud-detection/predict_server.py b/demo/talkingdata-adtracking-fraud-detection/predict_server.py index a101b06ea5d..dc36f18f24f 100644 --- a/demo/talkingdata-adtracking-fraud-detection/predict_server.py +++ b/demo/talkingdata-adtracking-fraud-detection/predict_server.py @@ -19,7 +19,6 @@ import tornado.ioloop import json import xgboost as xgb -import sqlalchemy as db import requests import argparse @@ -32,17 +31,17 @@ ("os", "int"), ("channel", "int"), ("click_time", "timestamp"), - ("is_attributed", 'int'), + ("is_attributed", "int"), ] url = "" def get_schema(): - dict_schema = {} + dict_schema_tmp = {} for i in table_schema: - dict_schema[i[0]] = i[1] - return dict_schema + dict_schema_tmp[i[0]] = i[1] + return dict_schema_tmp dict_schema = get_schema() @@ -52,9 +51,8 @@ def get_schema(): def build_feature(rs): - var_Y = [rs[-1]] - var_X = [rs[:-1]] - return np.array(var_X) + var_x = [rs[:-1]] + return np.array(var_x) class SchemaHandler(tornado.web.RequestHandler): @@ -75,7 +73,7 @@ def post(self): row_data.append(row.get(i[0], 0)) else: row_data.append(None) - print('receive request: ', row_data) + print("receive request: ", row_data) data["input"].append(row_data) rs = requests.post(url, json=data) result = json.loads(rs.text) diff --git a/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py b/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py index 93e32e7067b..cca3f415aef 100644 --- a/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py +++ b/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py @@ -3,27 +3,27 @@ import time import glob # fmt:off -import openmldb import sqlalchemy as db import pandas as pd import xgboost as xgb + # fmt:on # openmldb cluster configs -zk = "127.0.0.1:2181" -zk_path = "/openmldb" -db_name = "demo_db" # db name, predict_server.py will use it, be careful. -table_name = "talkingdata" + str(int(time.time())) +zk = '127.0.0.1:2181' +zk_path = '/openmldb' +db_name = 'demo_db' # db name, predict_server.py will use it, be careful. +table_name = 'talkingdata' + str(int(time.time())) # make sure that taskmanager can access the path -train_feature_dir = "/tmp/train_feature" +train_feature_dir = '/tmp/train_feature' def column_string(col_tuple) -> str: return ' '.join(col_tuple) -def xgb_modelfit_nocv(params, dtrain, dvalid, predictors, target='target', objective='binary:logistic', metrics='auc', +def xgb_modelfit_nocv(params, dtrain, dvalid, objective='binary:logistic', metrics='auc', feval=None, num_boost_round=3000, early_stopping_rounds=20): xgb_params = { 'booster': 'gbtree', @@ -42,7 +42,7 @@ def xgb_modelfit_nocv(params, dtrain, dvalid, predictors, target='target', objec } xgb_params.update(params) - print("preparing validation datasets") + print('preparing validation datasets') evals_results = {} @@ -56,9 +56,9 @@ def xgb_modelfit_nocv(params, dtrain, dvalid, predictors, target='target', objec feval=feval) n_estimators = bst1.best_iteration - print("\nModel Report") - print("n_estimators : ", n_estimators) - print(metrics + ":", evals_results['eval'][metrics][n_estimators - 1]) + print('\nModel Report') + print('n_estimators : ', n_estimators) + print(metrics + ':', evals_results['eval'][metrics][n_estimators - 1]) return bst1 @@ -79,50 +79,48 @@ def xgb_modelfit_nocv(params, dtrain, dvalid, predictors, target='target', objec def cut_data(): - print('Prepare train data, use {} rows, save it as train_sample.csv'.format(sample_cnt)) data_path = 'data/' sample_cnt = 10000 # you can prepare sample data by yourself - train_df = pd.read_csv(data_path + "train.csv", nrows=sample_cnt, - dtype=dtypes, usecols=[c[0] for c in train_schema]) - len_train = len(train_df) - assert len_train == sample_cnt + print(f'Prepare train data, use {sample_cnt} rows, save it as train_sample.csv') + train_df_tmp = pd.read_csv(data_path + 'train.csv', nrows=sample_cnt, + dtype=dtypes, usecols=[c[0] for c in train_schema]) + len_train_tmp = len(train_df_tmp) + assert len_train_tmp == sample_cnt # take a portion from train sample data - train_df.to_csv("train_sample.csv", index=False) - del train_df + train_df.to_csv('train_sample.csv', index=False) + del train_df_tmp gc.collect() def nothrow_execute(sql): # only used for drop deployment, cuz 'if not exist' is not supported now try: - print("execute " + sql) - ok, rs = connection.execute(sql) + print('execute ' + sql) + _, rs = connection.execute(sql) print(rs) except Exception as e: print(e) -print("Prepare openmldb, db {} table {}".format(db_name, table_name)) +print(f'Prepare openmldb, db {db_name} table {table_name}') # cut_data() engine = db.create_engine( - 'openmldb:///{}?zk={}&zkPath={}'.format(db_name, zk, zk_path)) + f'openmldb:///{db_name}?zk={zk}&zkPath={zk_path}') connection = engine.connect() -connection.execute("CREATE DATABASE IF NOT EXISTS {};".format(db_name)) +connection.execute(f'CREATE DATABASE IF NOT EXISTS {db_name};') schema_string = ','.join(list(map(column_string, train_schema))) -connection.execute("CREATE TABLE IF NOT EXISTS {}({});".format( - table_name, schema_string)) +connection.execute(f'CREATE TABLE IF NOT EXISTS {table_name}({schema_string});') -print("Load train_sample data to offline storage for training(hard copy)") -connection.execute("USE {}".format(db_name)) +print('Load train_sample data to offline storage for training(hard copy)') +connection.execute(f'USE {db_name}') connection.execute("SET @@execute_mode='offline';") # use sync offline job, to make sure `LOAD DATA` finished -connection.execute("SET @@sync_job=true;") -connection.execute("SET @@job_timeout=1200000;") +connection.execute('SET @@sync_job=true;') +connection.execute('SET @@job_timeout=1200000;') # use soft link after https://github.com/4paradigm/OpenMLDB/issues/1565 fixed -connection.execute("LOAD DATA INFILE 'file://{}' INTO TABLE {} OPTIONS(format='csv',header=true);".format( - os.path.abspath("train_sample.csv"), table_name)) - +connection.execute(f"LOAD DATA INFILE 'file://{os.path.abspath('train_sample.csv')}' " + f"INTO TABLE {table_name} OPTIONS(format='csv',header=true);") print('Feature extraction') sql_part = """ @@ -137,23 +135,22 @@ def nothrow_execute(sql): w3 as(partition by ip, app, os order by click_time ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) """.format(table_name) # extraction will take time -connection.execute("SET @@job_timeout=1200000;") -connection.execute("{} INTO OUTFILE '{}' OPTIONS(mode='overwrite');".format( - sql_part, train_feature_dir)) +connection.execute('SET @@job_timeout=1200000;') +connection.execute(f"{sql_part} INTO OUTFILE '{train_feature_dir}' OPTIONS(mode='overwrite');") -print("Load features from feature dir {}".format(train_feature_dir)) +print(f'Load features from feature dir {train_feature_dir}') # train_feature_dir has multi csv files train_df = pd.concat(map(lambda file: pd.read_csv(file), glob.glob( - os.path.join('', train_feature_dir + "/*.csv")))) -print("peek:") + os.path.join('', train_feature_dir + '/*.csv')))) +print('peek:') print(train_df.head()) len_train = len(train_df) train_row_cnt = int(len_train * 3 / 4) train_df = train_df[(len_train - train_row_cnt):len_train] val_df = train_df[:(len_train - train_row_cnt)] -print("train size: ", len(train_df)) -print("valid size: ", len(val_df)) +print('train size: ', len(train_df)) +print('valid size: ', len(val_df)) target = 'is_attributed' predictors = ['app', 'device', 'os', 'channel', 'hour', @@ -162,7 +159,7 @@ def nothrow_execute(sql): gc.collect() -print("Training by xgb") +print('Training by xgb') params_xgb = { 'num_leaves': 7, # we should let it be smaller than 2^(max_depth) 'max_depth': 3, # -1 means no limit @@ -182,8 +179,6 @@ def nothrow_execute(sql): bst = xgb_modelfit_nocv(params_xgb, xgtrain, watchlist, - predictors, - target, objective='binary:logistic', metrics='auc', num_boost_round=300, @@ -193,22 +188,23 @@ def nothrow_execute(sql): del val_df gc.collect() -print("Save model.json") -bst.save_model("./model.json") +print('Save model.json') +bst.save_model('./model.json') -print("Prepare online serving") +print('Prepare online serving') -print("Deploy sql") +print('Deploy sql') # predict server needs this name -deploy_name = "demo" +deploy_name = 'demo' connection.execute("SET @@execute_mode='online';") -connection.execute("USE {}".format(db_name)) -nothrow_execute("DROP DEPLOYMENT {}".format(deploy_name)) -deploy_sql = """DEPLOY {} {}""".format(deploy_name, sql_part) +connection.execute(f'USE {db_name}') +nothrow_execute(f'DROP DEPLOYMENT {deploy_name}') +deploy_sql = f"""DEPLOY {deploy_name} {sql_part}""" print(deploy_sql) connection.execute(deploy_sql) -print("Import data to online") +print('Import data to online') # online feature extraction needs history data # set job_timeout bigger if the `LOAD DATA` job timeout -connection.execute("LOAD DATA INFILE 'file://{}' INTO TABLE {}.{} OPTIONS(mode='append',format='csv',header=true);".format( - os.path.abspath("train_sample.csv"), db_name, table_name)) +connection.execute( + f"LOAD DATA INFILE 'file://{os.path.abspath('train_sample.csv')}' " + f"INTO TABLE {db_name}.{table_name} OPTIONS(mode='append',format='csv',header=true);") diff --git a/pylintrc b/pylintrc index ffe8706163b..3821c133811 100644 --- a/pylintrc +++ b/pylintrc @@ -89,7 +89,9 @@ disable=abstract-method, long-suffix, map-builtin-not-iterating, misplaced-comparison-constant, + missing-class-docstring, missing-function-docstring, + missing-module-docstring, metaclass-assignment, next-method-called, next-method-defined, From 7ab0df994c3424642b4eae1ca21dd039e4fabda7 Mon Sep 17 00:00:00 2001 From: mango Date: Wed, 1 Jun 2022 13:21:36 +0800 Subject: [PATCH 2/6] fix some lint error Signed-off-by: mango --- .../script/convert_data.py | 4 ++-- .../script/predict.py | 22 +++++++++---------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/demo/predict-taxi-trip-duration/script/convert_data.py b/demo/predict-taxi-trip-duration/script/convert_data.py index 098f40f2599..7808dadc689 100644 --- a/demo/predict-taxi-trip-duration/script/convert_data.py +++ b/demo/predict-taxi-trip-duration/script/convert_data.py @@ -17,14 +17,14 @@ """Module of covert data from system stdin""" import sys import time + i = 0 for line in sys.stdin: if i == 0: - i+=1 + i += 1 print(line.strip()) continue arr = line.strip().split(",") arr[2] = str(int(time.mktime(time.strptime(arr[2], "%Y-%m-%d %H:%M:%S"))) * 1000) arr[3] = str(int(time.mktime(time.strptime(arr[3], "%Y-%m-%d %H:%M:%S"))) * 1000) print(",".join(arr)) - diff --git a/demo/predict-taxi-trip-duration/script/predict.py b/demo/predict-taxi-trip-duration/script/predict.py index 854e32cbbeb..f5f7b52712e 100644 --- a/demo/predict-taxi-trip-duration/script/predict.py +++ b/demo/predict-taxi-trip-duration/script/predict.py @@ -17,16 +17,16 @@ import requests url = "http://127.0.0.1:8887/predict" -req ={"id":"id0376262", - "vendor_id":1, - "pickup_datetime":1467302350000, - "dropoff_datetime":1467304896000, - "passenger_count":2, - "pickup_longitude":-73.873093, - "pickup_latitude":40.774097, - "dropoff_longitude":-73.926704, - "dropoff_latitude":40.856739, - "store_and_fwd_flag":"N", - "trip_duration":1} +req = {"id": "id0376262", + "vendor_id": 1, + "pickup_datetime": 1467302350000, + "dropoff_datetime": 1467304896000, + "passenger_count": 2, + "pickup_longitude": -73.873093, + "pickup_latitude": 40.774097, + "dropoff_longitude": -73.926704, + "dropoff_latitude": 40.856739, + "store_and_fwd_flag": "N", + "trip_duration": 1} r = requests.post(url, json=req) print(r.text) From 3d22146e32e536757d7c6362fcfc2a2258c34cda Mon Sep 17 00:00:00 2001 From: mango Date: Wed, 1 Jun 2022 15:27:16 +0800 Subject: [PATCH 3/6] fix the execution of catch error Signed-off-by: mango --- demo/predict-taxi-trip-duration/test/import.py | 4 ++-- .../talkingdata-adtracking-fraud-detection/train_and_serve.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/demo/predict-taxi-trip-duration/test/import.py b/demo/predict-taxi-trip-duration/test/import.py index dceda93a2e2..d8f294b27f7 100644 --- a/demo/predict-taxi-trip-duration/test/import.py +++ b/demo/predict-taxi-trip-duration/test/import.py @@ -40,11 +40,11 @@ connection = engine.connect() try: connection.execute('create database db_test;') -except db.exc.SQLAlchemyError as e: +except Exception as e: print(e) try: connection.execute(ddl) -except db.exc.SQLAlchemyError as e: +except Exception as e: print(e) diff --git a/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py b/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py index 3c9c051721f..6b6bb0b1f12 100644 --- a/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py +++ b/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py @@ -106,7 +106,7 @@ def nothrow_execute(sql): print('execute ' + sql) _, rs = connection.execute(sql) print(rs) - except db.exc.SQLAlchemyError as e: + except Exception as e: print(e) From 3bdbb88399231b19033be497de81310694d49006 Mon Sep 17 00:00:00 2001 From: mango Date: Thu, 2 Jun 2022 15:40:08 +0800 Subject: [PATCH 4/6] solved broad-except lint error. Signed-off-by: mango --- demo/predict-taxi-trip-duration/test/import.py | 2 ++ demo/talkingdata-adtracking-fraud-detection/predict_server.py | 1 - .../talkingdata-adtracking-fraud-detection/train_and_serve.py | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/demo/predict-taxi-trip-duration/test/import.py b/demo/predict-taxi-trip-duration/test/import.py index d8f294b27f7..dbaabced81a 100644 --- a/demo/predict-taxi-trip-duration/test/import.py +++ b/demo/predict-taxi-trip-duration/test/import.py @@ -40,10 +40,12 @@ connection = engine.connect() try: connection.execute('create database db_test;') + # pylint: disable=broad-except except Exception as e: print(e) try: connection.execute(ddl) + # pylint: disable=broad-except except Exception as e: print(e) diff --git a/demo/talkingdata-adtracking-fraud-detection/predict_server.py b/demo/talkingdata-adtracking-fraud-detection/predict_server.py index b9e5278ae4c..0944ebf7877 100644 --- a/demo/talkingdata-adtracking-fraud-detection/predict_server.py +++ b/demo/talkingdata-adtracking-fraud-detection/predict_server.py @@ -164,7 +164,6 @@ def make_app(): default=True) args = parser.parse_args() - global global_args global_args = vars(args) print(global_args) logging.info("init args: %s", global_args) diff --git a/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py b/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py index 6b6bb0b1f12..ed779d5637b 100644 --- a/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py +++ b/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py @@ -106,6 +106,7 @@ def nothrow_execute(sql): print('execute ' + sql) _, rs = connection.execute(sql) print(rs) + # pylint: disable=broad-except except Exception as e: print(e) @@ -149,8 +150,7 @@ def nothrow_execute(sql): print(f'Load features from feature dir {train_feature_dir}') # train_feature_dir has multi csv files -train_df = pd.concat(map(lambda file: pd.read_csv(file), glob.glob( - os.path.join('', train_feature_dir + '/*.csv')))) +train_df = pd.concat(map(pd.read_csv, glob.glob(os.path.join('', train_feature_dir + '/*.csv')))) print('peek:') print(train_df.head()) len_train = len(train_df) From 802ba475164e1d26b9dd33599edf4b3bfaa020fc Mon Sep 17 00:00:00 2001 From: mango Date: Mon, 6 Jun 2022 15:39:56 +0800 Subject: [PATCH 5/6] keep quote delimiter consistent Signed-off-by: mango --- .../predict_server.py | 4 +- .../train_and_serve.py | 162 +++++++++--------- 2 files changed, 83 insertions(+), 83 deletions(-) diff --git a/demo/talkingdata-adtracking-fraud-detection/predict_server.py b/demo/talkingdata-adtracking-fraud-detection/predict_server.py index 0944ebf7877..d7609486cda 100644 --- a/demo/talkingdata-adtracking-fraud-detection/predict_server.py +++ b/demo/talkingdata-adtracking-fraud-detection/predict_server.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Module of docstring""" +"""Module of predict server""" import argparse import json import numpy as np @@ -73,7 +73,7 @@ def get_result(response): class PredictHandler(tornado.web.RequestHandler): - """Class PredictHandler docstring.""" + """Class PredictHandler.""" def post(self): # only one row row = json.loads(self.request.body) diff --git a/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py b/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py index ed779d5637b..46e24fba408 100644 --- a/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py +++ b/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py @@ -1,4 +1,4 @@ -"""Module of docstring""" +"""Module of training and request to predict server""" import gc import os import time @@ -12,46 +12,46 @@ # fmt:on # openmldb cluster configs -zk = '127.0.0.1:2181' -zk_path = '/openmldb' +zk = "127.0.0.1:2181" +zk_path = "/openmldb" # db, deploy name and model_path will update to predict server. You only need to modify here. -db_name = 'demo_db' -deploy_name = 'demo' +db_name = "demo_db" +deploy_name = "demo" # save model to -model_path = '/tmp/model.json' +model_path = "/tmp/model.json" -table_name = 'talkingdata' + str(int(time.time())) +table_name = "talkingdata" + str(int(time.time())) # make sure that taskmanager can access the path -train_feature_dir = '/tmp/train_feature' +train_feature_dir = "/tmp/train_feature" -predict_server = 'localhost:8881' +predict_server = "localhost:8881" def column_string(col_tuple) -> str: - return ' '.join(col_tuple) + return " ".join(col_tuple) -def xgb_modelfit_nocv(params, dtrain, dvalid, objective='binary:logistic', metrics='auc', +def xgb_modelfit_nocv(params, dtrain, dvalid, objective="binary:logistic", metrics="auc", feval=None, num_boost_round=3000, early_stopping_rounds=20): xgb_params = { - 'booster': 'gbtree', - 'obj': objective, - 'eval_metric': metrics, - 'num_leaves': 31, # we should let it be smaller than 2^(max_depth) - 'max_depth': -1, # -1 means no limit - 'max_bin': 255, # Number of bucketed bin for feature values - 'subsample': 0.6, # Subsample ratio of the training instance. - 'colsample_bytree': 0.3, - 'min_child_weight': 5, - 'alpha': 0, # L1 regularization term on weights - 'lambda': 0, # L2 regularization term on weights - 'nthread': 8, - 'verbosity': 0, + "booster": "gbtree", + "obj": objective, + "eval_metric": metrics, + "num_leaves": 31, # we should let it be smaller than 2^(max_depth) + "max_depth": -1, # -1 means no limit + "max_bin": 255, # Number of bucketed bin for feature values + "subsample": 0.6, # Subsample ratio of the training instance. + "colsample_bytree": 0.3, + "min_child_weight": 5, + "alpha": 0, # L1 regularization term on weights + "lambda": 0, # L2 regularization term on weights + "nthread": 8, + "verbosity": 0, } xgb_params.update(params) - print('preparing validation datasets') + print("preparing validation datasets") evals_results = {} @@ -65,37 +65,37 @@ def xgb_modelfit_nocv(params, dtrain, dvalid, objective='binary:logistic', metri feval=feval) n_estimators = bst1.best_iteration - print('\nModel Report') - print('n_estimators : ', n_estimators) - print(metrics + ':', evals_results['eval'][metrics][n_estimators - 1]) + print("\nModel Report") + print("n_estimators : ", n_estimators) + print(metrics + ":", evals_results["eval"][metrics][n_estimators - 1]) return bst1 # use pandas extension types to support NA in integer column dtypes = { - 'ip': 'UInt32', - 'app': 'UInt16', - 'device': 'UInt16', - 'os': 'UInt16', - 'channel': 'UInt16', - 'is_attributed': 'UInt8', - 'click_id': 'UInt32' + "ip": "UInt32", + "app": "UInt16", + "device": "UInt16", + "os": "UInt16", + "channel": "UInt16", + "is_attributed": "UInt8", + "click_id": "UInt32" } -train_schema = [('ip', 'int'), ('app', 'int'), ('device', 'int'), - ('os', 'int'), ('channel', 'int'), ('click_time', 'timestamp'), ('is_attributed', 'int')] +train_schema = [("ip", "int"), ("app", "int"), ("device", "int"), + ("os", "int"), ("channel", "int"), ("click_time", "timestamp"), ("is_attributed", "int")] def cut_data(): - data_path = 'data/' + data_path = "data/" sample_cnt = 10000 # you can prepare sample data by yourself - print(f'Prepare train data, use {sample_cnt} rows, save it as train_sample.csv') - train_df_tmp = pd.read_csv(data_path + 'train.csv', nrows=sample_cnt, + print(f"Prepare train data, use {sample_cnt} rows, save it as train_sample.csv") + train_df_tmp = pd.read_csv(data_path + "train.csv", nrows=sample_cnt, dtype=dtypes, usecols=[c[0] for c in train_schema]) assert len(train_df_tmp) == sample_cnt # take a portion from train sample data - train_df_tmp.to_csv('train_sample.csv', index=False) + train_df_tmp.to_csv("train_sample.csv", index=False) del train_df_tmp gc.collect() @@ -103,7 +103,7 @@ def cut_data(): def nothrow_execute(sql): # only used for drop deployment, cuz 'if not exist' is not supported now try: - print('execute ' + sql) + print("execute " + sql) _, rs = connection.execute(sql) print(rs) # pylint: disable=broad-except @@ -111,27 +111,27 @@ def nothrow_execute(sql): print(e) -print(f'Prepare openmldb, db {db_name} table {table_name}') +print(f"Prepare openmldb, db {db_name} table {table_name}") # cut_data() engine = db.create_engine( - f'openmldb:///{db_name}?zk={zk}&zkPath={zk_path}') + f"openmldb:///{db_name}?zk={zk}&zkPath={zk_path}") connection = engine.connect() -connection.execute(f'CREATE DATABASE IF NOT EXISTS {db_name};') -schema_string = ','.join(list(map(column_string, train_schema))) -connection.execute(f'CREATE TABLE IF NOT EXISTS {table_name}({schema_string});') +connection.execute(f"CREATE DATABASE IF NOT EXISTS {db_name};") +schema_string = ",".join(list(map(column_string, train_schema))) +connection.execute(f"CREATE TABLE IF NOT EXISTS {table_name}({schema_string});") -print('Load train_sample data to offline storage for training(hard copy)') -connection.execute(f'USE {db_name}') +print("Load train_sample data to offline storage for training(hard copy)") +connection.execute(f"USE {db_name}") connection.execute("SET @@execute_mode='offline';") # use sync offline job, to make sure `LOAD DATA` finished -connection.execute('SET @@sync_job=true;') -connection.execute('SET @@job_timeout=1200000;') +connection.execute("SET @@sync_job=true;") +connection.execute("SET @@job_timeout=1200000;") # use soft link after https://github.com/4paradigm/OpenMLDB/issues/1565 fixed connection.execute(f"LOAD DATA INFILE 'file://{os.path.abspath('train_sample.csv')}' " f"INTO TABLE {table_name} OPTIONS(format='csv',header=true);") -print('Feature extraction') +print("Feature extraction") # the first column `is_attributed` is the label sql_part = f""" select is_attributed, ip, app, device, os, channel, hour(click_time) as hour, day(click_time) as day, @@ -145,50 +145,50 @@ def nothrow_execute(sql): w3 as(partition by ip, app, os order by click_time ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) """ # extraction will take time -connection.execute('SET @@job_timeout=1200000;') +connection.execute("SET @@job_timeout=1200000;") connection.execute(f"{sql_part} INTO OUTFILE '{train_feature_dir}' OPTIONS(mode='overwrite');") -print(f'Load features from feature dir {train_feature_dir}') +print(f"Load features from feature dir {train_feature_dir}") # train_feature_dir has multi csv files -train_df = pd.concat(map(pd.read_csv, glob.glob(os.path.join('', train_feature_dir + '/*.csv')))) -print('peek:') +train_df = pd.concat(map(pd.read_csv, glob.glob(os.path.join("", train_feature_dir + "/*.csv")))) +print("peek:") print(train_df.head()) len_train = len(train_df) train_row_cnt = int(len_train * 3 / 4) train_df = train_df[(len_train - train_row_cnt):len_train] val_df = train_df[:(len_train - train_row_cnt)] -print('train size: ', len(train_df)) -print('valid size: ', len(val_df)) +print("train size: ", len(train_df)) +print("valid size: ", len(val_df)) -target = 'is_attributed' -predictors = ['app', 'device', 'os', 'channel', 'hour', - 'day', 'qty', 'ip_app_count', 'ip_app_os_count'] +target = "is_attributed" +predictors = ["app", "device", "os", "channel", "hour", + "day", "qty", "ip_app_count", "ip_app_os_count"] gc.collect() -print('Training by xgb') +print("Training by xgb") params_xgb = { - 'num_leaves': 7, # we should let it be smaller than 2^(max_depth) - 'max_depth': 3, # -1 means no limit - 'min_child_samples': 100, - 'max_bin': 100, # Number of bucketed bin for feature values - 'subsample': 0.7, # Subsample ratio of the training instance. + "num_leaves": 7, # we should let it be smaller than 2^(max_depth) + "max_depth": 3, # -1 means no limit + "min_child_samples": 100, + "max_bin": 100, # Number of bucketed bin for feature values + "subsample": 0.7, # Subsample ratio of the training instance. # Subsample ratio of columns when constructing each tree. - 'colsample_bytree': 0.7, + "colsample_bytree": 0.7, # Minimum sum of instance weight(hessian) needed in a child(leaf) - 'min_child_weight': 0 + "min_child_weight": 0 } xgtrain = xgb.DMatrix(train_df[predictors].values, label=train_df[target].values) xgvalid = xgb.DMatrix(val_df[predictors].values, label=val_df[target].values) -watchlist = [(xgvalid, 'eval'), (xgtrain, 'train')] +watchlist = [(xgvalid, "eval"), (xgtrain, "train")] bst = xgb_modelfit_nocv(params_xgb, xgtrain, watchlist, - objective='binary:logistic', - metrics='auc', + objective="binary:logistic", + metrics="auc", num_boost_round=300, early_stopping_rounds=50) @@ -196,25 +196,25 @@ def nothrow_execute(sql): del val_df gc.collect() -print('Save model.json to ', model_path) +print("Save model.json to ", model_path) bst.save_model(model_path) -print('Prepare online serving') +print("Prepare online serving") -print('Deploy sql') +print("Deploy sql") connection.execute("SET @@execute_mode='online';") -connection.execute(f'USE {db_name}') -nothrow_execute(f'DROP DEPLOYMENT {deploy_name}') +connection.execute(f"USE {db_name}") +nothrow_execute(f"DROP DEPLOYMENT {deploy_name}") deploy_sql = f"""DEPLOY {deploy_name} {sql_part}""" print(deploy_sql) connection.execute(deploy_sql) -print('Import data to online') +print("Import data to online") # online feature extraction needs history data # set job_timeout bigger if the `LOAD DATA` job timeout connection.execute( f"LOAD DATA INFILE 'file://{os.path.abspath('train_sample.csv')}' " f"INTO TABLE {db_name}.{table_name} OPTIONS(mode='append',format='csv',header=true);") -print('Update model to predict server') -infos = {'database': db_name, 'deployment': deploy_name, 'model_path': model_path} -requests.post('http://' + predict_server + '/update', json=infos) +print("Update model to predict server") +infos = {"database": db_name, "deployment": deploy_name, "model_path": model_path} +requests.post("http://" + predict_server + "/update", json=infos) From 3d5f595fa9b3a6476b6e11f2d4bd90a17ff5d346 Mon Sep 17 00:00:00 2001 From: mango Date: Tue, 7 Jun 2022 17:14:48 +0800 Subject: [PATCH 6/6] revert openmldb import Signed-off-by: mango --- .../train_and_serve.py | 162 +++++++++--------- 1 file changed, 82 insertions(+), 80 deletions(-) diff --git a/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py b/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py index 46e24fba408..ebd04e911a2 100644 --- a/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py +++ b/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py @@ -5,6 +5,8 @@ import glob import requests # fmt:off +# pylint: disable=unused-import +import openmldb import sqlalchemy as db import pandas as pd import xgboost as xgb @@ -12,46 +14,46 @@ # fmt:on # openmldb cluster configs -zk = "127.0.0.1:2181" -zk_path = "/openmldb" +zk = '127.0.0.1:2181' +zk_path = '/openmldb' # db, deploy name and model_path will update to predict server. You only need to modify here. -db_name = "demo_db" -deploy_name = "demo" +db_name = 'demo_db' +deploy_name = 'demo' # save model to -model_path = "/tmp/model.json" +model_path = '/tmp/model.json' -table_name = "talkingdata" + str(int(time.time())) +table_name = 'talkingdata' + str(int(time.time())) # make sure that taskmanager can access the path -train_feature_dir = "/tmp/train_feature" +train_feature_dir = '/tmp/train_feature' -predict_server = "localhost:8881" +predict_server = 'localhost:8881' def column_string(col_tuple) -> str: - return " ".join(col_tuple) + return ' '.join(col_tuple) -def xgb_modelfit_nocv(params, dtrain, dvalid, objective="binary:logistic", metrics="auc", +def xgb_modelfit_nocv(params, dtrain, dvalid, objective='binary:logistic', metrics='auc', feval=None, num_boost_round=3000, early_stopping_rounds=20): xgb_params = { - "booster": "gbtree", - "obj": objective, - "eval_metric": metrics, - "num_leaves": 31, # we should let it be smaller than 2^(max_depth) - "max_depth": -1, # -1 means no limit - "max_bin": 255, # Number of bucketed bin for feature values - "subsample": 0.6, # Subsample ratio of the training instance. - "colsample_bytree": 0.3, - "min_child_weight": 5, - "alpha": 0, # L1 regularization term on weights - "lambda": 0, # L2 regularization term on weights - "nthread": 8, - "verbosity": 0, + 'booster': 'gbtree', + 'obj': objective, + 'eval_metric': metrics, + 'num_leaves': 31, # we should let it be smaller than 2^(max_depth) + 'max_depth': -1, # -1 means no limit + 'max_bin': 255, # Number of bucketed bin for feature values + 'subsample': 0.6, # Subsample ratio of the training instance. + 'colsample_bytree': 0.3, + 'min_child_weight': 5, + 'alpha': 0, # L1 regularization term on weights + 'lambda': 0, # L2 regularization term on weights + 'nthread': 8, + 'verbosity': 0, } xgb_params.update(params) - print("preparing validation datasets") + print('preparing validation datasets') evals_results = {} @@ -65,37 +67,37 @@ def xgb_modelfit_nocv(params, dtrain, dvalid, objective="binary:logistic", metri feval=feval) n_estimators = bst1.best_iteration - print("\nModel Report") - print("n_estimators : ", n_estimators) - print(metrics + ":", evals_results["eval"][metrics][n_estimators - 1]) + print('\nModel Report') + print('n_estimators : ', n_estimators) + print(metrics + ':', evals_results['eval'][metrics][n_estimators - 1]) return bst1 # use pandas extension types to support NA in integer column dtypes = { - "ip": "UInt32", - "app": "UInt16", - "device": "UInt16", - "os": "UInt16", - "channel": "UInt16", - "is_attributed": "UInt8", - "click_id": "UInt32" + 'ip': 'UInt32', + 'app': 'UInt16', + 'device': 'UInt16', + 'os': 'UInt16', + 'channel': 'UInt16', + 'is_attributed': 'UInt8', + 'click_id': 'UInt32' } -train_schema = [("ip", "int"), ("app", "int"), ("device", "int"), - ("os", "int"), ("channel", "int"), ("click_time", "timestamp"), ("is_attributed", "int")] +train_schema = [('ip', 'int'), ('app', 'int'), ('device', 'int'), + ('os', 'int'), ('channel', 'int'), ('click_time', 'timestamp'), ('is_attributed', 'int')] def cut_data(): - data_path = "data/" + data_path = 'data/' sample_cnt = 10000 # you can prepare sample data by yourself - print(f"Prepare train data, use {sample_cnt} rows, save it as train_sample.csv") - train_df_tmp = pd.read_csv(data_path + "train.csv", nrows=sample_cnt, + print(f'Prepare train data, use {sample_cnt} rows, save it as train_sample.csv') + train_df_tmp = pd.read_csv(data_path + 'train.csv', nrows=sample_cnt, dtype=dtypes, usecols=[c[0] for c in train_schema]) assert len(train_df_tmp) == sample_cnt # take a portion from train sample data - train_df_tmp.to_csv("train_sample.csv", index=False) + train_df_tmp.to_csv('train_sample.csv', index=False) del train_df_tmp gc.collect() @@ -103,7 +105,7 @@ def cut_data(): def nothrow_execute(sql): # only used for drop deployment, cuz 'if not exist' is not supported now try: - print("execute " + sql) + print('execute ' + sql) _, rs = connection.execute(sql) print(rs) # pylint: disable=broad-except @@ -111,27 +113,27 @@ def nothrow_execute(sql): print(e) -print(f"Prepare openmldb, db {db_name} table {table_name}") +print(f'Prepare openmldb, db {db_name} table {table_name}') # cut_data() engine = db.create_engine( - f"openmldb:///{db_name}?zk={zk}&zkPath={zk_path}") + f'openmldb:///{db_name}?zk={zk}&zkPath={zk_path}') connection = engine.connect() -connection.execute(f"CREATE DATABASE IF NOT EXISTS {db_name};") -schema_string = ",".join(list(map(column_string, train_schema))) -connection.execute(f"CREATE TABLE IF NOT EXISTS {table_name}({schema_string});") +connection.execute(f'CREATE DATABASE IF NOT EXISTS {db_name};') +schema_string = ','.join(list(map(column_string, train_schema))) +connection.execute(f'CREATE TABLE IF NOT EXISTS {table_name}({schema_string});') -print("Load train_sample data to offline storage for training(hard copy)") -connection.execute(f"USE {db_name}") +print('Load train_sample data to offline storage for training(hard copy)') +connection.execute(f'USE {db_name}') connection.execute("SET @@execute_mode='offline';") # use sync offline job, to make sure `LOAD DATA` finished -connection.execute("SET @@sync_job=true;") -connection.execute("SET @@job_timeout=1200000;") +connection.execute('SET @@sync_job=true;') +connection.execute('SET @@job_timeout=1200000;') # use soft link after https://github.com/4paradigm/OpenMLDB/issues/1565 fixed connection.execute(f"LOAD DATA INFILE 'file://{os.path.abspath('train_sample.csv')}' " f"INTO TABLE {table_name} OPTIONS(format='csv',header=true);") -print("Feature extraction") +print('Feature extraction') # the first column `is_attributed` is the label sql_part = f""" select is_attributed, ip, app, device, os, channel, hour(click_time) as hour, day(click_time) as day, @@ -145,50 +147,50 @@ def nothrow_execute(sql): w3 as(partition by ip, app, os order by click_time ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) """ # extraction will take time -connection.execute("SET @@job_timeout=1200000;") +connection.execute('SET @@job_timeout=1200000;') connection.execute(f"{sql_part} INTO OUTFILE '{train_feature_dir}' OPTIONS(mode='overwrite');") -print(f"Load features from feature dir {train_feature_dir}") +print(f'Load features from feature dir {train_feature_dir}') # train_feature_dir has multi csv files -train_df = pd.concat(map(pd.read_csv, glob.glob(os.path.join("", train_feature_dir + "/*.csv")))) -print("peek:") +train_df = pd.concat(map(pd.read_csv, glob.glob(os.path.join('', train_feature_dir + '/*.csv')))) +print('peek:') print(train_df.head()) len_train = len(train_df) train_row_cnt = int(len_train * 3 / 4) train_df = train_df[(len_train - train_row_cnt):len_train] val_df = train_df[:(len_train - train_row_cnt)] -print("train size: ", len(train_df)) -print("valid size: ", len(val_df)) +print('train size: ', len(train_df)) +print('valid size: ', len(val_df)) -target = "is_attributed" -predictors = ["app", "device", "os", "channel", "hour", - "day", "qty", "ip_app_count", "ip_app_os_count"] +target = 'is_attributed' +predictors = ['app', 'device', 'os', 'channel', 'hour', + 'day', 'qty', 'ip_app_count', 'ip_app_os_count'] gc.collect() -print("Training by xgb") +print('Training by xgb') params_xgb = { - "num_leaves": 7, # we should let it be smaller than 2^(max_depth) - "max_depth": 3, # -1 means no limit - "min_child_samples": 100, - "max_bin": 100, # Number of bucketed bin for feature values - "subsample": 0.7, # Subsample ratio of the training instance. + 'num_leaves': 7, # we should let it be smaller than 2^(max_depth) + 'max_depth': 3, # -1 means no limit + 'min_child_samples': 100, + 'max_bin': 100, # Number of bucketed bin for feature values + 'subsample': 0.7, # Subsample ratio of the training instance. # Subsample ratio of columns when constructing each tree. - "colsample_bytree": 0.7, + 'colsample_bytree': 0.7, # Minimum sum of instance weight(hessian) needed in a child(leaf) - "min_child_weight": 0 + 'min_child_weight': 0 } xgtrain = xgb.DMatrix(train_df[predictors].values, label=train_df[target].values) xgvalid = xgb.DMatrix(val_df[predictors].values, label=val_df[target].values) -watchlist = [(xgvalid, "eval"), (xgtrain, "train")] +watchlist = [(xgvalid, 'eval'), (xgtrain, 'train')] bst = xgb_modelfit_nocv(params_xgb, xgtrain, watchlist, - objective="binary:logistic", - metrics="auc", + objective='binary:logistic', + metrics='auc', num_boost_round=300, early_stopping_rounds=50) @@ -196,25 +198,25 @@ def nothrow_execute(sql): del val_df gc.collect() -print("Save model.json to ", model_path) +print('Save model.json to ', model_path) bst.save_model(model_path) -print("Prepare online serving") +print('Prepare online serving') -print("Deploy sql") +print('Deploy sql') connection.execute("SET @@execute_mode='online';") -connection.execute(f"USE {db_name}") -nothrow_execute(f"DROP DEPLOYMENT {deploy_name}") +connection.execute(f'USE {db_name}') +nothrow_execute(f'DROP DEPLOYMENT {deploy_name}') deploy_sql = f"""DEPLOY {deploy_name} {sql_part}""" print(deploy_sql) connection.execute(deploy_sql) -print("Import data to online") +print('Import data to online') # online feature extraction needs history data # set job_timeout bigger if the `LOAD DATA` job timeout connection.execute( f"LOAD DATA INFILE 'file://{os.path.abspath('train_sample.csv')}' " f"INTO TABLE {db_name}.{table_name} OPTIONS(mode='append',format='csv',header=true);") -print("Update model to predict server") -infos = {"database": db_name, "deployment": deploy_name, "model_path": model_path} -requests.post("http://" + predict_server + "/update", json=infos) +print('Update model to predict server') +infos = {'database': db_name, 'deployment': deploy_name, 'model_path': model_path} +requests.post('http://' + predict_server + '/update', json=infos)