From 974104762296ca38732e96a71c7eac54a4a39299 Mon Sep 17 00:00:00 2001 From: long2ice Date: Thu, 16 Jan 2025 15:55:08 +0800 Subject: [PATCH] feat: starrocsk plugin add table config (#8268) --- backend/plugins/starrocks/tasks/task_data.go | 12 +++++++++--- backend/plugins/starrocks/tasks/tasks.go | 17 +++++++++++++++-- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/backend/plugins/starrocks/tasks/task_data.go b/backend/plugins/starrocks/tasks/task_data.go index eb106bdab99..f178cba6a4c 100644 --- a/backend/plugins/starrocks/tasks/task_data.go +++ b/backend/plugins/starrocks/tasks/task_data.go @@ -17,6 +17,11 @@ limitations under the License. package tasks +type TableConfig struct { + IncludedColumns []string `mapstructure:"included_columns"` + ExcludedColumns []string `mapstructure:"excluded_columns"` +} + type StarRocksConfig struct { SourceType string `mapstructure:"source_type"` SourceDsn string `mapstructure:"source_dsn"` @@ -29,8 +34,9 @@ type StarRocksConfig struct { BeHost string `mapstructure:"be_host"` BePort int `mapstructure:"be_port"` Tables []string - BatchSize int `mapstructure:"batch_size"` - OrderBy map[string]string `mapstructure:"order_by"` - DomainLayer string `mapstructure:"domain_layer"` + TableConfigs map[string]TableConfig `mapstructure:"table_configs"` + BatchSize int `mapstructure:"batch_size"` + OrderBy map[string]string `mapstructure:"order_by"` + DomainLayer string `mapstructure:"domain_layer"` Extra map[string]string } diff --git a/backend/plugins/starrocks/tasks/tasks.go b/backend/plugins/starrocks/tasks/tasks.go index cc33db2826d..b6c60e03dac 100644 --- a/backend/plugins/starrocks/tasks/tasks.go +++ b/backend/plugins/starrocks/tasks/tasks.go @@ -29,6 +29,8 @@ import ( "strings" "time" + "golang.org/x/exp/slices" + "github.com/apache/incubator-devlake/core/dal" "github.com/apache/incubator-devlake/core/errors" "github.com/apache/incubator-devlake/core/plugin" @@ -138,7 +140,6 @@ func createTmpTableInStarrocks(dc *DataConfigParams) (map[string]string, string, table := dc.SrcTableName starrocksTable := dc.DestTableName starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable) - columnMetas, err := db.GetColumns(&Table{name: table}, nil) updateColumn := config.UpdateColumn columnMap := make(map[string]string) @@ -163,8 +164,21 @@ func createTmpTableInStarrocks(dc *DataConfigParams) (map[string]string, string, } else { return nil, "", false, errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect())) } + tableConfig, ok := config.TableConfigs[table] for _, cm := range columnMetas { name := cm.Name() + if ok { + if len(tableConfig.ExcludedColumns) > 0 { + if slices.Contains(tableConfig.ExcludedColumns, name) { + continue + } + } + if len(tableConfig.IncludedColumns) > 0 { + if !slices.Contains(tableConfig.IncludedColumns, name) { + continue + } + } + } if name == updateColumn { // check update column to detect skip or not var updatedFrom time.Time @@ -276,7 +290,6 @@ func copyDataToDst(dc *DataConfigParams, columnMap map[string]string, orderBy st } else { return err } - } defer rows.Close()