diff --git a/aios/autil/autil/ThreadPool.cpp b/aios/autil/autil/ThreadPool.cpp index e0830bbeed..0a2d5baf7e 100644 --- a/aios/autil/autil/ThreadPool.cpp +++ b/aios/autil/autil/ThreadPool.cpp @@ -197,7 +197,7 @@ ThreadPool::ERROR_TYPE ThreadPool::pushWorkItem(WorkItem *item, bool isBlocked) ScopedLock lock(_cond); if (!isBlocked && _queue->size() >= _queueSize) { - AUTIL_LOG(INFO, + AUTIL_LOG(DEBUG, "thread pool [%s] is full, queueSize [%lu], active thread [%d]", _threadName.c_str(), _queueSize, diff --git a/aios/ha3/ha3/search/query_executor/IndexPartitionReaderUtil.cpp b/aios/ha3/ha3/search/query_executor/IndexPartitionReaderUtil.cpp index e2ef31c04a..14c06a8f6b 100644 --- a/aios/ha3/ha3/search/query_executor/IndexPartitionReaderUtil.cpp +++ b/aios/ha3/ha3/search/query_executor/IndexPartitionReaderUtil.cpp @@ -53,7 +53,7 @@ IndexPartitionReaderWrapperPtr IndexPartitionReaderUtil::createIndexPartitionRea bool usePartial) { auto tabletReader = partitionReaderSnapshot->GetTabletReader(mainTableName); if (tabletReader) { - AUTIL_LOG(DEBUG, + AUTIL_LOG(INFO, "create index partition reader wrapper with tabletReader," " mainTableName[%s], usePartial[%d]", mainTableName.c_str(), @@ -64,16 +64,20 @@ IndexPartitionReaderWrapperPtr IndexPartitionReaderUtil::createIndexPartitionRea return make_shared(tabletReader); } } else { + AUTIL_LOG(INFO, "createIndexPartitionReaderWrapper for table 1 %s", mainTableName.c_str()); const auto &tableMainSubIdxMap = partitionReaderSnapshot->getTableMainSubIdxMap(); auto iter = tableMainSubIdxMap.find(mainTableName); if (iter == tableMainSubIdxMap.end()) { IndexPartitionReaderPtr mainIndexPartReader = partitionReaderSnapshot->GetIndexPartitionReader(mainTableName); if (mainIndexPartReader == NULL) { + AUTIL_LOG(INFO, "createIndexPartitionReaderWrapper for table 2 %s", mainTableName.c_str()); return IndexPartitionReaderWrapperPtr(); } + AUTIL_LOG(INFO, "createIndexPartitionReaderWrapper for table 3 %s", mainTableName.c_str()); return createIndexPartitionReaderWrapper(mainIndexPartReader, usePartial); } else { + AUTIL_LOG(INFO, "createIndexPartitionReaderWrapper for table 4 %s", mainTableName.c_str()); return createIndexPartitionReaderWrapper(&(iter->second.indexName2IdMap), &(iter->second.attrName2IdMap), &(iter->second.indexReaderVec), diff --git a/aios/navi/engine/ResourceInitContext.cpp b/aios/navi/engine/ResourceInitContext.cpp index 611ed7bd53..bb4074f678 100644 --- a/aios/navi/engine/ResourceInitContext.cpp +++ b/aios/navi/engine/ResourceInitContext.cpp @@ -73,6 +73,7 @@ ResourceInitContext::ResourceInitContext( , _namedDataMap(namedDataMap) , _requireKernelNode(requireKernelNode) { + NAVI_KERNEL_LOG(INFO, "ResourceInitContext _partId %d", partId); } ResourceInitContext::~ResourceInitContext() { diff --git a/aios/sql/iquan/cpp/common/catalog/LocationDef.h b/aios/sql/iquan/cpp/common/catalog/LocationDef.h index 9497fecfb6..5ac916352b 100644 --- a/aios/sql/iquan/cpp/common/catalog/LocationDef.h +++ b/aios/sql/iquan/cpp/common/catalog/LocationDef.h @@ -31,6 +31,7 @@ struct LocationSign { uint32_t partitionCnt = 0; std::string nodeName; std::string nodeType; + std::string tableName; }; class LocationDef : public autil::legacy::Jsonizable { @@ -38,6 +39,7 @@ class LocationDef : public autil::legacy::Jsonizable { void Jsonize(autil::legacy::Jsonizable::JsonWrapper &json) override { json.Jsonize("partition_cnt", sign.partitionCnt, sign.partitionCnt); json.Jsonize("node_name", sign.nodeName, sign.nodeName); + json.Jsonize("location_table_name",sign.tableName,sign.tableName); json.Jsonize("node_type", sign.nodeType, sign.nodeType); json.Jsonize("tables", tableIdentities, tableIdentities); json.Jsonize("equivalent_hash_fields", equilvalentHashFields, equilvalentHashFields); diff --git a/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/client/common/json/catalog/IquanLocation.java b/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/client/common/json/catalog/IquanLocation.java index 36eed02e60..7aab03da65 100644 --- a/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/client/common/json/catalog/IquanLocation.java +++ b/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/client/common/json/catalog/IquanLocation.java @@ -16,6 +16,8 @@ @Getter @NoArgsConstructor public class IquanLocation { + @JsonProperty(value = "location_table_name", required = false) + private String locationTableName; @JsonProperty(value = "node_name", required = true) private String nodeName; @JsonProperty(value = "partition_cnt", required = true) diff --git a/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/api/schema/Location.java b/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/api/schema/Location.java index 5a44865fca..4766998e6e 100644 --- a/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/api/schema/Location.java +++ b/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/api/schema/Location.java @@ -7,6 +7,7 @@ public class Location { private final String nodeName; private final int partitionCnt; + private String locationTableName; public static final Location DEFAULT_QRS = new Location( "qrs", 1); public static final Location UNKNOWN = new Location( "unknown", Integer.MAX_VALUE); @@ -19,6 +20,7 @@ public Location(String nodeName, int partitionCnt) { public Location(IquanLocation iquanLocation) { this.nodeName = iquanLocation.getNodeName(); this.partitionCnt = iquanLocation.getPartitionCnt(); + this.locationTableName=iquanLocation.getLocationTableName(); } public boolean isSingle() { diff --git a/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/rel/ops/physical/IquanExchangeOp.java b/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/rel/ops/physical/IquanExchangeOp.java index 7358fea60a..51f6e26b5c 100644 --- a/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/rel/ops/physical/IquanExchangeOp.java +++ b/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/rel/ops/physical/IquanExchangeOp.java @@ -66,6 +66,7 @@ public void explainInternal(final Map map, SqlExplainLevel level ((IquanOptContext) inputRelNode.getCluster().getPlanner().getContext()).getExecutor().getDefaultDbName()); } IquanRelOptUtils.addMapIfNotEmpty(map, ConstantDefine.NODE_NAME, ((IquanRelNode) inputRelNode).getLocation().getNodeName()); + IquanRelOptUtils.addMapIfNotEmpty(map, "location_table_name", ((IquanRelNode) inputRelNode).getLocation().getLocationTableName()); IquanRelOptUtils.addMapIfNotEmpty(map, ConstantDefine.TABLE_DISTRIBUTION, RelDistributionUtil.formatDistribution((IquanRelNode) inputRelNode, true)); map.put(ConstantDefine.OUTPUT_PRUNABLE, outputPrunable); diff --git a/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/rel/ops/physical/IquanRelNode.java b/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/rel/ops/physical/IquanRelNode.java index 7b489c58bd..b80cc9b2ca 100644 --- a/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/rel/ops/physical/IquanRelNode.java +++ b/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/rel/ops/physical/IquanRelNode.java @@ -120,6 +120,7 @@ static void explainIquanRelNode(RelNode relNode, final Map map, if (location != null) { Map locationMeta = new TreeMap<>(); locationMeta.put(ConstantDefine.NODE_NAME, location.getNodeName()); + locationMeta.put("location_table_name", location.getLocationTableName()); locationMeta.put(ConstantDefine.PARTITION_CNT, location.getPartitionCnt()); map.put(ConstantDefine.LOCATION, locationMeta); } diff --git a/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/rel/ops/physical/IquanTableScanBase.java b/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/rel/ops/physical/IquanTableScanBase.java index a8903ff15a..4f850de8c2 100644 --- a/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/rel/ops/physical/IquanTableScanBase.java +++ b/aios/sql/iquan/java/iquan_core/src/main/java/com/taobao/search/iquan/core/rel/ops/physical/IquanTableScanBase.java @@ -412,6 +412,7 @@ public void explainInternal(final Map map, SqlExplainLevel level if (isRemoteScan && !Objects.isNull(location)) { Map locationMeta = new TreeMap<>(); locationMeta.put(ConstantDefine.NODE_NAME, location.getNodeName()); + locationMeta.put("location_table_name", location.getLocationTableName()); locationMeta.put(ConstantDefine.PARTITION_CNT, location.getPartitionCnt()); IquanRelOptUtils.addMapIfNotEmpty(map, ConstantDefine.LOCATION, locationMeta); } diff --git a/aios/sql/misc/sql_alog.conf b/aios/sql/misc/sql_alog.conf index f25d425b51..df9e9892a1 100644 --- a/aios/sql/misc/sql_alog.conf +++ b/aios/sql/misc/sql_alog.conf @@ -45,7 +45,7 @@ alog.appender.KmonAppender.log_keep_count=10 alog.appender.KmonAppender.layout=PatternLayout alog.appender.KmonAppender.layout.LogPattern=[%%d] [%%l] [%%t,%%F -- %%f():%%n] [%%m] -alog.logger.multi_call=INFO, GigAppender +alog.logger.multi_call=DEBUG, GigAppender inherit.multi_call=false alog.appender.GigAppender=FileAppender alog.appender.GigAppender.fileName=logs/gig/gig.log diff --git a/aios/sql/ops/delayDp/kernel/DelayDpKernel.cpp b/aios/sql/ops/delayDp/kernel/DelayDpKernel.cpp index e6f15e0d43..4d99cbf91b 100644 --- a/aios/sql/ops/delayDp/kernel/DelayDpKernel.cpp +++ b/aios/sql/ops/delayDp/kernel/DelayDpKernel.cpp @@ -288,6 +288,10 @@ bool DelayDpKernel::partHasData(int partId) { if (info.partRows.empty()) { continue; } + SQL_LOG(WARN, "partHasData size %ld partId %d", info.partRows.size(), partId); + if (partId >= info.partRows.size()){ + continue; + } const auto &rows = info.partRows[partId]; if (rows.empty()) { return false; diff --git a/aios/sql/ops/planTransform/GraphTransform.cpp b/aios/sql/ops/planTransform/GraphTransform.cpp index 92efe2edc7..49b5518f44 100644 --- a/aios/sql/ops/planTransform/GraphTransform.cpp +++ b/aios/sql/ops/planTransform/GraphTransform.cpp @@ -336,6 +336,7 @@ const string GraphTransform::EMPTY_STRING = ""; GraphTransformEnv::GraphTransformEnv() { disableWatermark = autil::EnvUtil::getEnv("disableWatermark", disableWatermark); useQrsTimestamp = autil::EnvUtil::getEnv("useQrsTimestamp", useQrsTimestamp); + searchNewBiz = autil::EnvUtil::getEnv("searchNewBiz", true); } GraphTransformEnv::~GraphTransformEnv() {} @@ -562,6 +563,10 @@ std::string GraphTransform::getRemoteBizName(iquan::PlanOp &op) { return getQrsBizName(); } } + auto tableName = getJsonStringValue(op, "table_name"); + if (GraphTransformEnv::get().searchNewBiz && tableName.empty() == false) { + return name + "." + tableName + ".write"; + } return name + "." + isearch::DEFAULT_SQL_BIZ_NAME; } else { return _config.searcherBizName; @@ -804,7 +809,9 @@ plan::PlanNode *GraphTransform::linkExchangeNode(plan::ExchangeNode &node, plan: subRoot->setInlineMode(true); } auto &op = *(node.op); - subRoot->setBizName(getRemoteBizName(op)); + auto remoteBizName=getRemoteBizName(op); + SQL_LOG(DEBUG, "getRemoteBizName is %s",remoteBizName.c_str()); + subRoot->setBizName(remoteBizName); subRoot->setCurDist(getJsonSegment(op, "table_distribution")); auto remoteDist = getJsonSegment(op, "output_distribution"); if (remoteDist.empty()) { diff --git a/aios/sql/ops/planTransform/GraphTransform.h b/aios/sql/ops/planTransform/GraphTransform.h index 79ec02ae9f..266b52ea9d 100644 --- a/aios/sql/ops/planTransform/GraphTransform.h +++ b/aios/sql/ops/planTransform/GraphTransform.h @@ -211,6 +211,7 @@ struct GraphTransformEnv { public: bool disableWatermark = false; bool useQrsTimestamp = true; + bool searchNewBiz = true; }; class GraphTransform : public plan::NodeVisitor { diff --git a/aios/sql/ops/scan/AttributeExpressionCreatorR.cpp b/aios/sql/ops/scan/AttributeExpressionCreatorR.cpp index 08fb76c315..64daf25da9 100644 --- a/aios/sql/ops/scan/AttributeExpressionCreatorR.cpp +++ b/aios/sql/ops/scan/AttributeExpressionCreatorR.cpp @@ -76,6 +76,10 @@ bool AttributeExpressionCreatorR::initExpressionCreator() { _indexPartitionReaderWrapper = isearch::search::IndexPartitionReaderUtil::createIndexPartitionReaderWrapper( _scanR->partitionReaderSnapshot.get(), _scanInitParamR->tableName); + if(!_indexPartitionReaderWrapper){ + SQL_LOG(ERROR, "initExpressionCreator _indexPartitionReaderWrapper is null table is %s",_scanInitParamR->tableName.c_str() ); + return false; + } _indexPartitionReaderWrapper->setSessionPool(_queryMemPoolR->getPool().get()); if (!createExpressionCreator()) { return false; diff --git a/aios/sql/python/sql_config_loader.py b/aios/sql/python/sql_config_loader.py index 56f926e648..7620873367 100644 --- a/aios/sql/python/sql_config_loader.py +++ b/aios/sql/python/sql_config_loader.py @@ -89,10 +89,16 @@ def fill_local_config_path(param_map): def get_log_config(): return { - "file_appenders": [ + "file_appenders" : [ { "file_name": "logs/navi.log", - "log_level": "info", + "log_level": "TRACE3", + "bt_filters": [ + ] + }, + { + "file_name": "logs/navi_schduler.log", + "log_level": "SCHEDULE3", "bt_filters": [ ] } diff --git a/aios/sql/python/sql_config_loader_new.py b/aios/sql/python/sql_config_loader_new.py new file mode 100644 index 0000000000..3fe6d7a7de --- /dev/null +++ b/aios/sql/python/sql_config_loader_new.py @@ -0,0 +1,341 @@ +import json +import os +import logging +import entry_loader +import sys +import sql_envs +import sql_utils +import copy + +sys.path.append(os.path.dirname(os.path.realpath(__file__))) + +QRS_BIZ_NAME_PREFIX_KEY = "QrsBizNamePrefix" + + +def load(config_path, load_param): + param_map = json.loads(load_param) + fill_local_config_path(param_map) + # logging.info("param_map: " + json.dumps(param_map, indent = 4)) + biz_metas = param_map["biz_metas"] + service_info = param_map["service_info"] + zone_name = service_info["zone_name"] + custom_app_info = param_map["custom_app_info"] + module_list = [] + on_qrs = ((sql_envs.get_role_type() == "qrs") or sql_envs.get_enable_local_access()) + for biz_name, conf in biz_metas.items(): + config_path = conf["config_path"] + biz_type = get_biz_type(conf) + if "model_biz" == biz_type: + continue + load_zone_name = zone_name + if on_qrs: + load_zone_name = "" + module_list += get_module_array(config_path, load_zone_name) + biz_config, biz_flow_config_map, gig_biz_sub_configs = get_biz_config(biz_metas, param_map) + config = { + "version": 0, + "log_config": get_log_config(), + "engine_config": get_engine_config(), + "modules": module_list, + "biz_config": biz_config, + "resource_config": [ + ], + "resource_list": [ + "suez_navi.health_check_rpc_r", + "sql.sql_rpc.r", + ], + "sleep_before_update_us": 0 * 1000 * 1000, + } + gig_config = { + "name": "navi.buildin.gig_client", + "config": get_gig_config(service_info, gig_biz_sub_configs, biz_flow_config_map), + } + config["resource_config"].append(gig_config) + default_biz = "" + for biz_name, conf in biz_config.items(): + if ".default_sql" in biz_name: + default_biz = biz_name + health_check_part_info_map = {} + for biz_name, conf in biz_config.items(): + health_check_part_info_map[biz_name] = { + "part_count": conf["part_count"], + "part_ids": conf["part_ids"] + } + health_check_config = { + "name": "suez_navi.health_check_rpc_r", + "config": { + "kernel_name": "suez_navi.health_check_k", + "cm2_http_alias": [ + "/SearchService/cm2_status", + "/SearchService/vip_status", + "/status_check", + "/QrsService/cm2_status", + ], + "biz_part_info_map": health_check_part_info_map + } + } + config["default_biz"] = default_biz + config["resource_config"].append(health_check_config) + return config + + +def fill_local_config_path(param_map): + biz_metas = param_map["biz_metas"] + biz_local_config_paths = param_map["biz_local_config_paths"] + for biz_name, conf in biz_metas.items(): + remote_config_path = conf["config_path"] + conf["config_path"] = biz_local_config_paths.get(biz_name, remote_config_path) + + +def get_log_config(): + return { + "file_appenders" : [ + { + "file_name": "logs/navi.log", + "log_level": "TRACE3", + "bt_filters": [ + ] + }, + { + "file_name": "logs/navi_schduler.log", + "log_level": "SCHEDULE3", + "bt_filters": [ + ] + } + ] + } + + +def get_engine_config(): + navi_thread_num = int(os.environ.get('naviThreadNum', 0)) + navi_processing_size = int(os.environ.get('naviProcessingSize', 300)) + navi_queue_size = int(os.environ.get('naviQueueSize', 1000)) + + navi_extra_task_queue = {} + extra_task_queue = os.environ.get('naviExtraTaskQueue', '') + for task_queue in extra_task_queue.split(';'): + if task_queue == '': + continue + queue_list = task_queue.split('|') + if len(queue_list) != 4: + raise Exception("task_queue size must be 4, actual [{}]".format(queue_list)) + queue_config = { + "thread_num": int(queue_list[1]), + "queue_size": int(queue_list[2]), + "processing_size": int(queue_list[3]), + } + navi_extra_task_queue[queue_list[0]] = queue_config + return { + "builtin_task_queue": { + "thread_num": navi_thread_num, + "queue_size": navi_queue_size, + "processing_size": navi_processing_size, + }, + "extra_task_queue": navi_extra_task_queue, + } + + +def get_module_array(config_path, load_zone_name): + agg = sql_utils.get_agg_plugins(config_path) + tvf = sql_utils.get_tvf_plugins(config_path) + func = sql_utils.get_function_plugins(config_path, load_zone_name) + return list(set(agg + tvf + func)) + + +def get_biz_type(conf): + if "custom_biz_info" in conf: + if "biz_type" in conf["custom_biz_info"]: + return conf["custom_biz_info"]["biz_type"] + return "" + + +def get_biz_config(biz_metas, param_map): + table_part_info = param_map["table_part_info"] + service_info = param_map["service_info"] + part_count = service_info["part_count"] + part_id = service_info["part_id"] + zone_name = service_info["zone_name"] + is_qrs = (sql_envs.get_role_type() == "qrs") + biz_config_map = {} + flow_config_map = {} + biz_gig_config_list = [] + for biz_name, conf in biz_metas.items(): + biz_param_map = copy.deepcopy(param_map) + biz_name = get_real_biz_name(zone_name, is_qrs) + biz_param_map["biz_name"] = biz_name + config_path = conf["config_path"] + max_part_count, max_part_ids = get_table_max_part_count(is_qrs, config_path, zone_name, table_part_info) + navi_biz_conf = {} + biz_gig_config = {} + biz_flow_config_map = {} + default_script = os.path.join(biz_param_map["install_root"], "sql_default.py") + navi_biz_conf, biz_gig_config, biz_flow_config_map = entry_loader.load( + config_path, biz_param_map, default_script) + navi_biz_conf["config_path"] = config_path + navi_biz_conf["part_count"] = max_part_count + navi_biz_conf["part_ids"] = max_part_ids + biz_config_map[biz_name] = navi_biz_conf + if biz_flow_config_map.keys() & flow_config_map.keys(): + raise Exception("flow control config key conflict, biz map: [%s], all [%s]" % + (json.dumps(biz_flow_config_map, indent=4), json.dumps(flow_config_map, indent=4))) + flow_config_map.update(biz_flow_config_map) + biz_gig_config_list.append(biz_gig_config) + if not is_qrs: + for table, table_info in table_part_info.items(): + biz_param_map = copy.deepcopy(param_map) + biz_name = get_table_real_biz_name(zone_name, is_qrs, table) + biz_param_map["biz_name"] = biz_name + max_part_count=table_part_info[table]["part_count"] + max_part_ids=table_part_info[table]["part_ids"] + navi_biz_conf = {} + biz_gig_config = {} + biz_flow_config_map = {} + default_script = os.path.join(biz_param_map["install_root"], "sql_default.py") + navi_biz_conf, biz_gig_config, biz_flow_config_map = entry_loader.load( + config_path, biz_param_map, default_script) + navi_biz_conf["config_path"] = config_path + navi_biz_conf["part_count"] = max_part_count + navi_biz_conf["part_ids"] = max_part_ids + biz_config_map[biz_name] = navi_biz_conf + if biz_flow_config_map.keys() & flow_config_map.keys(): + raise Exception("flow control config key conflict, biz map: [%s], all [%s]" % + (json.dumps(biz_flow_config_map, indent=4), json.dumps(flow_config_map, indent=4))) + flow_config_map.update(biz_flow_config_map) + biz_gig_config_list.append(biz_gig_config) + return biz_config_map, flow_config_map, biz_gig_config_list + + +def get_table_max_part_count(is_qrs, config_path, zone_name, table_part_info): + if is_qrs: + return 1, [0] + depend_tables = sql_utils.get_depend_tables(config_path, zone_name) + + max_part_count = 0 + if 0 == len(depend_tables): + max_table_name = None + for table_name, part_info in table_part_info.items(): + part_count = part_info["part_count"] + if part_count > max_part_count: + max_part_count = part_count + max_table_name = table_name + else: + max_table_name = None + for table_name in depend_tables: + part_info = table_part_info[table_name] + part_count = part_info["part_count"] + if part_count > max_part_count: + max_part_count = part_count + max_table_name = table_name + + if max_table_name is None: + return 1, [0] + max_part_ids = table_part_info[max_table_name]["part_ids"] + return max_part_count, max_part_ids + + +def get_real_biz_name(zone_name, is_qrs): + if is_qrs: + prefix = os.environ.get(QRS_BIZ_NAME_PREFIX_KEY) + if prefix: + return f"{prefix}.qrs.default_sql" + else: + return "qrs.default_sql" + else: + return f"{zone_name}.default_sql" + +def get_table_real_biz_name(zone_name, is_qrs, table_name): + if is_qrs: + prefix = os.environ.get(QRS_BIZ_NAME_PREFIX_KEY) + if prefix: + return f"{prefix}.qrs.default_sql" + else: + return "qrs.default_sql" + else: + return zone_name + "."+table_name+".write" + +def get_cm2_config_array(service_info): + cm2_config = [] + if "sub_cm2_configs" in service_info: + if len(service_info["sub_cm2_configs"]) > 0: + cm2_config = service_info["sub_cm2_configs"] + + if len(cm2_config) == 0: + if "cm2_config" in service_info: + if len(service_info["cm2_config"]) > 0: + cm2_config.append(service_info["cm2_config"]) + return cm2_config + + +def add_cm2_config(cm2_config_map, zk_host, zk_path, clusters): + serverKey = zk_host + zk_path + if serverKey not in cm2_config_map: + cm2_config_map[serverKey] = {} + cm2_config_map[serverKey]["zk_host"] = zk_host + cm2_config_map[serverKey]["zk_path"] = zk_path + if "cm2_part" not in cm2_config_map[serverKey]: + cm2_config_map[serverKey]["cm2_part"] = clusters + else: + cm2_config_map[serverKey]["cm2_part"] += clusters + + +def get_gig_config(service_info, gig_biz_sub_configs, biz_flow_config_map): + cm2_config_array = get_cm2_config_array(service_info) + for biz_gig_config in gig_biz_sub_configs: + if "cm2" in biz_gig_config: + cm2_config_array.append(biz_gig_config["cm2"]) + if "cm2_configs" in biz_gig_config: + cm2_config_array += biz_gig_config["cm2_configs"] + if "istio" in biz_gig_config: + cm2_config_array.append(biz_gig_config["istio"]) + if "istio_configs" in biz_gig_config: + cm2_config_array += biz_gig_config["istio_configs"] + if "local" in biz_gig_config: + cm2_config_array.append({ + "local": biz_gig_config["local"] + }) + cm2_config_map = {} + istios_subs = [] + local_subs = [] + cm2_subs = [] + for config in cm2_config_array: + if "istio_host" in config: + istios_subs.append(config) + if "local" in config: + local_subs += config["local"] + if "cm2_server_cluster_name" in config: + zk_host = config["cm2_server_zookeeper_host"] + zk_path = config["cm2_server_leader_path"] + clusters = config["cm2_server_cluster_name"].split(",") + add_cm2_config(cm2_config_map, zk_host, zk_path, clusters) + if "zk_host" in config: + zk_host = config["zk_host"] + zk_path = config["zk_path"] + clusters = config["cm2_part"] + add_cm2_config(cm2_config_map, zk_host, zk_path, clusters) + cm2_subs = list(cm2_config_map.values()) + sub_config = {} + if len(cm2_subs) > 0: + sub_config["cm2_configs"] = cm2_subs + if len(local_subs) > 0: + sub_config["local"] = local_subs + if len(istios_subs) > 0: + sub_config["istio_configs"] = istios_subs + # sub_config["allow_empty_sub"] = False + return { + "init_config": { + "subscribe_config": sub_config, + "misc_config": { + }, + "connection_config": { + "grpc_stream": { + "thread_num": 20, + "queue_size": 1000, + }, + "arpc": { + "thread_num": 20, + "queue_size": 1000, + } + } + }, + "flow_config": biz_flow_config_map + } diff --git a/aios/sql/resource/Ha3TableInfoR.cpp b/aios/sql/resource/Ha3TableInfoR.cpp index 460a461edd..9c0fdc35a6 100644 --- a/aios/sql/resource/Ha3TableInfoR.cpp +++ b/aios/sql/resource/Ha3TableInfoR.cpp @@ -103,8 +103,12 @@ bool Ha3TableInfoR::getTableSchemas( NAVI_KERNEL_LOG(ERROR, "empty index app map"); return false; } - auto indexAppPtr = id2IndexAppMap.begin()->second; - indexAppPtr->GetTableSchemas(tableSchemas); + //auto indexAppPtr = id2IndexAppMap.begin()->second; + //indexAppPtr->GetTableSchemas(tableSchemas); + std::set tableNames; + for (const auto &pair : id2IndexAppMap) { + pair.second->GetTableSchemas(tableSchemas, tableNames); + } return true; } @@ -113,7 +117,7 @@ bool Ha3TableInfoR::generateMeta( const auto &joinRelations = _tableInfoR->getJoinRelations(); const std::string &dbName = _zoneName; iquan::CatalogDefs catalogDefs; - LocationSign locationSign = {_partCount, _zoneName, "searcher"}; + LocationSign locationSign = {_partCount, _zoneName, "searcher", ""}; catalogDefs.catalog(SQL_DEFAULT_CATALOG_NAME).location(locationSign); // merge IndexPartitionSchema from tablet @@ -125,6 +129,11 @@ bool Ha3TableInfoR::generateMeta( addInnerDocId(tableDef); } const auto &tableName = tableSchema->GetTableName(); + NAVI_KERNEL_LOG(INFO, + "add locationSign.tableName, zoneName [%s] tableName [%s]", + _zoneName.c_str(), + tableName.c_str()); + locationSign.tableName=tableName; if (!_ha3ClusterDefR->fillTableDef(_zoneName, tableName, tableDef, _tableSortDescMap)) { NAVI_KERNEL_LOG(ERROR, "fillTableDef from cluster def failed, zoneName [%s] tableName [%s]", diff --git a/aios/storage/indexlib/indexlib/partition/index_application.cpp b/aios/storage/indexlib/indexlib/partition/index_application.cpp index f79b3680e2..99caa890a3 100644 --- a/aios/storage/indexlib/indexlib/partition/index_application.cpp +++ b/aios/storage/indexlib/indexlib/partition/index_application.cpp @@ -555,15 +555,23 @@ void IndexApplication::GetIndexPartitionReaderInfos(vector>& tableSchemas) +void IndexApplication::GetTableSchemas(std::vector>& tableSchemas, std::set &tableNames) { vector snapshotIndexPartitionReaders; mReaderContainer.CreateSnapshot(snapshotIndexPartitionReaders); for (size_t i = 0; i < snapshotIndexPartitionReaders.size(); i++) { - tableSchemas.push_back(snapshotIndexPartitionReaders[i]->GetTabletSchema()); + auto schema = snapshotIndexPartitionReaders[i]->GetTabletSchema(); + if (tableNames.find(schema->GetTableName()) == tableNames.end()) { + tableSchemas.push_back(schema); + tableNames.insert(schema->GetTableName()); + } } for (const auto& tablet : _tablets) { - tableSchemas.push_back(tablet->GetTabletSchema()); + auto schema = tablet->GetTabletSchema(); + if (tableNames.find(schema->GetTableName()) == tableNames.end()) { + tableSchemas.push_back(schema); + tableNames.insert(schema->GetTableName()); + } } } diff --git a/aios/storage/indexlib/indexlib/partition/index_application.h b/aios/storage/indexlib/indexlib/partition/index_application.h index e45a8d4889..de7f5fae86 100644 --- a/aios/storage/indexlib/indexlib/partition/index_application.h +++ b/aios/storage/indexlib/indexlib/partition/index_application.h @@ -66,7 +66,7 @@ class IndexApplication bool Init(const std::vector& indexPartitions); PartitionReaderSnapshotPtr CreateSnapshot(); PartitionReaderSnapshotPtr CreateSnapshot(const std::string& leadingTableName, int64_t maxCacheTimeUs = 0); - void GetTableSchemas(std::vector>& tableSchemas); + void GetTableSchemas(std::vector>& tableSchemas, std::set &tableNames); std::shared_ptr GetTableSchema(const std::string& tableName) const; void GetTableLatestDataTimestamps(std::vector& dataTsInfos); diff --git a/aios/suez/sdk/TableWriter.cpp b/aios/suez/sdk/TableWriter.cpp index 68230affa1..fcf41e44d3 100644 --- a/aios/suez/sdk/TableWriter.cpp +++ b/aios/suez/sdk/TableWriter.cpp @@ -152,6 +152,7 @@ bool TableWriter::init(const build_service::proto::PartitionId &pid, _reporter = reporter; _walConfig = std::make_unique(walConfig); _walConfig->desc = _pid->range().ShortDebugString(); + _walConfig->range = std::make_pair(_pid->range().from(), _pid->range().to()); _swiftClientCreator = swiftClientCreator; auto resourceReader = std::make_shared(configPath); diff --git a/aios/suez/table/QueueRawDocumentReader.cpp b/aios/suez/table/QueueRawDocumentReader.cpp index b3f85a733b..4fdad1d04f 100644 --- a/aios/suez/table/QueueRawDocumentReader.cpp +++ b/aios/suez/table/QueueRawDocumentReader.cpp @@ -37,19 +37,22 @@ bool QueueRawDocumentReader::initialize(const ReaderInitParam ¶ms) { if (!RawDocumentReader::initialize(params)) { return false; } - if (!initQueue(params.kvMap)) { + if (!initQueue(params)) { return false; } AUTIL_LOG(INFO, "create queue raw document reader success, queue_name[%s]", _queueName.c_str()); return true; } -bool QueueRawDocumentReader::initQueue(const KeyValueMap &kvMap) { - _queueName = getValueFromKeyValueMap(kvMap, QUEUE_NAME); - if (_queueName.empty()) { +bool QueueRawDocumentReader::initQueue(const ReaderInitParam ¶ms) { + string queueName = getValueFromKeyValueMap(params.kvMap, QUEUE_NAME); + if (queueName.empty()) { AUTIL_LOG(ERROR, "queue name is empty for queue_wal."); return false; } + _queueName = GlobalQueueManager::generateQueueName(queueName, + params.range.from(), params.range.to()); + _docQueuePtr = GlobalQueueManager::getInstance()->createQueue(_queueName); if (_docQueuePtr == nullptr) { AUTIL_LOG(ERROR, "get queue failed, queue_name[%s]", _queueName.c_str()); diff --git a/aios/suez/table/QueueRawDocumentReader.h b/aios/suez/table/QueueRawDocumentReader.h index fa57f1edfe..14e72c2402 100644 --- a/aios/suez/table/QueueRawDocumentReader.h +++ b/aios/suez/table/QueueRawDocumentReader.h @@ -39,7 +39,7 @@ class QueueRawDocumentReader : public build_service::reader::RawDocumentReader { virtual bool isEof() const; private: - bool initQueue(const build_service::KeyValueMap &kvMap); + bool initQueue(const build_service::reader::ReaderInitParam ¶ms); private: const static std::string QUEUE_NAME; diff --git a/aios/suez/table/test/QueueRawDocumentReaderTest.cpp b/aios/suez/table/test/QueueRawDocumentReaderTest.cpp index 81b806ae86..a8c86d0d83 100644 --- a/aios/suez/table/test/QueueRawDocumentReaderTest.cpp +++ b/aios/suez/table/test/QueueRawDocumentReaderTest.cpp @@ -21,18 +21,21 @@ void QueueRawDocumentReaderTest::tearDown() {} TEST_F(QueueRawDocumentReaderTest, testInitQueue) { { - KeyValueMap kvMap; + ReaderInitParam params; QueueRawDocumentReader reader; - EXPECT_TRUE(!reader.initQueue(kvMap)); + EXPECT_TRUE(!reader.initQueue(params)); EXPECT_TRUE(reader._docQueuePtr == nullptr); } { - KeyValueMap kvMap; + ReaderInitParam params; + params.range.set_from(0); + params.range.set_to(32767); + auto& kvMap = params.kvMap; kvMap[QueueRawDocumentReader::QUEUE_NAME] = "test"; QueueRawDocumentReader reader; - EXPECT_TRUE(reader.initQueue(kvMap)); + EXPECT_TRUE(reader.initQueue(params)); EXPECT_TRUE(reader._docQueuePtr != nullptr); - EXPECT_EQ(string("test"), reader._queueName); + EXPECT_EQ(string("test_0_32767"), reader._queueName); } } @@ -45,10 +48,13 @@ TEST_F(QueueRawDocumentReaderTest, testReadDocStrFailed) { EXPECT_EQ(RawDocumentReader::ERROR_EXCEPTION, ec); } { - KeyValueMap kvMap; + ReaderInitParam params; + params.range.set_from(0); + params.range.set_to(32767); + auto& kvMap = params.kvMap; kvMap[QueueRawDocumentReader::QUEUE_NAME] = "test"; QueueRawDocumentReader reader; - EXPECT_TRUE(reader.initQueue(kvMap)); + EXPECT_TRUE(reader.initQueue(params)); string docStr; DocInfo docInfo; auto ec = reader.readDocStr(docStr, nullptr, docInfo); @@ -57,10 +63,13 @@ TEST_F(QueueRawDocumentReaderTest, testReadDocStrFailed) { } TEST_F(QueueRawDocumentReaderTest, testReadDocStrSuccess) { - KeyValueMap kvMap; + ReaderInitParam params; + params.range.set_from(0); + params.range.set_to(32767); + auto& kvMap = params.kvMap; kvMap[QueueRawDocumentReader::QUEUE_NAME] = "test1"; QueueRawDocumentReader reader; - EXPECT_TRUE(reader.initQueue(kvMap)); + EXPECT_TRUE(reader.initQueue(params)); reader._docQueuePtr->Push({0, string("abc")}); string docStr; DocInfo docInfo; diff --git a/aios/suez/table/wal/GlobalQueueManager.cpp b/aios/suez/table/wal/GlobalQueueManager.cpp index 1f43526e0c..3335e9a8fa 100644 --- a/aios/suez/table/wal/GlobalQueueManager.cpp +++ b/aios/suez/table/wal/GlobalQueueManager.cpp @@ -18,6 +18,7 @@ #include #include "autil/LockFreeQueue.h" +#include "autil/StringUtil.h" using namespace std; using namespace autil; @@ -52,6 +53,16 @@ void GlobalQueueManager::releaseQueue(const std::string &queueName) { } } -GlobalQueueManager *GlobalQueueManager::getInstance() { return &queueManager; } +GlobalQueueManager *GlobalQueueManager::getInstance() { + return &queueManager; +} + +string GlobalQueueManager::generateQueueName(const std::string &queueName, + uint32_t from, uint32_t to) +{ + string newName = queueName + "_" + StringUtil::toString(from) + + "_" + StringUtil::toString(to); + return newName; +} } // namespace suez diff --git a/aios/suez/table/wal/GlobalQueueManager.h b/aios/suez/table/wal/GlobalQueueManager.h index d45694a199..2cabbaf863 100644 --- a/aios/suez/table/wal/GlobalQueueManager.h +++ b/aios/suez/table/wal/GlobalQueueManager.h @@ -37,6 +37,9 @@ class GlobalQueueManager { public: static GlobalQueueManager *getInstance(); + static std::string generateQueueName(const std::string &queueName, + uint32_t from, uint32_t to); + RawDocQueue createQueue(const std::string &queueName); void releaseQueue(const std::string &queueName); diff --git a/aios/suez/table/wal/QueueWAL.cpp b/aios/suez/table/wal/QueueWAL.cpp index 8975effebd..59652b8e08 100644 --- a/aios/suez/table/wal/QueueWAL.cpp +++ b/aios/suez/table/wal/QueueWAL.cpp @@ -38,11 +38,13 @@ QueueWAL::~QueueWAL() {} bool QueueWAL::init(const WALConfig &config) { const auto &kvMap = config.sinkDescription; - _queueName = build_service::getValueFromKeyValueMap(kvMap, QUEUE_NAME); - if (_queueName.empty()) { + string queueName = build_service::getValueFromKeyValueMap(kvMap, QUEUE_NAME); + if (queueName.empty()) { AUTIL_LOG(ERROR, "queue name is empty for queue_wal."); return false; } + _queueName = GlobalQueueManager::generateQueueName(queueName, + config.range.first, config.range.second); _docQueuePtr = GlobalQueueManager::getInstance()->createQueue(_queueName); if (_docQueuePtr == nullptr) { AUTIL_LOG(ERROR, "get queue failed, queue_name[%s]", _queueName.c_str()); diff --git a/aios/suez/table/wal/WALConfig.h b/aios/suez/table/wal/WALConfig.h index 852e570451..4535e3816d 100644 --- a/aios/suez/table/wal/WALConfig.h +++ b/aios/suez/table/wal/WALConfig.h @@ -33,6 +33,7 @@ class WALConfig final : public autil::legacy::Jsonizable { std::map sinkDescription; int64_t timeoutMs = 2 * 1000; // 2s std::string desc; + std::pair range; // no need jsonize }; } // namespace suez diff --git a/aios/suez_navi/search/NaviSearchManager.cpp b/aios/suez_navi/search/NaviSearchManager.cpp index faa4b71d79..25928769da 100644 --- a/aios/suez_navi/search/NaviSearchManager.cpp +++ b/aios/suez_navi/search/NaviSearchManager.cpp @@ -135,6 +135,7 @@ suez::UPDATE_RESULT NaviSearchManager::update( AUTIL_LOG(ERROR, "navi is null, search manager might have stopped"); return suez::UR_ERROR; } + AUTIL_LOG(DEBUG, "NaviSearchManager::update loadParam is %s _configLoader %s ",loadParam.c_str(),_configLoader.c_str() ); if (!_navi->update(_configLoader, "", loadParam, rootResourceMap)) { AUTIL_LOG(ERROR, "navi update failed"); return suez::UR_ERROR; diff --git a/aios/suez_turing/suez/turing/expression/util/TableInfoConfigurator.cpp b/aios/suez_turing/suez/turing/expression/util/TableInfoConfigurator.cpp index febf3b9c8e..f071f1d4ad 100644 --- a/aios/suez_turing/suez/turing/expression/util/TableInfoConfigurator.cpp +++ b/aios/suez_turing/suez/turing/expression/util/TableInfoConfigurator.cpp @@ -118,7 +118,8 @@ TableInfoPtr TableInfoConfigurator::createFromSchema(const shared_ptr> tableSchemas; - indexApp->GetTableSchemas(tableSchemas); + std::set tableNames; + indexApp->GetTableSchemas(tableSchemas, tableNames); shared_ptr mainTableSchema; for (auto &schema : tableSchemas) { if (schema->GetTableName() == mainTable) { diff --git a/aios/suez_turing/suez/turing/search/base/MultiTableWrapper.cpp b/aios/suez_turing/suez/turing/search/base/MultiTableWrapper.cpp index 3ea3f41726..02333c7bcd 100644 --- a/aios/suez_turing/suez/turing/search/base/MultiTableWrapper.cpp +++ b/aios/suez_turing/suez/turing/search/base/MultiTableWrapper.cpp @@ -86,7 +86,7 @@ bool MultiTableWrapper::init(const suez::MultiTableReader &multiTableReader, for (const auto &iter : _id2IndexAppMap) { partIds += StringUtil::toString(iter.first) + ", "; } - AUTIL_LOG(INFO, "parition count [%lu], ids:[%s]", _id2IndexAppMap.size(), partIds.c_str()); + AUTIL_LOG(INFO, "parition count [%lu], ids:[%s] for table %s", _id2IndexAppMap.size(), partIds.c_str(), itemTableName.c_str()); return true; } @@ -142,6 +142,17 @@ void createMultiMap(const SingleTableReaderMapMap &singleTableReaderMapMap, AUTIL_LOG(WARN, "table [%s] partition size is 0.", tableName.c_str()); continue; } + for (const auto &entry : singleTableReaderMap) { + if (entry.first.index == idx) { + auto data = entry.second->get(); + if (data != nullptr) { + result[tableName] = data; + tableVersionMap[tableName] = entry.first.getFullVersion(); + } + break; + } + } + /* if (singleTableReaderMap.size() > idx) { auto readerIter = singleTableReaderMap.begin(); std::advance(readerIter, idx); @@ -160,6 +171,7 @@ void createMultiMap(const SingleTableReaderMapMap &singleTableReaderMapMap, } } } + */ } } @@ -182,7 +194,7 @@ bool MultiTableWrapper::createIndexApplications(const SingleTableReaderMapMap &s // compatiable old, to create single indexApplications return createSingleIndexApplication(singleTableReaderMapMap, joinRelationMap); } else { - return createMultiIndexApplication(singleTableReaderMapMap, joinRelationMap, partPos); + return createMultiIndexApplication(singleTableReaderMapMap, joinRelationMap, maxPartCount); } } @@ -210,8 +222,8 @@ bool MultiTableWrapper::createSingleIndexApplication(const SingleTableReaderMapM bool MultiTableWrapper::createMultiIndexApplication(const SingleTableReaderMapMap &singleTableReaderMapMap, const JoinRelationMap &joinRelationMap, - const vector &partPos) { - for (size_t i = 0; i < partPos.size(); i++) { + const int32_t maxPartCount) { + for (int32_t i = 0; i < maxPartCount; i++) { suez::IndexPartitionMap indexPartitions; TabletMap tablets; TableVersionMap tableVersionMap; @@ -228,7 +240,7 @@ bool MultiTableWrapper::createMultiIndexApplication(const SingleTableReaderMapMa return false; } genTableInfoMap(tableVersionMap, indexPartitions, tablets); - _id2IndexAppMap[partPos[i]] = indexApp; + _id2IndexAppMap[i] = indexApp; } return true; } @@ -352,28 +364,33 @@ void MultiTableWrapper::genTableInfoWithRel(const std::string &itemTableName) { void MultiTableWrapper::genTableInfoMap(const TableVersionMap &tableVersionMap, const suez::IndexPartitionMap &indexPartitions, const TabletMap &tabletMap) { - if (_tableInfoMapWithoutRel.empty()) { - for (const auto &partitionPair : indexPartitions) { - const auto &partition = partitionPair.second; - int32_t version = -1; - auto it = tableVersionMap.find(partitionPair.first); - if (it != tableVersionMap.end()) { - version = it->second; - } - _tableInfoMapWithoutRel[partitionPair.first] = - TableInfoConfigurator::createFromSchema(partition->GetSchema(), version); + + for (const auto &partitionPair : indexPartitions) { + if (_tableInfoMapWithoutRel.find(partitionPair.first) != _tableInfoMapWithoutRel.end()) { + continue; } + const auto &partition = partitionPair.second; + int32_t version = -1; + auto it = tableVersionMap.find(partitionPair.first); + if (it != tableVersionMap.end()) { + version = it->second; + } + _tableInfoMapWithoutRel[partitionPair.first] = + TableInfoConfigurator::createFromSchema(partition->GetSchema(), version); + } - for (const auto &tabletPair : tabletMap) { - const auto &tablet = tabletPair.second; - int32_t version = -1; - auto it = tableVersionMap.find(tabletPair.first); - if (it != tableVersionMap.end()) { - version = it->second; - } - _tableInfoMapWithoutRel[tabletPair.first] = - TableInfoConfigurator::createFromSchema(tablet->GetTabletSchema(), version); + for (const auto &tabletPair : tabletMap) { + if (_tableInfoMapWithoutRel.find(tabletPair.first) != _tableInfoMapWithoutRel.end()) { + continue; + } + const auto &tablet = tabletPair.second; + int32_t version = -1; + auto it = tableVersionMap.find(tabletPair.first); + if (it != tableVersionMap.end()) { + version = it->second; } + _tableInfoMapWithoutRel[tabletPair.first] = + TableInfoConfigurator::createFromSchema(tablet->GetTabletSchema(), version); } } diff --git a/aios/suez_turing/suez/turing/search/base/MultiTableWrapper.h b/aios/suez_turing/suez/turing/search/base/MultiTableWrapper.h index 5977d8a270..5c22b02bcd 100644 --- a/aios/suez_turing/suez/turing/search/base/MultiTableWrapper.h +++ b/aios/suez_turing/suez/turing/search/base/MultiTableWrapper.h @@ -94,7 +94,7 @@ class MultiTableWrapper { const indexlib::partition::JoinRelationMap &joinRelationMap); bool createMultiIndexApplication(const suez::SingleTableReaderMapMap &singleTableReaderMapMap, const indexlib::partition::JoinRelationMap &joinRelationMap, - const std::vector &partPos); + const int32_t maxPartCount); indexlib::partition::IndexApplicationPtr createIndexApplication(const suez::IndexPartitionMap &indexPartitionMap, diff --git a/docs/havenask_docs/sql/guide/quickstart.md b/docs/havenask_docs/sql/guide/quickstart.md index 2c60138118..3649fb3513 100644 --- a/docs/havenask_docs/sql/guide/quickstart.md +++ b/docs/havenask_docs/sql/guide/quickstart.md @@ -83,4 +83,4 @@ cd ~/havenask bazel build //aios/tools/hape:hape_tar -c opt --copt -g --strip=always --config=havenask ## 编译havenask引擎 bazel build //aios/sql:ha_sql -c opt --copt -g --strip=always --config=havenask -``` \ No newline at end of file +```