diff --git a/demo/predict-taxi-trip-duration/script/convert_data.py b/demo/predict-taxi-trip-duration/script/convert_data.py index 0486e81d148..7808dadc689 100644 --- a/demo/predict-taxi-trip-duration/script/convert_data.py +++ b/demo/predict-taxi-trip-duration/script/convert_data.py @@ -14,16 +14,17 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Module of covert data from system stdin""" import sys -import time, datetime +import time + i = 0 for line in sys.stdin: if i == 0: - i+=1 + i += 1 print(line.strip()) continue arr = line.strip().split(",") arr[2] = str(int(time.mktime(time.strptime(arr[2], "%Y-%m-%d %H:%M:%S"))) * 1000) arr[3] = str(int(time.mktime(time.strptime(arr[3], "%Y-%m-%d %H:%M:%S"))) * 1000) print(",".join(arr)) - diff --git a/demo/predict-taxi-trip-duration/script/predict.py b/demo/predict-taxi-trip-duration/script/predict.py index d48e1dddba7..f5f7b52712e 100644 --- a/demo/predict-taxi-trip-duration/script/predict.py +++ b/demo/predict-taxi-trip-duration/script/predict.py @@ -13,25 +13,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +"""Module of request predict in script""" import requests -import os -import base64 -import random -import time -import hashlib url = "http://127.0.0.1:8887/predict" -req ={"id":"id0376262", - "vendor_id":1, - "pickup_datetime":1467302350000, - "dropoff_datetime":1467304896000, - "passenger_count":2, - "pickup_longitude":-73.873093, - "pickup_latitude":40.774097, - "dropoff_longitude":-73.926704, - "dropoff_latitude":40.856739, - "store_and_fwd_flag":"N", - "trip_duration":1} +req = {"id": "id0376262", + "vendor_id": 1, + "pickup_datetime": 1467302350000, + "dropoff_datetime": 1467304896000, + "passenger_count": 2, + "pickup_longitude": -73.873093, + "pickup_latitude": 40.774097, + "dropoff_longitude": -73.926704, + "dropoff_latitude": 40.856739, + "store_and_fwd_flag": "N", + "trip_duration": 1} r = requests.post(url, json=req) print(r.text) diff --git a/demo/predict-taxi-trip-duration/script/predict_server.py b/demo/predict-taxi-trip-duration/script/predict_server.py index a44e8746236..34d84eb8c95 100644 --- a/demo/predict-taxi-trip-duration/script/predict_server.py +++ b/demo/predict-taxi-trip-duration/script/predict_server.py @@ -14,52 +14,57 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Module of predict server""" import numpy as np import tornado.web import tornado.ioloop import json import lightgbm as lgb -import sqlalchemy as db import requests import argparse bst = None table_schema = [ - ("id", "string"), - ("vendor_id", "int"), - ("pickup_datetime", "timestamp"), - ("dropoff_datetime", "timestamp"), - ("passenger_count", "int"), - ("pickup_longitude", "double"), - ("pickup_latitude", "double"), - ("dropoff_longitude", "double"), - ("dropoff_latitude", "double"), - ("store_and_fwd_flag", "string"), - ("trip_duration", "int"), + ("id", "string"), + ("vendor_id", "int"), + ("pickup_datetime", "timestamp"), + ("dropoff_datetime", "timestamp"), + ("passenger_count", "int"), + ("pickup_longitude", "double"), + ("pickup_latitude", "double"), + ("dropoff_longitude", "double"), + ("dropoff_latitude", "double"), + ("store_and_fwd_flag", "string"), + ("trip_duration", "int"), ] url = "" + def get_schema(): - dict_schema = {} + dict_schema_tmp = {} for i in table_schema: - dict_schema[i[0]] = i[1] - return dict_schema + dict_schema_tmp[i[0]] = i[1] + return dict_schema_tmp + dict_schema = get_schema() json_schema = json.dumps(dict_schema) + def build_feature(rs): - var_Y = [rs[0]] - var_X = [rs[1:12]] - return np.array(var_X) + var_x = [rs[1:12]] + return np.array(var_x) + class SchemaHandler(tornado.web.RequestHandler): def get(self): self.write(json_schema) + class PredictHandler(tornado.web.RequestHandler): + """Class of PredictHandler docstring.""" def post(self): row = json.loads(self.request.body) data = {} @@ -72,7 +77,8 @@ def post(self): row_data.append(row.get(i[0], 0)) else: row_data.append(None) - data["input"].append(row_data) + + data["input"].append(row_data) rs = requests.post(url, json=data) result = json.loads(rs.text) for r in result["data"]["data"]: @@ -81,12 +87,14 @@ def post(self): self.write(str(ins) + "\n") duration = bst.predict(ins) self.write("---------------predict trip_duration -------------\n") - self.write("%s s"%str(duration[0])) + self.write(f"{str(duration[0])} s") + class MainHandler(tornado.web.RequestHandler): def get(self): self.write("real time execute sparksql demo") + def make_app(): return tornado.web.Application([ (r"/", MainHandler), @@ -94,12 +102,13 @@ def make_app(): (r"/predict", PredictHandler), ]) + if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument("endpoint", help="specify the endpoint of apiserver") - parser.add_argument("model_path", help="specify the model path") + parser.add_argument("endpoint", help="specify the endpoint of apiserver") + parser.add_argument("model_path", help="specify the model path") args = parser.parse_args() - url = "http://%s/dbs/demo_db/deployments/demo" % args.endpoint + url = f"http://{args.endpoint}/dbs/demo_db/deployments/demo" bst = lgb.Booster(model_file=args.model_path) app = make_app() app.listen(8887) diff --git a/demo/predict-taxi-trip-duration/script/train.py b/demo/predict-taxi-trip-duration/script/train.py index 3952a8094fc..d81d8266ff7 100644 --- a/demo/predict-taxi-trip-duration/script/train.py +++ b/demo/predict-taxi-trip-duration/script/train.py @@ -14,30 +14,29 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Module of train and save model""" import lightgbm as lgb import pandas as pd -from sklearn.metrics import mean_squared_error -from sklearn.model_selection import GridSearchCV from sklearn.model_selection import train_test_split import argparse import os parser = argparse.ArgumentParser() -parser.add_argument("feature_path", help="specify the feature path") -parser.add_argument("model_path", help="specify the model path") +parser.add_argument('feature_path', help='specify the feature path') +parser.add_argument('model_path', help='specify the model path') args = parser.parse_args() feature_path = args.feature_path # merge file if os.path.isdir(feature_path): path_list = os.listdir(feature_path) - new_file = "/tmp/merged_feature.csv" - with open(new_file, 'w') as wf: + new_file = '/tmp/merged_feature.csv' + with open(new_file, 'w', encoding='utf-8') as wf: has_write_header = False for filename in path_list: - if filename == "_SUCCESS" or filename.startswith('.'): + if filename == '_SUCCESS' or filename.startswith('.'): continue - with open(os.path.join(feature_path, filename), 'r') as f: + with open(os.path.join(feature_path, filename), 'r', encoding='utf-8') as f: first_line = True for line in f.readlines(): if first_line is True: @@ -50,7 +49,7 @@ feature_path = new_file # run batch sql and get instances -df = pd.read_csv(feature_path); +df = pd.read_csv(feature_path) train_set, predict_set = train_test_split(df, test_size=0.2) y_train = train_set['trip_duration'] x_train = train_set.drop(columns=['trip_duration']) @@ -83,4 +82,4 @@ early_stopping_rounds=5) gbm.save_model(args.model_path) -print("save model.txt done") +print('save model.txt done') diff --git a/demo/predict-taxi-trip-duration/test/import.py b/demo/predict-taxi-trip-duration/test/import.py index e88a4081a03..dbaabced81a 100644 --- a/demo/predict-taxi-trip-duration/test/import.py +++ b/demo/predict-taxi-trip-duration/test/import.py @@ -14,15 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -""" -""" +"""Module of insert data to table""" import sqlalchemy as db - -import sys import datetime -ddl=""" +ddl = """ create table t1( id string, vendor_id int, @@ -42,26 +39,31 @@ engine = db.create_engine('openmldb:///db_test?zk=127.0.0.1:2181&zkPath=/openmldb') connection = engine.connect() try: - connection.execute("create database db_test;"); + connection.execute('create database db_test;') + # pylint: disable=broad-except except Exception as e: print(e) try: - connection.execute(ddl); + connection.execute(ddl) + # pylint: disable=broad-except except Exception as e: print(e) + def insert_row(line): row = line.split(',') - row[2] = '%dl'%int(datetime.datetime.strptime(row[2], '%Y-%m-%d %H:%M:%S').timestamp() * 1000) - row[3] = '%dl'%int(datetime.datetime.strptime(row[3], '%Y-%m-%d %H:%M:%S').timestamp() * 1000) - insert = "insert into t1 values('%s', %s, %s, %s, %s, %s, %s, %s, %s, '%s', %s);"% tuple(row) + row[2] = f"{int(datetime.datetime.strptime(row[2], '%Y-%m-%d %H:%M:%S').timestamp() * 1000)}l" + row[3] = f"{int(datetime.datetime.strptime(row[3], '%Y-%m-%d %H:%M:%S').timestamp() * 1000)}l" + insert = f"insert into t1 values('{row[0]}', {row[1]}, {row[2]}, {row[3]}, {row[4]}, {row[5]}, " \ + f"{row[6]}, {row[7]}, {row[8]}, '{row[9]}', {row[10]});" connection.execute(insert) -with open('data/taxi_tour_table_train_simple.csv', 'r') as fd: + +with open('data/taxi_tour_table_train_simple.csv', 'r', encoding='utf-8') as fd: idx = 0 - for line in fd: + for csv_line in fd: if idx == 0: idx = idx + 1 continue - insert_row(line.replace('\n', '')) + insert_row(csv_line.replace('\n', '')) idx = idx + 1 diff --git a/demo/talkingdata-adtracking-fraud-detection/predict.py b/demo/talkingdata-adtracking-fraud-detection/predict.py index 67a2dd24e18..00e3acfe1d7 100644 --- a/demo/talkingdata-adtracking-fraud-detection/predict.py +++ b/demo/talkingdata-adtracking-fraud-detection/predict.py @@ -13,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +"""Module of request predict in talkingdata-adtracking-fraud-detection""" import requests url = "http://127.0.0.1:8881/predict" diff --git a/demo/talkingdata-adtracking-fraud-detection/predict_server.py b/demo/talkingdata-adtracking-fraud-detection/predict_server.py index 696508c80dd..d7609486cda 100644 --- a/demo/talkingdata-adtracking-fraud-detection/predict_server.py +++ b/demo/talkingdata-adtracking-fraud-detection/predict_server.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +"""Module of predict server""" import argparse import json import numpy as np @@ -23,9 +24,9 @@ import xgboost as xgb import logging -logging.basicConfig(encoding='utf-8', level=logging.INFO, format="%(asctime)s-%(name)s-%(levelname)s-%(message)s") +logging.basicConfig(encoding="utf-8", level=logging.INFO, format="%(asctime)s-%(name)s-%(levelname)s-%(message)s") -arg_keys = ["endpoint", 'database', 'deployment', 'model_path'] +arg_keys = ["endpoint", "database", "deployment", "model_path"] bst = xgb.Booster() # schema column type, ref hybridse::sdk::DataTypeName table_schema = [] @@ -60,18 +61,19 @@ def request_row_cvt(json_row): """ row_data = [] for col in table_schema: - row_data.append(json_row.get(col['name'], "") if col['type'] == "string" else json_row.get(col['name'], 0)) - logging.info('request row: %s', row_data) + row_data.append(json_row.get(col["name"], "") if col["type"] == "string" else json_row.get(col["name"], 0)) + logging.info("request row: %s", row_data) return row_data def get_result(response): result = json.loads(response.text) - logging.info('request result: %s', result) - return result['data']['data'] + logging.info("request result: %s", result) + return result["data"]["data"] class PredictHandler(tornado.web.RequestHandler): + """Class PredictHandler.""" def post(self): # only one row row = json.loads(self.request.body) @@ -86,11 +88,12 @@ def post(self): prediction = bst.predict(ins) self.write( "---------------predict whether is attributed -------------\n") - self.write("%s" % str(prediction[0])) - logging.info('feature: %s, prediction: %s', res, prediction) + self.write(f"{str(prediction[0])}") + logging.info("feature: %s, prediction: %s", res, prediction) class MainHandler(tornado.web.RequestHandler): + """Class of MainHandler.""" def get(self): self.write("real time fe request demo\n") @@ -100,45 +103,44 @@ def args_validator(update_info): def make_url(): - return "http://%s/dbs/%s/deployments/%s" % (global_args['endpoint'], - global_args['database'], - global_args['deployment']) + return f"http://{global_args['endpoint']}/dbs/{global_args['database']}/deployments/{global_args['deployment']}" def update_schema(): r = requests.get(url) rs = json.loads(r.text) global table_schema - table_schema = rs['data']['input_schema'] + table_schema = rs["data"]["input_schema"] def update_model(): - bst.load_model(fname=global_args['model_path']) + bst.load_model(fname=global_args["model_path"]) def update_all(): global url url = make_url() update_schema() - logging.info('url and schema updated') + logging.info("url and schema updated") update_model() - logging.info('model updated') + logging.info("model updated") class UpdateHandler(tornado.web.RequestHandler): + """Class of UpdateHandler.""" def post(self): update_info = json.loads(self.request.body) # must use the full names global global_args - logging.info('before update: %s', global_args) + logging.info("before update: %s", global_args) # just update if update_info is empty # if not, do restrict update if update_info and not args_validator(update_info): - msg = 'invalid arg in {}, valid candidates {}'.format(update_info, arg_keys) + msg = f"invalid arg in {update_info}, valid candidates {arg_keys}" self.write(msg) return global_args = global_args | update_info - logging.info('update: %s', global_args) + logging.info("update: %s", global_args) update_all() self.write("ok\n") @@ -162,10 +164,9 @@ def make_app(): default=True) args = parser.parse_args() - global global_args global_args = vars(args) print(global_args) - logging.info('init args: %s', global_args) + logging.info("init args: %s", global_args) if args.init: update_all() diff --git a/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py b/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py index 421e0ea621f..ebd04e911a2 100644 --- a/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py +++ b/demo/talkingdata-adtracking-fraud-detection/train_and_serve.py @@ -1,37 +1,40 @@ +"""Module of training and request to predict server""" import gc import os import time import glob import requests # fmt:off +# pylint: disable=unused-import import openmldb import sqlalchemy as db import pandas as pd import xgboost as xgb + # fmt:on # openmldb cluster configs -zk = "127.0.0.1:2181" -zk_path = "/openmldb" +zk = '127.0.0.1:2181' +zk_path = '/openmldb' # db, deploy name and model_path will update to predict server. You only need to modify here. -db_name = "demo_db" -deploy_name = "demo" +db_name = 'demo_db' +deploy_name = 'demo' # save model to -model_path = "/tmp/model.json" +model_path = '/tmp/model.json' -table_name = "talkingdata" + str(int(time.time())) +table_name = 'talkingdata' + str(int(time.time())) # make sure that taskmanager can access the path -train_feature_dir = "/tmp/train_feature" +train_feature_dir = '/tmp/train_feature' -predict_server = "localhost:8881" +predict_server = 'localhost:8881' def column_string(col_tuple) -> str: return ' '.join(col_tuple) -def xgb_modelfit_nocv(params, dtrain, dvalid, predictors, target='target', objective='binary:logistic', metrics='auc', +def xgb_modelfit_nocv(params, dtrain, dvalid, objective='binary:logistic', metrics='auc', feval=None, num_boost_round=3000, early_stopping_rounds=20): xgb_params = { 'booster': 'gbtree', @@ -50,7 +53,7 @@ def xgb_modelfit_nocv(params, dtrain, dvalid, predictors, target='target', objec } xgb_params.update(params) - print("preparing validation datasets") + print('preparing validation datasets') evals_results = {} @@ -64,9 +67,9 @@ def xgb_modelfit_nocv(params, dtrain, dvalid, predictors, target='target', objec feval=feval) n_estimators = bst1.best_iteration - print("\nModel Report") - print("n_estimators : ", n_estimators) - print(metrics + ":", evals_results['eval'][metrics][n_estimators - 1]) + print('\nModel Report') + print('n_estimators : ', n_estimators) + print(metrics + ':', evals_results['eval'][metrics][n_estimators - 1]) return bst1 @@ -89,78 +92,76 @@ def xgb_modelfit_nocv(params, dtrain, dvalid, predictors, target='target', objec def cut_data(): data_path = 'data/' sample_cnt = 10000 # you can prepare sample data by yourself - print('Prepare train data, use {} rows, save it as train_sample.csv'.format(sample_cnt)) - train_df = pd.read_csv(data_path + "train.csv", nrows=sample_cnt, - dtype=dtypes, usecols=[c[0] for c in train_schema]) - assert len(train_df) == sample_cnt + print(f'Prepare train data, use {sample_cnt} rows, save it as train_sample.csv') + train_df_tmp = pd.read_csv(data_path + 'train.csv', nrows=sample_cnt, + dtype=dtypes, usecols=[c[0] for c in train_schema]) + assert len(train_df_tmp) == sample_cnt # take a portion from train sample data - train_df.to_csv("train_sample.csv", index=False) - del train_df + train_df_tmp.to_csv('train_sample.csv', index=False) + del train_df_tmp gc.collect() def nothrow_execute(sql): # only used for drop deployment, cuz 'if not exist' is not supported now try: - print("execute " + sql) - ok, rs = connection.execute(sql) + print('execute ' + sql) + _, rs = connection.execute(sql) print(rs) + # pylint: disable=broad-except except Exception as e: print(e) -print("Prepare openmldb, db {} table {}".format(db_name, table_name)) +print(f'Prepare openmldb, db {db_name} table {table_name}') # cut_data() engine = db.create_engine( - 'openmldb:///{}?zk={}&zkPath={}'.format(db_name, zk, zk_path)) + f'openmldb:///{db_name}?zk={zk}&zkPath={zk_path}') connection = engine.connect() -connection.execute("CREATE DATABASE IF NOT EXISTS {};".format(db_name)) +connection.execute(f'CREATE DATABASE IF NOT EXISTS {db_name};') schema_string = ','.join(list(map(column_string, train_schema))) -connection.execute("CREATE TABLE IF NOT EXISTS {}({});".format( - table_name, schema_string)) +connection.execute(f'CREATE TABLE IF NOT EXISTS {table_name}({schema_string});') -print("Load train_sample data to offline storage for training(hard copy)") -connection.execute("USE {}".format(db_name)) +print('Load train_sample data to offline storage for training(hard copy)') +connection.execute(f'USE {db_name}') connection.execute("SET @@execute_mode='offline';") # use sync offline job, to make sure `LOAD DATA` finished -connection.execute("SET @@sync_job=true;") -connection.execute("SET @@job_timeout=1200000;") +connection.execute('SET @@sync_job=true;') +connection.execute('SET @@job_timeout=1200000;') # use soft link after https://github.com/4paradigm/OpenMLDB/issues/1565 fixed -connection.execute("LOAD DATA INFILE 'file://{}' INTO TABLE {} OPTIONS(format='csv',header=true);".format( - os.path.abspath("train_sample.csv"), table_name)) +connection.execute(f"LOAD DATA INFILE 'file://{os.path.abspath('train_sample.csv')}' " + f"INTO TABLE {table_name} OPTIONS(format='csv',header=true);") print('Feature extraction') # the first column `is_attributed` is the label -sql_part = """ +sql_part = f""" select is_attributed, ip, app, device, os, channel, hour(click_time) as hour, day(click_time) as day, count(channel) over w1 as qty, count(channel) over w2 as ip_app_count, count(channel) over w3 as ip_app_os_count -from {} +from {table_name} window w1 as (partition by ip order by click_time ROWS_RANGE BETWEEN 1h PRECEDING AND CURRENT ROW), w2 as(partition by ip, app order by click_time ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), w3 as(partition by ip, app, os order by click_time ROWS_RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) -""".format(table_name) +""" # extraction will take time -connection.execute("SET @@job_timeout=1200000;") -connection.execute("{} INTO OUTFILE '{}' OPTIONS(mode='overwrite');".format( - sql_part, train_feature_dir)) +connection.execute('SET @@job_timeout=1200000;') +connection.execute(f"{sql_part} INTO OUTFILE '{train_feature_dir}' OPTIONS(mode='overwrite');") -print("Load features from feature dir {}".format(train_feature_dir)) +print(f'Load features from feature dir {train_feature_dir}') # train_feature_dir has multi csv files -train_df = pd.concat(map(lambda file: pd.read_csv(file), glob.glob( - os.path.join('', train_feature_dir + "/*.csv")))) -print("peek:") +train_df = pd.concat(map(pd.read_csv, glob.glob(os.path.join('', train_feature_dir + '/*.csv')))) +print('peek:') print(train_df.head()) len_train = len(train_df) train_row_cnt = int(len_train * 3 / 4) train_df = train_df[(len_train - train_row_cnt):len_train] val_df = train_df[:(len_train - train_row_cnt)] -print("train size: ", len(train_df)) -print("valid size: ", len(val_df)) +print('train size: ', len(train_df)) +print('valid size: ', len(val_df)) target = 'is_attributed' predictors = ['app', 'device', 'os', 'channel', 'hour', @@ -168,7 +169,7 @@ def nothrow_execute(sql): gc.collect() -print("Training by xgb") +print('Training by xgb') params_xgb = { 'num_leaves': 7, # we should let it be smaller than 2^(max_depth) 'max_depth': 3, # -1 means no limit @@ -188,8 +189,6 @@ def nothrow_execute(sql): bst = xgb_modelfit_nocv(params_xgb, xgtrain, watchlist, - predictors, - target, objective='binary:logistic', metrics='auc', num_boost_round=300, @@ -199,25 +198,25 @@ def nothrow_execute(sql): del val_df gc.collect() -print("Save model.json to ", model_path) +print('Save model.json to ', model_path) bst.save_model(model_path) -print("Prepare online serving") +print('Prepare online serving') -print("Deploy sql") +print('Deploy sql') connection.execute("SET @@execute_mode='online';") -connection.execute("USE {}".format(db_name)) -nothrow_execute("DROP DEPLOYMENT {}".format(deploy_name)) -deploy_sql = """DEPLOY {} {}""".format(deploy_name, sql_part) +connection.execute(f'USE {db_name}') +nothrow_execute(f'DROP DEPLOYMENT {deploy_name}') +deploy_sql = f"""DEPLOY {deploy_name} {sql_part}""" print(deploy_sql) connection.execute(deploy_sql) -print("Import data to online") +print('Import data to online') # online feature extraction needs history data # set job_timeout bigger if the `LOAD DATA` job timeout connection.execute( - "LOAD DATA INFILE 'file://{}' INTO TABLE {}.{} OPTIONS(mode='append',format='csv',header=true);".format( - os.path.abspath("train_sample.csv"), db_name, table_name)) + f"LOAD DATA INFILE 'file://{os.path.abspath('train_sample.csv')}' " + f"INTO TABLE {db_name}.{table_name} OPTIONS(mode='append',format='csv',header=true);") -print("Update model to predict server") -infos = {"database": db_name, "deployment": deploy_name, "model_path": model_path} -requests.post("http://" + predict_server + "/update", json=infos) +print('Update model to predict server') +infos = {'database': db_name, 'deployment': deploy_name, 'model_path': model_path} +requests.post('http://' + predict_server + '/update', json=infos)