From 3effe9e2d36187533300785d6279579027334aec Mon Sep 17 00:00:00 2001 From: mechatroner Date: Sat, 12 Jun 2021 21:16:57 -0400 Subject: [PATCH] merge from RBQL --- rbql-js/cli_rbql.js | 25 ++-- rbql-js/rbql.js | 336 +++++++++++++++++++++++++++++++++++++++----- rbql-js/rbql_csv.js | 129 ++++++++++++----- rbql/_version.py | 2 +- rbql/csv_utils.py | 16 +-- rbql/rbql_csv.py | 166 ++++++++++++---------- rbql/rbql_engine.py | 290 ++++++++++++++++++++++++++++++++------ rbql/rbql_main.py | 325 ++++++++++++++++++++++++++++++++++-------- 8 files changed, 1027 insertions(+), 262 deletions(-) diff --git a/rbql-js/cli_rbql.js b/rbql-js/cli_rbql.js index db25c12..3a7a4c2 100755 --- a/rbql-js/cli_rbql.js +++ b/rbql-js/cli_rbql.js @@ -158,7 +158,7 @@ async function autodetect_delim_policy(table_path) { } -function print_colorized(records, delim, show_column_names, skip_header) { +function print_colorized(records, delim, show_column_names, with_headers) { let reset_color_code = '\x1b[0m'; let color_codes = ['\x1b[0m', '\x1b[31m', '\x1b[32m', '\x1b[33m', '\x1b[34m', '\x1b[35m', '\x1b[36m', '\x1b[31;1m', '\x1b[32;1m', '\x1b[33;1m']; for (let r = 0; r < records.length; r++) { @@ -166,7 +166,7 @@ function print_colorized(records, delim, show_column_names, skip_header) { for (let c = 0; c < records[r].length; c++) { let color_code = color_codes[c % color_codes.length]; let field = records[r][c]; - let colored_field = (!show_column_names || (skip_header && r == 0)) ? color_code + field : `${color_code}a${c + 1}:${field}`; + let colored_field = (!show_column_names || (with_headers && r == 0)) ? color_code + field : `${color_code}a${c + 1}:${field}`; out_fields.push(colored_field); } let out_line = out_fields.join(delim) + reset_color_code; @@ -208,7 +208,8 @@ async function run_with_js(args) { var input_path = get_default(args, 'input', null); var output_path = get_default(args, 'output', null); var csv_encoding = args['encoding']; - var skip_header = args['skip-header']; + var with_headers = args['with-headers']; + var comment_prefix = args['comment-prefix']; var output_delim = get_default(args, 'out-delim', null); var output_policy = get_default(args, 'out-policy', null); let init_source_file = get_default(args, 'init-source-file', null); @@ -222,7 +223,14 @@ async function run_with_js(args) { user_init_code = rbql_csv.read_user_init_code(init_source_file); try { let warnings = []; - await rbql_csv.query_csv(query, input_path, delim, policy, output_path, output_delim, output_policy, csv_encoding, warnings, skip_header, user_init_code, {'bulk_read': true}); + // Do not use bulk_read mode here because: + // * Bulk read can't handle large file since node unable to read the whole file into a string, see https://github.com/mechatroner/rainbow_csv/issues/19 + // * In case of stdin read we would have to use the util.TextDecoder anyway + // * binary/latin-1 do not require the decoder anyway + // * This is CLI so no way we are in the Electron environment which can't use the TextDecoder + // * Streaming mode works a little faster (since we don't need to do the manual validation) + // TODO check if the current node installation doesn't have ICU enabled (which is typicaly provided by Node.js by default, see https://nodejs.org/api/intl.html) and report a user-friendly error with an option to use latin-1 encoding or switch the interpreter + await rbql_csv.query_csv(query, input_path, delim, policy, output_path, output_delim, output_policy, csv_encoding, warnings, with_headers, comment_prefix, user_init_code/*, {'bulk_read': true}*/); await handle_query_success(warnings, output_path, csv_encoding, output_delim, output_policy); return true; } catch (e) { @@ -242,11 +250,11 @@ function get_default_output_path(input_path, delim) { } -async function show_preview(input_path, encoding, delim, policy, skip_header) { +async function show_preview(input_path, encoding, delim, policy, with_headers) { let [records, warnings] = await sample_records(input_path, encoding, delim, policy); console.log('Input table preview:'); console.log('===================================='); - print_colorized(records, delim, true, skip_header); + print_colorized(records, delim, true, with_headers); console.log('====================================\n'); for (let warning of warnings) { show_warning(warning); @@ -272,7 +280,7 @@ async function run_interactive_loop(args) { if (!delim) throw new GenericError('Unable to autodetect table delimiter. Provide column separator explicitly with "--delim" option'); } - await show_preview(input_path, args['encoding'], delim, policy, args['skip-header']); + await show_preview(input_path, args['encoding'], delim, policy, args['with-headers']); args.delim = delim; args.policy = policy; if (!args.output) { @@ -357,7 +365,8 @@ function main() { '--output': {'help': 'Write output table to FILE instead of stdout', 'metavar': 'FILE'}, '--delim': {'help': 'Delimiter character or multicharacter string, e.g. "," or "###". Can be autodetected in interactive mode', 'metavar': 'DELIM'}, '--policy': {'help': 'Split policy, see the explanation below. Supported values: "simple", "quoted", "quoted_rfc", "whitespace", "monocolumn". Can be autodetected in interactive mode', 'metavar': 'POLICY'}, - '--skip-header': {'boolean': true, 'help': 'Skip header line in input and join tables. Roughly equivalent of ... WHERE NR > 1 ... in your Query'}, + '--with-headers': {'boolean': true, 'help': 'Indicates that input (and join) table has header'}, + '--comment-prefix': {'help': 'Ignore lines in input and join tables that start with the comment PREFIX, e.g. "#" or ">>"', 'metavar': 'PREFIX'}, '--encoding': {'default': 'utf-8', 'help': 'Manually set csv encoding', 'metavar': 'ENCODING'}, '--out-format': {'default': 'input', 'help': 'Output format. Supported values: ' + out_format_names.map(v => `"${v}"`).join(', '), 'metavar': 'FORMAT'}, '--out-delim': {'help': 'Output delim. Use with "out-policy". Overrides out-format', 'metavar': 'DELIM'}, diff --git a/rbql-js/rbql.js b/rbql-js/rbql.js index 16ab5a3..8ab6e63 100755 --- a/rbql-js/rbql.js +++ b/rbql-js/rbql.js @@ -70,7 +70,113 @@ var query_context = null; // Needs to be global for MIN(), MAX(), etc functions const wrong_aggregation_usage_error = 'Usage of RBQL aggregation functions inside JavaScript expressions is not allowed, see the docs'; -const RBQL_VERSION = '0.17.0'; +const RBQL_VERSION = '0.20.0'; + + +function check_if_brackets_match(opening_bracket, closing_bracket) { + return (opening_bracket == '[' && closing_bracket == ']') || (opening_bracket == '(' && closing_bracket == ')') || (opening_bracket == '{' && closing_bracket == '}'); +} + + +function parse_root_bracket_level_text_spans(select_expression) { + let text_spans = []; // parts of text separated by commas at the root parenthesis level + let last_pos = 0; + let bracket_stack = []; + for (let i = 0; i < select_expression.length; i++) { + let cur_char = select_expression[i]; + if (cur_char == ',' && bracket_stack.length == 0) { + text_spans.push(select_expression.substring(last_pos, i)); + last_pos = i + 1; + } else if (['[', '{', '('].indexOf(cur_char) != -1) { + bracket_stack.push(cur_char); + } else if ([']', '}', ')'].indexOf(cur_char) != -1) { + if (bracket_stack.length && check_if_brackets_match(bracket_stack[bracket_stack.length - 1], cur_char)) { + bracket_stack.pop(); + } else { + throw new RbqlParsingError(`Unable to parse column headers in SELECT expression: No matching opening bracket for closing "${cur_char}"`); + } + } + } + if (bracket_stack.length) { + throw new RbqlParsingError(`Unable to parse column headers in SELECT expression: No matching closing bracket for opening "${bracket_stack[0]}"`); + } + text_spans.push(select_expression.substring(last_pos, select_expression.length)); + text_spans = text_spans.map(span => span.trim()); + return text_spans; +} + + +function unquote_string(quoted_str) { + // It's possible to use eval here to unqoute the quoted_column_name, but it would be a little barbaric, let's do it manually instead + if (!quoted_str || quoted_str.length < 2) + return null; + if (quoted_str[0] == "'" && quoted_str[quoted_str.length - 1] == "'") { + return quoted_str.substring(1, quoted_str.length - 1).replace(/\\'/g, "'").replace(/\\\\/g, "\\"); + } else if (quoted_str[0] == '"' && quoted_str[quoted_str.length - 1] == '"') { + return quoted_str.substring(1, quoted_str.length - 1).replace(/\\"/g, '"').replace(/\\\\/g, "\\"); + } else { + return null; + } +} + + +function column_info_from_text_span(text_span, string_literals) { + // This function is a rough equivalent of "column_info_from_node()" function in python version of RBQL + text_span = text_span.trim(); + let rbql_star_marker = '__RBQL_INTERNAL_STAR'; + let simple_var_match = /^[_a-zA-Z][_a-zA-Z0-9]*$/.exec(text_span); + let attribute_match = /^([ab])\.([_a-zA-Z][_a-zA-Z0-9]*)$/.exec(text_span); + let subscript_int_match = /^([ab])\[([0-9]+)\]$/.exec(text_span); + let subscript_str_match = /^([ab])\[___RBQL_STRING_LITERAL([0-9]+)___\]$/.exec(text_span); + if (simple_var_match !== null) { + if (text_span == rbql_star_marker) + return {table_name: null, column_index: null, column_name: null, is_star: true}; + if (text_span.startsWith('___RBQL_STRING_LITERAL')) + return null; + let match = /^([ab])([0-9]+)$/.exec(text_span); + if (match !== null) { + return {table_name: match[1], column_index: parseInt(match[2]) - 1, column_name: null, is_star: false}; + } + // Some examples for this branch: NR, NF + return {table_name: null, column_index: null, column_name: text_span, is_star: false}; + } else if (attribute_match !== null) { + let table_name = attribute_match[1]; + let column_name = attribute_match[2]; + if (column_name == rbql_star_marker) { + return {table_name: table_name, column_index: null, column_name: null, is_star: true}; + } + return {table_name: null, column_index: null, column_name: column_name, is_star: false}; + } else if (subscript_int_match != null) { + let table_name = subscript_int_match[1]; + let column_index = parseInt(subscript_int_match[2]) - 1; + return {table_name: table_name, column_index: column_index, column_name: null, is_star: false}; + } else if (subscript_str_match != null) { + let table_name = subscript_str_match[1]; + let replaced_string_literal_id = subscript_str_match[2]; + if (replaced_string_literal_id < string_literals.length) { + let quoted_column_name = string_literals[replaced_string_literal_id]; + let unquoted_column_name = unquote_string(quoted_column_name); + if (unquoted_column_name) { + return {table_name: null, column_index: null, column_name: unquoted_column_name, is_star: false}; + } + } + } + return null; +} + + +function adhoc_parse_select_expression_to_column_infos(select_expression, string_literals) { + // It is acceptable for the algorithm to provide null column name when it could be theorethically possible to deduce the name. + // I.e. this algorithm guarantees precision but doesn't guarantee completeness in all theorethically possible queries. + // Although the algorithm should be complete in all practical scenarios, i.e. it should be hard to come up with the query that doesn't produce complete set of column names. + // The null column name just means that the output column will be named as col{i}, so the failure to detect the proper column name can be tolerated. + // Specifically this function guarantees the following: + // 1. The number of column_infos is correct and will match the number of fields in each record in the output - otherwise the exception should be thrown + // 2. If column_info at pos j is not null, it is guaranteed to correctly represent that column name in the output + let text_spans = parse_root_bracket_level_text_spans(select_expression); + let column_infos = text_spans.map(ts => column_info_from_text_span(ts, string_literals)); + return column_infos; +} function stable_compare(a, b) { @@ -860,13 +966,22 @@ async function compile_and_run(query_context) { if (e instanceof SyntaxError) { // SyntaxError's from eval() function do not contain detailed explanation of what has caused the syntax error, so to guess what was wrong we can only use the original query // v8 issue to fix eval: https://bugs.chromium.org/p/v8/issues/detail?id=2589 - if (query_context.query_text.toLowerCase().indexOf(' having ') != -1) + let lower_case_query = query_context.query_text.toLowerCase(); + if (lower_case_query.indexOf(' having ') != -1) throw new SyntaxError(e.message + "\nRBQL doesn't support \"HAVING\" keyword"); - if (query_context.query_text.toLowerCase().indexOf(' like ') != -1) + if (lower_case_query.indexOf(' like ') != -1) throw new SyntaxError(e.message + "\nRBQL doesn't support \"LIKE\" operator, use like() function instead e.g. ... WHERE like(a1, 'foo%bar') ... "); // UT JSON - if (query_context.query_text.toLowerCase().indexOf(' from ') != -1) + if (lower_case_query.indexOf(' from ') != -1) throw new SyntaxError(e.message + "\nRBQL doesn't use \"FROM\" keyword, e.g. you can query 'SELECT *' without FROM"); // UT JSON + if (e && e.message && String(e.message).toLowerCase().indexOf('unexpected identifier') != -1) { + if (lower_case_query.indexOf(' and ') != -1) + throw new SyntaxError(e.message + "\nDid you use 'and' keyword in your query?\nJavaScript backend doesn't support 'and' keyword, use '&&' operator instead!"); + if (lower_case_query.indexOf(' or ') != -1) + throw new SyntaxError(e.message + "\nDid you use 'or' keyword in your query?\nJavaScript backend doesn't support 'or' keyword, use '||' operator instead!"); + } } + if (e && e.message && e.message.indexOf('Received an instance of RBQLAggregationToken') != -1) + throw new RbqlParsingError(wrong_aggregation_usage_error); throw e; } } @@ -884,6 +999,7 @@ const ORDER_BY = 'ORDER BY'; const WHERE = 'WHERE'; const LIMIT = 'LIMIT'; const EXCEPT = 'EXCEPT'; +const WITH = 'WITH'; function get_ambiguous_error_msg(variable_name) { @@ -916,7 +1032,7 @@ function strip_comments(cline) { function combine_string_literals(backend_expression, string_literals) { for (var i = 0; i < string_literals.length; i++) { - backend_expression = replace_all(backend_expression, `###RBQL_STRING_LITERAL${i}###`, string_literals[i]); + backend_expression = replace_all(backend_expression, `___RBQL_STRING_LITERAL${i}___`, string_literals[i]); } return backend_expression; } @@ -1163,6 +1279,24 @@ function replace_star_vars(rbql_expression) { } +function replace_star_vars_for_header_parsing(rbql_expression) { + let star_rgx = /(?:(?<=^)|(?<=,)) *(\*|a\.\*|b\.\*) *(?=$|,)/g; + let matches = get_all_matches(star_rgx, rbql_expression); + let last_pos = 0; + let result = ''; + for (let match of matches) { + let star_expression = match[1]; + let replacement_expression = {'*': '__RBQL_INTERNAL_STAR', 'a.*': 'a.__RBQL_INTERNAL_STAR', 'b.*': 'b.__RBQL_INTERNAL_STAR'}[star_expression]; + if (last_pos < match.index) + result += rbql_expression.substring(last_pos, match.index); + result += replacement_expression; + last_pos = match.index + match[0].length; + } + result += rbql_expression.substring(last_pos); + return result; +} + + function translate_update_expression(update_expression, input_variables_map, string_literals, indent) { let first_assignment = str_strip(update_expression.split('=')[0]); let first_assignment_error = `Unable to parse "UPDATE" expression: the expression must start with assignment, but "${first_assignment}" does not look like an assignable field name`; @@ -1194,12 +1328,12 @@ function translate_update_expression(update_expression, input_variables_map, str function translate_select_expression(select_expression) { - var translated = replace_star_count(select_expression); - translated = replace_star_vars(translated); - translated = str_strip(translated); + let expression_without_stars = replace_star_count(select_expression); + let translated = str_strip(replace_star_vars(expression_without_stars)); + let translated_for_header = str_strip(replace_star_vars_for_header_parsing(expression_without_stars)); if (!translated.length) throw new RbqlParsingError('"SELECT" expression is empty'); - return `[].concat([${translated}])`; + return [`[].concat([${translated}])`, translated_for_header]; } @@ -1216,7 +1350,7 @@ function separate_string_literals(rbql_expression) { string_literals.push(string_literal); var start_index = match_obj.index; format_parts.push(rbql_expression.substring(idx_before, start_index)); - format_parts.push(`###RBQL_STRING_LITERAL${literal_id}###`); + format_parts.push(`___RBQL_STRING_LITERAL${literal_id}___`); idx_before = rgx.lastIndex; } format_parts.push(rbql_expression.substring(idx_before)); @@ -1260,8 +1394,13 @@ function locate_statements(rbql_expression) { function separate_actions(rbql_expression) { rbql_expression = str_strip(rbql_expression); - var ordered_statements = locate_statements(rbql_expression); var result = {}; + let with_match = /^(.*) *[Ww][Ii][Tt][Hh] *\(([a-z]{4,20})\) *$/.exec(rbql_expression); + if (with_match !== null) { + rbql_expression = with_match[1]; + result[WITH] = with_match[2]; + } + var ordered_statements = locate_statements(rbql_expression); for (var i = 0; i < ordered_statements.length; i++) { var statement_start = ordered_statements[i][0]; var span_start = ordered_statements[i][1]; @@ -1296,7 +1435,7 @@ function separate_actions(rbql_expression) { if (statement == SELECT) { if (statement_start != 0) throw new RbqlParsingError('SELECT keyword must be at the beginning of the query'); - var match = /^ *TOP *([0-9]+) /i.exec(span); + let match = /^ *TOP *([0-9]+) /i.exec(span); if (match !== null) { statement_params['top'] = parseInt(match[1]); span = span.substr(match.index + match[0].length); @@ -1338,7 +1477,7 @@ function find_top(rb_actions) { } -function translate_except_expression(except_expression, input_variables_map, string_literals) { +function translate_except_expression(except_expression, input_variables_map, string_literals, input_header) { let skip_vars = except_expression.split(','); skip_vars = skip_vars.map(str_strip); let skip_indices = []; @@ -1349,8 +1488,9 @@ function translate_except_expression(except_expression, input_variables_map, str skip_indices.push(input_variables_map[var_name].index); } skip_indices = skip_indices.sort((a, b) => a - b); + let output_header = input_header === null ? null : select_except(input_header, skip_indices); let indices_str = skip_indices.join(','); - return `select_except(record_a, [${indices_str}])`; + return [output_header, `select_except(record_a, [${indices_str}])`]; } @@ -1430,6 +1570,44 @@ function remove_redundant_table_name(query_text) { } +function select_output_header(input_header, join_header, query_column_infos) { + if (input_header === null && join_header === null) + return null; + if (input_header === null) + input_header = []; + if (join_header === null) + join_header = []; + let output_header = []; + for (let qci of query_column_infos) { + // TODO refactor this and python version: extract this code into a function instead to always return something + if (qci === null) { + output_header.push('col' + (output_header.length + 1)); + } else if (qci.is_star) { + if (qci.table_name === null) { + output_header = output_header.concat(input_header).concat(join_header); + } else if (qci.table_name === 'a') { + output_header = output_header.concat(input_header); + } else if (qci.table_name === 'b') { + output_header = output_header.concat(join_header); + } + } else if (qci.column_name !== null) { + output_header.push(qci.column_name); + } else if (qci.column_index !== null) { + if (qci.table_name == 'a' && qci.column_index < input_header.length) { + output_header.push(input_header[qci.column_index]); + } else if (qci.table_name == 'b' && qci.column_index < join_header.length) { + output_header.push(join_header[qci.column_index]); + } else { + output_header.push('col' + (output_header.length + 1)); + } + } else { // Should never happen + output_header.push('col' + (output_header.length + 1)); + } + } + return output_header; +} + + function make_inconsistent_num_fields_warning(table_name, inconsistent_records_info) { let keys = Object.keys(inconsistent_records_info); let entries = []; @@ -1448,8 +1626,66 @@ function make_inconsistent_num_fields_warning(table_name, inconsistent_records_i } -class TableIterator { +class RBQLInputIterator { + constructor(){} + stop() { + throw new Error("Unable to call the interface method"); + } + async get_variables_map(query_text) { + throw new Error("Unable to call the interface method"); + } + async get_record() { + throw new Error("Unable to call the interface method"); + } + handle_query_modifier() { + return; // Reimplement if you need to handle a boolean query modifier that can be used like this: `SELECT * WITH (modifiername)` + } + get_warnings() { + return []; // Reimplement if your class can produce warnings + } + async get_header() { + return null; // Reimplement if your class can provide input header + } +} + + +class RBQLOutputWriter { + constructor(){} + + write(fields) { + throw new Error("Unable to call the interface method"); + } + + async finish() { + // Reimplement if your class needs to do something on finish e.g. cleanup + }; + + get_warnings() { + return []; // Reimplement if your class can produce warnings + }; + + set_header() { + return; // Reimplement if your class can handle output headers in a meaningful way + } +} + + +class RBQLTableRegistry { + constructor(){} + + get_iterator_by_table_id(table_id) { + throw new Error("Unable to call the interface method"); + } + + get_warnings() { + return []; // Reimplement if your class can produce warnings + }; +} + + +class TableIterator extends RBQLInputIterator { constructor(table, column_names=null, normalize_column_names=true, variable_prefix='a') { + super(); this.table = table; this.column_names = column_names; this.normalize_column_names = normalize_column_names; @@ -1501,12 +1737,18 @@ class TableIterator { return [make_inconsistent_num_fields_warning('input', this.fields_info)]; return []; }; + + async get_header() { + return this.column_names; + } } -class TableWriter { +class TableWriter extends RBQLOutputWriter { constructor(external_table) { + super(); this.table = external_table; + this.header = null; } write(fields) { @@ -1514,16 +1756,15 @@ class TableWriter { return true; }; - get_warnings() { - return []; - }; - - async finish() {}; + set_header(header) { + this.header = header; + } } -class SingleTableRegistry { +class SingleTableRegistry extends RBQLTableRegistry { constructor(table, column_names=null, normalize_column_names=true, table_id='b') { + super(); this.table = table; this.table_id = table_id; this.column_names = column_names; @@ -1545,6 +1786,9 @@ async function shallow_parse_input_query(query_text, input_iterator, join_tables var input_variables_map = await input_iterator.get_variables_map(query_text); var rb_actions = separate_actions(format_expression); + if (rb_actions.hasOwnProperty(WITH)) { + input_iterator.handle_query_modifier(rb_actions[WITH]); + } if (rb_actions.hasOwnProperty(ORDER_BY) && rb_actions.hasOwnProperty(UPDATE)) throw new RbqlParsingError('"ORDER BY" is not allowed in "UPDATE" queries'); @@ -1556,6 +1800,7 @@ async function shallow_parse_input_query(query_text, input_iterator, join_tables } let join_variables_map = null; + let join_header = null; if (rb_actions.hasOwnProperty(JOIN)) { var [rhs_table_id, variable_pairs] = parse_join_expression(rb_actions[JOIN]['text']); if (join_tables_registry === null) @@ -1563,7 +1808,11 @@ async function shallow_parse_input_query(query_text, input_iterator, join_tables let join_record_iterator = join_tables_registry.get_iterator_by_table_id(rhs_table_id); if (!join_record_iterator) throw new RbqlParsingError(`Unable to find join table: "${rhs_table_id}"`); + if (rb_actions.hasOwnProperty(WITH)) { + join_record_iterator.handle_query_modifier(rb_actions[WITH]); + } join_variables_map = await join_record_iterator.get_variables_map(query_text); + join_header = await join_record_iterator.get_header(); let [lhs_variables, rhs_indices] = resolve_join_variables(input_variables_map, join_variables_map, variable_pairs, string_literals); let sql_join_type = {'JOIN': InnerJoiner, 'INNER JOIN': InnerJoiner, 'LEFT JOIN': LeftJoiner, 'LEFT OUTER JOIN': LeftJoiner, 'STRICT LEFT JOIN': StrictLeftJoiner}[rb_actions[JOIN]['join_subtype']]; query_context.lhs_join_var_expression = lhs_variables.length == 1 ? lhs_variables[0] : 'JSON.stringify([' + lhs_variables.join(',') + '])'; @@ -1576,31 +1825,38 @@ async function shallow_parse_input_query(query_text, input_iterator, join_tables if (rb_actions.hasOwnProperty(WHERE)) { var where_expression = rb_actions[WHERE]['text']; - if (/[^!=]=[^=]/.exec(where_expression)) + if (/[^> 1: - raise RbqlIOHandlingError('Unable to use "Monocolumn" output format: some records have more than one field') + raise rbql_engine.RbqlIOHandlingError('Unable to use "Monocolumn" output format: some records have more than one field') def normalize_fields(self, fields): for i in polymorphic_xrange(len(fields)): - if fields[i] is None: + if PY3 and isinstance(fields[i], str): + continue + elif not PY3 and isinstance(fields[i], basestring): + continue + elif fields[i] is None: fields[i] = '' self.none_in_output = True elif isinstance(fields[i], list): self.normalize_fields(fields[i]) fields[i] = self.sub_array_delim.join(fields[i]) else: - fields[i] = polymorphic_str(fields[i]) + fields[i] = str(fields[i]) def _write_all(self, table): @@ -340,8 +333,8 @@ def get_warnings(self): return result -class CSVRecordIterator: - def __init__(self, stream, encoding, delim, policy, skip_headers=False, table_name='input', variable_prefix='a', chunk_size=1024, line_mode=False): +class CSVRecordIterator(rbql_engine.RBQLInputIterator): + def __init__(self, stream, encoding, delim, policy, has_header=False, comment_prefix=None, table_name='input', variable_prefix='a', chunk_size=1024, line_mode=False): assert encoding in ['utf-8', 'latin-1', None] self.encoding = encoding self.stream = encode_input_stream(stream, encoding) @@ -349,33 +342,49 @@ def __init__(self, stream, encoding, delim, policy, skip_headers=False, table_na self.policy = policy self.table_name = table_name self.variable_prefix = variable_prefix + self.comment_prefix = comment_prefix if (comment_prefix is not None and len(comment_prefix)) else None self.buffer = '' self.detected_line_separator = '\n' self.exhausted = False - self.NR = 0 + self.NR = 0 # Record number + self.NL = 0 # Line number (NL != NR when the CSV file has comments or multiline fields) self.chunk_size = chunk_size self.fields_info = dict() self.utf8_bom_removed = False - self.first_defective_line = None # TODO use line # instead of record # when "\n" in fields parsing is implemented + self.first_defective_line = None self.polymorphic_get_row = self.get_row_rfc if policy == 'quoted_rfc' else self.get_row_simple + self.has_header = has_header + self.first_record_should_be_emitted = False if not line_mode: - self.header_record = None - self.header_record_emitted = skip_headers - self.header_record = self.get_record() + self.first_record = None + self.first_record = self.get_record() + self.first_record_should_be_emitted = not has_header + def handle_query_modifier(self, modifier): + # For `... WITH (header) ...` syntax + if modifier in ['header', 'headers']: + self.has_header = True + self.first_record_should_be_emitted = False + if modifier in ['noheader', 'noheaders']: + self.has_header = False + self.first_record_should_be_emitted = True + + def get_variables_map(self, query_text): variable_map = dict() rbql_engine.parse_basic_variables(query_text, self.variable_prefix, variable_map) rbql_engine.parse_array_variables(query_text, self.variable_prefix, variable_map) - if self.header_record is not None: - rbql_engine.parse_attribute_variables(query_text, self.variable_prefix, self.header_record, 'CSV header line', variable_map) - rbql_engine.parse_dictionary_variables(query_text, self.variable_prefix, self.header_record, variable_map) + if self.first_record is not None: + rbql_engine.parse_attribute_variables(query_text, self.variable_prefix, self.first_record, 'CSV header line', variable_map) + rbql_engine.parse_dictionary_variables(query_text, self.variable_prefix, self.first_record, variable_map) return variable_map + def get_header(self): + return self.first_record if self.has_header else None def _get_row_from_buffer(self): str_before, separator, str_after = csv_utils.extract_line_from_data(self.buffer) @@ -409,27 +418,33 @@ def _read_until_found(self): def get_row_simple(self): try: - row = self._get_row_from_buffer() - if row is not None: - return row - self._read_until_found() row = self._get_row_from_buffer() if row is None: - assert self.exhausted - if self.buffer: - tmp = self.buffer + self._read_until_found() + row = self._get_row_from_buffer() + if row is None: + assert self.exhausted + if not len(self.buffer): + return None + row = self.buffer self.buffer = '' - return tmp - return None + self.NL += 1 + if self.NL == 1: + clean_line = remove_utf8_bom(row, self.encoding) + if clean_line != row: + row = clean_line + self.utf8_bom_removed = True return row except UnicodeDecodeError: - raise RbqlIOHandlingError('Unable to decode input table as UTF-8. Use binary (latin-1) encoding instead') + raise rbql_engine.RbqlIOHandlingError('Unable to decode input table as UTF-8. Use binary (latin-1) encoding instead') def get_row_rfc(self): first_row = self.get_row_simple() if first_row is None: return None + if self.comment_prefix is not None and first_row.startswith(self.comment_prefix): + return first_row if first_row.count('"') % 2 == 0: return first_row rows_buffer = [first_row] @@ -443,25 +458,22 @@ def get_row_rfc(self): def get_record(self): - if not self.header_record_emitted and self.header_record is not None: - self.header_record_emitted = True - return self.header_record - line = self.polymorphic_get_row() - if line is None: - return None - if self.NR == 0: - clean_line = remove_utf8_bom(line, self.encoding) - if clean_line != line: - line = clean_line - self.utf8_bom_removed = True + if self.first_record_should_be_emitted: + self.first_record_should_be_emitted = False + return self.first_record + while True: + line = self.polymorphic_get_row() + if line is None: + return None + if self.comment_prefix is None or not line.startswith(self.comment_prefix): + break self.NR += 1 record, warning = csv_utils.smart_split(line, self.delim, self.policy, preserve_quotes_and_whitespaces=False) if warning: if self.first_defective_line is None: - self.first_defective_line = self.NR + self.first_defective_line = self.NL if self.policy == 'quoted_rfc': - # TODO add line number when NL is supported - raise RbqlIOHandlingError('Inconsistent double quote escaping in {} table at record {}'.format(self.table_name, self.NR)) + raise rbql_engine.RbqlIOHandlingError('Inconsistent double quote escaping in {} table at record {}, line {}'.format(self.table_name, self.NR, self.NL)) num_fields = len(record) if num_fields not in self.fields_info: self.fields_info[num_fields] = self.NR @@ -501,32 +513,37 @@ def get_warnings(self): return result -class FileSystemCSVRegistry: - def __init__(self, delim, policy, encoding, skip_headers): +class FileSystemCSVRegistry(rbql_engine.RBQLTableRegistry): + def __init__(self, delim, policy, encoding, has_header, comment_prefix): self.delim = delim self.policy = policy self.encoding = encoding self.record_iterator = None self.input_stream = None - self.skip_headers = skip_headers + self.has_header = has_header + self.comment_prefix = comment_prefix self.table_path = None def get_iterator_by_table_id(self, table_id): self.table_path = find_table_path(table_id) if self.table_path is None: - raise RbqlIOHandlingError('Unable to find join table "{}"'.format(table_id)) + raise rbql_engine.RbqlIOHandlingError('Unable to find join table "{}"'.format(table_id)) self.input_stream = open(self.table_path, 'rb') - self.record_iterator = CSVRecordIterator(self.input_stream, self.encoding, self.delim, self.policy, self.skip_headers, table_name=table_id, variable_prefix='b') + self.record_iterator = CSVRecordIterator(self.input_stream, self.encoding, self.delim, self.policy, self.has_header, comment_prefix=self.comment_prefix, table_name=table_id, variable_prefix='b') return self.record_iterator - def finish(self, output_warnings): + def finish(self): if self.input_stream is not None: self.input_stream.close() - if self.skip_headers: - output_warnings.append('The first (header) record was also skipped in the JOIN file: {}'.format(os.path.basename(self.table_path))) # UT JSON CSV + + def get_warnings(self): + result = [] + if self.record_iterator is not None and self.has_header: + result.append('The first record in JOIN file {} was also treated as header (and skipped)'.format(os.path.basename(self.table_path))) # UT JSON CSV + return result -def query_csv(query_text, input_path, input_delim, input_policy, output_path, output_delim, output_policy, csv_encoding, output_warnings, skip_headers=False, user_init_code='', colorize_output=False): +def query_csv(query_text, input_path, input_delim, input_policy, output_path, output_delim, output_policy, csv_encoding, output_warnings, with_headers, comment_prefix=None, user_init_code='', colorize_output=False): output_stream, close_output_on_finish = (None, False) input_stream, close_input_on_finish = (None, False) join_tables_registry = None @@ -535,22 +552,22 @@ def query_csv(query_text, input_path, input_delim, input_policy, output_path, ou input_stream, close_input_on_finish = (sys.stdin, False) if input_path is None else (open(input_path, 'rb'), True) if input_delim == '"' and input_policy == 'quoted': - raise RbqlIOHandlingError('Double quote delimiter is incompatible with "quoted" policy') + raise rbql_engine.RbqlIOHandlingError('Double quote delimiter is incompatible with "quoted" policy') if input_delim != ' ' and input_policy == 'whitespace': - raise RbqlIOHandlingError('Only whitespace " " delim is supported with "whitespace" policy') + raise rbql_engine.RbqlIOHandlingError('Only whitespace " " delim is supported with "whitespace" policy') if not is_ascii(query_text) and csv_encoding == 'latin-1': - raise RbqlIOHandlingError('To use non-ascii characters in query enable UTF-8 encoding instead of latin-1/binary') + raise rbql_engine.RbqlIOHandlingError('To use non-ascii characters in query enable UTF-8 encoding instead of latin-1/binary') if (not is_ascii(input_delim) or not is_ascii(output_delim)) and csv_encoding == 'latin-1': - raise RbqlIOHandlingError('To use non-ascii separators enable UTF-8 encoding instead of latin-1/binary') + raise rbql_engine.RbqlIOHandlingError('To use non-ascii separators enable UTF-8 encoding instead of latin-1/binary') default_init_source_path = os.path.join(os.path.expanduser('~'), '.rbql_init_source.py') if user_init_code == '' and os.path.exists(default_init_source_path): user_init_code = read_user_init_code(default_init_source_path) - join_tables_registry = FileSystemCSVRegistry(input_delim, input_policy, csv_encoding, skip_headers) - input_iterator = CSVRecordIterator(input_stream, csv_encoding, input_delim, input_policy, skip_headers) + join_tables_registry = FileSystemCSVRegistry(input_delim, input_policy, csv_encoding, with_headers, comment_prefix) + input_iterator = CSVRecordIterator(input_stream, csv_encoding, input_delim, input_policy, with_headers, comment_prefix=comment_prefix) output_writer = CSVWriter(output_stream, close_output_on_finish, csv_encoding, output_delim, output_policy, colorize_output=colorize_output) if debug_mode: rbql_engine.set_debug_mode() @@ -561,7 +578,8 @@ def query_csv(query_text, input_path, input_delim, input_policy, output_path, ou if close_output_on_finish: output_stream.close() if join_tables_registry: - join_tables_registry.finish(output_warnings) + join_tables_registry.finish() + output_warnings += join_tables_registry.get_warnings() def set_debug_mode(): diff --git a/rbql/rbql_engine.py b/rbql/rbql_engine.py index 9e22a43..98a5a93 100755 --- a/rbql/rbql_engine.py +++ b/rbql/rbql_engine.py @@ -6,6 +6,7 @@ import re import random import time +import ast from collections import OrderedDict, defaultdict, namedtuple import datetime # For date operations inside user queries @@ -26,6 +27,8 @@ # UT JSON CSV - means json csv Unit Test exists for this case +# TODO we can do good-enough header autodetection in CSV files to show warnings when we have a high degree of confidence that the file has header but user didn't skip it and vise versa + # TODO catch exceptions in user expression to report the exact place where it occured: "SELECT" expression, "WHERE" expression, etc # TODO consider supporting explicit column names variables like "host" or "name" or "surname" - just parse all variable-looking sequences from the query and match them against available column names from the header, but skip all symbol defined in rbql_engine.py/rbql.js, user init code and python/js builtin keywords (show warning on intersection) @@ -38,14 +41,18 @@ # TODO support custom (virtual) headers for CSV version -# TODO support RBQL variable "NL" - line number. when header is skipped it would be "2" for the first record. Also it is not equal to NR for multiline records - -# TODO support option to skip comment lines (lines starting with the specified prefix) +# TODO allow to use NL in RBQL queries for CSV version # TODO add "inconsistent number of fields in output table" warning. Useful for queries like this: `*a1.split("|")` or `...a1.split("|")`, where num of fields in a1 is variable +# TODO add RBQL iterators for json lines ( https://jsonlines.org/ ) and xml-by-line files +# TODO add RBQL file-system iterator to be able to query files like fselect does + +# TODO use ast module to improve parsing of parse_attribute_variables / parse_dictionary_variables, like it was done for select parsing -# FIXME refactor this module in sync with the JS version. There wasn't any cleanup after the last redesign +# TODO support 'AS' keyword + +# FIXME consider disallowing to use values in the first row when header is not enabled (only a1, a2, ... should be allowed) and vice versa - Don't allow a1, a2 etc when header is enabled. This is to make sure that the user knows what query mode they are in. GROUP_BY = 'GROUP BY' @@ -60,6 +67,7 @@ WHERE = 'WHERE' LIMIT = 'LIMIT' EXCEPT = 'EXCEPT' +WITH = 'WITH' ambiguous_error_msg = 'Ambiguous variable name: "{}" is present both in input and in join tables' invalid_keyword_in_aggregate_query_error_msg = '"ORDER BY", "UPDATE" and "DISTINCT" keywords are not allowed in aggregate queries' @@ -132,6 +140,95 @@ def __init__(self, input_iterator, output_writer, user_init_code): PY3 = sys.version_info[0] == 3 +def is_str6(val): + return (PY3 and isinstance(val, str)) or (not PY3 and isinstance(val, basestring)) + + +QueryColumnInfo = namedtuple('QueryColumnInfo', ['table_name', 'column_index', 'column_name', 'is_star']) + + +def get_field(root, field_name): + for f in ast.iter_fields(root): + if len(f) == 2 and f[0] == field_name: + return f[1] + return None + + +def column_info_from_node(root): + rbql_star_marker = '__RBQL_INTERNAL_STAR' + if isinstance(root, ast.Name): + var_name = get_field(root, 'id') + if var_name is None: + return None + if var_name == rbql_star_marker: + return QueryColumnInfo(table_name=None, column_index=None, column_name=None, is_star=True) + good_column_name_rgx = '^([ab])([0-9][0-9]*)$' + match_obj = re.match(good_column_name_rgx, var_name) + if match_obj is not None: + table_name = match_obj.group(1) + column_index = int(match_obj.group(2)) - 1 + return QueryColumnInfo(table_name=table_name, column_index=column_index, column_name=None, is_star=False) + # Some examples for this branch: NR, NF + return QueryColumnInfo(table_name=None, column_index=None, column_name=var_name, is_star=False) + if isinstance(root, ast.Attribute): + column_name = get_field(root, 'attr') + if not column_name: + return None + if not is_str6(column_name): + return None + var_root = get_field(root, 'value') + if not isinstance(var_root, ast.Name): + return None + table_name = get_field(var_root, 'id') + if table_name is None or table_name not in ['a', 'b']: + return None + if column_name == rbql_star_marker: + return QueryColumnInfo(table_name=table_name, column_index=None, column_name=None, is_star=True) + return QueryColumnInfo(table_name=None, column_index=None, column_name=column_name, is_star=False) + if isinstance(root, ast.Subscript): + var_root = get_field(root, 'value') + if not isinstance(var_root, ast.Name): + return None + table_name = get_field(var_root, 'id') + if table_name is None or table_name not in ['a', 'b']: + return None + slice_root = get_field(root, 'slice') + if slice_root is None or not isinstance(slice_root, ast.Index): + return None + slice_val_root = get_field(slice_root, 'value') + column_index = None + column_name = None + if isinstance(slice_val_root, ast.Str): + column_name = get_field(slice_val_root, 's') + table_name = None # We don't need table name for named fields + elif isinstance(slice_val_root, ast.Num): + column_index = get_field(slice_val_root, 'n') - 1 + else: + return None + return QueryColumnInfo(table_name=table_name, column_index=column_index, column_name=column_name, is_star=False) + return None + + +def ast_parse_select_expression_to_column_infos(select_expression): + root = ast.parse(select_expression) + children = list(ast.iter_child_nodes(root)) + if 'body' not in root._fields: + raise RbqlParsingError('Unable to parse SELECT expression (error code #117)') # Should never happen + if len(children) != 1: + raise RbqlParsingError('Unable to parse SELECT expression (error code #118)') # Should never happen + root = children[0] + children = list(ast.iter_child_nodes(root)) + if len(children) != 1: + raise RbqlParsingError('Unable to parse SELECT expression (error code #119): "{}"'.format(select_expression)) # This can be triggered with `SELECT a = 100` + root = children[0] + if isinstance(root, ast.Tuple): + column_expression_trees = root.elts + column_infos = [column_info_from_node(ct) for ct in column_expression_trees] + else: + column_infos = [column_info_from_node(root)] + return column_infos + + def iteritems6(x): if PY3: return x.items() @@ -228,9 +325,7 @@ def __init__(self, start_with_int): def parse(self, val): if not self.string_detection_done: self.string_detection_done = True - if PY3 and isinstance(val, str): - self.is_str = True - if not PY3 and isinstance(val, basestring): + if is_str6(val): self.is_str = True if not self.is_str: return val @@ -406,14 +501,12 @@ def init_aggregator(generator_name, val, post_proc=None): def MIN(val): return init_aggregator(MinAggregator, val) if query_context.aggregation_stage < 2 else val -# min = MIN - see the mad max copypaste below Min = MIN def MAX(val): return init_aggregator(MaxAggregator, val) if query_context.aggregation_stage < 2 else val -# max = MAX - see the mad max copypaste below Max = MAX @@ -427,7 +520,6 @@ def COUNT(_val): def SUM(val): return init_aggregator(SumAggregator, val) if query_context.aggregation_stage < 2 else val -# sum = SUM - see the mad max copypaste below Sum = SUM @@ -459,9 +551,7 @@ def ARRAY_AGG(val, post_proc=None): array_agg = ARRAY_AGG - - -# Redefining builtin max, min and sum. See test_max_max.py unit test for explanation +# Redefining builtin max, min and sum builtin_max = max builtin_min = min builtin_sum = sum @@ -895,7 +985,7 @@ def strip_comments(cline): def combine_string_literals(backend_expression, string_literals): for i in range(len(string_literals)): - backend_expression = backend_expression.replace('###RBQL_STRING_LITERAL{}###'.format(i), string_literals[i]) + backend_expression = backend_expression.replace('___RBQL_STRING_LITERAL{}___'.format(i), string_literals[i]) return backend_expression @@ -1044,6 +1134,7 @@ def ensure_no_ambiguous_variables(query_text, input_column_names, join_column_na def generate_common_init_code(query_text, variable_prefix): assert variable_prefix in ['a', 'b'] result = list() + # TODO [PERFORMANCE] do not initialize RBQLRecord if we don't have `a.` or `a[` prefix in the query result.append('{} = RBQLRecord()'.format(variable_prefix)) base_var = 'NR' if variable_prefix == 'a' else 'bNR' attr_var = '{}.NR'.format(variable_prefix) @@ -1086,6 +1177,21 @@ def replace_star_vars(rbql_expression): return result +def replace_star_vars_for_ast(rbql_expression): + star_matches = list(re.finditer(r'(?:(?<=^)|(?<=,)) *(\*|a\.\*|b\.\*) *(?=$|,)', rbql_expression)) + last_pos = 0 + result = '' + for match in star_matches: + star_expression = match.group(1) + replacement_expression = {'*': '__RBQL_INTERNAL_STAR', 'a.*': 'a.__RBQL_INTERNAL_STAR', 'b.*': 'b.__RBQL_INTERNAL_STAR'}[star_expression] + if last_pos < match.start(): + result += rbql_expression[last_pos:match.start()] + result += replacement_expression + last_pos = match.end() + result += rbql_expression[last_pos:] + return result + + def translate_update_expression(update_expression, input_variables_map, string_literals): assignment_looking_rgx = re.compile(r'(?:^|,) *(a[.#a-zA-Z0-9\[\]_]*) *=(?=[^=])') update_expressions = [] @@ -1110,12 +1216,12 @@ def translate_update_expression(update_expression, input_variables_map, string_l def translate_select_expression(select_expression): - translated = replace_star_count(select_expression) - translated = replace_star_vars(translated) - translated = translated.strip() + expression_without_stars = replace_star_count(select_expression) + translated = replace_star_vars(expression_without_stars).strip() + translated_for_ast = replace_star_vars_for_ast(expression_without_stars).strip() if not len(translated): raise RbqlParsingError('"SELECT" expression is empty') # UT JSON - return '[{}]'.format(translated) + return ('[{}]'.format(translated), translated_for_ast) def separate_string_literals(rbql_expression): @@ -1129,7 +1235,7 @@ def separate_string_literals(rbql_expression): literal_id = len(string_literals) string_literals.append(m.group(0)) format_parts.append(rbql_expression[idx_before:m.start()]) - format_parts.append('###RBQL_STRING_LITERAL{}###'.format(literal_id)) + format_parts.append('___RBQL_STRING_LITERAL{}___'.format(literal_id)) idx_before = m.end() format_parts.append(rbql_expression[idx_before:]) format_expression = ''.join(format_parts) @@ -1168,8 +1274,13 @@ def separate_actions(rbql_expression): # TODO add more checks: # make sure all rbql_expression was separated and SELECT or UPDATE is at the beginning rbql_expression = rbql_expression.strip(' ') - ordered_statements = locate_statements(rbql_expression) result = dict() + # For now support no more than one query modifier per query + mobj = re.match('^(.*) *[Ww][Ii][Tt][Hh] *\(([a-z]{4,20})\) *$', rbql_expression) + if mobj is not None: + rbql_expression = mobj.group(1) + result[WITH] = mobj.group(2) + ordered_statements = locate_statements(rbql_expression) for i in range(len(ordered_statements)): statement_start = ordered_statements[i][0] span_start = ordered_statements[i][1] @@ -1217,7 +1328,8 @@ def separate_actions(rbql_expression): result[statement] = statement_params if SELECT not in result and UPDATE not in result: raise RbqlParsingError('Query must contain either SELECT or UPDATE statement') # UT JSON - assert (SELECT in result) != (UPDATE in result) + if SELECT in result and UPDATE in result: + raise RbqlParsingError('Query can not contain both SELECT and UPDATE statements') return result @@ -1230,7 +1342,7 @@ def find_top(rb_actions): return rb_actions[SELECT].get('top', None) -def translate_except_expression(except_expression, input_variables_map, string_literals): +def translate_except_expression(except_expression, input_variables_map, string_literals, input_header): skip_vars = except_expression.split(',') skip_vars = [v.strip() for v in skip_vars] skip_indices = list() @@ -1241,8 +1353,9 @@ def translate_except_expression(except_expression, input_variables_map, string_l raise RbqlParsingError('Unknown field in EXCEPT expression: "{}"'.format(var_name)) # UT JSON skip_indices.append(var_info.index) skip_indices = sorted(skip_indices) + output_header = None if input_header is None else select_except(input_header, skip_indices) skip_indices = [str(v) for v in skip_indices] - return 'select_except(record_a, [{}])'.format(','.join(skip_indices)) + return (output_header, 'select_except(record_a, [{}])'.format(','.join(skip_indices))) class HashJoinMap: @@ -1310,13 +1423,46 @@ def remove_redundant_input_table_name(query_text): return query_text +def select_output_header(input_header, join_header, query_column_infos): + if input_header is None and join_header is None: + return None + if input_header is None: + input_header = [] + if join_header is None: + join_header = [] + output_header = [] + for qci in query_column_infos: + if qci is None: + output_header.append('col{}'.format(len(output_header) + 1)) + elif qci.is_star: + if qci.table_name is None: + output_header += input_header + join_header + elif qci.table_name == 'a': + output_header += input_header + elif qci.table_name == 'b': + output_header += join_header + elif qci.column_name is not None: + output_header.append(qci.column_name) + elif qci.column_index is not None: + if qci.table_name == 'a' and qci.column_index < len(input_header): + output_header.append(input_header[qci.column_index]) + elif qci.table_name == 'b' and qci.column_index < len(join_header): + output_header.append(join_header[qci.column_index]) + else: + output_header.append('col{}'.format(len(output_header) + 1)) + else: # Should never happen + output_header.append('col{}'.format(len(output_header) + 1)) + return output_header + + def shallow_parse_input_query(query_text, input_iterator, join_tables_registry, query_context): query_text = cleanup_query(query_text) format_expression, string_literals = separate_string_literals(query_text) format_expression = remove_redundant_input_table_name(format_expression) input_variables_map = input_iterator.get_variables_map(query_text) - rb_actions = separate_actions(format_expression) + if WITH in rb_actions: + input_iterator.handle_query_modifier(rb_actions[WITH]) if ORDER_BY in rb_actions and UPDATE in rb_actions: raise RbqlParsingError('"ORDER BY" is not allowed in "UPDATE" queries') # UT JSON @@ -1328,6 +1474,7 @@ def shallow_parse_input_query(query_text, input_iterator, join_tables_registry, join_variables_map = None + join_header = None if JOIN in rb_actions: rhs_table_id, variable_pairs = parse_join_expression(rb_actions[JOIN]['text']) if join_tables_registry is None: @@ -1335,7 +1482,10 @@ def shallow_parse_input_query(query_text, input_iterator, join_tables_registry, join_record_iterator = join_tables_registry.get_iterator_by_table_id(rhs_table_id) if join_record_iterator is None: raise RbqlParsingError('Unable to find join table: "{}"'.format(rhs_table_id)) # UT JSON CSV + if WITH in rb_actions: + join_record_iterator.handle_query_modifier(rb_actions[WITH]) join_variables_map = join_record_iterator.get_variables_map(query_text) + join_header = join_record_iterator.get_header() lhs_variables, rhs_indices = resolve_join_variables(input_variables_map, join_variables_map, variable_pairs, string_literals) joiner_type = {JOIN: InnerJoiner, INNER_JOIN: InnerJoiner, LEFT_OUTER_JOIN: LeftJoiner, LEFT_JOIN: LeftJoiner, STRICT_LEFT_JOIN: StrictLeftJoiner}[rb_actions[JOIN]['join_subtype']] @@ -1349,7 +1499,7 @@ def shallow_parse_input_query(query_text, input_iterator, join_tables_registry, if WHERE in rb_actions: where_expression = rb_actions[WHERE]['text'] - if re.search(r'[^!=]=[^=]', where_expression) is not None: + if re.search(r'[^> ') @@ -190,20 +234,87 @@ def run_interactive_loop(args): break # Ctrl-D if not len(query): break - readline.write_history_file(history_path) + try: + readline.write_history_file(history_path) # This can fail sometimes for no valid reason + except Exception: + pass args.query = query - success = run_with_python(args, is_interactive=True) + if mode == 'csv': + success = run_with_python_csv(args, is_interactive=True) + else: + success = run_with_python_sqlite(args, is_interactive=True) if success: print('\nOutput table preview:') print('====================================') - records, _warnings = sample_records(args.output, args.output_delim, args.output_policy, args.encoding) - print_colorized(records, args.output_delim, args.encoding, show_column_names=False, skip_header=False) + records, _warnings = sample_records(args.output, args.output_delim, args.output_policy, args.encoding, comment_prefix=None) + print_colorized(records, args.output_delim, args.encoding, show_column_names=False, with_headers=False) print('====================================') print('Success! Result table was saved to: ' + args.output) break -def start_preview_mode(args): +def sample_records_sqlite(db_connection, table_name): + import sqlite3 + record_iterator = rbql_sqlite.SqliteRecordIterator(db_connection, table_name) + records = [] + records.append(record_iterator.get_column_names()) + records += record_iterator.get_all_records(num_rows=10) + return records + + +def read_table_names(db_connection): + cursor = db_connection.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") + table_names = [r[0] for r in cursor.fetchall()] + return table_names + + +def select_table_name_by_user_choice(db_connection): + table_names = read_table_names(db_connection) + max_to_show = 20 + if len(table_names) > max_to_show: + print('Database has {} tables, showing top {}:'.format(len(table_names), max_to_show)) + else: + print('Showing database tables:') + print(', '.join(table_names[:max_to_show])) + table_name = polymorphic_input('No input table was provided as a CLI argument, please type in the table name to use:\n> ') + table_name = table_name.strip() + while table_name not in table_names: + table_name = polymorphic_input('"{}" is not a valid table name. Please enter a valid table name:\n> '.format(table_name)) + table_name = table_name.strip() + return table_name + + +def start_preview_mode_sqlite(args): + import sqlite3 + db_path = args.database + db_connection = sqlite3.connect(db_path) + if not args.input: + args.input = select_table_name_by_user_choice(db_connection) + try: + records = sample_records_sqlite(db_connection, table_name=args.input) + except Exception as e: + if args.debug_mode: + raise + error_type, error_msg = rbql_engine.exception_to_error_info(e) + show_error(error_type, 'Unable to sample preview records: {}'.format(error_msg), is_interactive=True) + sys.exit(1) + db_connection.close() + + print('Input table preview:') + print('====================================') + print_colorized(records, '|', args.encoding, show_column_names=True, with_headers=False) + print('====================================\n') + if args.output is None: + args.output = get_default_output_path('rbql_sqlite_rs', args.output_delim) + show_warning('Output path was not provided. Result set will be saved as: ' + args.output, is_interactive=True) + try: + run_interactive_loop('sqlite', args) + except KeyboardInterrupt: + print() + + +def start_preview_mode_csv(args): input_path = args.input if not input_path: show_error('generic', 'Input file must be provided in interactive mode. You can use stdin input only in non-interactive mode', is_interactive=True) @@ -215,16 +326,16 @@ def start_preview_mode(args): delim = rbql_csv.normalize_delim(args.delim) policy = args.policy if args.policy is not None else get_default_policy(delim) else: - delim, policy = autodetect_delim_policy(input_path, args.encoding) + delim, policy = autodetect_delim_policy(input_path, args.encoding, args.comment_prefix) if delim is None: show_error('generic', 'Unable to autodetect table delimiter. Provide column separator explicitly with "--delim" option', is_interactive=True) return args.delim = delim args.policy = policy - records, warnings = sample_records(input_path, delim, policy, args.encoding) + records, warnings = sample_records(input_path, delim, policy, args.encoding, args.comment_prefix) print('Input table preview:') print('====================================') - print_colorized(records, delim, args.encoding, show_column_names=True, skip_header=args.skip_header) + print_colorized(records, delim, args.encoding, show_column_names=True, with_headers=args.with_headers) print('====================================\n') for warning in warnings: show_warning(warning, is_interactive=True) @@ -232,22 +343,30 @@ def start_preview_mode(args): args.output = get_default_output_path(input_path, delim) show_warning('Output path was not provided. Result set will be saved as: ' + args.output, is_interactive=True) try: - run_interactive_loop(args) + run_interactive_loop('csv', args) except KeyboardInterrupt: print() -tool_description = ''' -Run RBQL queries against CSV files and data streams +csv_tool_description = ''' +Run RBQL queries against CSV files, sqlite databases + +rbql supports two modes: non-interactive (with "--query" option) and interactive (without "--query" option) -rbql-py supports two modes: non-interactive (with "--query" option) and interactive (without "--query" option) Interactive mode shows source table preview which makes query editing much easier. Usage example: - $ rbql-py --input input.csv -Non-interactive mode supports reading input tables from stdin. Usage example: - $ rbql-py --query "select a1, a2 order by a1" --delim , < input.csv + $ rbql --input input.csv + +Non-interactive mode supports reading input tables from stdin and writing output to stdout. Usage example: + $ rbql --query "select a1, a2 order by a1" --delim , < input.csv + +By default rbql works with CSV input files. +To learn how to use rbql to query an sqlite database, run this command: + + $ rbql sqlite --help + ''' -epilog = ''' +csv_epilog = ''' Description of the available CSV split policies: * "simple" - RBQL uses simple split() function and doesn't perform special handling of double quote characters * "quoted" - Separator can be escaped inside double-quoted fields. Double quotes inside double-quoted fields must be doubled @@ -257,17 +376,18 @@ def start_preview_mode(args): ''' -def main(): - parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, description=tool_description, epilog=epilog) +def csv_main(): + parser = argparse.ArgumentParser(prog='rbql [csv]', formatter_class=argparse.RawDescriptionHelpFormatter, description=csv_tool_description, epilog=csv_epilog) parser.add_argument('--input', metavar='FILE', help='read csv table from FILE instead of stdin. Required in interactive mode') parser.add_argument('--delim', help='delimiter character or multicharacter string, e.g. "," or "###". Can be autodetected in interactive mode') parser.add_argument('--policy', help='CSV split policy, see the explanation below. Can be autodetected in interactive mode', choices=policy_names) - parser.add_argument('--skip-header', action='store_true', help='skip header line in input and join tables. Roughly equivalent of ... WHERE NR > 1 ... in your Query') + parser.add_argument('--with-headers', action='store_true', help='indicates that input (and join) table has header') + parser.add_argument('--comment-prefix', metavar='PREFIX', help='ignore lines in input and join tables that start with the comment PREFIX, e.g. "#" or ">>"') parser.add_argument('--query', help='query string in rbql. Run in interactive mode if empty') parser.add_argument('--out-format', help='output format', default='input', choices=out_format_names) parser.add_argument('--encoding', help='manually set csv encoding', default=rbql_csv.default_csv_encoding, choices=['latin-1', 'utf-8']) parser.add_argument('--output', metavar='FILE', help='write output table to FILE instead of stdout') - parser.add_argument('--color', action='store_true', help='colorize columns in output in non-interactive mode. Do NOT use if redirecting output to a file') + parser.add_argument('--color', action='store_true', help='colorize columns in output in non-interactive mode') parser.add_argument('--version', action='store_true', help='print RBQL version and exit') parser.add_argument('--init-source-file', metavar='FILE', help=argparse.SUPPRESS) # Path to init source file to use instead of ~/.rbql_init_source.py parser.add_argument('--debug-mode', action='store_true', help=argparse.SUPPRESS) # Run in debug mode @@ -277,6 +397,14 @@ def main(): print(_version.__version__) return + if args.color and os.name == 'nt': + show_error('generic', '--color option is not supported for Windows terminals', is_interactive=False) + sys.exit(1) + + if args.output is not None and args.color: + show_error('generic', '"--output" is not compatible with "--color" option', is_interactive=False) + sys.exit(1) + if args.policy == 'monocolumn': args.delim = '' @@ -284,33 +412,108 @@ def main(): show_error('generic', 'Using "--policy" without "--delim" is not allowed', is_interactive=False) sys.exit(1) - if args.output is not None and args.color: - show_error('generic', '"--output" is not compatible with "--color" option', is_interactive=False) - sys.exit(1) - if args.encoding != 'latin-1' and not PY3: if args.delim is not None: args.delim = args.delim.decode(args.encoding) if args.query is not None: args.query = args.query.decode(args.encoding) - if args.query: + is_interactive_mode = args.query is None + if is_interactive_mode: + if args.color: + show_error('generic', '"--color" option is not compatible with interactive mode. Output and Input files preview would be colorized anyway', is_interactive=False) + sys.exit(1) + start_preview_mode_csv(args) + else: if args.delim is None: show_error('generic', 'Separator must be provided with "--delim" option in non-interactive mode', is_interactive=False) sys.exit(1) - success = run_with_python(args, is_interactive=False) - if not success: + if not run_with_python_csv(args, is_interactive=False): sys.exit(1) - else: + + +sqlite_tool_description = ''' +Run RBQL queries against sqlite databases +Although sqlite database can serve as an input data source, the query engine which will be used is RBQL (not sqlite). +Result set will be written to a csv file. This is also true for UPDATE queries because in RBQL UPDATE is just a special case of SELECT. + +rbql sqlite supports two modes: non-interactive (with "--query" option) and interactive (without "--query" option) + +Interactive mode shows source table preview which makes query editing much easier. + $ rbql sqlite path/to/database.sqlite + +Non-interactive mode supports reading input tables from stdin and writing output to stdout. Usage example: + $ rbql sqlite path/to/database.sqlite --input Employee --query "select top 20 a1, random.random(), a.salary // 1000 order by a.emp_id" + +''' + + +def sqlite_main(): + parser = argparse.ArgumentParser(prog='rbql sqlite', formatter_class=argparse.RawDescriptionHelpFormatter, description=sqlite_tool_description) + parser.add_argument('database', metavar='PATH', help='PATH to sqlite db') + parser.add_argument('--input', metavar='NAME', help='NAME of the table in sqlite database') + parser.add_argument('--query', help='query string in rbql. Run in interactive mode if empty') + parser.add_argument('--out-format', help='output format', default='csv', choices=['csv', 'tsv']) + parser.add_argument('--output', metavar='FILE', help='write output table to FILE instead of stdout') + parser.add_argument('--color', action='store_true', help='colorize columns in output in non-interactive mode. Do NOT use if redirecting output to a file') + parser.add_argument('--version', action='store_true', help='print RBQL version and exit') + parser.add_argument('--init-source-file', metavar='FILE', help=argparse.SUPPRESS) # Path to init source file to use instead of ~/.rbql_init_source.py + parser.add_argument('--debug-mode', action='store_true', help=argparse.SUPPRESS) # Run in debug mode + args = parser.parse_args() + + if args.version: + print(_version.__version__) + return + + if not os.path.isfile(args.database): + show_error('generic', 'The database does not exist: {}'.format(args.database), is_interactive=False) + sys.exit(1) + + is_interactive_mode = args.query is None + + import sqlite3 + if not args.input: + db_connection = sqlite3.connect(args.database) + table_names = read_table_names(db_connection) + db_connection.close() + if len(table_names) == 1: + args.input = table_names[0] + # TODO Consider showing a warning here + elif not is_interactive_mode: + show_error('generic', 'Please provide input table name with --input parameter: source database has more than one table', is_interactive=False) + sys.exit(1) + + if args.output is not None and args.color: + show_error('generic', '"--output" is not compatible with "--color" option', is_interactive=False) + sys.exit(1) + + args.encoding = 'utf-8' + args.output_delim, args.output_policy = (',', 'quoted_rfc') if args.out_format == 'csv' else rbql_csv.interpret_named_csv_format(args.out_format) + + if is_interactive_mode: if args.color: show_error('generic', '"--color" option is not compatible with interactive mode. Output and Input files preview would be colorized anyway', is_interactive=False) sys.exit(1) - if os.name == 'nt': - show_error('generic', 'Interactive mode is not available on Windows', is_interactive=False) + start_preview_mode_sqlite(args) + else: + if not run_with_python_sqlite(args, is_interactive=False): sys.exit(1) - start_preview_mode(args) +def main(): + if len(sys.argv) > 1: + if sys.argv[1] == 'sqlite': + del sys.argv[1] + sqlite_main() + elif sys.argv[1] == 'csv': + del sys.argv[1] + csv_main() + else: + # TODO Consider showing "uknown mode" error if the first argument doesn't start with '--' + csv_main() + else: + csv_main() + if __name__ == '__main__': main()