Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](cloud) introduce copy into #32759

Merged
merged 6 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion be/src/vec/exec/format/csv/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,13 @@ Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
// Only check for load task. For query task, the non exist column will be filled "null".
// if actual column number in csv file is not equal to _file_slot_descs.size()
// then filter this line.
if (_split_values.size() != _file_slot_descs.size()) {
bool ignore_col = false;
ignore_col = _params.__isset.file_attributes &&
_params.file_attributes.__isset.ignore_csv_redundant_col &&
_params.file_attributes.ignore_csv_redundant_col;

if ((!ignore_col && _split_values.size() != _file_slot_descs.size()) ||
(ignore_col && _split_values.size() < _file_slot_descs.size())) {
std::string cmp_str =
_split_values.size() > _file_slot_descs.size() ? "more than" : "less than";
RETURN_IF_ERROR(_state->append_error_msg_to_file(
Expand Down
34 changes: 34 additions & 0 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2630,6 +2630,40 @@ public static boolean isNotCloudMode() {
@ConfField
public static int cloud_txn_tablet_batch_size = 50;

/**
* Default number of waiting copy jobs for the whole cluster
*/
@ConfField(mutable = true)
public static int cluster_max_waiting_copy_jobs = 100;

/**
* Default number of max file num for per copy into job
*/
@ConfField(mutable = true)
public static int max_file_num_per_copy_into_job = 50;

/**
* Default number of max meta size for per copy into job
*/
@ConfField(mutable = true)
public static int max_meta_size_per_copy_into_job = 51200;

// 0 means no limit
@ConfField(mutable = true)
public static int cloud_max_copy_job_per_table = 10000;

@ConfField(mutable = true)
public static int cloud_filter_copy_file_num_limit = 100;

@ConfField(mutable = true, masterOnly = true)
public static boolean cloud_delete_loaded_internal_stage_files = false;

@ConfField(mutable = false)
public static int cloud_copy_txn_conflict_error_retry_num = 5;

@ConfField(mutable = false)
public static int cloud_copy_into_statement_submitter_threads_num = 64;

@ConfField
public static int drop_user_notify_ms_max_times = 86400;

Expand Down
116 changes: 114 additions & 2 deletions fe/fe-core/src/main/cup/sql_parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Version;
import org.apache.doris.cloud.analysis.UseCloudClusterStmt;
import org.apache.doris.cloud.proto.Cloud.StagePB;
import org.apache.doris.mysql.MysqlPassword;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.policy.PolicyTypeEnum;
Expand Down Expand Up @@ -600,6 +601,8 @@ terminal String
KW_SPLIT,
KW_SQL,
KW_SQL_BLOCK_RULE,
KW_STAGE,
KW_STAGES,
KW_START,
KW_STARTS,
KW_STATS,
Expand Down Expand Up @@ -707,7 +710,7 @@ nonterminal StatementBase stmt, show_stmt, show_param, help_stmt, load_stmt,
use_stmt, use_cloud_cluster_stmt, kill_stmt, drop_stmt, recover_stmt, grant_stmt, revoke_stmt, create_stmt, set_stmt, sync_stmt, cancel_stmt, cancel_param, delete_stmt,
switch_stmt, transaction_stmt, unsupported_stmt, export_stmt, admin_stmt, truncate_stmt,
import_columns_stmt, import_delete_on_stmt, import_sequence_stmt, import_where_stmt, install_plugin_stmt, uninstall_plugin_stmt,
import_preceding_filter_stmt, unlock_tables_stmt, lock_tables_stmt, refresh_stmt, clean_stmt, analyze_stmt, show_mtmv_stmt, kill_analysis_job_stmt, insert_overwrite_stmt;
import_preceding_filter_stmt, unlock_tables_stmt, lock_tables_stmt, refresh_stmt, clean_stmt, analyze_stmt, show_mtmv_stmt, kill_analysis_job_stmt, insert_overwrite_stmt, copy_stmt;

nonterminal FromClause opt_using_clause;

Expand All @@ -725,7 +728,7 @@ nonterminal ValueList value_clause;
// No return.
nonterminal describe_command, opt_full, opt_inner, opt_outer, from_or_in, keys_or_index, opt_storage, opt_wild_where,
charset, equal, transaction_characteristics, isolation_level,
transaction_access_mode, isolation_types;
transaction_access_mode, isolation_types, opt_where;

// String
nonterminal String user, opt_user, opt_using_charset;
Expand Down Expand Up @@ -985,6 +988,12 @@ nonterminal Long opt_auto_inc_init_value;
// workload policy/group
nonterminal String policy_condition_op, policy_condition_value;

// copy into
nonterminal CopyFromParam copy_from_param;
nonterminal String stage_name;
nonterminal StageAndPattern stage_and_pattern;
nonterminal List<Expr> copy_select_expr_list;

precedence nonassoc COMMA;
precedence nonassoc STRING_LITERAL;
precedence nonassoc KW_COLUMNS;
Expand Down Expand Up @@ -1251,6 +1260,8 @@ stmt ::=
{:
RESULT = new EmptyStmt();
:}
| copy_stmt:stmt
{: RESULT = stmt; :}
;

refresh_stmt ::=
Expand Down Expand Up @@ -1977,6 +1988,11 @@ create_stmt ::=
{:
RESULT = new AlterTableStmt(tableName, Lists.newArrayList(new BuildIndexClause(tableName, new IndexDef(indexName, partitionNames, true), false)));
:}
/* stage */
| KW_CREATE KW_STAGE opt_if_not_exists:ifNotExists ident:stageName KW_PROPERTIES opt_key_value_map:properties
{:
RESULT = new CreateStageStmt(ifNotExists, stageName, properties);
:}
;

channel_desc_list ::=
Expand Down Expand Up @@ -2325,6 +2341,62 @@ job_label ::=
:}
;

// Copy Statement
copy_stmt ::=
KW_COPY KW_INTO opt_select_hints:hints table_name:name opt_col_list:cols KW_FROM copy_from_param:copyFromParam KW_PROPERTIES opt_key_value_map:properties
{:
RESULT = new CopyStmt(name, cols, copyFromParam, properties, hints);
:}
| KW_COPY KW_INTO opt_select_hints:hints table_name:name opt_col_list:cols KW_FROM copy_from_param:copyFromParam
{:
RESULT = new CopyStmt(name, cols, copyFromParam, new HashMap(), hints);
:}
;

copy_from_param ::=
stage_and_pattern:stage
{:
RESULT = new CopyFromParam(stage);
:}
| LPAREN KW_SELECT copy_select_expr_list:exprList KW_FROM stage_and_pattern:stage where_clause:whereExpr RPAREN
{:
RESULT = new CopyFromParam(stage, exprList, whereExpr);
:}
;

stage_name ::=
AT ident:stage
{:
RESULT = stage;
:}
| AT BITNOT
{:
RESULT = "~";
:}
;

stage_and_pattern ::=
stage_name:name
{:
RESULT = new StageAndPattern(name, null);
:}
| stage_name:name LPAREN STRING_LITERAL:pattern RPAREN
{:
RESULT = new StageAndPattern(name, pattern);
:}
;

copy_select_expr_list ::=
{:
RESULT = null;
:}
| STAR {:
RESULT = null;
:}
| expr_list:exprList {:
RESULT = exprList;
:};

data_desc_list ::=
data_desc:desc
{:
Expand Down Expand Up @@ -2921,6 +2993,14 @@ grant_stmt ::=
{:
RESULT = new GrantStmt(roles, userId);
:}
| KW_GRANT privilege_list:privs KW_ON KW_STAGE resource_pattern:resourcePattern KW_TO user_identity:userId
{:
RESULT = new GrantStmt(userId, null, resourcePattern, privs, ResourceTypeEnum.STAGE);
:}
| KW_GRANT privilege_list:privs KW_ON KW_STAGE resource_pattern:resourcePattern KW_TO KW_ROLE STRING_LITERAL:role
{:
RESULT = new GrantStmt(null, role, resourcePattern, privs, ResourceTypeEnum.STAGE);
:}
;

tbl_pattern ::=
Expand Down Expand Up @@ -3009,6 +3089,14 @@ revoke_stmt ::=
{:
RESULT = new RevokeStmt(roles, userId);
:}
| KW_REVOKE privilege_list:privs KW_ON KW_STAGE resource_pattern:resourcePattern KW_FROM user_identity:userId
{:
RESULT = new RevokeStmt(userId, null, resourcePattern, privs, ResourceTypeEnum.STAGE);
:}
| KW_REVOKE privilege_list:privs KW_ON KW_STAGE resource_pattern:resourcePattern KW_FROM KW_ROLE STRING_LITERAL:role
{:
RESULT = new RevokeStmt(null, role, resourcePattern, privs, ResourceTypeEnum.STAGE);
:}
;

// Drop statement
Expand Down Expand Up @@ -3116,6 +3204,10 @@ drop_stmt ::=
{:
RESULT = new DropAnalyzeJobStmt(job_id);
:}
| KW_DROP KW_STAGE opt_if_exists:ifExists ident:stageName
{:
RESULT = new DropStageStmt(ifExists, stageName);
:}
;

// Recover statement
Expand Down Expand Up @@ -3957,6 +4049,11 @@ show_stmt ::=
{:
RESULT = new ShowPolicyStmt(PolicyTypeEnum.STORAGE, null, null);
:}
/* show stage */
| KW_SHOW KW_STAGES
{:
RESULT = new ShowStageStmt();
:}
;

show_param ::=
Expand Down Expand Up @@ -4468,6 +4565,11 @@ show_param ::=
{:
RESULT = new DiagnoseTabletStmt(tabletId);
:}
/* Show copy statement */
| KW_COPY opt_db:db opt_where order_by_clause:orderByClause limit_clause:limitClause
{:
RESULT = new ShowCopyStmt(db, parser.where, orderByClause, limitClause);
:}
;

opt_tmp ::=
Expand Down Expand Up @@ -4590,6 +4692,14 @@ opt_wild_where ::=
:}
;

opt_where ::=
/* empty */
| KW_WHERE expr:where
{:
parser.where = where;
:}
;

opt_id ::=
/* empty */
{:
Expand Down Expand Up @@ -8194,6 +8304,8 @@ keyword ::=
{: RESULT = id; :}
| KW_BELONG:id
{: RESULT = id; :}
| KW_STAGE:id
{: RESULT = id; :}
;

// Identifier that contain keyword
Expand Down
Loading
Loading