Skip to content

Commit

Permalink
[fix] (inverted index) Disallow variant columns from using inverted i…
Browse files Browse the repository at this point in the history
…ndex format v1 (#43599)

### What problem does this PR solve?

Problem Summary:
1. When the inverted index of a variant column uses storage format v1,
schema changes can cause some segments to lack corresponding index
files.

2. By using storage format v2 for inverted indexes, all indexes
correspond to a single file, and the corresponding files will always
exist regardless of whether the variant includes subcolumn indexes.

### Release note

When creating an inverted index for a variant column, file format v1 is
not supported
  • Loading branch information
csun5285 authored Nov 12, 2024
1 parent 3fffc61 commit 127253a
Show file tree
Hide file tree
Showing 16 changed files with 191 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.ClearAlterTask;
import org.apache.doris.task.UpdateTabletMetaInfoTask;
import org.apache.doris.thrift.TInvertedIndexFileStorageFormat;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TTaskType;
Expand Down Expand Up @@ -2730,12 +2731,15 @@ private boolean processAddIndex(CreateIndexClause alterClause, OlapTable olapTab
+ " ) already exist.");
}
}

boolean disableInvertedIndexV1ForVariant = olapTable.getInvertedIndexFileStorageFormat()
== TInvertedIndexFileStorageFormat.V1 && ConnectContext.get().getSessionVariable()
.getDisableInvertedIndexV1ForVaraint();
for (String col : indexDef.getColumns()) {
Column column = olapTable.getColumn(col);
if (column != null) {
indexDef.checkColumn(column, olapTable.getKeysType(),
olapTable.getTableProperty().getEnableUniqueKeyMergeOnWrite());
olapTable.getTableProperty().getEnableUniqueKeyMergeOnWrite(),
disableInvertedIndexV1ForVariant);
indexDef.getColumnUniqueIds().add(column.getUniqueId());
} else {
throw new DdlException("index column does not exist in table. invalid column: " + col);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.rewrite.ExprRewriteRule;
import org.apache.doris.rewrite.ExprRewriter;
import org.apache.doris.thrift.TInvertedIndexFileStorageFormat;

import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -598,7 +599,9 @@ public void analyze(Analyzer analyzer) throws UserException {
if (CollectionUtils.isNotEmpty(indexDefs)) {
Set<String> distinct = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
Set<Pair<IndexType, List<String>>> distinctCol = new HashSet<>();

boolean disableInvertedIndexV1ForVariant = PropertyAnalyzer.analyzeInvertedIndexFileStorageFormat(
new HashMap<>(properties)) == TInvertedIndexFileStorageFormat.V1
&& ConnectContext.get().getSessionVariable().getDisableInvertedIndexV1ForVaraint();
for (IndexDef indexDef : indexDefs) {
indexDef.analyze();
if (!engineName.equalsIgnoreCase(DEFAULT_ENGINE_NAME)) {
Expand All @@ -608,7 +611,8 @@ public void analyze(Analyzer analyzer) throws UserException {
boolean found = false;
for (Column column : columns) {
if (column.getName().equalsIgnoreCase(indexColName)) {
indexDef.checkColumn(column, getKeysDesc().getKeysType(), enableUniqueKeyMergeOnWrite);
indexDef.checkColumn(column, getKeysDesc().getKeysType(),
enableUniqueKeyMergeOnWrite, disableInvertedIndexV1ForVariant);
found = true;
break;
}
Expand Down
12 changes: 10 additions & 2 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/IndexDef.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ public boolean isInvertedIndex() {
return (this.indexType == IndexType.INVERTED);
}

public void checkColumn(Column column, KeysType keysType, boolean enableUniqueKeyMergeOnWrite)
throws AnalysisException {
public void checkColumn(Column column, KeysType keysType, boolean enableUniqueKeyMergeOnWrite,
boolean disableInvertedIndexV1ForVariant) throws AnalysisException {
if (indexType == IndexType.BITMAP || indexType == IndexType.INVERTED || indexType == IndexType.BLOOMFILTER
|| indexType == IndexType.NGRAM_BF) {
String indexColName = column.getName();
Expand All @@ -225,6 +225,14 @@ public void checkColumn(Column column, KeysType keysType, boolean enableUniqueKe
throw new AnalysisException(colType + " is not supported in " + indexType.toString() + " index. "
+ "invalid index: " + indexName);
}

// In inverted index format v1, each subcolumn of a variant has its own index file, leading to high IOPS.
// when the subcolumn type changes, it may result in missing files, causing link file failure.
if (colType.isVariantType() && disableInvertedIndexV1ForVariant) {
throw new AnalysisException(colType + " is not supported in inverted index format V1,"
+ "Please set properties(\"inverted_index_storage_format\"= \"v2\"),"
+ "or upgrade to a newer version");
}
if (!column.isKey()) {
if (keysType == KeysType.AGG_KEYS) {
throw new AnalysisException("index should only be used in columns of DUP_KEYS/UNIQUE_KEYS table"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.doris.nereids.util.TypeCoercionUtils;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TInvertedIndexFileStorageFormat;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
Expand Down Expand Up @@ -607,6 +608,14 @@ public void validate(ConnectContext ctx) {
if (!indexes.isEmpty()) {
Set<String> distinct = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
Set<Pair<IndexDef.IndexType, List<String>>> distinctCol = new HashSet<>();
boolean disableInvertedIndexV1ForVariant = false;
try {
disableInvertedIndexV1ForVariant = PropertyAnalyzer.analyzeInvertedIndexFileStorageFormat(
new HashMap<>(properties)) == TInvertedIndexFileStorageFormat.V1
&& ConnectContext.get().getSessionVariable().getDisableInvertedIndexV1ForVaraint();
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}

for (IndexDefinition indexDef : indexes) {
indexDef.validate();
Expand All @@ -618,7 +627,8 @@ public void validate(ConnectContext ctx) {
boolean found = false;
for (ColumnDefinition column : columns) {
if (column.getName().equalsIgnoreCase(indexColName)) {
indexDef.checkColumn(column, keysType, isEnableMergeOnWrite);
indexDef.checkColumn(column, keysType, isEnableMergeOnWrite,
disableInvertedIndexV1ForVariant);
found = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public IndexDefinition(String name, List<String> cols, String indexTypeName,
* checkColumn
*/
public void checkColumn(ColumnDefinition column, KeysType keysType,
boolean enableUniqueKeyMergeOnWrite) throws AnalysisException {
boolean enableUniqueKeyMergeOnWrite, boolean disableInvertedIndexV1ForVariant) throws AnalysisException {
if (indexType == IndexType.BITMAP || indexType == IndexType.INVERTED
|| indexType == IndexType.BLOOMFILTER || indexType == IndexType.NGRAM_BF) {
String indexColName = column.getName();
Expand All @@ -106,6 +106,14 @@ public void checkColumn(ColumnDefinition column, KeysType keysType,
throw new AnalysisException(colType + " is not supported in " + indexType.toString()
+ " index. " + "invalid index: " + name);
}

// In inverted index format v1, each subcolumn of a variant has its own index file, leading to high IOPS.
// when the subcolumn type changes, it may result in missing files, causing link file failure.
if (colType.isVariantType() && disableInvertedIndexV1ForVariant) {
throw new AnalysisException(colType + " is not supported in inverted index format V1,"
+ "Please set properties(\"inverted_index_storage_format\"= \"v2\"),"
+ "or upgrade to a newer version");
}
if (!column.isKey()) {
if (keysType == KeysType.AGG_KEYS) {
throw new AnalysisException("index should only be used in columns of DUP_KEYS/UNIQUE_KEYS table"
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ public class SessionVariable implements Serializable, Writable {

public static final String MAX_FETCH_REMOTE_TABLET_COUNT = "max_fetch_remote_schema_tablet_count";

public static final String DISABLE_INVERTED_INDEX_V1_FOR_VARIANT = "disable_inverted_index_v1_for_variant";

// CLOUD_VARIABLES_BEGIN
public static final String CLOUD_CLUSTER = "cloud_cluster";
public static final String DISABLE_EMPTY_PARTITION_PRUNE = "disable_empty_partition_prune";
Expand Down Expand Up @@ -1224,6 +1226,9 @@ public enum IgnoreSplitType {
@VariableMgr.VarAttr(name = WAIT_FULL_BLOCK_SCHEDULE_TIMES)
public int waitFullBlockScheduleTimes = 2;

@VariableMgr.VarAttr(name = DISABLE_INVERTED_INDEX_V1_FOR_VARIANT)
private boolean disableInvertedIndexV1ForVaraint = true;

public int getBeNumberForTest() {
return beNumberForTest;
}
Expand Down Expand Up @@ -4561,4 +4566,12 @@ public boolean isEnableCooldownReplicaAffinity() {
public boolean isUseSerialExchange() {
return useSerialExchange && getEnableLocalExchange();
}

public void setDisableInvertedIndexV1ForVaraint(boolean disableInvertedIndexV1ForVaraint) {
this.disableInvertedIndexV1ForVaraint = disableInvertedIndexV1ForVaraint;
}

public boolean getDisableInvertedIndexV1ForVaraint() {
return disableInvertedIndexV1ForVaraint;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.doris.analysis;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -60,6 +63,16 @@ public void testAnalyzeExpection() throws AnalysisException {
} catch (AnalysisException e) {
Assert.assertTrue(e instanceof AnalysisException);
}
try {
def = new IndexDef("variant_index", false, Lists.newArrayList("col1"),
IndexDef.IndexType.INVERTED, null, "comment");
boolean isIndexFormatV1 = true;
def.checkColumn(new Column("col1", PrimitiveType.VARIANT), KeysType.UNIQUE_KEYS, true, isIndexFormatV1);
Assert.fail("No exception throws.");
} catch (AnalysisException e) {
Assert.assertTrue(e instanceof AnalysisException);
Assert.assertTrue(e.getMessage().contains("not supported in inverted index format V1"));
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.plans.commands;

import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
import org.apache.doris.nereids.trees.plans.commands.info.IndexDefinition;
import org.apache.doris.nereids.types.VariantType;

import com.google.common.collect.Lists;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class IndexDefinitionTest {
@Test
void testVariantIndexFormatV1() throws AnalysisException {
IndexDefinition def = new IndexDefinition("variant_index", Lists.newArrayList("col1"), "INVERTED",
null, "comment");
try {
boolean isIndexFormatV1 = true;
def.checkColumn(new ColumnDefinition("col1", VariantType.INSTANCE, false, AggregateType.NONE, true,
null, "comment"), KeysType.UNIQUE_KEYS, true, isIndexFormatV1);
Assertions.fail("No exception throws.");
} catch (AnalysisException e) {
Assertions.assertTrue(e instanceof AnalysisException);
Assertions.assertTrue(e.getMessage().contains("not supported in inverted index format V1"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ suite("test_single_compaction_with_variant_inverted", "p2") {


sql """ DROP TABLE IF EXISTS ${tableName}; """
sql """ set disable_inverted_index_v1_for_variant = false """
sql """
CREATE TABLE ${tableName} (
`id` int(11) NULL,
Expand All @@ -150,6 +151,7 @@ suite("test_single_compaction_with_variant_inverted", "p2") {
"compaction_policy" = "time_series"
);
"""
sql """ set disable_inverted_index_v1_for_variant = true """

def tablets = sql_return_maparray """ show tablets from ${tableName}; """

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ suite("test_insert_with_index", "p0") {
}

set_be_config("inverted_index_ram_dir_enable", "true")
sql """ set disable_inverted_index_v1_for_variant = false """
test.call("V1")
sql """ set disable_inverted_index_v1_for_variant = true """
test.call("V2")
set_be_config("inverted_index_ram_dir_enable", "false")
sql """ set disable_inverted_index_v1_for_variant = false """
test.call("V1")
sql """ set disable_inverted_index_v1_for_variant = true """
test.call("V2")
set_be_config("inverted_index_ram_dir_enable", "true")
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,14 @@ suite("test_stream_load_with_inverted_index_p0", "p0") {
}

set_be_config("inverted_index_ram_dir_enable", "true")
sql """ set disable_inverted_index_v1_for_variant = false """
test.call("V1")
sql """ set disable_inverted_index_v1_for_variant = true """
test.call("V2")
set_be_config("inverted_index_ram_dir_enable", "false")
sql """ set disable_inverted_index_v1_for_variant = false """
test.call("V1")
sql """ set disable_inverted_index_v1_for_variant = true """
test.call("V2")
set_be_config("inverted_index_ram_dir_enable", "true")
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ suite("test_show_nested_index_file_http_action_with_variant", "nonConcurrent,p0"
def tableName = "test_show_nested_index_file_http_action_with_variant_" + format

sql "DROP TABLE IF EXISTS ${tableName}"
sql """ set disable_inverted_index_v1_for_variant = false """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
k bigint,
Expand All @@ -74,6 +75,7 @@ suite("test_show_nested_index_file_http_action_with_variant", "nonConcurrent,p0"
DISTRIBUTED BY HASH(k) BUCKETS 1
properties("replication_num" = "1", "disable_auto_compaction" = "true", "inverted_index_storage_format" = "${format}");
"""
sql """ set disable_inverted_index_v1_for_variant = true """
load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""")
load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ suite("test_variant_index_format_v1", "p2") {

def table_name = "github_events"
sql """DROP TABLE IF EXISTS ${table_name}"""
sql """ set disable_inverted_index_v1_for_variant = false """
sql """
CREATE TABLE IF NOT EXISTS ${table_name} (
k bigint,
Expand All @@ -70,7 +71,7 @@ suite("test_variant_index_format_v1", "p2") {
DISTRIBUTED BY HASH(k) BUCKETS 1
properties("replication_num" = "1", "disable_auto_compaction" = "true", "inverted_index_storage_format" = "V1");
"""

sql """ set disable_inverted_index_v1_for_variant = true """
set_be_config.call("memory_limitation_per_thread_for_schema_change_bytes", "6294967296")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""")
load_json_data.call(table_name, """${getS3Url() + '/regression/gharchive.m/2015-01-01-1.json'}""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ suite("test_single_replica_load", "p2") {
def tableName = "test_single_replica_load"

sql "DROP TABLE IF EXISTS ${tableName}"
sql """ set disable_inverted_index_v1_for_variant = false """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
k bigint,
Expand All @@ -61,6 +62,7 @@ suite("test_single_replica_load", "p2") {
DISTRIBUTED BY HASH(k) BUCKETS 1
properties("replication_num" = "2", "disable_auto_compaction" = "true", "inverted_index_storage_format" = "V1");
"""
sql """ set disable_inverted_index_v1_for_variant = true """
load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""")
load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""")
load_json_data.call(tableName, """${getS3Url() + '/regression/gharchive.m/2015-01-01-0.json'}""")
Expand Down
2 changes: 1 addition & 1 deletion regression-test/suites/variant_p0/load.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ suite("regression_test_variant", "p0"){
"is_being_synced" = "false",
"storage_medium" = "hdd",
"storage_format" = "V2",
"inverted_index_storage_format" = "V1",
"inverted_index_storage_format" = "V2",
"enable_unique_key_merge_on_write" = "true",
"light_schema_change" = "true",
"store_row_column" = "true",
Expand Down
Loading

0 comments on commit 127253a

Please sign in to comment.