Skip to content

Commit

Permalink
Add debug tables to improve Flink debugging
Browse files Browse the repository at this point in the history
The yaml table format makes it tough to do local debugging. The updated create_ngsild_tables.py script
is now providing an additional file 'ngsild.flinksql.debu' which contains the SQL compliant tables and views
which can be applied to the flink client directly.

Signed-off-by: marcel <[email protected]>
  • Loading branch information
wagmarcel authored and abhijith-hr committed Oct 1, 2024
1 parent 64acfe7 commit 2ee4292
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 2 deletions.
2 changes: 1 addition & 1 deletion semantic-model/shacl2flink/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ helm: build
mkdir -p ${HELM_DIR}
rm -rf ${HELM_DIR}/templates
mv ${OUTPUTDIR} ${HELM_DIR}/templates
rm -f ${HELM_DIR}/templates/*.sqlite ${HELM_DIR}/templates/core.yaml ${HELM_DIR}/templates/knowledge.yaml
rm -f ${HELM_DIR}/templates/*.sqlite ${HELM_DIR}/templates/core.yaml ${HELM_DIR}/templates/knowledge.yaml ${HELM_DIR}/templates/*.debug
cp Chart.yaml ${HELM_DIR}
test: requirements-dev.txt
Expand Down
5 changes: 4 additions & 1 deletion semantic-model/shacl2flink/create_ngsild_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def main(shaclfile, knowledgefile, output_folder='output'):
config['retention.ms'] = configs.kafka_topic_ngsi_retention
with open(os.path.join(output_folder, "ngsild.yaml"), "w") as f, \
open(os.path.join(output_folder, "ngsild.sqlite"), "w") as sqlitef, \
open(os.path.join(output_folder, "ngsild-kafka.yaml"), "w") as fk:
open(os.path.join(output_folder, "ngsild-kafka.yaml"), "w") as fk, \
open(os.path.join(output_folder, "ngsild.flinksql.debug"), "w") as dt:
for table_name, table in tables.items():
connector = 'kafka'
primary_key = None
Expand Down Expand Up @@ -129,6 +130,8 @@ def main(shaclfile, knowledgefile, output_folder='output'):
f'{configs.kafka_topic_ngsi_prefix}.\
{table_name}', configs.kafka_topic_object_label,
config), fk)
print(utils.create_flink_debug_table(table_name, connector, table, primary_key, kafka, value), file=dt)
print(utils.create_sql_view(table_name, table), file=dt)


if __name__ == '__main__':
Expand Down
41 changes: 41 additions & 0 deletions semantic-model/shacl2flink/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,47 @@ def create_yaml_table(name, connector, table, primary_key, kafka, value):
return yaml_table


def create_flink_debug_table(name, connector, table, primary_key, kafka, value):
sqltable = f'DROP TABLE IF EXISTS `{name}`;\n'
first = True
sqltable += f'CREATE TABLE `{name}` (\n'
tsname = 'ts'
for field in table:
for fname, ftype in field.items():
if fname.lower() == 'watermark':
break
if 'metadata' in ftype.lower() and 'timestamp' in ftype.lower():
ftype = "TIMESTAMP(3) METADATA FROM 'timestamp'"
tsname = fname
if first:
first = False
else:
sqltable += ',\n'
sqltable += f'`{fname}` {ftype}'
sqltable += f", \nwatermark FOR `{tsname}` AS `{tsname}`"
if primary_key is not None:
sqltable += ',\nPRIMARY KEY('
first = True
for key in primary_key:
if first:
first = False
else:
sqltable += ','
sqltable += f'`{key}`'
sqltable += ')\n'
sqltable += ')'
sqltable += ' WITH (\n'
sqltable += "'format' = 'json',\n"
sqltable += f"'connector' = '{connector}',\n"
sqltable += f"'topic' = '{kafka['topic']}',\n"
sqltable += "'scan.startup.mode' = 'earliest-offset'\n"
properties = kafka['properties']
for k in properties:
sqltable += f",\n'properties.{k}' = '{properties[k]}'\n"
sqltable += ');'
return sqltable


def create_sql_table(name, table, primary_key, dialect=SQL_DIALECT. SQL):
sqltable = f'DROP TABLE IF EXISTS `{name}`;\n'
first = True
Expand Down
19 changes: 19 additions & 0 deletions semantic-model/shacl2flink/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,25 @@ def test_create_yaml_table(mock_check, mock_class):
}}


@patch('lib.utils.class_to_obj_name')
@patch('lib.utils.check_dns_name')
def test_create_flink_debug_table(mock_check, mock_class):
mock_check.return_value = True
mock_class.return_value = 'object'
result = utils.create_flink_debug_table('name', 'connector', [{'field': 'type'}, {'ts': 'TIMESTAMP METADATA'}],
{'primary': 'key'}, {'topic': 'topic', 'properties': {'prop1': 'prop1'}},
'value')
assert result == "DROP TABLE IF EXISTS `name`;\nCREATE TABLE `name` (\n`field` type,\n`ts` TIMESTAMP(3) METADATA \
FROM 'timestamp', \nwatermark FOR `ts` AS `ts`,\nPRIMARY KEY(`primary`)\n) WITH (\n'format' = 'json',\n'connector' = \
'connector',\n'topic' = 'topic',\n'scan.startup.mode' = 'earliest-offset'\n,\n'properties.prop1' = 'prop1'\n);"

result = utils.create_flink_debug_table('name', 'connector', [{'field': 'type'}, {'ts': 'TIMESTAMP METADATA'}],
None, {'topic': 'topic', 'properties': {'prop1': 'prop1'}}, 'value')
assert result == "DROP TABLE IF EXISTS `name`;\nCREATE TABLE `name` (\n`field` type,\n`ts` TIMESTAMP(3) METADATA \
FROM 'timestamp', \nwatermark FOR `ts` AS `ts`) WITH (\n'format' = 'json',\n'connector' = 'connector',\n'topic' = \
'topic',\n'scan.startup.mode' = 'earliest-offset'\n,\n'properties.prop1' = 'prop1'\n);"


@patch('lib.utils.class_to_obj_name')
@patch('lib.utils.check_dns_name')
def test_create_yaml_table_exception(mock_check, mock_class):
Expand Down

0 comments on commit 2ee4292

Please sign in to comment.