diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 65d98815196..84ca579dd20 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -28,301 +28,430 @@ struct Settings * but we are not going to do it, because settings is used everywhere as static struct fields. */ -#define APPLY_FOR_SETTINGS(M) \ - M(SettingString, regions, "", "the region need to be read.") \ - M(SettingBool, resolve_locks, false, "tmt resolve locks.") \ - M(SettingBool, group_by_collation_sensitive, false, "do group by with collation info.") \ - M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \ - M(SettingInt64, dag_records_per_chunk, DEFAULT_DAG_RECORDS_PER_CHUNK, "default chunk size of a DAG response.") \ - M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \ - M(SettingUInt64, mpp_task_timeout, DEFAULT_MPP_TASK_TIMEOUT, "mpp task max endurable time.") \ - M(SettingUInt64, mpp_task_running_timeout, DEFAULT_MPP_TASK_RUNNING_TIMEOUT, "mpp task max time that running without any progress.") \ - M(SettingUInt64, mpp_task_waiting_timeout, DEFAULT_MPP_TASK_WAITING_TIMEOUT, "mpp task max time that waiting first data block from source input stream.") \ - M(SettingInt64, safe_point_update_interval_seconds, 1, "The interval in seconds to update safe point from PD.")\ - M(SettingUInt64, batch_commands_threads, 0, "Number of threads to use for handling batch commands concurrently. 0 means - same as 'max_threads'.") \ - M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \ - M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \ - M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading") \ - M(SettingUInt64, max_insert_block_size, DEFAULT_INSERT_BLOCK_SIZE, "The maximum block size for insertion, if we control the creation of blocks for insertion.") \ - M(SettingUInt64, min_insert_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE, "Squash blocks passed to INSERT query to specified size in rows, if blocks are not big enough.") \ - M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.") \ - M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.") \ - M(SettingUInt64, cop_pool_size, 0, "The number of threads to handle cop requests. By default, it is determined automatically.") \ - M(SettingUInt64, batch_cop_pool_size, 0, "The number of threads to handle batch cop requests. By default, it is determined automatically.") \ - M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.") \ - M(SettingUInt64, max_distributed_connections, DEFAULT_MAX_DISTRIBUTED_CONNECTIONS, "The maximum number of connections for distributed processing of one query (should be greater than max_threads).") \ - M(SettingUInt64, max_query_size, DEFAULT_MAX_QUERY_SIZE, "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)") \ - M(SettingUInt64, interactive_delay, DEFAULT_INTERACTIVE_DELAY, "The interval in microseconds to check if the request is cancelled, and to send progress info.") \ - M(SettingSeconds, connect_timeout, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, "Connection timeout if there are no replicas.") \ - M(SettingMilliseconds, connect_timeout_with_failover_ms, DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS, "Connection timeout for selecting first healthy replica.") \ - M(SettingSeconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "") \ - M(SettingSeconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "") \ - M(SettingMilliseconds, queue_max_wait_ms, DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS, "The wait time in the request queue, if the number of concurrent requests exceeds the maximum.") \ - M(SettingUInt64, poll_interval, DBMS_DEFAULT_POLL_INTERVAL, "Block at the query wait loop on the server for the specified number of seconds.") \ - M(SettingUInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.") \ - M(SettingUInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.") \ - M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.") \ - M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.") \ - M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.") \ - M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.") \ - \ - M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS, "Sleep time for StorageDistributed DirectoryMonitors in case there is no work or exception has been thrown.") \ - \ - M(SettingBool, distributed_directory_monitor_batch_inserts, false, "Should StorageDistributed DirectoryMonitors try to batch individual inserts into bigger ones.") \ - \ - M(SettingBool, optimize_move_to_prewhere, true, "Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree.") \ - \ - M(SettingUInt64, replication_alter_partitions_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.") \ - M(SettingUInt64, replication_alter_columns_timeout, 60, "Wait for actions to change the table structure within the specified number of seconds. 0 - wait unlimited time.") \ - \ - M(SettingLoadBalancing, load_balancing, LoadBalancing::RANDOM, "Which replicas (among healthy replicas) to preferably send a query to (on the first attempt) for distributed processing.") \ - \ - M(SettingTotalsMode, totals_mode, TotalsMode::AFTER_HAVING_EXCLUSIVE, "How to calculate TOTALS when HAVING is present, as well as when max_rows_to_group_by and group_by_overflow_mode = ‘any’ are present.") \ - M(SettingFloat, totals_auto_threshold, 0.5, "The threshold for totals_mode = 'auto'.") \ - \ - M(SettingBool, compile, false, "Whether query compilation is enabled.") \ - M(SettingUInt64, min_count_to_compile, 3, "The number of structurally identical queries before they are compiled.") \ - M(SettingUInt64, group_by_two_level_threshold, 100000, "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.") \ - M(SettingUInt64, group_by_two_level_threshold_bytes, 100000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered.") \ - M(SettingBool, distributed_aggregation_memory_efficient, false, "Is the memory-saving mode of distributed aggregation enabled.") \ - M(SettingUInt64, aggregation_memory_efficient_merge_threads, 0, "Number of threads to use for merge intermediate aggregation results in memory efficient mode. When bigger, then more memory is consumed. 0 means - same as 'max_threads'.") \ - \ - M(SettingUInt64, max_parallel_replicas, 1, "The maximum number of replicas of each shard used when the query is executed. For consistency (to get different parts of the same partition), this option only works for the specified sampling key. The lag of the replicas is not controlled.") \ - M(SettingUInt64, parallel_replicas_count, 0, "") \ - M(SettingUInt64, parallel_replica_offset, 0, "") \ - \ - M(SettingBool, skip_unavailable_shards, false, "Silently skip unavailable shards.") \ - \ - M(SettingBool, distributed_group_by_no_merge, false, "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there are different keys on different shards.") \ - \ - M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), "If at least as many lines are read from one file, the reading can be parallelized.") \ - M(SettingUInt64, merge_tree_min_rows_for_seek, 0, "You can skip reading more than that number of rows at the price of one seek per file.") \ - M(SettingUInt64, merge_tree_coarse_index_granularity, 8, "If the index segment can contain the required keys, divide it into as many parts and recursively check them. ") \ - M(SettingUInt64, merge_tree_max_rows_to_use_cache, (1024 * 1024), "The maximum number of rows per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)") \ - \ - M(SettingBool, merge_tree_uniform_read_distribution, true, "Distribute read from MergeTree over threads evenly, ensuring stable average execution time of each thread within one read operation.") \ - \ - M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3, "The minimum length of the expression `expr = x1 OR ... expr = xN` for optimization ") \ - \ - M(SettingUInt64, min_bytes_to_use_direct_io, 0, "The minimum number of bytes for input/output operations is bypassing the page cache. 0 - disabled.") \ - \ - M(SettingBool, force_index_by_date, 0, "Throw an exception if there is a partition key in a table, and it is not used.") \ - M(SettingBool, force_primary_key, 0, "Throw an exception if there is primary key in a table, and it is not used.") \ - \ - M(SettingUInt64, mark_cache_min_lifetime, 0, "If the maximum size of mark_cache is exceeded, delete only records older than mark_cache_min_lifetime seconds.") \ - \ - M(SettingFloat, max_streams_to_max_threads_ratio, 1, "Allows you to use more sources than the number of threads - to more evenly distribute work across threads. It is assumed that this is a temporary solution, since it will be possible in the future to make the number of sources equal to the number of threads, but for each source to dynamically select available work for itself.") \ - \ - M(SettingCompressionMethod, network_compression_method, CompressionMethod::LZ4, "Allows you to select the method of data compression when writing.") \ - \ - M(SettingInt64, network_zstd_compression_level, 1, "Allows you to select the level of ZSTD compression.") \ - \ - M(SettingUInt64, priority, 0, "Priority of the query. 1 - the highest, higher value - lower priority; 0 - do not use priorities.") \ - \ - M(SettingBool, log_queries, 0, "Log requests and write the log to the system table.") \ - \ - M(SettingUInt64, log_queries_cut_to_length, 100000, "If query length is greater than specified threshold (in bytes), then cut query when writing to query log. Also limit length of printed query in ordinary text log.") \ - \ - M(SettingDistributedProductMode, distributed_product_mode, DistributedProductMode::DENY, "How are distributed subqueries performed inside IN or JOIN sections?") \ - \ - M(SettingUInt64, max_concurrent_queries_for_user, 0, "The maximum number of concurrent requests per user.") \ - \ - M(SettingBool, insert_deduplicate, true, "For INSERT queries in the replicated table, specifies that deduplication of insertings blocks should be preformed") \ - \ - M(SettingUInt64, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.") \ - M(SettingMilliseconds, insert_quorum_timeout, 600000, "") \ - M(SettingUInt64, select_sequential_consistency, 0, "For SELECT queries from the replicated table, throw an exception if the replica does not have a chunk written with the quorum; do not read the parts that have not yet been written with the quorum.") \ - M(SettingUInt64, table_function_remote_max_addresses, 1000, "The maximum number of different shards and the maximum number of replicas of one shard in the `remote` function.") \ - M(SettingMilliseconds, read_backoff_min_latency_ms, 1000, "Setting to reduce the number of threads in case of slow reads. Pay attention only to reads that took at least that much time.") \ - M(SettingUInt64, read_backoff_max_throughput, 1048576, "Settings to reduce the number of threads in case of slow reads. Count events when the read bandwidth is less than that many bytes per second.") \ - M(SettingMilliseconds, read_backoff_min_interval_between_events_ms, 1000, "Settings to reduce the number of threads in case of slow reads. Do not pay attention to the event, if the previous one has passed less than a certain amount of time.") \ - M(SettingUInt64, read_backoff_min_events, 2, "Settings to reduce the number of threads in case of slow reads. The number of events after which the number of threads will be reduced.") \ - \ - M(SettingFloat, memory_tracker_fault_probability, 0., "For testing of `exception safety` - throw an exception every time you allocate memory with the specified probability.") \ - \ - M(SettingBool, enable_http_compression, 0, "Compress the result if the client over HTTP said that it understands data compressed by gzip or deflate.") \ - M(SettingInt64, http_zlib_compression_level, 3, "Compression level - used if the client on HTTP said that it understands data compressed by gzip or deflate.") \ - \ - M(SettingBool, http_native_compression_disable_checksumming_on_decompress, 0, "If you uncompress the POST data from the client compressed by the native format, do not check the checksum.") \ - \ - M(SettingString, count_distinct_implementation, "uniqExact", "What aggregate function to use for implementation of count(DISTINCT ...)") \ - \ - M(SettingBool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.") \ - \ - M(SettingBool, add_http_cors_header, false, "Write add http CORS header.") \ - \ - M(SettingBool, input_format_skip_unknown_fields, false, "Skip columns with unknown names from input data (it works for JSONEachRow and TSKV formats).") \ - \ - M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.") \ - \ - M(SettingBool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.") \ - \ - M(SettingBool, output_format_json_quote_denormals, false, "Enables '+nan', '-nan', '+inf', '-inf' outputs in JSON output format.") \ - \ - M(SettingUInt64, output_format_pretty_max_rows, 10000, "Rows limit for Pretty formats.") \ - \ - M(SettingBool, use_client_time_zone, false, "Use client timezone for interpreting DateTime string values, instead of adopting server timezone.") \ - \ - M(SettingBool, send_progress_in_http_headers, false, "Send progress notifications using X-ClickHouse-Progress headers. Some clients do not support high amount of HTTP headers (Python requests in particular), so it is disabled by default.") \ - \ - M(SettingUInt64, http_headers_progress_interval_ms, 100, "Do not send HTTP headers X-ClickHouse-Progress more frequently than at each specified interval.") \ - \ - M(SettingBool, fsync_metadata, 1, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server with high load of DDL queries and high load of disk subsystem.") \ - \ - M(SettingUInt64, input_format_allow_errors_num, 0, "Maximum absolute amount of errors while reading text formats (like CSV, TSV). In case of error, if both absolute and relative values are non-zero, and at least absolute or relative amount of errors is lower than corresponding value, will skip until next line and continue.") \ - M(SettingFloat, input_format_allow_errors_ratio, 0, "Maximum relative amount of errors while reading text formats (like CSV, TSV). In case of error, if both absolute and relative values are non-zero, and at least absolute or relative amount of errors is lower than corresponding value, will skip until next line and continue.") \ - \ - M(SettingBool, join_use_nulls, 0, "Use NULLs for non-joined rows of outer JOINs. If false, use default value of corresponding columns data type.") \ - \ - M(SettingUInt64, preferred_block_size_bytes, 1000000, "") \ - \ - M(SettingUInt64, max_replica_delay_for_distributed_queries, 300, "If set, distributed queries of Replicated tables will choose servers with replication delay in seconds less than the specified value (not inclusive). Zero means do not take delay into account.") \ - M(SettingBool, fallback_to_stale_replicas_for_distributed_queries, 1, "Suppose max_replica_delay_for_distributed_queries is set and all replicas for the queried table are stale. If this setting is enabled, the query will be performed anyway, otherwise the error will be reported.") \ - M(SettingUInt64, preferred_max_column_in_block_size_bytes, 0, "Limit on max column size in block while reading. Helps to decrease cache misses count. Should be close to L2 cache size.") \ - \ - M(SettingBool, insert_distributed_sync, false, "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.") \ - M(SettingUInt64, insert_distributed_timeout, 0, "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no timeout.") \ - M(SettingInt64, distributed_ddl_task_timeout, 180, "Timeout for DDL query responses from all hosts in cluster. Negative value means infinite.") \ - M(SettingMilliseconds, stream_flush_interval_ms, DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS, "Timeout for flushing data from streaming storages.") \ - M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \ - M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \ - M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \ - M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout") \ - M(SettingSeconds, http_receive_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP receive timeout") \ - M(SettingBool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown") \ - M(SettingBool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.") \ - \ - M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \ - M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.") \ - \ - \ +#define APPLY_FOR_SETTINGS(M) \ + M(SettingString, regions, "", "the region need to be read.") \ + M(SettingBool, resolve_locks, false, "tmt resolve locks.") \ + M(SettingBool, group_by_collation_sensitive, false, "do group by with collation info.") \ + M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \ + M(SettingInt64, dag_records_per_chunk, DEFAULT_DAG_RECORDS_PER_CHUNK, "default chunk size of a DAG response.") \ + M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \ + M(SettingUInt64, mpp_task_timeout, DEFAULT_MPP_TASK_TIMEOUT, "mpp task max endurable time.") \ + M(SettingUInt64, mpp_task_running_timeout, DEFAULT_MPP_TASK_RUNNING_TIMEOUT, "mpp task max time that running without any progress.") \ + M(SettingUInt64, mpp_task_waiting_timeout, DEFAULT_MPP_TASK_WAITING_TIMEOUT, \ + "mpp task max time that waiting first data block from source input stream.") \ + M(SettingInt64, safe_point_update_interval_seconds, 1, "The interval in seconds to update safe point from PD.") \ + M(SettingUInt64, batch_commands_threads, 0, \ + "Number of threads to use for handling batch commands concurrently. 0 means - same as 'max_threads'.") \ + M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, \ + "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value " \ + "and no less than the volume of data for one mark.") \ + M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE, \ + "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \ + M(SettingUInt64, max_block_size, DEFAULT_BLOCK_SIZE, "Maximum block size for reading") \ + M(SettingUInt64, max_insert_block_size, DEFAULT_INSERT_BLOCK_SIZE, \ + "The maximum block size for insertion, if we control the creation of blocks for insertion.") \ + M(SettingUInt64, min_insert_block_size_rows, DEFAULT_INSERT_BLOCK_SIZE, \ + "Squash blocks passed to INSERT query to specified size in rows, if blocks are not big enough.") \ + M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), \ + "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.") \ + M(SettingMaxThreads, max_threads, 0, \ + "The maximum number of threads to execute the request. By default, it is determined automatically.") \ + M(SettingUInt64, cop_pool_size, 0, "The number of threads to handle cop requests. By default, it is determined automatically.") \ + M(SettingUInt64, batch_cop_pool_size, 0, \ + "The number of threads to handle batch cop requests. By default, it is determined automatically.") \ + M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.") \ + M(SettingUInt64, max_distributed_connections, DEFAULT_MAX_DISTRIBUTED_CONNECTIONS, \ + "The maximum number of connections for distributed processing of one query (should be greater than max_threads).") \ + M(SettingUInt64, max_query_size, DEFAULT_MAX_QUERY_SIZE, \ + "Which part of the query can be read into RAM for parsing (the remaining data for INSERT, if any, is read later)") \ + M(SettingUInt64, interactive_delay, DEFAULT_INTERACTIVE_DELAY, \ + "The interval in microseconds to check if the request is cancelled, and to send progress info.") \ + M(SettingSeconds, connect_timeout, DBMS_DEFAULT_CONNECT_TIMEOUT_SEC, "Connection timeout if there are no replicas.") \ + M(SettingMilliseconds, connect_timeout_with_failover_ms, DBMS_DEFAULT_CONNECT_TIMEOUT_WITH_FAILOVER_MS, \ + "Connection timeout for selecting first healthy replica.") \ + M(SettingSeconds, receive_timeout, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, "") \ + M(SettingSeconds, send_timeout, DBMS_DEFAULT_SEND_TIMEOUT_SEC, "") \ + M(SettingMilliseconds, queue_max_wait_ms, DEFAULT_QUERIES_QUEUE_WAIT_TIME_MS, \ + "The wait time in the request queue, if the number of concurrent requests exceeds the maximum.") \ + M(SettingUInt64, poll_interval, DBMS_DEFAULT_POLL_INTERVAL, \ + "Block at the query wait loop on the server for the specified number of seconds.") \ + M(SettingUInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, \ + "Maximum number of connections with one remote server in the pool.") \ + M(SettingUInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, \ + "The maximum number of attempts to connect to replicas.") \ + M(SettingBool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.") \ + M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.") \ + M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.") \ + M(SettingUInt64, background_pool_size, DBMS_DEFAULT_BACKGROUND_POOL_SIZE, \ + "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server " \ + "startup.") \ + \ + M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, DBMS_DISTRIBUTED_DIRECTORY_MONITOR_SLEEP_TIME_MS, \ + "Sleep time for StorageDistributed DirectoryMonitors in case there is no work or exception has been thrown.") \ + \ + M(SettingBool, distributed_directory_monitor_batch_inserts, false, \ + "Should StorageDistributed DirectoryMonitors try to batch individual inserts into bigger ones.") \ + \ + M(SettingBool, optimize_move_to_prewhere, true, "Allows disabling WHERE to PREWHERE optimization in SELECT queries from MergeTree.") \ + \ + M(SettingUInt64, replication_alter_partitions_sync, 1, \ + "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.") \ + M(SettingUInt64, replication_alter_columns_timeout, 60, \ + "Wait for actions to change the table structure within the specified number of seconds. 0 - wait unlimited time.") \ + \ + M(SettingLoadBalancing, load_balancing, LoadBalancing::RANDOM, \ + "Which replicas (among healthy replicas) to preferably send a query to (on the first attempt) for distributed processing.") \ + \ + M(SettingTotalsMode, totals_mode, TotalsMode::AFTER_HAVING_EXCLUSIVE, \ + "How to calculate TOTALS when HAVING is present, as well as when max_rows_to_group_by and group_by_overflow_mode = ‘any’ are " \ + "present.") \ + M(SettingFloat, totals_auto_threshold, 0.5, "The threshold for totals_mode = 'auto'.") \ + \ + M(SettingBool, compile, false, "Whether query compilation is enabled.") \ + M(SettingUInt64, min_count_to_compile, 3, "The number of structurally identical queries before they are compiled.") \ + M(SettingUInt64, group_by_two_level_threshold, 100000, \ + "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.") \ + M(SettingUInt64, group_by_two_level_threshold_bytes, 100000000, \ + "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. " \ + "Two-level aggregation is used when at least one of the thresholds is triggered.") \ + M(SettingBool, distributed_aggregation_memory_efficient, false, "Is the memory-saving mode of distributed aggregation enabled.") \ + M(SettingUInt64, aggregation_memory_efficient_merge_threads, 0, \ + "Number of threads to use for merge intermediate aggregation results in memory efficient mode. When bigger, then more memory is " \ + "consumed. 0 means - same as 'max_threads'.") \ + \ + M(SettingUInt64, max_parallel_replicas, 1, \ + "The maximum number of replicas of each shard used when the query is executed. For consistency (to get different parts of the " \ + "same partition), this option only works for the specified sampling key. The lag of the replicas is not controlled.") \ + M(SettingUInt64, parallel_replicas_count, 0, "") \ + M(SettingUInt64, parallel_replica_offset, 0, "") \ + \ + M(SettingBool, skip_unavailable_shards, false, "Silently skip unavailable shards.") \ + \ + M(SettingBool, distributed_group_by_no_merge, false, \ + "Do not merge aggregation states from different servers for distributed query processing - in case it is for certain that there " \ + "are different keys on different shards.") \ + \ + M(SettingUInt64, merge_tree_min_rows_for_concurrent_read, (20 * 8192), \ + "If at least as many lines are read from one file, the reading can be parallelized.") \ + M(SettingUInt64, merge_tree_min_rows_for_seek, 0, \ + "You can skip reading more than that number of rows at the price of one seek per file.") \ + M(SettingUInt64, merge_tree_coarse_index_granularity, 8, \ + "If the index segment can contain the required keys, divide it into as many parts and recursively check them. ") \ + M(SettingUInt64, merge_tree_max_rows_to_use_cache, (1024 * 1024), \ + "The maximum number of rows per request, to use the cache of uncompressed data. If the request is large, the cache is not used. " \ + "(For large queries not to flush out the cache.)") \ + \ + M(SettingBool, merge_tree_uniform_read_distribution, true, \ + "Distribute read from MergeTree over threads evenly, ensuring stable average execution time of each thread within one read " \ + "operation.") \ + \ + M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3, \ + "The minimum length of the expression `expr = x1 OR ... expr = xN` for optimization ") \ + \ + M(SettingUInt64, min_bytes_to_use_direct_io, 0, \ + "The minimum number of bytes for input/output operations is bypassing the page cache. 0 - disabled.") \ + \ + M(SettingBool, force_index_by_date, 0, "Throw an exception if there is a partition key in a table, and it is not used.") \ + M(SettingBool, force_primary_key, 0, "Throw an exception if there is primary key in a table, and it is not used.") \ + \ + M(SettingUInt64, mark_cache_min_lifetime, 0, \ + "If the maximum size of mark_cache is exceeded, delete only records older than mark_cache_min_lifetime seconds.") \ + \ + M(SettingFloat, max_streams_to_max_threads_ratio, 1, \ + "Allows you to use more sources than the number of threads - to more evenly distribute work across threads. It is assumed that " \ + "this is a temporary solution, since it will be possible in the future to make the number of sources equal to the number of " \ + "threads, but for each source to dynamically select available work for itself.") \ + \ + M(SettingCompressionMethod, network_compression_method, CompressionMethod::LZ4, \ + "Allows you to select the method of data compression when writing.") \ + \ + M(SettingInt64, network_zstd_compression_level, 1, "Allows you to select the level of ZSTD compression.") \ + \ + M(SettingUInt64, priority, 0, "Priority of the query. 1 - the highest, higher value - lower priority; 0 - do not use priorities.") \ + \ + M(SettingBool, log_queries, 0, "Log requests and write the log to the system table.") \ + \ + M(SettingUInt64, log_queries_cut_to_length, 100000, \ + "If query length is greater than specified threshold (in bytes), then cut query when writing to query log. Also limit length of " \ + "printed query in ordinary text log.") \ + \ + M(SettingDistributedProductMode, distributed_product_mode, DistributedProductMode::DENY, \ + "How are distributed subqueries performed inside IN or JOIN sections?") \ + \ + M(SettingUInt64, max_concurrent_queries_for_user, 0, "The maximum number of concurrent requests per user.") \ + \ + M(SettingBool, insert_deduplicate, true, \ + "For INSERT queries in the replicated table, specifies that deduplication of insertings blocks should be preformed") \ + \ + M(SettingUInt64, insert_quorum, 0, \ + "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the " \ + "data. 0 - disabled.") \ + M(SettingMilliseconds, insert_quorum_timeout, 600000, "") \ + M(SettingUInt64, select_sequential_consistency, 0, \ + "For SELECT queries from the replicated table, throw an exception if the replica does not have a chunk written with the quorum; " \ + "do not read the parts that have not yet been written with the quorum.") \ + M(SettingUInt64, table_function_remote_max_addresses, 1000, \ + "The maximum number of different shards and the maximum number of replicas of one shard in the `remote` function.") \ + M(SettingMilliseconds, read_backoff_min_latency_ms, 1000, \ + "Setting to reduce the number of threads in case of slow reads. Pay attention only to reads that took at least that much time.") \ + M(SettingUInt64, read_backoff_max_throughput, 1048576, \ + "Settings to reduce the number of threads in case of slow reads. Count events when the read bandwidth is less than that many " \ + "bytes per second.") \ + M(SettingMilliseconds, read_backoff_min_interval_between_events_ms, 1000, \ + "Settings to reduce the number of threads in case of slow reads. Do not pay attention to the event, if the previous one has " \ + "passed less than a certain amount of time.") \ + M(SettingUInt64, read_backoff_min_events, 2, \ + "Settings to reduce the number of threads in case of slow reads. The number of events after which the number of threads will be " \ + "reduced.") \ + \ + M(SettingFloat, memory_tracker_fault_probability, 0., \ + "For testing of `exception safety` - throw an exception every time you allocate memory with the specified probability.") \ + \ + M(SettingBool, enable_http_compression, 0, \ + "Compress the result if the client over HTTP said that it understands data compressed by gzip or deflate.") \ + M(SettingInt64, http_zlib_compression_level, 3, \ + "Compression level - used if the client on HTTP said that it understands data compressed by gzip or deflate.") \ + \ + M(SettingBool, http_native_compression_disable_checksumming_on_decompress, 0, \ + "If you uncompress the POST data from the client compressed by the native format, do not check the checksum.") \ + \ + M(SettingString, count_distinct_implementation, "uniqExact", \ + "What aggregate function to use for implementation of count(DISTINCT ...)") \ + \ + M(SettingBool, output_format_write_statistics, true, \ + "Write statistics about read rows, bytes, time elapsed in suitable output formats.") \ + \ + M(SettingBool, add_http_cors_header, false, "Write add http CORS header.") \ + \ + M(SettingBool, input_format_skip_unknown_fields, false, \ + "Skip columns with unknown names from input data (it works for JSONEachRow and TSKV formats).") \ + \ + M(SettingBool, input_format_values_interpret_expressions, true, \ + "For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.") \ + \ + M(SettingBool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.") \ + \ + M(SettingBool, output_format_json_quote_denormals, false, "Enables '+nan', '-nan', '+inf', '-inf' outputs in JSON output format.") \ + \ + M(SettingUInt64, output_format_pretty_max_rows, 10000, "Rows limit for Pretty formats.") \ + \ + M(SettingBool, use_client_time_zone, false, \ + "Use client timezone for interpreting DateTime string values, instead of adopting server timezone.") \ + \ + M(SettingBool, send_progress_in_http_headers, false, \ + "Send progress notifications using X-ClickHouse-Progress headers. Some clients do not support high amount of HTTP headers " \ + "(Python requests in particular), so it is disabled by default.") \ + \ + M(SettingUInt64, http_headers_progress_interval_ms, 100, \ + "Do not send HTTP headers X-ClickHouse-Progress more frequently than at each specified interval.") \ + \ + M(SettingBool, fsync_metadata, 1, \ + "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server " \ + "with high load of DDL queries and high load of disk subsystem.") \ + \ + M(SettingUInt64, input_format_allow_errors_num, 0, \ + "Maximum absolute amount of errors while reading text formats (like CSV, TSV). In case of error, if both absolute and relative " \ + "values are non-zero, and at least absolute or relative amount of errors is lower than corresponding value, will skip until next " \ + "line and continue.") \ + M(SettingFloat, input_format_allow_errors_ratio, 0, \ + "Maximum relative amount of errors while reading text formats (like CSV, TSV). In case of error, if both absolute and relative " \ + "values are non-zero, and at least absolute or relative amount of errors is lower than corresponding value, will skip until next " \ + "line and continue.") \ + \ + M(SettingBool, join_use_nulls, 0, \ + "Use NULLs for non-joined rows of outer JOINs. If false, use default value of corresponding columns data type.") \ + \ + M(SettingUInt64, preferred_block_size_bytes, 1000000, "") \ + \ + M(SettingUInt64, max_replica_delay_for_distributed_queries, 300, \ + "If set, distributed queries of Replicated tables will choose servers with replication delay in seconds less than the specified " \ + "value (not inclusive). Zero means do not take delay into account.") \ + M(SettingBool, fallback_to_stale_replicas_for_distributed_queries, 1, \ + "Suppose max_replica_delay_for_distributed_queries is set and all replicas for the queried table are stale. If this setting is " \ + "enabled, the query will be performed anyway, otherwise the error will be reported.") \ + M(SettingUInt64, preferred_max_column_in_block_size_bytes, 0, \ + "Limit on max column size in block while reading. Helps to decrease cache misses count. Should be close to L2 cache size.") \ + \ + M(SettingBool, insert_distributed_sync, false, \ + "If setting is enabled, insert query into distributed waits until data will be sent to all nodes in cluster.") \ + M(SettingUInt64, insert_distributed_timeout, 0, \ + "Timeout for insert query into distributed. Setting is used only with insert_distributed_sync enabled. Zero value means no " \ + "timeout.") \ + M(SettingInt64, distributed_ddl_task_timeout, 180, \ + "Timeout for DDL query responses from all hosts in cluster. Negative value means infinite.") \ + M(SettingMilliseconds, stream_flush_interval_ms, DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS, \ + "Timeout for flushing data from streaming storages.") \ + M(SettingString, format_schema, "", "Schema identifier (used by schema-based formats)") \ + M(SettingBool, insert_allow_materialized_columns, 0, "If setting is enabled, Allow materialized columns in INSERT.") \ + M(SettingSeconds, http_connection_timeout, DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT, "HTTP connection timeout.") \ + M(SettingSeconds, http_send_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP send timeout") \ + M(SettingSeconds, http_receive_timeout, DEFAULT_HTTP_READ_BUFFER_TIMEOUT, "HTTP receive timeout") \ + M(SettingBool, optimize_throw_if_noop, false, \ + "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown") \ + M(SettingBool, use_index_for_in_with_subqueries, true, \ + "Try using an index if there is a subquery or a table expression on the right side of the IN operator.") \ + \ + M(SettingBool, empty_result_for_aggregation_by_empty_set, false, "Return empty result when aggregating without keys on empty set.") \ + M(SettingBool, allow_distributed_ddl, true, "If it is set to true, then a user is allowed to executed distributed DDL queries.") \ + \ + \ /** Limits during query execution are part of the settings. \ * Used to provide a more safe execution of queries from the user interface. \ * Basically, limits are checked for each block (not every row). That is, the limits can be slightly violated. \ * Almost all limits apply only to SELECTs. \ * Almost all limits apply to each stream individually. \ - */ \ - \ - M(SettingUInt64, max_rows_to_read, 0, "Limit on read rows from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.") \ - M(SettingUInt64, max_bytes_to_read, 0, "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.") \ - M(SettingOverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ - \ - M(SettingUInt64, max_rows_to_group_by, 0, "") \ - M(SettingOverflowMode, group_by_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ - M(SettingUInt64, max_bytes_before_external_group_by, 0, "") \ - \ - M(SettingUInt64, max_rows_to_sort, 0, "") \ - M(SettingUInt64, max_bytes_to_sort, 0, "") \ - M(SettingOverflowMode, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ - M(SettingUInt64, max_bytes_before_external_sort, 0, "") \ - \ - M(SettingUInt64, max_result_rows, 0, "Limit on result size in rows. Also checked for intermediate data sent from remote servers.") \ - M(SettingUInt64, max_result_bytes, 0, "Limit on result size in bytes (uncompressed). Also checked for intermediate data sent from remote servers.") \ - M(SettingOverflowMode, result_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ - \ - /* TODO: Check also when merging and finalizing aggregate functions. */ \ - M(SettingSeconds, max_execution_time, 0, "") \ - M(SettingOverflowMode, timeout_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ - \ - M(SettingUInt64, min_execution_speed, 0, "In rows per second.") \ - M(SettingSeconds, timeout_before_checking_execution_speed, 0, "Check that the speed is not too low after the specified time has elapsed.") \ - \ - M(SettingUInt64, max_columns_to_read, 0, "") \ - M(SettingUInt64, max_temporary_columns, 0, "") \ - M(SettingUInt64, max_temporary_non_const_columns, 0, "") \ - \ - M(SettingUInt64, max_subquery_depth, 100, "") \ - M(SettingUInt64, max_pipeline_depth, 1000, "") \ - M(SettingUInt64, max_ast_depth, 1000, "Maximum depth of query syntax tree. Checked after parsing.") \ - M(SettingUInt64, max_ast_elements, 50000, "Maximum size of query syntax tree in number of nodes. Checked after parsing.") \ - M(SettingUInt64, max_expanded_ast_elements, 500000, "Maximum size of query syntax tree in number of nodes after expansion of aliases and the asterisk.") \ - \ - M(SettingUInt64, readonly, 0, "0 - everything is allowed. 1 - only read requests. 2 - only read requests, as well as changing settings, except for the 'readonly' setting.") \ - \ - M(SettingUInt64, shared_query_clients, 0, "How many clients will share the same query_id. If > 0, enable shared query mode.")\ - M(SettingString, query_id, "", "The query_id, only for testing.")\ - M(SettingUInt64, mutable_deduper, 5, "The deduper used by MutableMergeTree storage. By default 5. 0: OriginStreams, 1: OriginUnity, 2: ReplacingUnity, 3: ReplacingPartitioning, 4: DedupPartitioning, 5: ReplacingPartitioningOpt.")\ - \ - M(SettingUInt64, dt_segment_limit_rows, 1000000, "Base rows of segments in DeltaTree Engine.")\ - M(SettingUInt64, dt_segment_limit_size, 1073741824, "Base size of segments in DeltaTree Engine. 1 GB by default.")\ - M(SettingUInt64, dt_segment_delta_limit_rows, 80000, "Max rows of segment delta in DeltaTree Engine")\ - M(SettingUInt64, dt_segment_delta_limit_size, 85983232, "Max size of segment delta in DeltaTree Engine. 82 MB by default.")\ - M(SettingUInt64, dt_segment_force_merge_delta_deletes, 10, "Delta delete ranges before force merge into stable.")\ - M(SettingUInt64, dt_segment_force_merge_delta_rows, 400000, "Delta rows before force merge into stable.")\ - M(SettingUInt64, dt_segment_force_merge_delta_size, 429496729, "Delta size before force merge into stable. 400 MB by default.")\ - M(SettingUInt64, dt_segment_stop_write_delta_rows, 2000000, "Delta rows before stop new writes.")\ - M(SettingUInt64, dt_segment_stop_write_delta_size, 2147483648, "Delta size before stop new writes. 2 GB by default.")\ - M(SettingUInt64, dt_segment_delta_cache_limit_rows, 4000, "Max rows of cache in segment delta in DeltaTree Engine.")\ - M(SettingUInt64, dt_segment_delta_cache_limit_size, 4194304, "Max size of cache in segment delta in DeltaTree Engine. 4 MB by default.")\ - M(SettingUInt64, dt_segment_delta_small_pack_rows, 512, "Determine whether a pack in delta is small or not.")\ - M(SettingUInt64, dt_segment_delta_small_pack_size, 524288, "Determine whether a pack in delta is small or not. 512 KB by default.")\ - M(SettingUInt64, dt_segment_stable_pack_rows, DEFAULT_MERGE_BLOCK_SIZE, "Expected stable pack rows in DeltaTree Engine.")\ - M(SettingFloat, dt_segment_wait_duration_factor, 1, "The factor of wait duration in a write stall.")\ - M(SettingUInt64, dt_bg_gc_check_interval, 600, "Background gc thread check interval, the unit is second.")\ - M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 15, "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.")\ - M(SettingFloat, dt_bg_gc_ratio_threhold_to_trigger_gc, 1.2, "Trigger segment's gc when the ratio of invalid version exceed this threhold. Values smaller than or equal to 1.0 means gc all segments")\ - M(SettingUInt64, dt_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaTree Engine. By default 0 means no limit.")\ - M(SettingBool, dt_enable_rough_set_filter, true, "Whether to parse where expression as Rough Set Index filter or not.") \ - M(SettingBool, dt_raw_filter_range, true, "Do range filter or not when read data in raw mode in DeltaTree Engine.")\ - M(SettingBool, dt_read_delta_only, false, "Only read delta data in DeltaTree Engine.")\ - M(SettingBool, dt_read_stable_only, false, "Only read stable data in DeltaTree Engine.")\ - M(SettingBool, dt_enable_logical_split, true, "Enable logical split or not in DeltaTree Engine.")\ - M(SettingBool, dt_flush_after_write, false, "Flush cache or not after write in DeltaTree Engine.")\ - M(SettingBool, dt_enable_relevant_place, false, "Enable relevant place or not in DeltaTree Engine.")\ - M(SettingBool, dt_enable_skippable_place, true, "Enable skippable place or not in DeltaTree Engine.")\ - M(SettingBool, dt_enable_stable_column_cache, true, "Enable column cache for StorageDeltaMerge.") \ - M(SettingBool, dt_enable_single_file_mode_dmfile, false, "Enable write DMFile in single file mode.") \ - M(SettingUInt64, dt_open_file_max_idle_seconds, 15, "Max idle time of opening files, 0 means infinite.") \ - M(SettingUInt64, dt_page_num_max_expect_legacy_files, 100, "Max number of legacy file expected") \ - M(SettingFloat, dt_page_num_max_gc_valid_rate, 1.0, "Max valid rate of deciding a page file can be compact when exising legacy files are more over than `dt_stroage_num_max_expect_legacy_files`") \ - M(SettingFloat, dt_page_gc_low_write_prob, 0.10, "Probability to run gc when write there is few writes.") \ - \ - M(SettingUInt64, dt_storage_pool_log_write_slots, 4, "Max write concurrency for each StoragePool.log.") \ - M(SettingUInt64, dt_storage_pool_log_gc_min_file_num, 10, "Min number of page files to compact") \ - M(SettingUInt64, dt_storage_pool_log_gc_min_legacy_num, 3, "Min number of legacy page files to compact") \ - M(SettingUInt64, dt_storage_pool_log_gc_min_bytes, 128 * Constant::MB, "Min bytes of page data to compact") \ - M(SettingFloat, dt_storage_pool_log_gc_max_valid_rate, 0.35, "Max valid rate of deciding a page file can be compact") \ - \ - M(SettingUInt64, dt_storage_pool_data_write_slots, 1, "Max write concurrency for each StoragePool.data.") \ - M(SettingUInt64, dt_storage_pool_data_gc_min_file_num, 10, "Min number of page files to compact") \ - M(SettingUInt64, dt_storage_pool_data_gc_min_legacy_num, 3, "Min number of legacy page files to compact") \ - M(SettingUInt64, dt_storage_pool_data_gc_min_bytes, 128 * Constant::MB, "Min bytes of page data to compact") \ - M(SettingFloat, dt_storage_pool_data_gc_max_valid_rate, 0.35, "Max valid rate of deciding a page file can be compact") \ - \ - M(SettingUInt64, dt_storage_pool_meta_write_slots, 2, "Max write concurrency for each StoragePool.meta.") \ - M(SettingUInt64, dt_storage_pool_meta_gc_min_file_num, 10, "Min number of page files to compact") \ - M(SettingUInt64, dt_storage_pool_meta_gc_min_legacy_num, 3, "Min number of legacy page files to compact") \ - M(SettingUInt64, dt_storage_pool_meta_gc_min_bytes, 128 * Constant::MB, "Min bytes of page data to compact") \ - M(SettingFloat, dt_storage_pool_meta_gc_max_valid_rate, 0.35, "Max valid rate of deciding a page file can be compact") \ - \ - M(SettingUInt64, max_rows_in_set, 0, "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \ - M(SettingUInt64, max_bytes_in_set, 0, "Maximum size of the set (in bytes in memory) resulting from the execution of the IN section.") \ - M(SettingOverflowMode, set_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ - \ - M(SettingUInt64, max_rows_in_join, 0, "Maximum size of the hash table for JOIN (in number of rows).") \ - M(SettingUInt64, max_bytes_in_join, 0, "Maximum size of the hash table for JOIN (in number of bytes in memory).") \ - M(SettingOverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ - \ - M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \ - M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \ - M(SettingOverflowMode, transfer_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ - \ - M(SettingUInt64, max_rows_in_distinct, 0, "Maximum number of elements during execution of DISTINCT.") \ - M(SettingUInt64, max_bytes_in_distinct, 0, "Maximum total size of state (in uncompressed bytes) in memory for the execution of DISTINCT.") \ - M(SettingOverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ - \ - M(SettingBool, join_concurrent_build, true, "Build hash table concurrently for join.") \ - M(SettingUInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.") \ - M(SettingUInt64, max_memory_usage_for_user, 0, "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.") \ - M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Maximum memory usage for processing all concurrently running queries on the server. Zero means unlimited.") \ - \ - M(SettingUInt64, max_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for a query. Zero means unlimited.") \ - M(SettingUInt64, max_network_bytes, 0, "The maximum number of bytes (compressed) to receive or transmit over the network for execution of the query.") \ - M(SettingUInt64, max_network_bandwidth_for_user, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running user queries. Zero means unlimited.")\ - M(SettingUInt64, max_network_bandwidth_for_all_users, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running queries. Zero means unlimited.") + */ \ + \ + M(SettingUInt64, max_rows_to_read, 0, \ + "Limit on read rows from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it " \ + "is only checked on a remote server.") \ + M(SettingUInt64, max_bytes_to_read, 0, \ + "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading " \ + "from a remote server, it is only checked on a remote server.") \ + M(SettingOverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ + \ + M(SettingUInt64, max_rows_to_group_by, 0, "") \ + M(SettingOverflowMode, group_by_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ + M(SettingUInt64, max_bytes_before_external_group_by, 0, "") \ + \ + M(SettingUInt64, max_rows_to_sort, 0, "") \ + M(SettingUInt64, max_bytes_to_sort, 0, "") \ + M(SettingOverflowMode, sort_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ + M(SettingUInt64, max_bytes_before_external_sort, 0, "") \ + \ + M(SettingUInt64, max_result_rows, 0, "Limit on result size in rows. Also checked for intermediate data sent from remote servers.") \ + M(SettingUInt64, max_result_bytes, 0, \ + "Limit on result size in bytes (uncompressed). Also checked for intermediate data sent from remote servers.") \ + M(SettingOverflowMode, result_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ + \ + /* TODO: Check also when merging and finalizing aggregate functions. */ \ + M(SettingSeconds, max_execution_time, 0, "") \ + M(SettingOverflowMode, timeout_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ + \ + M(SettingUInt64, min_execution_speed, 0, "In rows per second.") \ + M(SettingSeconds, timeout_before_checking_execution_speed, 0, \ + "Check that the speed is not too low after the specified time has elapsed.") \ + \ + M(SettingUInt64, max_columns_to_read, 0, "") \ + M(SettingUInt64, max_temporary_columns, 0, "") \ + M(SettingUInt64, max_temporary_non_const_columns, 0, "") \ + \ + M(SettingUInt64, max_subquery_depth, 100, "") \ + M(SettingUInt64, max_pipeline_depth, 1000, "") \ + M(SettingUInt64, max_ast_depth, 1000, "Maximum depth of query syntax tree. Checked after parsing.") \ + M(SettingUInt64, max_ast_elements, 50000, "Maximum size of query syntax tree in number of nodes. Checked after parsing.") \ + M(SettingUInt64, max_expanded_ast_elements, 500000, \ + "Maximum size of query syntax tree in number of nodes after expansion of aliases and the asterisk.") \ + \ + M(SettingUInt64, readonly, 0, \ + "0 - everything is allowed. 1 - only read requests. 2 - only read requests, as well as changing settings, except for the " \ + "'readonly' setting.") \ + \ + M(SettingUInt64, shared_query_clients, 0, "How many clients will share the same query_id. If > 0, enable shared query mode.") \ + M(SettingString, query_id, "", "The query_id, only for testing.") \ + M(SettingUInt64, mutable_deduper, 5, \ + "The deduper used by MutableMergeTree storage. By default 5. 0: OriginStreams, 1: OriginUnity, 2: ReplacingUnity, 3: " \ + "ReplacingPartitioning, 4: DedupPartitioning, 5: ReplacingPartitioningOpt.") \ + \ + M(SettingUInt64, dt_segment_limit_rows, 1000000, "Base rows of segments in DeltaTree Engine.") \ + M(SettingUInt64, dt_segment_limit_size, 1073741824, "Base size of segments in DeltaTree Engine. 1 GB by default.") \ + M(SettingUInt64, dt_segment_delta_limit_rows, 80000, "Max rows of segment delta in DeltaTree Engine") \ + M(SettingUInt64, dt_segment_delta_limit_size, 85983232, "Max size of segment delta in DeltaTree Engine. 82 MB by default.") \ + M(SettingUInt64, dt_segment_force_merge_delta_deletes, 10, "Delta delete ranges before force merge into stable.") \ + M(SettingUInt64, dt_segment_force_merge_delta_rows, 400000, "Delta rows before force merge into stable.") \ + M(SettingUInt64, dt_segment_force_merge_delta_size, 429496729, "Delta size before force merge into stable. 400 MB by default.") \ + M(SettingUInt64, dt_segment_stop_write_delta_rows, 2000000, "Delta rows before stop new writes.") \ + M(SettingUInt64, dt_segment_stop_write_delta_size, 2147483648, "Delta size before stop new writes. 2 GB by default.") \ + M(SettingUInt64, dt_segment_delta_cache_limit_rows, 4000, "Max rows of cache in segment delta in DeltaTree Engine.") \ + M(SettingUInt64, dt_segment_delta_cache_limit_size, 4194304, \ + "Max size of cache in segment delta in DeltaTree Engine. 4 MB by default.") \ + M(SettingUInt64, dt_segment_delta_small_pack_rows, 512, "Determine whether a pack in delta is small or not.") \ + M(SettingUInt64, dt_segment_delta_small_pack_size, 524288, "Determine whether a pack in delta is small or not. 512 KB by default.") \ + M(SettingUInt64, dt_segment_stable_pack_rows, DEFAULT_MERGE_BLOCK_SIZE, "Expected stable pack rows in DeltaTree Engine.") \ + M(SettingFloat, dt_segment_wait_duration_factor, 1, "The factor of wait duration in a write stall.") \ + M(SettingUInt64, dt_bg_gc_check_interval, 5, "Background gc thread check interval, the unit is second.") \ + M(SettingInt64, dt_bg_gc_max_segments_to_check_every_round, 100, \ + "Max segments to check in every gc round, value less than or equal to 0 means gc no segments.") \ + M(SettingFloat, dt_bg_gc_ratio_threhold_to_trigger_gc, 1.2, \ + "Trigger segment's gc when the ratio of invalid version exceed this threhold. Values smaller than or equal to 1.0 means gc all " \ + "segments") \ + M(SettingFloat, dt_bg_gc_delta_delete_ratio_to_trigger_gc, 0.3, \ + "Trigger segment's gc when the ratio of delta delete range to stable exceeds this ratio.") \ + M(SettingUInt64, dt_insert_max_rows, 0, "Max rows of insert blocks when write into DeltaTree Engine. By default 0 means no limit.") \ + M(SettingBool, dt_enable_rough_set_filter, true, "Whether to parse where expression as Rough Set Index filter or not.") \ + M(SettingBool, dt_raw_filter_range, true, "Do range filter or not when read data in raw mode in DeltaTree Engine.") \ + M(SettingBool, dt_read_delta_only, false, "Only read delta data in DeltaTree Engine.") \ + M(SettingBool, dt_read_stable_only, false, "Only read stable data in DeltaTree Engine.") \ + M(SettingBool, dt_enable_logical_split, true, "Enable logical split or not in DeltaTree Engine.") \ + M(SettingBool, dt_flush_after_write, false, "Flush cache or not after write in DeltaTree Engine.") \ + M(SettingBool, dt_enable_relevant_place, false, "Enable relevant place or not in DeltaTree Engine.") \ + M(SettingBool, dt_enable_skippable_place, true, "Enable skippable place or not in DeltaTree Engine.") \ + M(SettingBool, dt_enable_stable_column_cache, true, "Enable column cache for StorageDeltaMerge.") \ + M(SettingBool, dt_enable_single_file_mode_dmfile, false, "Enable write DMFile in single file mode.") \ + M(SettingUInt64, dt_open_file_max_idle_seconds, 15, "Max idle time of opening files, 0 means infinite.") \ + M(SettingUInt64, dt_page_num_max_expect_legacy_files, 100, "Max number of legacy file expected") \ + M(SettingFloat, dt_page_num_max_gc_valid_rate, 1.0, \ + "Max valid rate of deciding a page file can be compact when exising legacy files are more over than " \ + "`dt_stroage_num_max_expect_legacy_files`") \ + M(SettingFloat, dt_page_gc_low_write_prob, 0.10, "Probability to run gc when write there is few writes.") \ + \ + M(SettingUInt64, dt_storage_pool_log_write_slots, 4, "Max write concurrency for each StoragePool.log.") \ + M(SettingUInt64, dt_storage_pool_log_gc_min_file_num, 10, "Min number of page files to compact") \ + M(SettingUInt64, dt_storage_pool_log_gc_min_legacy_num, 3, "Min number of legacy page files to compact") \ + M(SettingUInt64, dt_storage_pool_log_gc_min_bytes, 128 * Constant::MB, "Min bytes of page data to compact") \ + M(SettingFloat, dt_storage_pool_log_gc_max_valid_rate, 0.35, "Max valid rate of deciding a page file can be compact") \ + \ + M(SettingUInt64, dt_storage_pool_data_write_slots, 1, "Max write concurrency for each StoragePool.data.") \ + M(SettingUInt64, dt_storage_pool_data_gc_min_file_num, 10, "Min number of page files to compact") \ + M(SettingUInt64, dt_storage_pool_data_gc_min_legacy_num, 3, "Min number of legacy page files to compact") \ + M(SettingUInt64, dt_storage_pool_data_gc_min_bytes, 128 * Constant::MB, "Min bytes of page data to compact") \ + M(SettingFloat, dt_storage_pool_data_gc_max_valid_rate, 0.35, "Max valid rate of deciding a page file can be compact") \ + \ + M(SettingUInt64, dt_storage_pool_meta_write_slots, 2, "Max write concurrency for each StoragePool.meta.") \ + M(SettingUInt64, dt_storage_pool_meta_gc_min_file_num, 10, "Min number of page files to compact") \ + M(SettingUInt64, dt_storage_pool_meta_gc_min_legacy_num, 3, "Min number of legacy page files to compact") \ + M(SettingUInt64, dt_storage_pool_meta_gc_min_bytes, 128 * Constant::MB, "Min bytes of page data to compact") \ + M(SettingFloat, dt_storage_pool_meta_gc_max_valid_rate, 0.35, "Max valid rate of deciding a page file can be compact") \ + \ + M(SettingUInt64, max_rows_in_set, 0, \ + "Maximum size of the set (in number of elements) resulting from the execution of the IN section.") \ + M(SettingUInt64, max_bytes_in_set, 0, "Maximum size of the set (in bytes in memory) resulting from the execution of the IN section.") \ + M(SettingOverflowMode, set_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ + \ + M(SettingUInt64, max_rows_in_join, 0, "Maximum size of the hash table for JOIN (in number of rows).") \ + M(SettingUInt64, max_bytes_in_join, 0, "Maximum size of the hash table for JOIN (in number of bytes in memory).") \ + M(SettingOverflowMode, join_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ + \ + M(SettingUInt64, max_rows_to_transfer, 0, \ + "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \ + M(SettingUInt64, max_bytes_to_transfer, 0, \ + "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \ + M(SettingOverflowMode, transfer_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ + \ + M(SettingUInt64, max_rows_in_distinct, 0, "Maximum number of elements during execution of DISTINCT.") \ + M(SettingUInt64, max_bytes_in_distinct, 0, \ + "Maximum total size of state (in uncompressed bytes) in memory for the execution of DISTINCT.") \ + M(SettingOverflowMode, distinct_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.") \ + \ + M(SettingBool, join_concurrent_build, true, "Build hash table concurrently for join.") \ + M(SettingUInt64, max_memory_usage, 0, "Maximum memory usage for processing of single query. Zero means unlimited.") \ + M(SettingUInt64, max_memory_usage_for_user, 0, \ + "Maximum memory usage for processing all concurrently running queries for the user. Zero means unlimited.") \ + M(SettingUInt64, max_memory_usage_for_all_queries, 0, \ + "Maximum memory usage for processing all concurrently running queries on the server. Zero means unlimited.") \ + \ + M(SettingUInt64, max_network_bandwidth, 0, \ + "The maximum speed of data exchange over the network in bytes per second for a query. Zero means unlimited.") \ + M(SettingUInt64, max_network_bytes, 0, \ + "The maximum number of bytes (compressed) to receive or transmit over the network for execution of the query.") \ + M(SettingUInt64, max_network_bandwidth_for_user, 0, \ + "The maximum speed of data exchange over the network in bytes per second for all concurrently running user queries. Zero means " \ + "unlimited.") \ + M(SettingUInt64, max_network_bandwidth_for_all_users, 0, \ + "The maximum speed of data exchange over the network in bytes per second for all concurrently running queries. Zero means " \ + "unlimited.") -#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \ - TYPE NAME {DEFAULT}; +#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) TYPE NAME{DEFAULT}; APPLY_FOR_SETTINGS(DECLARE) diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h index c27a0deb15b..581315ffb8a 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.h @@ -319,6 +319,8 @@ class DeltaValueSnapshot : public std::enable_shared_from_thistryToDeleteRange(); dp_delete) + squashed_delete_range = squashed_delete_range.merge(dp_delete->getDeleteRange()); + } + return squashed_delete_range; +} + // ================================================ // DeltaValueReader // ================================================ - DeltaValueReader::DeltaValueReader(const DMContext & context, const DeltaSnapshotPtr & delta_snap_, const ColumnDefinesPtr & col_defs_, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index da6515fa9d3..7b1c7eff457 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -94,14 +94,14 @@ bool DeltaMergeStore::MergeDeltaTaskPool::addTask(const BackgroundTask & task, c std::scoped_lock lock(mutex); switch (task.type) { - case Split: - case Merge: - case MergeDelta: + case TaskType::Split: + case TaskType::Merge: + case TaskType::MergeDelta: heavy_tasks.push(task); return true; - case Compact: - case Flush: - case PlaceIndex: + case TaskType::Compact: + case TaskType::Flush: + case TaskType::PlaceIndex: light_tasks.push(task); return false; default: @@ -701,7 +701,7 @@ void DeltaMergeStore::ingestFiles(const DMContextPtr & dm_context, updated_segments.push_back(segment); fiu_do_on(FailPoints::segment_merge_after_ingest_packs, { segment->flushCache(*dm_context); - segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_BG_Thread_Pool); + segmentMergeDelta(*dm_context, segment, TaskRunThread::BackgroundThreadPool); storage_pool.gc(global_context.getSettingsRef(), StoragePool::Seconds(0)); }); break; @@ -874,7 +874,7 @@ void DeltaMergeStore::mergeDeltaAll(const Context & context) for (auto & segment : all_segments) { - segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_FG); + segmentMergeDelta(*dm_context, segment, TaskRunThread::Foreground); } } @@ -1074,7 +1074,6 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const { if (segment->hasAbandoned()) return; - auto & delta = segment->getDelta(); size_t delta_saved_rows = delta->getRows(/* use_unsaved */ false); @@ -1118,8 +1117,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const bool should_background_merge_delta = ((delta_check_rows >= delta_limit_rows || delta_check_bytes >= delta_limit_bytes) // && (delta_rows - delta_last_try_merge_delta_rows >= delta_cache_limit_rows - || delta_bytes - delta_last_try_merge_delta_bytes >= delta_cache_limit_bytes)) - || delta_deletes >= 2; + || delta_bytes - delta_last_try_merge_delta_bytes >= delta_cache_limit_bytes)); bool should_foreground_merge_delta_by_rows_or_bytes = delta_check_rows >= forceMergeDeltaRows(dm_context) || delta_check_bytes >= forceMergeDeltaBytes(dm_context); bool should_foreground_merge_delta_by_deletes = delta_deletes >= forceMergeDeltaDeletes(dm_context); @@ -1176,6 +1174,8 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const } } + // Need to check the latest delta (maybe updated after foreground flush). If it is updating by another thread, + // give up adding more tasks on this version of delta. if (segment->getDelta()->isUpdating()) return; @@ -1202,14 +1202,12 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const if (it == segments.end()) return {}; next_segment = it->second; - - auto limit = dm_context->segment_limit_rows / 5; + auto limit = dm_context->segment_limit_rows / 5; if (next_segment->getEstimatedRows() >= limit) return {}; } return next_segment; }; - SegmentPtr merge_sibling; auto try_fg_merge_delta = [&]() -> SegmentPtr { if (should_foreground_merge_delta_by_rows_or_bytes || should_foreground_merge_delta_by_deletes) @@ -1228,7 +1226,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const .Observe(watch.elapsedSeconds()); }); - return segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_FG); + return segmentMergeDelta(*dm_context, segment, TaskRunThread::Foreground); } return {}; }; @@ -1264,6 +1262,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const return false; }; auto try_bg_merge = [&]() { + SegmentPtr merge_sibling; if (should_merge && (merge_sibling = getMergeSibling())) { try_add_background_task(BackgroundTask{TaskType::Merge, dm_context, segment, merge_sibling}); @@ -1299,8 +1298,7 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const if (try_fg_split(segment)) return; - SegmentPtr new_segment; - if ((new_segment = try_fg_merge_delta())) + if (SegmentPtr new_segment = try_fg_merge_delta(); new_segment) { // After merge delta, we better check split immediately. if (try_bg_split(new_segment)) @@ -1359,35 +1357,35 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) { switch (task.type) { - case Split: + case TaskType::Split: std::tie(left, right) = segmentSplit(*task.dm_context, task.segment, false); type = ThreadType::BG_Split; break; - case Merge: + case TaskType::Merge: segmentMerge(*task.dm_context, task.segment, task.next_segment, false); type = ThreadType::BG_Merge; break; - case MergeDelta: { + case TaskType::MergeDelta: { FAIL_POINT_PAUSE(FailPoints::pause_before_dt_background_delta_merge); - left = segmentMergeDelta(*task.dm_context, task.segment, TaskRunThread::Thread_BG_Thread_Pool); + left = segmentMergeDelta(*task.dm_context, task.segment, TaskRunThread::BackgroundThreadPool); type = ThreadType::BG_MergeDelta; // Wake up all waiting threads if failpoint is enabled FailPointHelper::disableFailPoint(FailPoints::pause_until_dt_background_delta_merge); break; } - case Compact: + case TaskType::Compact: task.segment->compactDelta(*task.dm_context); left = task.segment; type = ThreadType::BG_Compact; break; - case Flush: + case TaskType::Flush: task.segment->flushCache(*task.dm_context); // After flush cache, better place delta index. task.segment->placeDeltaIndex(*task.dm_context); left = task.segment; type = ThreadType::BG_Flush; break; - case PlaceIndex: + case TaskType::PlaceIndex: task.segment->placeDeltaIndex(*task.dm_context); break; default: @@ -1397,7 +1395,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) catch (const Exception & e) { LOG_ERROR(log, - "Task " << toString(task.type) << " on Segment [" << task.segment->segmentId() + "Task " << DeltaMergeStore::toString(task.type) << " on Segment [" << task.segment->segmentId() << ((bool)task.next_segment ? ("] and [" + DB::toString(task.next_segment->segmentId())) : "") << "] failed. Error msg: " << e.message()); e.rethrow(); @@ -1408,6 +1406,7 @@ bool DeltaMergeStore::handleBackgroundTask(bool heavy) throw; } + // continue to check whether we need to apply more tasks after this task is ended. if (left) checkSegmentUpdate(task.dm_context, left, type); if (right) @@ -1420,14 +1419,14 @@ namespace GC { // Returns true if it needs gc. // This is for optimization purpose, does not mean to be accurate. -bool shouldCompact(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, Logger * log) +bool shouldCompactStable(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ratio_threshold, Logger * log) { // Always GC. if (ratio_threshold < 1.0) return true; auto & property = seg->getStable()->getStableProperty(); - LOG_DEBUG(log, property.toDebugString()); + LOG_TRACE(log, __PRETTY_FUNCTION__ << property.toDebugString()); // No data older than safe_point to GC. if (property.gc_hint_version > gc_safepoint) return false; @@ -1439,6 +1438,33 @@ bool shouldCompact(const SegmentPtr & seg, DB::Timestamp gc_safepoint, double ra return true; return false; } + +bool shouldCompactDeltaWithStable( + const DMContext & context, const SegmentSnapshotPtr & snap, const RowKeyRange & segment_range, double ratio_threshold, Logger * log) +{ + auto actual_delete_range = snap->delta->getSquashDeleteRange().shrink(segment_range); + if (actual_delete_range.none()) + return false; + + auto [delete_rows, delete_bytes] = snap->stable->getApproxRowsAndBytes(context, actual_delete_range); + + auto stable_rows = snap->stable->getRows(); + auto stable_bytes = snap->stable->getBytes(); + + LOG_TRACE(log, + __PRETTY_FUNCTION__ << " delete range rows [" << delete_rows << "], delete_bytes [" << delete_bytes << "] stable_rows [" + << stable_rows << "] stable_bytes [" << stable_bytes << "]"); + + // 1. for small tables, the data may just reside in delta and stable_rows may be 0, + // so the `=` in `>=` is needed to cover the scenario when set tiflash replica of small tables to 0. + // (i.e. `actual_delete_range` is not none, but `delete_rows` and `stable_rows` are both 0). + // 2. the disadvantage of `=` in `>=` is that it may trigger an extra gc when write apply snapshot file to an empty segment, + // because before write apply snapshot file, it will write a delete range first, and will meet the following gc criteria. + // But the cost should be really minor because merge delta on an empty segment should be very fast. + // What's more, we can ignore this kind of delete range in future to avoid this extra gc. + bool should_compact = (delete_rows >= stable_rows * ratio_threshold) || (delete_bytes >= stable_bytes * ratio_threshold); + return should_compact; +} } // namespace GC UInt64 DeltaMergeStore::onSyncGc(Int64 limit) @@ -1455,7 +1481,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) if (segments.size() == 1) { const auto & seg = segments.begin()->second; - if (seg->getStable()->getRows() == 0) + if (seg->getEstimatedRows() == 0) return 0; } } @@ -1463,16 +1489,19 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) DB::Timestamp gc_safe_point = latest_gc_safe_point.load(std::memory_order_acquire); LOG_DEBUG(log, "GC on table " << table_name << " start with key: " << next_gc_check_key.toDebugString() - << ", gc_safe_point: " << gc_safe_point); + << ", gc_safe_point: " << gc_safe_point << ", max gc limit: " << limit); UInt64 check_segments_num = 0; Int64 gc_segments_num = 0; while (gc_segments_num < limit) { - SegmentPtr segment; // If the store is shut down, give up running GC on it. if (shutdown_called.load(std::memory_order_relaxed)) break; + + auto dm_context = newDMContext(global_context, global_context.getSettingsRef()); + SegmentPtr segment; + SegmentSnapshotPtr segment_snap; { std::shared_lock lock(read_write_mutex); @@ -1487,21 +1516,20 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) segment = segment_it->second; next_gc_check_key = segment_it->first.toRowKeyValue(); + segment_snap = segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); } - if (segment->hasAbandoned()) - continue; - - if (segment->getLastCheckGCSafePoint() >= gc_safe_point) + assert(segment != nullptr); + if (segment->hasAbandoned() || segment->getLastCheckGCSafePoint() >= gc_safe_point || segment_snap == nullptr) continue; const auto segment_id = segment->segmentId(); RowKeyRange segment_range = segment->getRowKeyRange(); - if (segment->getDelta()->isUpdating()) + + // meet empty segment, try merge it + if (segment_snap->getRows() == 0) { - LOG_DEBUG(log, - "GC is skipped Segment [" << segment_id << "] [range=" << segment_range.toDebugString() << "] [table=" << table_name - << "]"); + checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC); continue; } @@ -1511,9 +1539,8 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) // On the other hand, if it should do DeltaMerge using this gc_safe_point, and the DeltaMerge is interruptted by other process, // it's still worth to wait another gc_safe_point to check this segment again. segment->setLastCheckGCSafePoint(gc_safe_point); - - auto dm_context = newDMContext(global_context, global_context.getSettingsRef()); dm_context->min_version = gc_safe_point; + // calculate StableProperty if needed if (!segment->getStable()->isStablePropertyCached()) segment->getStable()->calculateStableProperty(*dm_context, segment_range, isCommonHandle()); @@ -1522,22 +1549,32 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit) { // Check whether we should apply gc on this segment const bool should_compact - = GC::shouldCompact(segment, gc_safe_point, global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc, log); + = GC::shouldCompactStable( + segment, gc_safe_point, global_context.getSettingsRef().dt_bg_gc_ratio_threhold_to_trigger_gc, log) + || GC::shouldCompactDeltaWithStable(*dm_context, + segment_snap, + segment_range, + global_context.getSettingsRef().dt_bg_gc_delta_delete_ratio_to_trigger_gc, + log); bool finish_gc_on_segment = false; if (should_compact) { - ThreadType type = ThreadType::BG_GC; - segment = segmentMergeDelta(*dm_context, segment, TaskRunThread::Thread_BG_GC); - if (segment) + if (segment = segmentMergeDelta(*dm_context, segment, TaskRunThread::BackgroundGCThread, segment_snap); segment) { // Continue to check whether we need to apply more tasks on this segment - checkSegmentUpdate(dm_context, segment, type); + checkSegmentUpdate(dm_context, segment, ThreadType::BG_GC); gc_segments_num++; finish_gc_on_segment = true; LOG_INFO(log, "GC-merge-delta done Segment [" << segment_id << "] [range=" << segment_range.toDebugString() << "] [table=" << table_name << "]"); } + else + { + LOG_INFO(log, + "GC aborted on Segment [" << segment_id << "] [range=" << segment_range.toDebugString() + << "] [table=" << table_name << "]"); + } } if (!finish_gc_on_segment) LOG_DEBUG(log, @@ -1568,7 +1605,7 @@ SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentP { std::shared_lock lock(read_write_mutex); - if (!isSegmentValid(segment)) + if (!isSegmentValid(lock, segment)) { LOG_DEBUG(log, "Give up segment [" << segment->segmentId() << "] split"); return {}; @@ -1620,7 +1657,7 @@ SegmentPair DeltaMergeStore::segmentSplit(DMContext & dm_context, const SegmentP { std::unique_lock lock(read_write_mutex); - if (!isSegmentValid(segment)) + if (!isSegmentValid(lock, segment)) { LOG_DEBUG(log, "Give up segment [" << segment->segmentId() << "] split"); wbs.setRollback(); @@ -1691,12 +1728,12 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le { std::shared_lock lock(read_write_mutex); - if (!isSegmentValid(left)) + if (!isSegmentValid(lock, left)) { LOG_DEBUG(log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]"); return; } - if (!isSegmentValid(right)) + if (!isSegmentValid(lock, right)) { LOG_DEBUG(log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]"); return; @@ -1735,7 +1772,7 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le { std::unique_lock lock(read_write_mutex); - if (!isSegmentValid(left) || !isSegmentValid(right)) + if (!isSegmentValid(lock, left) || !isSegmentValid(lock, right)) { LOG_DEBUG(log, "Give up merge segments left [" << left->segmentId() << "], right [" << right->segmentId() << "]"); wbs.setRollback(); @@ -1778,23 +1815,29 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le check(dm_context.db_context); } -SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, const TaskRunThread run_thread) +SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, + const SegmentPtr & segment, + const TaskRunThread run_thread, + SegmentSnapshotPtr segment_snap) { LOG_DEBUG(log, toString(run_thread) << " merge delta, segment [" << segment->segmentId() << "], safe point:" << dm_context.min_version); - SegmentSnapshotPtr segment_snap; - ColumnDefinesPtr schema_snap; + ColumnDefinesPtr schema_snap; + { std::shared_lock lock(read_write_mutex); - if (!isSegmentValid(segment)) + if (!isSegmentValid(lock, segment)) { LOG_DEBUG(log, "Give up merge delta, segment [" << segment->segmentId() << "]"); return {}; } - segment_snap = segment->createSnapshot(dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + // Try to generate a new snapshot if there is no pre-allocated one if (!segment_snap) + segment_snap = segment->createSnapshot(dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge); + + if (unlikely(!segment_snap)) { LOG_DEBUG(log, "Give up merge delta, segment [" << segment->segmentId() << "]"); return {}; @@ -1812,13 +1855,13 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const Segm switch (run_thread) { - case TaskRunThread::Thread_BG_Thread_Pool: + case TaskRunThread::BackgroundThreadPool: GET_METRIC(dm_context.metrics, tiflash_storage_subtask_count, type_delta_merge).Increment(); break; - case TaskRunThread::Thread_FG: + case TaskRunThread::Foreground: GET_METRIC(dm_context.metrics, tiflash_storage_subtask_count, type_delta_merge_fg).Increment(); break; - case TaskRunThread::Thread_BG_GC: + case TaskRunThread::BackgroundGCThread: GET_METRIC(dm_context.metrics, tiflash_storage_subtask_count, type_delta_merge_bg_gc).Increment(); break; default: @@ -1829,15 +1872,15 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const Segm SCOPE_EXIT({ switch (run_thread) { - case TaskRunThread::Thread_BG_Thread_Pool: + case TaskRunThread::BackgroundThreadPool: GET_METRIC(dm_context.metrics, tiflash_storage_subtask_duration_seconds, type_delta_merge) .Observe(watch_delta_merge.elapsedSeconds()); break; - case TaskRunThread::Thread_FG: + case TaskRunThread::Foreground: GET_METRIC(dm_context.metrics, tiflash_storage_subtask_duration_seconds, type_delta_merge_fg) .Observe(watch_delta_merge.elapsedSeconds()); break; - case TaskRunThread::Thread_BG_GC: + case TaskRunThread::BackgroundGCThread: GET_METRIC(dm_context.metrics, tiflash_storage_subtask_duration_seconds, type_delta_merge_bg_gc) .Observe(watch_delta_merge.elapsedSeconds()); break; @@ -1856,7 +1899,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const Segm { std::unique_lock read_write_lock(read_write_mutex); - if (!isSegmentValid(segment)) + if (!isSegmentValid(read_write_lock, segment)) { LOG_DEBUG(log, "Give up merge delta, segment [" << segment->segmentId() << "]"); wbs.setRollback(); @@ -1901,7 +1944,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, const Segm return new_segment; } -bool DeltaMergeStore::isSegmentValid(const SegmentPtr & segment) +bool DeltaMergeStore::doIsSegmentValid(const SegmentPtr & segment) { if (segment->hasAbandoned()) { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 2901cb390af..5186459b680 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -172,9 +172,9 @@ class DeltaMergeStore : private boost::noncopyable enum TaskRunThread { - Thread_BG_Thread_Pool, - Thread_FG, - Thread_BG_GC, + BackgroundThreadPool, + Foreground, + BackgroundGCThread, }; static std::string toString(ThreadType type) @@ -204,21 +204,6 @@ class DeltaMergeStore : private boost::noncopyable } } - static std::string toString(TaskRunThread type) - { - switch (type) - { - case Thread_BG_Thread_Pool: - return "BackgroundThreadPool"; - case Thread_FG: - return "Foreground"; - case Thread_BG_GC: - return "BackgroundGCThread"; - default: - return "Unknown"; - } - } - static std::string toString(TaskType type) { switch (type) @@ -240,6 +225,21 @@ class DeltaMergeStore : private boost::noncopyable } } + static std::string toString(TaskRunThread type) + { + switch (type) + { + case BackgroundThreadPool: + return "BackgroundThreadPool"; + case Foreground: + return "Foreground"; + case BackgroundGCThread: + return "BackgroundGCThread"; + default: + return "Unknown"; + } + } + struct BackgroundTask { TaskType type; @@ -403,7 +403,7 @@ class DeltaMergeStore : private boost::noncopyable private: #endif - DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id=""); + DMContextPtr newDMContext(const Context & db_context, const DB::Settings & db_settings, const String & query_id = ""); static bool pkIsHandle(const ColumnDefine & handle_define) { return handle_define.id != EXTRA_HANDLE_COLUMN_ID; } @@ -414,13 +414,19 @@ class DeltaMergeStore : private boost::noncopyable SegmentPair segmentSplit(DMContext & dm_context, const SegmentPtr & segment, bool is_foreground); void segmentMerge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right, bool is_foreground); - SegmentPtr segmentMergeDelta(DMContext & dm_context, const SegmentPtr & segment, const TaskRunThread thread); + SegmentPtr segmentMergeDelta(DMContext & dm_context, + const SegmentPtr & segment, + const TaskRunThread thread, + SegmentSnapshotPtr segment_snap = nullptr); bool updateGCSafePoint(); bool handleBackgroundTask(bool heavy); - bool isSegmentValid(const SegmentPtr & segment); + // isSegmentValid should be protected by lock on `read_write_mutex` + inline bool isSegmentValid(std::shared_lock &, const SegmentPtr & segment) { return doIsSegmentValid(segment); } + inline bool isSegmentValid(std::unique_lock &, const SegmentPtr & segment) { return doIsSegmentValid(segment); } + bool doIsSegmentValid(const SegmentPtr & segment); void restoreStableFiles(); diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index c2b24042c0b..a405030be25 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -138,13 +138,13 @@ class Segment : private boost::noncopyable size_t expected_block_size = DEFAULT_BLOCK_SIZE); /// Return a stream which is suitable for exporting data. - /// reorgize_block: put those rows with the same pk rows into the same block or not. + /// reorganize_block: put those rows with the same pk rows into the same block or not. BlockInputStreamPtr getInputStreamForDataExport(const DMContext & dm_context, const ColumnDefines & columns_to_read, const SegmentSnapshotPtr & segment_snap, const RowKeyRange & data_range, size_t expected_block_size = DEFAULT_BLOCK_SIZE, - bool reorgnize_block = true) const; + bool reorganize_block = true) const; BlockInputStreamPtr getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read, diff --git a/dbms/src/Storages/DeltaMerge/convertColumnTypeHelpers.cpp b/dbms/src/Storages/DeltaMerge/convertColumnTypeHelpers.cpp index 38b03d3de5e..824eb8e9340 100644 --- a/dbms/src/Storages/DeltaMerge/convertColumnTypeHelpers.cpp +++ b/dbms/src/Storages/DeltaMerge/convertColumnTypeHelpers.cpp @@ -16,6 +16,7 @@ namespace DB namespace ErrorCodes { extern const int BAD_ARGUMENTS; +extern const int NOT_IMPLEMENTED; } namespace DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 1c249720754..a2d7c821ff1 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -138,7 +138,6 @@ try { // flush segment segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -357,9 +356,9 @@ try if (merge_delta_after_delete) { - // flush segment for apply delete range + // flush cache before applying merge delete range or the delete range will not be compacted to stable + segment->flushCache(dmContext()); segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -381,6 +380,22 @@ try } in->readSuffix(); } + + // For the case that apply merge delta after delete range, we ensure that data on disk are compacted + if (merge_delta_after_delete) + { + // read raw after delete range + auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + in->readPrefix(); + size_t num_rows = 0; + while (Block block = in->read()) + { + num_rows += block.rows(); + } + in->readSuffix(); + // Only 2 rows are left on disk, others are compacted. + ASSERT_EQ(num_rows, 2UL); + } } CATCH @@ -410,9 +425,8 @@ try } { - // flush segment + // merge delta to create stable segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -424,14 +438,13 @@ try // flush segment segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } if (merge_delta_after_delete) { - // flush segment for apply delete range + // flush cache before applying merge delete range or the delete range will not be compacted to stable + segment->flushCache(dmContext()); segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -453,6 +466,22 @@ try } in->readSuffix(); } + + // For the case that apply merge delta after delete range, we ensure that data on disk are compacted + if (merge_delta_after_delete) + { + // read raw after delete range + auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + in->readPrefix(); + size_t num_rows = 0; + while (Block block = in->read()) + { + num_rows += block.rows(); + } + in->readSuffix(); + // Only 2 rows are left on disk, others are compacted. + ASSERT_EQ(num_rows, 2UL); + } } CATCH @@ -466,7 +495,6 @@ try segment->write(dmContext(), std::move(block)); // flush [0, 50) to segment's stable segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } auto [read_before_delete, merge_delta_after_delete] = GetParam(); @@ -501,9 +529,9 @@ try if (merge_delta_after_delete) { - // flush segment for apply delete range + // flush cache before applying merge delete range or the delete range will not be compacted to stable + segment->flushCache(dmContext()); segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -525,6 +553,22 @@ try } in->readSuffix(); } + + // For the case that apply merge delta after delete range, we ensure that data on disk are compacted + if (merge_delta_after_delete) + { + // read raw after delete range + auto in = segment->getInputStreamRaw(dmContext(), *tableColumns()); + size_t num_rows = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows += block.rows(); + } + in->readSuffix(); + // Only 2 rows are left on disk, others are compacted. + ASSERT_EQ(num_rows, 2UL); + } } CATCH @@ -540,18 +584,25 @@ try } { - // flush segment + // do delta-merge move data to stable segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } + auto check_segment_squash_delete_range = [this](SegmentPtr & segment, const HandleRange & expect_range) { + // set `is_update=false` to get full squash delete range + auto snap = segment->createSnapshot(dmContext(), /*for_update*/ false, CurrentMetrics::DT_SnapshotOfRead); + auto squash_range = snap->delta->getSquashDeleteRange(); + ASSERT_ROWKEY_RANGE_EQ(squash_range, RowKeyRange::fromHandleRange(expect_range)); + }; + { // Test delete range [70, 100) HandleRange del{70, 100}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); - // flush segment + SCOPED_TRACE("check after range: " + del.toDebugString()); // Add trace msg when ASSERT failed + check_segment_squash_delete_range(segment, HandleRange{70, 100}); + segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -581,9 +632,10 @@ try // Test delete range [63, 70) HandleRange del{63, 70}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); - // flush segment + SCOPED_TRACE("check after range: " + del.toDebugString()); + check_segment_squash_delete_range(segment, HandleRange{63, 100}); + segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -612,9 +664,9 @@ try // Test delete range [1, 32) HandleRange del{1, 32}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); - // flush segment + SCOPED_TRACE("check after range: " + del.toDebugString()); + check_segment_squash_delete_range(segment, HandleRange{1, 100}); segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -642,9 +694,9 @@ try // delete should be idempotent HandleRange del{1, 32}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); - // flush segment + SCOPED_TRACE("check after range: " + del.toDebugString()); + check_segment_squash_delete_range(segment, HandleRange{1, 100}); segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -672,9 +724,9 @@ try // There is an overlap range [0, 1) HandleRange del{0, 2}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); - // flush segment + SCOPED_TRACE("check after range: " + del.toDebugString()); + check_segment_squash_delete_range(segment, HandleRange{0, 100}); segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } { @@ -695,6 +747,36 @@ try } in->readSuffix(); } + + { + Block block = DMTestEnv::prepareSimpleWriteBlock(9, 16, false); + segment->write(dmContext(), std::move(block)); + SCOPED_TRACE("check after write"); + // if we write some new data, we can still get the delete range + check_segment_squash_delete_range(segment, HandleRange{0, 100}); + } + + { + // Read after new write + auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + in->readPrefix(); + while (Block block = in->read()) + { + ASSERT_EQ(block.rows(), num_rows_write - 33 + 7); + for (auto & iter : block) + { + auto c = iter.column; + if (iter.name == DMTestEnv::pk_name) + { + EXPECT_EQ(c->getInt(0), 9); + EXPECT_EQ(c->getInt(6), 15); + EXPECT_EQ(c->getInt(7), 32); + EXPECT_EQ(c->getInt(block.rows() - 1), 62); + } + } + } + in->readSuffix(); + } } CATCH @@ -873,7 +955,6 @@ try segment->write(dmContext(), std::move(block)); // flush segment segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } SegmentPtr new_segment = Segment::restoreSegment(dmContext(), segment->segmentId()); @@ -940,7 +1021,6 @@ try { // flush segment segment = segment->mergeDelta(dmContext(), tableColumns()); - ; } for (size_t i = (num_batches_written - 1) * num_rows_per_write + 2; i < num_batches_written * num_rows_per_write; i++) @@ -1033,10 +1113,10 @@ class Segment_test_2 : public Segment_test, public testing::WithParamInterface> genDMFile(DMContext & context, const Block & block) { - auto delegator = context.path_pool.getStableDiskDelegator(); - auto file_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto delegator = context.path_pool.getStableDiskDelegator(); + auto file_id = context.storage_pool.newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); auto input_stream = std::make_shared(block); - auto store_path = delegator.choosePath(); + auto store_path = delegator.choosePath(); DMFileBlockOutputStream::Flags flags; flags.setSingleFile(DMTestEnv::getPseudoRandomNumber() % 2); @@ -1046,9 +1126,9 @@ class Segment_test_2 : public Segment_test, public testing::WithParamInterfacegetBytesOnDisk(), store_path); - auto & pk_column = block.getByPosition(0).column; - auto min_pk = pk_column->getInt(0); - auto max_pk = pk_column->getInt(block.rows() - 1); + auto & pk_column = block.getByPosition(0).column; + auto min_pk = pk_column->getInt(0); + auto max_pk = pk_column->getInt(block.rows() - 1); HandleRange range(min_pk, max_pk + 1); return {RowKeyRange::fromHandleRange(range), {file_id}}; @@ -1073,13 +1153,13 @@ try segment->write(dmContext(), std::move(block)); break; case Segment_test_Mode::V2_FileOnly: { - auto delegate = dmContext().path_pool.getStableDiskDelegator(); - auto file_provider = dmContext().db_context.getFileProvider(); - auto [range, file_ids] = genDMFile(dmContext(), block); - auto file_id = file_ids[0]; - auto file_parent_path = delegate.getDTFilePath(file_id); - auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path); - auto pack = std::make_shared(dmContext(), file, range); + auto delegate = dmContext().path_pool.getStableDiskDelegator(); + auto file_provider = dmContext().db_context.getFileProvider(); + auto [range, file_ids] = genDMFile(dmContext(), block); + auto file_id = file_ids[0]; + auto file_parent_path = delegate.getDTFilePath(file_id); + auto file = DMFile::restore(file_provider, file_id, file_id, file_parent_path); + auto pack = std::make_shared(dmContext(), file, range); WriteBatches wbs(*storage_pool); wbs.data.putExternal(file_id, 0); wbs.writeLogAndData(); diff --git a/dbms/src/Storages/GCManager.cpp b/dbms/src/Storages/GCManager.cpp index bf4f89ac3aa..81997249cc5 100644 --- a/dbms/src/Storages/GCManager.cpp +++ b/dbms/src/Storages/GCManager.cpp @@ -11,10 +11,15 @@ extern const int TABLE_IS_DROPPED; bool GCManager::work() { auto & global_settings = global_context.getSettingsRef(); + if (gc_check_stop_watch.elapsedSeconds() < global_settings.dt_bg_gc_check_interval) + return false; Int64 gc_segments_limit = global_settings.dt_bg_gc_max_segments_to_check_every_round; // limit less than or equal to 0 means no gc if (gc_segments_limit <= 0) + { + gc_check_stop_watch.restart(); return false; + } LOG_INFO(log, "Start GC with table id: " << next_table_id); // Get a storage snapshot with weak_ptrs first @@ -72,6 +77,7 @@ bool GCManager::work() iter = storages.begin(); next_table_id = iter->first; LOG_INFO(log, "End GC and next gc will start with table id: " << next_table_id); + gc_check_stop_watch.restart(); // Always return false return false; } diff --git a/dbms/src/Storages/GCManager.h b/dbms/src/Storages/GCManager.h index 81199d447e9..aa819a22c17 100644 --- a/dbms/src/Storages/GCManager.h +++ b/dbms/src/Storages/GCManager.h @@ -8,7 +8,7 @@ namespace DB class GCManager { public: - GCManager(Context & context) : global_context{context.getGlobalContext()}, log(&Logger::get("GCManager")) {}; + GCManager(Context & context) : global_context{context.getGlobalContext()}, log(&Logger::get("GCManager")){}; ~GCManager() = default; @@ -19,6 +19,8 @@ class GCManager TableID next_table_id = InvalidTableID; + AtomicStopwatch gc_check_stop_watch; + Logger * log; }; } // namespace DB diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp index d932fcde1f3..69da9770cdb 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.cpp @@ -4,25 +4,24 @@ #include #include #include +#include #include #include -#include #include - #include #include #ifdef __linux__ -#include #include +#include #endif namespace CurrentMetrics { - extern const Metric BackgroundPoolTask; - extern const Metric MemoryTrackingInBackgroundProcessingPool; -} +extern const Metric BackgroundPoolTask; +extern const Metric MemoryTrackingInBackgroundProcessingPool; +} // namespace CurrentMetrics namespace DB { @@ -128,9 +127,9 @@ void BackgroundProcessingPool::threadFunction() const auto name = "BkgPool" + std::to_string(tid++); setThreadName(name.data()); is_background_thread = true; - #ifdef __linux__ +#ifdef __linux__ addThreadId(syscall(SYS_gettid)); - #endif +#endif } MemoryTracker memory_tracker; @@ -174,8 +173,8 @@ void BackgroundProcessingPool::threadFunction() { std::unique_lock lock(tasks_mutex); wake_event.wait_for(lock, - std::chrono::duration(sleep_seconds - + std::uniform_real_distribution(0, sleep_seconds_random_part)(rng))); + std::chrono::duration( + sleep_seconds + std::uniform_real_distribution(0, sleep_seconds_random_part)(rng))); continue; } @@ -184,8 +183,9 @@ void BackgroundProcessingPool::threadFunction() if (min_time > current_time) { std::unique_lock lock(tasks_mutex); - wake_event.wait_for(lock, std::chrono::microseconds( - min_time - current_time + std::uniform_int_distribution(0, sleep_seconds_random_part * 1000000)(rng))); + wake_event.wait_for(lock, + std::chrono::microseconds( + min_time - current_time + std::uniform_int_distribution(0, sleep_seconds_random_part * 1000000)(rng))); } std::shared_lock rlock(task->rwlock); @@ -269,4 +269,4 @@ void BackgroundProcessingPool::addThreadId(pid_t tid) thread_ids.push_back(tid); } -} +} // namespace DB diff --git a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h index 68dc72219d5..5386d7a3a2b 100644 --- a/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h +++ b/dbms/src/Storages/MergeTree/BackgroundProcessingPool.h @@ -1,17 +1,18 @@ #pragma once -#include -#include -#include -#include +#include +#include +#include + +#include #include +#include +#include +#include #include +#include #include -#include -#include -#include -#include -#include +#include namespace DB { @@ -49,11 +50,11 @@ class BackgroundProcessingPool /// Read lock is hold when task is executed. std::shared_mutex rwlock; - std::atomic removed {false}; + std::atomic removed{false}; /// only can be invoked by one thread at same time. const bool multi; - std::atomic_bool occupied {false}; + std::atomic_bool occupied{false}; const uint64_t interval_milliseconds; @@ -65,14 +66,19 @@ class BackgroundProcessingPool BackgroundProcessingPool(int size_); - size_t getNumberOfThreads() const - { - return size; - } + size_t getNumberOfThreads() const { return size; } /// if multi == false, this task can only be called by one thread at same time. /// If interval_ms is zero, this task will be scheduled with `sleep_seconds`. /// If interval_ms is not zero, this task will be scheduled with `interval_ms`. + /// + /// But at each scheduled time, there may be multiple threads try to run the same task, + /// and then execute the same task one by one in sequential order(not simultaneously) even if `multi` is false. + /// For example, consider the following case when it's time to schedule a task, + /// 1. thread A get the task, mark the task as occupied and begin to execute it + /// 2. thread B also get the same task + /// 3. thread A finish the execution of the task quickly, release the task and try to update the next schedule time of the task + /// 4. thread B find the task is not occupied and execute the task again almost immediately TaskHandle addTask(const Task & task, const bool multi = true, const size_t interval_ms = 0); void removeTask(const TaskHandle & task); @@ -80,22 +86,23 @@ class BackgroundProcessingPool std::vector getThreadIds(); void addThreadId(pid_t tid); + private: - using Tasks = std::multimap; /// key is desired next time to execute (priority). + using Tasks = std::multimap; /// key is desired next time to execute (priority). using Threads = std::vector; const size_t size; static constexpr double sleep_seconds = 10; static constexpr double sleep_seconds_random_part = 1.0; - Tasks tasks; /// Ordered in priority. + Tasks tasks; /// Ordered in priority. std::mutex tasks_mutex; Threads threads; - std::vector thread_ids; // Linux Thread ID + std::vector thread_ids; // Linux Thread ID std::mutex thread_ids_mtx; - std::atomic shutdown {false}; + std::atomic shutdown{false}; std::condition_variable wake_event; @@ -104,4 +111,4 @@ class BackgroundProcessingPool using BackgroundProcessingPoolPtr = std::shared_ptr; -} +} // namespace DB diff --git a/dbms/src/Storages/Transaction/RegionTable.cpp b/dbms/src/Storages/Transaction/RegionTable.cpp index 74779bef7d6..eee67e8403e 100644 --- a/dbms/src/Storages/Transaction/RegionTable.cpp +++ b/dbms/src/Storages/Transaction/RegionTable.cpp @@ -285,9 +285,12 @@ void RegionTable::removeRegion(const RegionID region_id, bool remove_data, const table.regions.erase(internal_region_it); if (table.regions.empty()) { - /// All regions of this table is removed, the storage maybe drop or pd - /// move it to another node, we can optimize outdated data. - table_to_optimize.insert(table_id); + if (auto & tmt = context->getTMTContext(); !tmt.isBgFlushDisabled()) + { + /// All regions of this table is removed, the storage maybe drop or pd + /// move it to another node, we can optimize outdated data. + table_to_optimize.insert(table_id); + } tables.erase(table_id); } LOG_INFO(log, __FUNCTION__ << ": remove [region " << region_id << "] in RegionTable done");