diff --git a/algorithm/kapacity/metric/query.py b/algorithm/kapacity/metric/query.py index 0758ad2..43d2ddf 100644 --- a/algorithm/kapacity/metric/query.py +++ b/algorithm/kapacity/metric/query.py @@ -42,8 +42,12 @@ def fetch_metrics(addr, namespace, metric, scale_target, start, end): start=start, end=end) elif metric_type == 'Pods': - # TODO: support pods metric type - raise RuntimeError('UnsupportedMetricType') + return fetch_pod_metric_history(addr=addr, + namespace=namespace, + metric=metric, + scale_target=scale_target, + start=start, + end=end) elif metric_type == 'Object': return fetch_object_metric_history(addr=addr, namespace=namespace, @@ -71,19 +75,6 @@ def compute_history_range(history_len): return start, end -def fetch_replicas_metric_history(addr, namespace, metric, scale_target, start, end): - external = metric['external'] - metric_identifier = build_metric_identifier(external['metric']) - name, group_kind = get_obj_name_and_group_kind(scale_target) - workload_external = metric_pb.WorkloadExternalQuery(group_kind=group_kind, - namespace=namespace, - name=name, - metric=metric_identifier) - query = metric_pb.Query(type=metric_pb.WORKLOAD_EXTERNAL, - workload_external=workload_external) - return query_metrics(addr=addr, query=query, start=start, end=end) - - def fetch_resource_metric_history(addr, namespace, metric, scale_target, start, end): resource_name = metric['resource']['name'] name, group_kind = get_obj_name_and_group_kind(scale_target) @@ -113,6 +104,19 @@ def fetch_container_resource_metric_history(addr, namespace, metric, scale_targe return query_metrics(addr=addr, query=query, start=start, end=end) +def fetch_pod_metric_history(addr, namespace, metric, scale_target, start, end): + pods = metric['pods'] + metric_identifier = build_metric_identifier(pods['metric']) + name, group_kind = get_obj_name_and_group_kind(scale_target) + workload_external = metric_pb.WorkloadExternalQuery(group_kind=group_kind, + namespace=namespace, + name=name, + metric=metric_identifier) + query = metric_pb.Query(type=metric_pb.WORKLOAD_EXTERNAL, + workload_external=workload_external) + return query_metrics(addr=addr, query=query, start=start, end=end) + + def fetch_object_metric_history(addr, namespace, metric, start, end): obj = metric['object'] metric_identifier = build_metric_identifier(obj['metric']) @@ -167,19 +171,12 @@ def query_metrics(addr, query, start, end): def convert_metric_series_to_dataframe(series): - dataframe = None - for item in series: - array = [] - for point in item.points: - array.append([point.timestamp, point.value]) - df = pd.DataFrame(array, columns=['timestamp', 'value'], dtype=float) - df['timestamp'] = df['timestamp'].map(lambda x: x / 1000).astype('int64') - if dataframe is not None: - # TODO: consider if it's possible to have multiple series - pd.merge(dataframe, df, how='left', on='timestamp') - else: - dataframe = df - return dataframe + df_list = [] + for point in series[0].points: + df_list.append([point.timestamp, point.value]) + df = pd.DataFrame(df_list, columns=['timestamp', 'value'], dtype=float) + df['timestamp'] = df['timestamp'].map(lambda x: x / 1000).astype('int64') + return df def time_period_to_minutes(time_period): diff --git a/algorithm/kapacity/portrait/horizontal/predictive/main.py b/algorithm/kapacity/portrait/horizontal/predictive/main.py index d63b79e..a3c68f3 100644 --- a/algorithm/kapacity/portrait/horizontal/predictive/main.py +++ b/algorithm/kapacity/portrait/horizontal/predictive/main.py @@ -33,7 +33,6 @@ class EnvInfo: class MetricsContext: workload_identifier = None - resource_name = None resource_target = 0 resource_history = None replicas_history = None @@ -130,7 +129,7 @@ def predict_replicas(args, metric_ctx, pred_traffics): pred = estimator.estimate(history, pred_traffics, 'timestamp', - metric_ctx.resource_name, + 'resource', 'replicas', traffic_col, metric_ctx.resource_target, @@ -155,12 +154,10 @@ def merge_history_dict(history_dict): def resample_by_freq(old_df, freq, agg_funcs): - df = old_df.copy() - df = df.sort_values(by='timestamp', ascending=True) + df = old_df.sort_values(by='timestamp', ascending=True) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s') - df = df.resample(rule=freq, on='timestamp').agg(agg_funcs) - df = df.rename_axis('timestamp').reset_index() - df['timestamp'] = df['timestamp'].astype('int64') // 10 ** 9 + df = df.resample(rule=freq, on='timestamp').agg(agg_funcs).reset_index() + df['timestamp'] = df['timestamp'].astype('int64') // 1e9 return df @@ -185,18 +182,16 @@ def fetch_metrics_history(args, env, hp_cr): resource = metric['containerResource'] else: raise RuntimeError('MetricTypeError') - resource_history = query.fetch_metrics(env.metrics_server_addr, env.namespace, metric, scale_target, start, end) - metric_ctx.resource_name = resource['name'] metric_ctx.resource_target = compute_resource_target(env.namespace, resource, scale_target) - metric_ctx.resource_history = resource_history.rename(columns={'value': resource['name']}) + resource_history = query.fetch_metrics(env.metrics_server_addr, env.namespace, metric, scale_target, start, end) + metric_ctx.resource_history = resource_history.rename(columns={'value': 'resource'}) elif i == 1: - if metric_type != 'External': + if metric_type != 'Pods': raise RuntimeError('MetricTypeError') - replica_history = query.fetch_replicas_metric_history(env.metrics_server_addr, env.namespace, metric, - scale_target, start, end) + replica_history = query.fetch_metrics(env.metrics_server_addr, env.namespace, metric, scale_target, start, end) metric_ctx.replicas_history = replica_history.rename(columns={'value': 'replicas'}) else: - if metric_type != 'Object' and metric_type != 'External': + if metric_type != 'Pods' and metric_type != 'Object' and metric_type != 'External': raise RuntimeError('MetricTypeError') metric_name = metric['name'] traffic_history = query.fetch_metrics(env.metrics_server_addr, env.namespace, metric, scale_target, start, end) diff --git a/algorithm/kapacity/portrait/horizontal/predictive/replicas_estimator.py b/algorithm/kapacity/portrait/horizontal/predictive/replicas_estimator.py index 6170913..f4b0902 100644 --- a/algorithm/kapacity/portrait/horizontal/predictive/replicas_estimator.py +++ b/algorithm/kapacity/portrait/horizontal/predictive/replicas_estimator.py @@ -631,15 +631,15 @@ class EstimationException(Exception): pass -def estimate(data, - data_pred, - time_col, - resource_col, - replicas_col, - traffic_cols, - resource_target, - time_delta_hours, - test_dataset_size_in_seconds=86400): +def estimate(data: pd.DataFrame, + data_pred: pd.DataFrame, + time_col: str, + resource_col: str, + replicas_col: str, + traffic_cols: list[str], + resource_target: float, + time_delta_hours: int, + test_dataset_size_in_seconds: int = 86400) -> pd.DataFrame: logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s') logger = logging.getLogger() diff --git a/algorithm/kapacity/timeseries/forecasting/forecaster.py b/algorithm/kapacity/timeseries/forecasting/forecaster.py index 0c7b6d2..d986ed9 100644 --- a/algorithm/kapacity/timeseries/forecasting/forecaster.py +++ b/algorithm/kapacity/timeseries/forecasting/forecaster.py @@ -630,7 +630,7 @@ def fit(freq: str, context_length: int, learning_rate: float = 1e-3, epochs: int = 100, - batch_size: int = 1024, + batch_size: int = 32, num_workers: int = 0, model_path: str = './', df: pd.DataFrame = None,