Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read Stream Runs Only Once #95

Open
hlopetz opened this issue Sep 8, 2021 · 0 comments
Open

Read Stream Runs Only Once #95

hlopetz opened this issue Sep 8, 2021 · 0 comments

Comments

@hlopetz
Copy link

hlopetz commented Sep 8, 2021

hi there,

i was solving a problem with re-streaming and enriching data flow from one table to another. some records must be skipped, but there are still millions of them. since we have that big number of records, we must process them in chunks to let Clickhouse server finalize the transaction (flush the buffers).

so i need to run the process in iterations somehow. sadly, that does not work because i cannot make read stream get the next chunk of records. here is my test case which reproduces the issue (output + debug is below):

https://github.com/hlopetz/clickhouse/blob/restreaming-issue/test/test.js#L604

please let me know if i am doing something stupid and that must be rewritten different way. i did not find any examples, sadly.

also, i get into other weird issues. i am not sure they are related, but

  • JSON.stringify does not work inside stream.Transform. it shows DB::ParsingException: Cannot parse input: expected '\t' before: '{"str":"0","val":0}\n{"str"…. so i created tabify() function instead.
  • specified session ID leads to Invalid JSON (Unexpected "C" at position 0 in state STOP) (seems it requires JSON in that case, but see the previous bullet i have to use tabify instead of JSON.stringify)

anyway, i'd be happy to have ability to re-stream the records between tables in chunks somehow. please suggest how to do that correctly.

the output + debug:

$ node_modules/.bin/mocha --timeout 60000 --slow 5000 --f "restreaming"

QueryCursor {
  query: 'DROP DATABASE IF EXISTS test_10144',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params DROP DATABASE IF EXISTS test_10144 {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=b3b6663e-e055-469b-98ff-615f748239e8&database=default&query=DROP+DATABASE+IF+EXISTS+test_10144'
}
QueryCursor.exec: result DROP DATABASE IF EXISTS test_10144 null {
  statusCode: 200,
  body: '',
  statusMessage: 'OK',
  headers: {
    server: 'openresty/1.15.8.2',
    date: 'Tue, 07 Sep 2021 15:42:22 GMT',
    'content-type': 'text/plain; charset=UTF-8',
    'transfer-encoding': 'chunked',
    connection: 'close',
    vary: 'Accept-Encoding',
    'strict-transport-security': 'max-age=15724800; includeSubDomains',
    'x-clickhouse-server-display-name': 'clickhouse-6dc9d5fb8c-4vfkp',
    'x-clickhouse-summary': '{"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}'
  }
}
QueryCursor {
  query: 'CREATE DATABASE test_10144',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params CREATE DATABASE test_10144 {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=7d6a973c-828e-45aa-832a-1912efc68233&database=default&query=CREATE+DATABASE+test_10144'
}
QueryCursor.exec: result CREATE DATABASE test_10144 null {
  statusCode: 200,
  body: '',
  statusMessage: 'OK',
  headers: {
    server: 'openresty/1.15.8.2',
    date: 'Tue, 07 Sep 2021 15:42:22 GMT',
    'content-type': 'text/plain; charset=UTF-8',
    'transfer-encoding': 'chunked',
    connection: 'close',
    vary: 'Accept-Encoding',
    'strict-transport-security': 'max-age=15724800; includeSubDomains',
    'x-clickhouse-server-display-name': 'clickhouse-6dc9d5fb8c-4vfkp',
    'x-clickhouse-summary': '{"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}'
  }
}
  queries
QueryCursor {
  query: 'DROP TABLE IF EXISTS session_temp',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params DROP TABLE IF EXISTS session_temp {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=6a1433c1-30bf-4281-9c8d-3b277697fce2&database=test_10144&query=DROP+TABLE+IF+EXISTS+session_temp'
}
QueryCursor.exec: result DROP TABLE IF EXISTS session_temp null {
  statusCode: 200,
  body: '',
  statusMessage: 'OK',
  headers: {
    server: 'openresty/1.15.8.2',
    date: 'Tue, 07 Sep 2021 15:42:22 GMT',
    'content-type': 'text/plain; charset=UTF-8',
    'transfer-encoding': 'chunked',
    connection: 'close',
    vary: 'Accept-Encoding',
    'strict-transport-security': 'max-age=15724800; includeSubDomains',
    'x-clickhouse-server-display-name': 'clickhouse-6dc9d5fb8c-4vfkp',
    'x-clickhouse-summary': '{"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}'
  }
}
QueryCursor {
  query: 'DROP TABLE IF EXISTS session_temp2',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params DROP TABLE IF EXISTS session_temp2 {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=36996b36-b11d-4421-8e2f-2a39348c2ea9&database=test_10144&query=DROP+TABLE+IF+EXISTS+session_temp2'
}
QueryCursor.exec: result DROP TABLE IF EXISTS session_temp2 null {
  statusCode: 200,
  body: '',
  statusMessage: 'OK',
  headers: {
    server: 'openresty/1.15.8.2',
    date: 'Tue, 07 Sep 2021 15:42:22 GMT',
    'content-type': 'text/plain; charset=UTF-8',
    'transfer-encoding': 'chunked',
    connection: 'close',
    vary: 'Accept-Encoding',
    'strict-transport-security': 'max-age=15724800; includeSubDomains',
    'x-clickhouse-server-display-name': 'clickhouse-6dc9d5fb8c-4vfkp',
    'x-clickhouse-summary': '{"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}'
  }
}
QueryCursor {
  query: 'CREATE TABLE session_temp (str String) ENGINE=MergeTree PARTITION BY tuple() ORDER BY tuple()',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params CREATE TABLE session_temp (str String) ENGINE=MergeTree PARTITION BY tuple() ORDER BY tuple() {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=635fc3d5-dd02-4026-8fd0-c6e92899f2dc&database=test_10144&query=CREATE+TABLE+session_temp+%28str+String%29+ENGINE%3DMergeTree+PARTITION+BY+tuple%28%29+ORDER+BY+tuple%28%29'
}
QueryCursor.exec: result CREATE TABLE session_temp (str String) ENGINE=MergeTree PARTITION BY tuple() ORDER BY tuple() null {
  statusCode: 200,
  body: '',
  statusMessage: 'OK',
  headers: {
    server: 'openresty/1.15.8.2',
    date: 'Tue, 07 Sep 2021 15:42:22 GMT',
    'content-type': 'text/plain; charset=UTF-8',
    'transfer-encoding': 'chunked',
    connection: 'close',
    vary: 'Accept-Encoding',
    'strict-transport-security': 'max-age=15724800; includeSubDomains',
    'x-clickhouse-server-display-name': 'clickhouse-6dc9d5fb8c-4vfkp',
    'x-clickhouse-summary': '{"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}'
  }
}
QueryCursor {
  query: 'CREATE TABLE session_temp2 (val Int8, str String) ENGINE=MergeTree PARTITION BY tuple() ORDER BY tuple()',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params CREATE TABLE session_temp2 (val Int8, str String) ENGINE=MergeTree PARTITION BY tuple() ORDER BY tuple() {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=1c30f125-f332-4ff3-8584-d238045aa026&database=test_10144&query=CREATE+TABLE+session_temp2+%28val+Int8%2C+str+String%29+ENGINE%3DMergeTree+PARTITION+BY+tuple%28%29+ORDER+BY+tuple%28%29'
}
QueryCursor.exec: result CREATE TABLE session_temp2 (val Int8, str String) ENGINE=MergeTree PARTITION BY tuple() ORDER BY tuple() null {
  statusCode: 200,
  body: '',
  statusMessage: 'OK',
  headers: {
    server: 'openresty/1.15.8.2',
    date: 'Tue, 07 Sep 2021 15:42:22 GMT',
    'content-type': 'text/plain; charset=UTF-8',
    'transfer-encoding': 'chunked',
    connection: 'close',
    vary: 'Accept-Encoding',
    'strict-transport-security': 'max-age=15724800; includeSubDomains',
    'x-clickhouse-server-display-name': 'clickhouse-6dc9d5fb8c-4vfkp',
    'x-clickhouse-summary': '{"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}'
  }
}
QueryCursor {
  query: 'INSERT INTO session_temp',
  data: [
    [ 0 ],  [ 1 ],  [ 2 ],  [ 3 ],
    [ 4 ],  [ 5 ],  [ 6 ],  [ 7 ],
    [ 8 ],  [ 9 ],  [ 10 ], [ 11 ],
    [ 12 ], [ 13 ], [ 14 ], [ 15 ],
    [ 16 ], [ 17 ], [ 18 ], [ 19 ],
    [ 20 ], [ 21 ], [ 22 ], [ 23 ],
    [ 24 ], [ 25 ], [ 26 ], [ 27 ],
    [ 28 ], [ 29 ]
  ],
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params INSERT INTO session_temp {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  body: '0\n' +
    '1\n' +
    '2\n' +
    '3\n' +
    '4\n' +
    '5\n' +
    '6\n' +
    '7\n' +
    '8\n' +
    '9\n' +
    '10\n' +
    '11\n' +
    '12\n' +
    '13\n' +
    '14\n' +
    '15\n' +
    '16\n' +
    '17\n' +
    '18\n' +
    '19\n' +
    '20\n' +
    '21\n' +
    '22\n' +
    '23\n' +
    '24\n' +
    '25\n' +
    '26\n' +
    '27\n' +
    '28\n' +
    '29',
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=7cba58c6-ce62-47e7-940f-281890398fc6&database=test_10144&query=INSERT+INTO+session_temp+FORMAT+TabSeparated'
}
QueryCursor.exec: result INSERT INTO session_temp null {
  statusCode: 200,
  body: '',
  statusMessage: 'OK',
  headers: {
    server: 'openresty/1.15.8.2',
    date: 'Tue, 07 Sep 2021 15:42:22 GMT',
    'content-type': 'text/tab-separated-values; charset=UTF-8',
    'transfer-encoding': 'chunked',
    connection: 'close',
    'strict-transport-security': 'max-age=15724800; includeSubDomains',
    'x-clickhouse-server-display-name': 'clickhouse-6dc9d5fb8c-4vfkp',
    'x-clickhouse-query-id': '7cba58c6-ce62-47e7-940f-281890398fc6',
    'x-clickhouse-format': 'TabSeparated',
    'x-clickhouse-timezone': 'UTC',
    'x-clickhouse-summary': '{"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}'
  }
}
QueryCursor {
  query: 'SELECT COUNT(*) AS count FROM session_temp',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params SELECT COUNT(*) AS count FROM session_temp {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=636b8468-851f-4173-8a4e-e49170fd522a&database=test_10144&query=SELECT+COUNT%28*%29+AS+count+FROM+session_temp+FORMAT+JSON%3B'
}
QueryCursor.exec: result SELECT COUNT(*) AS count FROM session_temp null {
  statusCode: 200,
  body: '{\n' +
    '\t"meta":\n' +
    '\t[\n' +
    '\t\t{\n' +
    '\t\t\t"name": "count",\n' +
    '\t\t\t"type": "UInt64"\n' +
    '\t\t}\n' +
    '\t],\n' +
    '\n' +
    '\t"data":\n' +
    '\t[\n' +
    '\t\t{\n' +
    '\t\t\t"count": 30\n' +
    '\t\t}\n' +
    '\t],\n' +
    '\n' +
    '\t"rows": 1,\n' +
    '\n' +
    '\t"statistics":\n' +
    '\t{\n' +
    '\t\t"elapsed": 0.000368257,\n' +
    '\t\t"rows_read": 1,\n' +
    '\t\t"bytes_read": 4104\n' +
    '\t}\n' +
    '}\n',
  statusMessage: 'OK',
  headers: {
    server: 'openresty/1.15.8.2',
    date: 'Tue, 07 Sep 2021 15:42:22 GMT',
    'content-type': 'application/json; charset=UTF-8',
    'transfer-encoding': 'chunked',
    connection: 'close',
    vary: 'Accept-Encoding',
    'strict-transport-security': 'max-age=15724800; includeSubDomains',
    'x-clickhouse-server-display-name': 'clickhouse-6dc9d5fb8c-4vfkp',
    'x-clickhouse-query-id': '636b8468-851f-4173-8a4e-e49170fd522a',
    'x-clickhouse-format': 'JSON',
    'x-clickhouse-timezone': 'UTC',
    'x-clickhouse-summary': '{"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}'
  }
}
i: 0
QueryCursor {
  query: 'SELECT str FROM session_temp LIMIT 0, 10',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params SELECT str FROM session_temp LIMIT 0, 10 {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=c5b4b238-b679-4af9-b8a2-57ee82e5a1ac&database=test_10144&query=SELECT+str+FROM+session_temp+LIMIT+0%2C+10+FORMAT+JSON%3B'
}
QueryCursor {
  query: 'INSERT INTO session_temp2 (val, str)',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params INSERT INTO session_temp2 (val, str) {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=0b85aa8d-2035-4ac3-924d-0d0220db4c37&database=test_10144&query=INSERT+INTO+session_temp2+%28val%2C+str%29+FORMAT+TabSeparated'
}
cnt: 9
i: 1
QueryCursor {
  query: 'SELECT str FROM session_temp LIMIT 10, 10',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params SELECT str FROM session_temp LIMIT 10, 10 {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=6d3c0a67-2f88-4c73-98a5-a68c7712ec1f&database=test_10144&query=SELECT+str+FROM+session_temp+LIMIT+10%2C+10+FORMAT+JSON%3B'
}
QueryCursor {
  query: 'INSERT INTO session_temp2 (val, str)',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params INSERT INTO session_temp2 (val, str) {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=750adabb-cf73-4d5e-bbce-41a0bdca2596&database=test_10144&query=INSERT+INTO+session_temp2+%28val%2C+str%29+FORMAT+TabSeparated'
}
cnt: 9
i: 2
QueryCursor {
  query: 'SELECT str FROM session_temp LIMIT 20, 10',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params SELECT str FROM session_temp LIMIT 20, 10 {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=d307a4ae-8448-4d1e-aaa9-5e8950af6d01&database=test_10144&query=SELECT+str+FROM+session_temp+LIMIT+20%2C+10+FORMAT+JSON%3B'
}
QueryCursor {
  query: 'INSERT INTO session_temp2 (val, str)',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params INSERT INTO session_temp2 (val, str) {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=0e6e19f8-b609-434e-968c-36f486b3c11f&database=test_10144&query=INSERT+INTO+session_temp2+%28val%2C+str%29+FORMAT+TabSeparated'
}
cnt: 9
QueryCursor {
  query: 'SELECT count(*) AS count FROM session_temp2',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params SELECT count(*) AS count FROM session_temp2 {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=4a52cb00-0e09-4324-8ddf-065fd36dc5e8&database=test_10144&query=SELECT+count%28*%29+AS+count+FROM+session_temp2+FORMAT+JSON%3B'
}
QueryCursor.exec: result SELECT count(*) AS count FROM session_temp2 null {
  statusCode: 200,
  body: '{\n' +
    '\t"meta":\n' +
    '\t[\n' +
    '\t\t{\n' +
    '\t\t\t"name": "count",\n' +
    '\t\t\t"type": "UInt64"\n' +
    '\t\t}\n' +
    '\t],\n' +
    '\n' +
    '\t"data":\n' +
    '\t[\n' +
    '\t\t{\n' +
    '\t\t\t"count": 9\n' +
    '\t\t}\n' +
    '\t],\n' +
    '\n' +
    '\t"rows": 1,\n' +
    '\n' +
    '\t"statistics":\n' +
    '\t{\n' +
    '\t\t"elapsed": 0.00054679,\n' +
    '\t\t"rows_read": 1,\n' +
    '\t\t"bytes_read": 4104\n' +
    '\t}\n' +
    '}\n',
  statusMessage: 'OK',
  headers: {
    server: 'openresty/1.15.8.2',
    date: 'Tue, 07 Sep 2021 15:42:22 GMT',
    'content-type': 'application/json; charset=UTF-8',
    'transfer-encoding': 'chunked',
    connection: 'close',
    vary: 'Accept-Encoding',
    'strict-transport-security': 'max-age=15724800; includeSubDomains',
    'x-clickhouse-server-display-name': 'clickhouse-6dc9d5fb8c-4vfkp',
    'x-clickhouse-query-id': '4a52cb00-0e09-4324-8ddf-065fd36dc5e8',
    'x-clickhouse-format': 'JSON',
    'x-clickhouse-timezone': 'UTC',
    'x-clickhouse-summary': '{"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}'
  }
}
    1) restreaming with chunks

QueryCursor {
  query: 'DROP DATABASE IF EXISTS test_10144',
  data: undefined,
  opts: { format: 'json', raw: false }
}
QueryCursor._getReqParams: params DROP DATABASE IF EXISTS test_10144 {
  headers: { 'Content-Type': 'text/plain' },
  strictSSL: false,
  url: 'https://host:port/?session_timeout=60&output_format_json_quote_64bit_integers=0&enable_http_compression=0&query_id=bcb9dfe0-d53c-4fcf-9609-7abc28bd9ecf&database=test_10144&query=DROP+DATABASE+IF+EXISTS+test_10144'
}
QueryCursor.exec: result DROP DATABASE IF EXISTS test_10144 null {
  statusCode: 200,
  body: '',
  statusMessage: 'OK',
  headers: {
    server: 'openresty/1.15.8.2',
    date: 'Tue, 07 Sep 2021 15:42:22 GMT',
    'content-type': 'text/plain; charset=UTF-8',
    'transfer-encoding': 'chunked',
    connection: 'close',
    vary: 'Accept-Encoding',
    'strict-transport-security': 'max-age=15724800; includeSubDomains',
    'x-clickhouse-server-display-name': 'clickhouse-6dc9d5fb8c-4vfkp',
    'x-clickhouse-summary': '{"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0"}'
  }
}

  0 passing (276ms)
  1 failing

  1) queries
       restreaming with chunks:

      Error: expected 27 to sort of equal 9
      + expected - actual

      -27
      +9
      
      at Assertion.assert (node_modules/expect.js/index.js:96:13)
      at Assertion.eql (node_modules/expect.js/index.js:230:10)
      at Context.<anonymous> (test/test.js:679:33)
      at processTicksAndRejections (node:internal/process/task_queues:96:5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant