Skip to content

Commit

Permalink
Remove rb (#54)
Browse files Browse the repository at this point in the history
* Remove row_binary format

* Append unixtime millis to the test db name when using Cloud (#50)

* Append unixtime millis to the test db name when using Cloud

* Fix create_db database name

* Update ignored test branches

Co-authored-by: Geoff Genz <[email protected]>

* Cleanup for row_binary removal

Co-authored-by: Serge Klochkov <[email protected]>
  • Loading branch information
genzgd and slvrtrn authored Oct 7, 2022
1 parent cded1b8 commit f29206f
Show file tree
Hide file tree
Showing 13 changed files with 22 additions and 540 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ Create a ClickHouse client using the `clickhouse_connect.driver.create_client(..
Defaults to the default database for the ClickHouse user
* `compress:bool` Accept compressed data from the ClickHouse server.
Defaults to _True_
* `format:str` _native_ (ClickHouse Native) or _rb_ (ClickHouse Row Binary)
Native format is preferred for performance reasons
* `query_limit:int` LIMIT value added to all queries.
Defaults to 5,000 rows. Setting query_limit=0 will return unlimited results, at the risk of running out of memory
* `connect_timeout:int` HTTP connection timeout in seconds. Default 10 seconds.
Expand Down
12 changes: 5 additions & 7 deletions benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
from typing import List

import clickhouse_connect
from clickhouse_connect import datatypes
from clickhouse_connect.datatypes.format import set_default_formats
from clickhouse_connect.driver.client import Client

columns = {
'uint16': ('UInt16', 1),
'int16': ('Int16', -2),
'uint64': ('UInt64', 32489071615273482),
'float32': ('Float32', 3.14),
'str': ('String', 'hello'),
'fstr': ('FixedString(16)', b'world numkn \nman'),
Expand Down Expand Up @@ -67,7 +68,6 @@ def main():
parser.add_argument('-t', '--tries', help='Total tries for each test', type=int, default=50)
parser.add_argument('-r', '--rows', help='Total rows in dataset', type=int, default=10000)
parser.add_argument('-c', '--columns', help='Column types to test', type=str, nargs='+')
parser.add_argument('-f', '--format', help='HTTP input/output format', type=str, default='native')

args = parser.parse_args()
rows = args.rows
Expand All @@ -83,11 +83,9 @@ def main():
sys.exit()
else:
col_names = standard_cols
client = clickhouse_connect.get_client(compress=False, data_format=args.format)
datatypes.ip_format('string')
datatypes.uuid_format('string')
# datatypes.fixed_string_format('string')
datatypes.big_int_format('string')
client = clickhouse_connect.get_client(compress=False)

set_default_formats('IP*', 'native', '*Int64', 'native')
create_table(client, col_names, rows)
check_reads(client, tries, rows)

Expand Down
37 changes: 2 additions & 35 deletions clickhouse_connect/datatypes/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import array
import logging

from abc import abstractmethod, ABC
from abc import ABC
from math import log
from typing import NamedTuple, Dict, Type, Any, Sequence, MutableSequence, Optional, Union, Tuple

Expand Down Expand Up @@ -71,20 +71,13 @@ def write_format(cls):

def __init__(self, type_def: TypeDef):
"""
Base class constructor that sets Nullable and LowCardinality wrappers and currently assigns the row_binary conversion
functions
Base class constructor that sets Nullable and LowCardinality wrappers
:param type_def: ClickHouseType base configuration parameters
"""
self.type_def = type_def
self.wrappers = type_def.wrappers
self.low_card = 'LowCardinality' in self.wrappers
self.nullable = 'Nullable' in self.wrappers
if self.nullable:
self.from_row_binary = self._nullable_from_row_binary
self.to_row_binary = self._nullable_to_row_binary
else:
self.to_row_binary = self._to_row_binary
self.from_row_binary = self._from_row_binary

def __eq__(self, other):
return other.__class__ == self.__class__ and self.type_def == other.type_def
Expand Down Expand Up @@ -211,26 +204,6 @@ def write_native_data(self, column: Sequence, dest: MutableSequence):
dest += bytes([1 if x is None else 0 for x in column])
self._write_native_binary(column, dest)

@abstractmethod
def _from_row_binary(self, source: Sequence, loc: int):
pass

@abstractmethod
def _to_row_binary(self, value: Any, dest: MutableSequence):
pass

def _nullable_from_row_binary(self, source, loc) -> (Any, int):
if source[loc] == 0:
return self._from_row_binary(source, loc + 1)
return None, loc + 1

def _nullable_to_row_binary(self, value, dest: bytearray):
if value is None:
dest += b'\x01'
else:
dest += b'\x00'
self._to_row_binary(value, dest)

def _read_native_low_card(self, source: Sequence, loc: int, num_rows: int, use_none=True):
if num_rows == 0:
return tuple(), loc
Expand Down Expand Up @@ -349,12 +322,6 @@ def __init__(self, type_def: TypeDef):
super().__init__(type_def)
self._name_suffix = type_def.arg_str

def _from_row_binary(self, *_args):
raise NotSupportedError(f'{self.name} deserialization not supported')

def _to_row_binary(self, *_args):
raise NotSupportedError(f'{self.name} serialization not supported')

def _read_native_binary(self, source: Sequence, loc: int, num_rows: int):
raise NotSupportedError(f'{self.name} deserialization not supported')

Expand Down
71 changes: 4 additions & 67 deletions clickhouse_connect/datatypes/container.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import array
from typing import Dict, Sequence, MutableSequence, Any
from typing import Sequence, MutableSequence

from clickhouse_connect import json_impl
from clickhouse_connect.datatypes.base import ClickHouseType, TypeDef
from clickhouse_connect.driver.common import read_leb128, to_leb128, array_column, must_swap
from clickhouse_connect.driver.common import array_column, must_swap
from clickhouse_connect.datatypes.registry import get_from_name


Expand All @@ -16,20 +16,6 @@ def __init__(self, type_def: TypeDef):
self.element_type = get_from_name(type_def.values[0])
self._name_suffix = f'({self.element_type.name})'

def _from_row_binary(self, source: bytearray, loc: int):
size, loc = read_leb128(source, loc)
values = []
for _ in range(size):
value, loc = self.element_type.from_row_binary(source, loc)
values.append(value)
return values, loc

def _to_row_binary(self, value: Sequence, dest: MutableSequence):
dest += to_leb128(len(value))
conv = self.element_type.to_row_binary
for x in value:
conv(x, dest)

def read_native_prefix(self, source: Sequence, loc: int):
return self.element_type.read_native_prefix(source, loc)

Expand Down Expand Up @@ -85,15 +71,13 @@ def write_native_data(self, column: Sequence, dest: MutableSequence):


class Tuple(ClickHouseType):
_slots = 'element_names', 'element_types', 'from_rb_funcs', 'to_rb_funcs'
_slots = 'element_names', 'element_types'
valid_formats = 'tuple', 'json', 'native' # native is 'tuple' for unnamed tuples, and dict for named tuples

def __init__(self, type_def: TypeDef):
super().__init__(type_def)
self.element_names = type_def.keys
self.element_types = [get_from_name(name) for name in type_def.values]
self.from_rb_funcs = tuple((t.from_row_binary for t in self.element_types))
self.to_rb_funcs = tuple((t.to_row_binary for t in self.element_types))
if self.element_names:
self._name_suffix = f"({', '.join(k + ' ' + str(v) for k, v in zip(type_def.keys, type_def.values))})"
else:
Expand All @@ -107,17 +91,6 @@ def python_type(self):
return str
return dict

def _from_row_binary(self, source: bytes, loc: int):
values = []
for conv in self.from_rb_funcs:
value, loc = conv(source, loc)
values.append(value)
return tuple(values), loc

def _to_row_binary(self, value: Sequence, dest: MutableSequence):
for x, conv in zip(value, self.to_rb_funcs):
conv(x, dest)

def read_native_prefix(self, source: Sequence, loc: int):
for e_type in self.element_types:
loc = e_type.read_native_prefix(source, loc)
Expand Down Expand Up @@ -151,35 +124,15 @@ def write_native_data(self, column: Sequence, dest: MutableSequence):


class Map(ClickHouseType):
_slots = 'key_type', 'value_type', 'key_from_rb', 'key_to_rb', 'value_from_rb', 'value_to_rb'
_slots = 'key_type', 'value_type'
python_type = dict

def __init__(self, type_def: TypeDef):
super().__init__(type_def)
self.key_type = get_from_name(type_def.values[0])
self.key_from_rb, self.key_to_rb = self.key_type.from_row_binary, self.key_type.to_row_binary
self.value_type = get_from_name(type_def.values[1])
self.value_from_rb, self.value_to_rb = self.value_type.from_row_binary, self.value_type.to_row_binary
self._name_suffix = type_def.arg_str

def _from_row_binary(self, source: Sequence, loc: int):
size, loc = read_leb128(source, loc)
values = {}
key_from = self.key_from_rb
value_from = self.value_from_rb
for _ in range(size):
key, loc = key_from(source, loc)
value, loc = value_from(source, loc)
values[key] = value
return values, loc

def _to_row_binary(self, value: Dict, dest: bytearray):
key_to = self.key_to_rb
value_to = self.value_to_rb
for k, v in value.items():
dest += key_to(k, dest)
dest += value_to(v, dest)

def read_native_prefix(self, source: Sequence, loc: int):
loc = self.key_type.read_native_prefix(source, loc)
loc = self.value_type.read_native_prefix(source, loc)
Expand Down Expand Up @@ -233,14 +186,6 @@ def __init__(self, type_def):
cols = [f'{x[0]} {x[1].name}' for x in zip(type_def.keys, self.element_types)]
self._name_suffix = f"({', '.join(cols)})"

def _to_row_binary(self, value: dict, dest: MutableSequence):
self.tuple_array.write_native_data([tuple(sub_row[key] for key in self.element_names) for sub_row in value],
dest)

def _from_row_binary(self, source: Sequence, loc: int):
data, loc = self.tuple_array.from_row_binary(source, loc)
return [dict(zip(self.element_names, x)) for x in data], loc

def read_native_prefix(self, source: Sequence, loc: int):
return self.tuple_array.read_native_prefix(source, loc)

Expand All @@ -261,14 +206,6 @@ def write_native_data(self, column: Sequence, dest: MutableSequence):
class JSON(ClickHouseType):
python_type = dict

def _to_row_binary(self, value: Any, dest: MutableSequence):
value = json_impl.any_to_json(value)
dest += to_leb128(len(value)) + value

def _from_row_binary(self, source: Sequence, loc: int):
# ClickHouse will never return JSON/Object types, just tuples
return None, 0

def write_native_prefix(self, dest: MutableSequence):
dest.append(0x01)

Expand Down
38 changes: 0 additions & 38 deletions clickhouse_connect/datatypes/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,6 @@ def np_type(self):
def python_null(self):
return '' if self.read_format() == 'string' else V4_NULL

def _from_row_binary(self, source: bytes, loc: int):
ipv4 = IPv4Address.__new__(IPv4Address)
ipv4._ip = int.from_bytes(source[loc: loc + 4], 'little')
return ipv4, loc + 4

def _to_row_binary(self, value: [int, IPv4Address, str], dest: bytearray):
if isinstance(value, IPv4Address):
dest += value._ip.to_bytes(4, 'little')
elif isinstance(value, str):
dest += bytes(reversed([int(b) for b in value.split('.')]))
else:
dest += value.to_bytes(4, 'little')

def _read_native_binary(self, source: Sequence, loc: int, num_rows: int):
if self.read_format() == 'string':
return self._from_native_str(source, loc, num_rows)
Expand Down Expand Up @@ -90,31 +77,6 @@ def np_type(self):
def python_null(self):
return '' if self.read_format() == 'string' else V6_NULL

def _from_row_binary(self, source: Sequence, loc: int):
end = loc + 16
int_value = int.from_bytes(source[loc:end], 'big')
if int_value >> 32 == 0xFFFF:
ipv4 = IPv4Address.__new__(IPv4Address)
ipv4._ip = int_value & 0xFFFFFFFF
return ipv4, end
return IPv6Address(int_value), end

def _to_row_binary(self, value: Union[str, IPv4Address, IPv6Address, bytes, bytearray], dest: bytearray):
v4mask = IPV4_V6_MASK
if isinstance(value, str):
if '.' in value:
dest += v4mask + bytes(int(b) for b in value.split('.'))
else:
dest += socket.inet_pton(socket.AF_INET6, value)
elif isinstance(value, IPv4Address):
dest += v4mask + value._ip.to_bytes(4, 'big')
elif isinstance(value, IPv6Address):
dest += value.packed
elif len(value) == 4:
dest += IPV4_V6_MASK + value
else:
dest += value

def _read_native_binary(self, source: Sequence, loc: int, num_rows: int):
if self.read_format() == 'string':
return self._read_native_str(source, loc, num_rows)
Expand Down
Loading

0 comments on commit f29206f

Please sign in to comment.