Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use parameter binding in TDEngine target insertions #536

Merged
merged 28 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
62fa138
Small changes
jond01 Sep 8, 2024
c9fde04
Set pytest marker at the module level
jond01 Sep 8, 2024
00d62c8
Add type hints
jond01 Sep 8, 2024
5576ab8
Upgrade pytest
jond01 Sep 10, 2024
af38afe
Use parameter binding in TDEngine INSERT
jond01 Sep 10, 2024
c7894bb
Upgrade pytest-benchmark
jond01 Sep 10, 2024
b6cbb80
Fix `pytest.skip` usage
jond01 Sep 10, 2024
ed2e2a5
Revert "Fix `pytest.skip` usage"
jond01 Sep 10, 2024
76efedc
Revert "Upgrade pytest-benchmark"
jond01 Sep 10, 2024
14e3b7e
Revert "Upgrade pytest"
jond01 Sep 10, 2024
41ab446
Suppress type hint for old pytest
jond01 Sep 10, 2024
95d7e82
Remove a redundant parameter
jond01 Sep 11, 2024
95f0d73
Move error class to dtypes
jond01 Sep 11, 2024
095785a
Rename fun -> func
jond01 Sep 11, 2024
968f019
val_names -> regular_column_names
jond01 Sep 11, 2024
8013c32
Add `_TDEngineField` named tuple
jond01 Sep 11, 2024
087cc4d
Add `_to_tag` and `_to_column` mappings
jond01 Sep 11, 2024
c7da19c
Rename `_TDEngineField` -> `_TDEngineFieldData`
jond01 Sep 11, 2024
368fc69
Get TDEngine schema from table/super-table
jond01 Sep 11, 2024
81d2fb0
format
jond01 Sep 11, 2024
1149e63
Improve type hints
jond01 Sep 11, 2024
21dbb5d
empty
jond01 Sep 11, 2024
2badda2
Call `DESCRIBE` once in the `_init` method
jond01 Sep 12, 2024
8ab298f
Rename "field data" to "field"
jond01 Sep 12, 2024
667e64a
Check first instead of try-except
jond01 Sep 12, 2024
dddcfaf
Validate DB and table names
jond01 Sep 12, 2024
dfce184
Improve ms comment
jond01 Sep 17, 2024
a4d55d2
Rewrite the test into `test_get_table_schema`
jond01 Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 63 additions & 17 deletions integration/test_tdengine.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,33 @@
import os
from datetime import datetime
from collections.abc import Iterator
from datetime import datetime, timezone
from typing import Optional

import pytest
import pytz
import taosws

from storey import SyncEmitSource, build_flow
from storey.targets import TDEngineTarget

url = os.getenv("TDENGINE_URL")
url = os.getenv("TDENGINE_URL") # e.g.: taosws://root:taosdata@localhost:6041
user = os.getenv("TDENGINE_USER")
password = os.getenv("TDENGINE_PASSWORD")
has_tdengine_credentials = all([url, user, password]) or (url and url.startswith("taosws"))
has_tdengine_credentials = all([url, user, password]) or (url and url.startswith("taosws://"))

pytestmark = pytest.mark.skipif(not has_tdengine_credentials, reason="Missing TDEngine URL, user, and/or password")

@pytest.fixture()
def tdengine():
TDEngineData = tuple[taosws.Connection, str, Optional[str], Optional[str], str, str]


@pytest.fixture(params=[10])
def tdengine(request: "pytest.FixtureRequest") -> Iterator[TDEngineData]:
db_name = "storey"
supertable_name = "test_supertable"

if url.startswith("taosws"):
if url.startswith("taosws://"):
connection = taosws.connect(url)
else:

connection = taosws.connect(
url=url,
user=user,
password=password,
)
connection = taosws.connect(url=url, user=user, password=password)

try:
connection.execute(f"DROP DATABASE {db_name};")
Expand All @@ -44,7 +44,9 @@ def tdengine():
if "STable not exist" not in str(err):
raise err

connection.execute(f"CREATE STABLE {supertable_name} (time TIMESTAMP, my_string NCHAR(10)) TAGS (my_int INT);")
connection.execute(
f"CREATE STABLE {supertable_name} (time TIMESTAMP, my_string NCHAR({request.param})) TAGS (my_int INT);"
)

# Test runs
yield connection, url, user, password, db_name, supertable_name
Expand All @@ -55,8 +57,7 @@ def tdengine():


@pytest.mark.parametrize("table_col", [None, "$key", "table"])
@pytest.mark.skipif(not has_tdengine_credentials, reason="Missing TDEngine URL, user, and/or password")
def test_tdengine_target(tdengine, table_col):
def test_tdengine_target(tdengine: TDEngineData, table_col: Optional[str]) -> None:
connection, url, user, password, db_name, supertable_name = tdengine
time_format = "%d/%m/%y %H:%M:%S UTC%z"

Expand Down Expand Up @@ -116,7 +117,7 @@ def test_tdengine_target(tdengine, table_col):
if typ == "TIMESTAMP":
t = datetime.fromisoformat(row[field_index])
# websocket returns a timestamp with the local time zone
t = t.astimezone(pytz.UTC).replace(tzinfo=None)
t = t.astimezone(timezone.utc).replace(tzinfo=None)
row[field_index] = t
result_list.append(row)
if table_col:
Expand All @@ -133,3 +134,48 @@ def test_tdengine_target(tdengine, table_col):
[datetime(2019, 9, 18, 1, 55, 14), "hello4", 4],
]
assert result_list == expected_result


@pytest.mark.parametrize("tdengine", [100], indirect=["tdengine"])
def test_sql_injection(tdengine: TDEngineData) -> None:
connection, url, user, password, db_name, supertable_name = tdengine
# Create another table to be dropped via SQL injection
tb_name = "dont_drop_me"
connection.execute(f"CREATE TABLE IF NOT EXISTS {tb_name} USING {supertable_name} TAGS (101);")
extra_table_query = f"SHOW TABLES LIKE '{tb_name}';"
assert list(connection.query(extra_table_query)), "The extra table was not created"

# Try dropping the table
table_name = "test_table"
table_col = "table"
controller = build_flow(
[
SyncEmitSource(),
TDEngineTarget(
url=url,
time_col="time",
columns=["my_string"],
user=user,
password=password,
database=db_name,
table_col=table_col,
supertable=supertable_name,
tag_cols=["my_int"],
time_format="%d/%m/%y %H:%M:%S UTC%z",
max_events=10,
),
]
).run()

date_time_str = "18/09/19 01:55:1"
for i in range(5):
timestamp = f"{date_time_str}{i} UTC-0000"
subtable_name = f"{table_name}{i}"
event_body = {"time": timestamp, "my_int": i, "my_string": f"s); DROP TABLE {tb_name};"}
event_body[table_col] = subtable_name
controller.emit(event_body)

controller.terminate()
controller.await_termination()

assert list(connection.query(extra_table_query)), "The extra table was dropped"
23 changes: 21 additions & 2 deletions storey/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from datetime import datetime, timezone
from enum import Enum
from typing import Callable, List, Optional, Union
from typing import Callable, List, Literal, NamedTuple, Optional, Union

import numpy

Expand Down Expand Up @@ -103,6 +103,14 @@ class FlowError(Exception):
pass


class TDEngineTypeError(TypeError):
pass


class TDEngineValueError(ValueError):
pass


class WindowBase:
def __init__(self, window, period, window_str):
self.window_millis = window
Expand Down Expand Up @@ -446,3 +454,14 @@ def should_aggregate(self, element):
class FixedWindowType(Enum):
CurrentOpenWindow = 1
LastClosedWindow = 2


class _TDEngineField(NamedTuple):
field: str
# https://docs.tdengine.com/reference/taos-sql/data-type/
type: Literal["TIMESTAMP", "INT", "FLOAT", "DOUBLE", "BINARY", "BOOL", "NCHAR", "JSON", "VARCHAR"]
length: int
note: Literal["", "TAG"]
encode: str
compress: str
level: str
Loading
Loading