From 71d530c2d8cea389af784656e19f030e0e0e782f Mon Sep 17 00:00:00 2001 From: MactavishCui Date: Fri, 22 Nov 2024 00:52:59 +0800 Subject: [PATCH 1/3] front end impl --- dinky-web/src/locales/zh-CN/pages.ts | 14 + .../SqlTask/TaskConfig/index.tsx | 13 + .../SqlTask/TaskConfig/testCase/index.tsx | 244 ++++++++++++++++++ dinky-web/src/services/endpoints.tsx | 6 +- 4 files changed, 276 insertions(+), 1 deletion(-) create mode 100644 dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/TaskConfig/testCase/index.tsx diff --git a/dinky-web/src/locales/zh-CN/pages.ts b/dinky-web/src/locales/zh-CN/pages.ts index c38f7f6b0c..adc1417094 100644 --- a/dinky-web/src/locales/zh-CN/pages.ts +++ b/dinky-web/src/locales/zh-CN/pages.ts @@ -456,6 +456,20 @@ export default { 'pages.datastudio.label.execConfig.mocksink': '开启SinkMock', 'pages.datastudio.label.execConfig.mocksink.tip': '将SinkFunction进行Mock,调试过程中不会向线上环境执行写入,但可以通过dinky预览Sink结果', + 'pages.datastudio.label.execConfig.mocksource': '开启SourceMock', + 'pages.datastudio.label.execConfig.mocksource.tip': + '基于任务血缘对任务所读取的表单进行Mock输入所给定的测试用例', + 'pages.datastudio.label.execConfig.mocksource.getTestCase': '获取表结构与测试用例', + 'pages.datastudio.label.execConfig.mocksource.saveTestCase': '保存测试用例', + 'pages.datastudio.label.execConfig.mocksource.handleRowData': '操作', + 'pages.datastudio.label.execConfig.mocksource.editeRowData': '编辑', + 'pages.datastudio.label.execConfig.mocksource.deleteRowData': '删除', + 'pages.datastudio.label.execConfig.mocksource.testcase': '测试用例', + 'pages.datastudio.label.execConfig.mocksource.testcase.tips': + '上传当前任务的测试用例,仅在SourceMock开启时有效', + 'pages.datastudio.label.execConfig.mocksource.testcase.top': '添加到顶部', + 'pages.datastudio.label.execConfig.mocksource.testcase.bottom': '添加到底部', + 'pages.datastudio.label.execConfig.mocksource.testcase.hidden': '隐藏', 'pages.datastudio.label.jobConfig': '作业配置', 'pages.datastudio.label.jobConfig.addConfig': '添加配置项', 'pages.datastudio.label.jobConfig.addConfig.params': '参数', diff --git a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/TaskConfig/index.tsx b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/TaskConfig/index.tsx index fb7b02c581..5e93c74fd6 100644 --- a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/TaskConfig/index.tsx +++ b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/TaskConfig/index.tsx @@ -27,6 +27,7 @@ import { TaskState, TempData } from '@/pages/DataStudio/type'; import { BasicConfig } from '@/pages/DataStudio/CenterTabContent/SqlTask/TaskConfig/BasicConfig'; import { isSql, assert } from '@/pages/DataStudio/utils'; import { JOB_LIFE_CYCLE } from '@/pages/DevOps/constants'; +import { TestCaseFlinkSqlConfig } from "@/pages/DataStudio/CenterTabContent/SqlTask/TaskConfig/testCase"; export default (props: { tempData: TempData; @@ -87,6 +88,18 @@ export default (props: { }} {...SWITCH_OPTIONS()} /> + + }} + {...SWITCH_OPTIONS()} + /> + + + ); } diff --git a/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/TaskConfig/testCase/index.tsx b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/TaskConfig/testCase/index.tsx new file mode 100644 index 0000000000..1b84090b4e --- /dev/null +++ b/dinky-web/src/pages/DataStudio/CenterTabContent/SqlTask/TaskConfig/testCase/index.tsx @@ -0,0 +1,244 @@ +import {TaskState} from "@/pages/DataStudio/type"; +import React, {useState} from "react"; +import {handleOption} from "@/services/BusinessCrud"; +import {API_CONSTANTS} from "@/services/endpoints"; +import {EditableProTable, ProColumns} from "@ant-design/pro-table"; +import {Button, Tabs, Tooltip} from "antd"; +import {l} from "@/utils/intl"; +import {StateType} from "rmc-input-number"; +import {connect} from "umi"; + + +export const TestCaseFlinkSqlConfig = (props: { params: TaskState }) => { + const { + params: { taskId, statement }, + users + } = props; + // 测试用例相关Type定义 + // Flink Sql任务的测试用例type,一个任务拥有多个输入表 + type TestCaseList = TestCase[]; + // 每张表的测试用例type,同一张表拥有多个column相同的测试用例 + type TestCase = { + tableName?: string; + columns?: string[]; + rowData?: TestCaseRowData[]; + }; + // 每个测试用例 + type TestCaseRowData = { + dinky_mock_source_row_id: React.Key; + [key: string]: any; + }; + + // 测试用例state,用于渲染表单 + const [testCaseList, setTestCaseList] = useState([]); + + // 表单编辑相关 + // Modal是否可见 + const [modalVisible, setModalVisible] = useState(false); + // 可编辑的数据 + const [editableKeys, setEditableRowKeys] = useState([]); + // 待分配的最小行id + const [rowKeyId, setRowKeyId] = useState(1000); + // 当前正在操作的行数据 + const [rowData, setRowData] = useState([]); + // 插入位置 + const [position, setPosition] = useState<'top' | 'bottom' | 'hidden'>('bottom'); + + // 表单开启、关闭时重置state + const handleOpenChange = (open: boolean) => { + setModalVisible(open); + setEditableRowKeys([]); + setTestCaseList([]); + setRowKeyId(1000); + setRowData([]); + setPosition('bottom'); + } + + //初始化,获取source表结构及已经保存的测试用例 + const handleFormInit = async () => { + handleOpenChange(true); + + const res = await handleOption( + API_CONSTANTS.LIST_TASK_TEST_CASE_BY_STATEMENT, + l('pages.datastudio.label.execConfig.mocksource.getTestCase'), + { + taskId: taskId, + statement: statement + } + ); + + // generate row id + const testCaseListWithId: TestCase[] = []; + const originTestCaseList: TestCase[] = res?.data?.taskTableInputList; + let rowDataRowKeyId: number = rowKeyId; + originTestCaseList?.forEach((tableScheme) => { + tableScheme?.rowData?.forEach((row) => { + row.dinky_mock_source_row_id = rowDataRowKeyId++; + }); + testCaseListWithId.push({ + tableName: tableScheme.tableName, + columns: tableScheme.columns, + rowData: tableScheme.rowData + }); + }); + + setTestCaseList(testCaseListWithId); + setRowKeyId(rowKeyId); + }; + + // 保存测试用例 + const handleFinish = async () => { + await handleOption( + API_CONSTANTS.SAVE_OR_UPDATE_TEST_CASE, + l('pages.datastudio.label.execConfig.mocksource.saveTestCase'), + { + taskId: taskId, + statement: testCaseList + } + ); + return true; + }; + + // 基于表的行数据渲染表单 + const getProColumns = (columns: string[]) => { + const proColumns: ProColumns[] = []; + columns.forEach((columnName) => { + proColumns.push({ + title: columnName, + dataIndex: columnName + }); + }); + proColumns.push({ + title: l('pages.datastudio.label.execConfig.mocksource.handleRowData'), + valueType: 'option', + width: 200, + render: (text, record, _, action) => [ + { + action?.startEditable?.(record.dinky_mock_source_row_id); + }} + >{l('pages.datastudio.label.execConfig.mocksource.editeRowData')}, + { + setTestCaseList( + testCaseList.map((testCase) => { + const newRowData = testCase.rowData.filter( + (row) => row.dinky_mock_source_row_id !== record.dinky_mock_source_row_id + ); + return {...testCase, rowData: newRowData}; + }) + ); + }} + >{('pages.datastudio.label.execConfig.mocksource.deleteRowData')} + ] + }); + return proColumns; + }; + + //表单数据发生改变时,更新state + const handleDataChange = async (targetTableName: string, targetRowKey: any, newRowData: any) => { + setTestCaseList((prevTestCase) => + prevTestCase.map((testCase) => { + if (testCase.tableName === targetTableName) { + const rowData = testCase.rowData ? [...testCase.rowData] : []; + let findExistedRowData = false; + + // 更新或添加新数据 + const updatedRowData = rowData.map((item) => { + // 存在相同rowKey时更新数据 + if (item.dinky_mock_source_row_id === targetRowKey) { + findExistedRowData = true; + return newRowData; // 返回新数据 + } + return item; // 返回原数据 + }); + // 如果没有找到匹配项,则添加 + if (!findExistedRowData) { + updatedRowData.push(newRowData); + } + // 返回更新后的 testCase + return {...testCase, rowData: updatedRowData}; + } + return testCase; + }) + ); + //保存 + await handleFinish(); + }; + + return ( + <> + + + + + + {testCaseList.map((data, index) => { + return ( + + + rowKey='dinky_mock_source_row_id' + scroll={{x: 960}} + recordCreatorProps={ + position !== 'hidden' + ? { + position: position as 'top', + record: () => ( + { + dinky_mock_source_row_id: rowKeyId + } + ) + }: false + } + loading={false} + toollBarRender={() => [ + setPosition(e.target.value) + }} + options={[ + { + label: l('pages.datastudio.label.execConfig.mocksource.testcase.top'), + value: 'top' + }, + { + label: l('pages.datastudio.label.execConfig.mocksource.testcase.bottom'), + value: 'bottom' + }, + { + label: l('pages.datastudio.label.execConfig.mocksource.testcase.hidden'), + value: 'hidden' + } + ]} + /> + ]} + columns={getProColumns(data.columns ?? [])} + value={data.rowData} + edutable={{ + type: 'multiple', + editableKeys, + onSave: async (rowKey, rowData, row) => { + await handleDataChange(data.tableName ?? '', rowKey, rowData); + }, + onChange: setEditableRowKeys + }} + /> + + ); + })} + + + + ); +}; diff --git a/dinky-web/src/services/endpoints.tsx b/dinky-web/src/services/endpoints.tsx index b42731dbd5..76d7de7b48 100644 --- a/dinky-web/src/services/endpoints.tsx +++ b/dinky-web/src/services/endpoints.tsx @@ -300,5 +300,9 @@ export enum API_CONSTANTS { FLINK_CONF_CONFIG_OPTIONS = '/api/flinkConf/configOptions', // ------------------------------------ suggestion ------------------------------------ - SUGGESTION_QUERY_ALL_SUGGESTIONS = '/api/suggestion/queryAllSuggestions' + SUGGESTION_QUERY_ALL_SUGGESTIONS = '/api/suggestion/queryAllSuggestions', + + // ------------------------------------ test case ------------------------------------ + LIST_TASK_TEST_CASE_BY_STATEMENT = "/api/testcase/listTaskTestCaseByStatement", + SAVE_OR_UPDATE_TEST_CASE ='/api/testCase/save0rUpdateTestcase' } From 0e32b385d36901633f6df9a0c1718ef78b46e538 Mon Sep 17 00:00:00 2001 From: MactavishCui Date: Thu, 28 Nov 2024 01:07:44 +0800 Subject: [PATCH 2/3] test case CRUD; Mock source connector and part of the implement of Mock statement explainer; --- .../org/dinky/controller/APIController.java | 8 + .../controller/TaskTestCaseController.java | 65 ++++++++ .../dinky/data/dto/TaskTestCaseListDTO.java | 29 ++++ .../org/dinky/data/model/TableTestCase.java | 49 ++++++ .../org/dinky/mapper/TaskTestCaseMapper.java | 7 + .../dinky/service/TaskTestCaseService.java | 16 ++ .../service/impl/TaskTestCaseServiceImpl.java | 139 ++++++++++++++++++ .../mock/source/MockDynamicTableSource.java | 67 +++++++++ .../source/MockDynamicTableSourceFactory.java | 60 ++++++++ .../mock/source/MockSourceFunction.java | 82 +++++++++++ .../explainer/lineage/LineageBuilder.java | 13 ++ .../mock/MockStatementExplainer.java | 60 ++++++-- .../org.apache.flink.table.factories.Factory | 1 + 13 files changed, 586 insertions(+), 10 deletions(-) create mode 100644 dinky-admin/src/main/java/org/dinky/controller/TaskTestCaseController.java create mode 100644 dinky-admin/src/main/java/org/dinky/data/dto/TaskTestCaseListDTO.java create mode 100644 dinky-admin/src/main/java/org/dinky/data/model/TableTestCase.java create mode 100644 dinky-admin/src/main/java/org/dinky/mapper/TaskTestCaseMapper.java create mode 100644 dinky-admin/src/main/java/org/dinky/service/TaskTestCaseService.java create mode 100644 dinky-admin/src/main/java/org/dinky/service/impl/TaskTestCaseServiceImpl.java create mode 100644 dinky-core/src/main/java/org/dinky/connector/mock/source/MockDynamicTableSource.java create mode 100644 dinky-core/src/main/java/org/dinky/connector/mock/source/MockDynamicTableSourceFactory.java create mode 100644 dinky-core/src/main/java/org/dinky/connector/mock/source/MockSourceFunction.java diff --git a/dinky-admin/src/main/java/org/dinky/controller/APIController.java b/dinky-admin/src/main/java/org/dinky/controller/APIController.java index 6b0ac4c0fc..298d02d817 100644 --- a/dinky-admin/src/main/java/org/dinky/controller/APIController.java +++ b/dinky-admin/src/main/java/org/dinky/controller/APIController.java @@ -27,6 +27,7 @@ import org.dinky.data.enums.BusinessType; import org.dinky.data.enums.Status; import org.dinky.data.exception.NotSupportExplainExcepition; +import org.dinky.data.model.TableTestCase; import org.dinky.data.model.job.JobInstance; import org.dinky.data.result.ProTableResult; import org.dinky.data.result.Result; @@ -40,6 +41,7 @@ import java.util.List; +import org.dinky.service.TaskTestCaseService; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; @@ -69,6 +71,7 @@ public class APIController { private final TaskService taskService; private final JobInstanceService jobInstanceService; + private final TaskTestCaseService taskTestCaseService; @GetMapping("/version") @ApiOperation(value = "Query Service Version", notes = "Query Dinky Service Version Number") @@ -217,4 +220,9 @@ public Result getTaskLineage(@RequestParam Integer id) { public ProTableResult listJobInstances(@RequestBody JsonNode para) { return jobInstanceService.listJobInstances(para); } + + @GetMapping("/getTableTestCase") + public Result getTableTestCase(@RequestParam Integer taskId, @RequestParam String tableName) { + return Result.succeed(taskTestCaseService.getTestCaseByTaskIdAndTableName(taskId,tableName)); + } } diff --git a/dinky-admin/src/main/java/org/dinky/controller/TaskTestCaseController.java b/dinky-admin/src/main/java/org/dinky/controller/TaskTestCaseController.java new file mode 100644 index 0000000000..8cae65b65f --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/controller/TaskTestCaseController.java @@ -0,0 +1,65 @@ +package org.dinky.controller; + +import cn.dev33.satoken.annotation.SaCheckLogin; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dinky.data.annotations.CheckTaskOwner; +import org.dinky.data.annotations.TaskId; +import org.dinky.data.dto.TaskTestCaseListDTO; +import org.dinky.data.enums.Status; +import org.dinky.data.result.Result; +import org.dinky.service.TaskTestCaseService; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@Slf4j +@RestController +@Api(tags = "Task Test Case Controller") +@RequestMapping("/api/testCase") +@SaCheckLogin +@RequiredArgsConstructor +public class TaskTestCaseController { + private final TaskTestCaseService taskTestCaseService; + + @PostMapping("/listTaskTestCaseByStatement") + @ApiOperation("Query Test cases Bv TaskId") + @ApiImplicitParam( + name = "TaskTestcaseDTO", + value = "Task Test case DTO", + dataType = "TaskTestcaseDTO", + paramType = "body", + required = true + ) + @CheckTaskOwner(checkParam = TaskId.class, checkInterface = TaskTestCaseService.class) + public Result getTestCaseBasedOnStatement(@RequestBody TaskTestCaseListDTO taskTestCaseListDTO) { + return Result.succeed(taskTestCaseService.getTestCaseBasedOnStatement(taskTestCaseListDTO)); + } + + @PostMapping("/saveDrUpdateTestCase") + @ApiOperation("Save Or Update Test Cases By TaskId") + @ApiImplicitParam( + name = "taskTestCaseDTO", + value = "task id and test cases of every source table", + dataType = "TaskTestCaseDTO", + paramType = "body", + required = true + ) + @CheckTaskOwner(checkParam = TaskId.class, checkInterface = TaskTestCaseService.class) + public Result saveOrUpdateTestCase(@RequestBody TaskTestCaseListDTO taskTestCaseListDTO) { + try { + taskTestCaseService.savOrUpdateTestCase(taskTestCaseListDTO); + return Result.succeed(Status.MODIFY_SUCCESS); + } catch (Exception e) { + log.error(e.getMessage()); + return Result.failed(Status.MODIFY_FAILED); + + } + } +} + + diff --git a/dinky-admin/src/main/java/org/dinky/data/dto/TaskTestCaseListDTO.java b/dinky-admin/src/main/java/org/dinky/data/dto/TaskTestCaseListDTO.java new file mode 100644 index 0000000000..54fe05b510 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/data/dto/TaskTestCaseListDTO.java @@ -0,0 +1,29 @@ +package org.dinky.data.dto; + + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.dinky.data.annotations.TaskId; +import org.dinky.data.model.TableTestCase; + +import java.util.List; + +@EqualsAndHashCode(callSuper = true) +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@ApiModel(value = "TaskTestCaseDTo", description = "Task Test Case DTo") +public class TaskTestCaseListDTO extends AbstractStatementDTO { + + @ApiModelProperty(value = "taskId", required = true, dataType = "Integer") + @TaskId + private Integer taskId; + @ApiModelProperty(value = "testCaseList", required = true, dataType = "List", allowEmptyValue = true) + private List taskTableInputList; +} diff --git a/dinky-admin/src/main/java/org/dinky/data/model/TableTestCase.java b/dinky-admin/src/main/java/org/dinky/data/model/TableTestCase.java new file mode 100644 index 0000000000..b44a973390 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/data/model/TableTestCase.java @@ -0,0 +1,49 @@ +package org.dinky.data.model; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import org.dinky.data.typehandler.JSONObjectHandler; + +import java.util.List; +import java.util.Map; + +/** + * Test case for a table in a task + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@Builder +@EqualsAndHashCode +@TableName("dinky_task_test_case") +@ApiModel(value = "Table Test Case", description = "Test cases for a table in a task") +public class TableTestCase { + @TableId(type = IdType.AUTO) + @ApiModelProperty(value = "ID", dataType = "Integer", example = "1", notes = "Unique identifier for a test case") + private Integer id; + + @ApiModelProperty(value = "Task Id", dataType = "Integer", example = "1", notes = "test case related task id") + private Integer taskId; + + @ApiModelProperty(value = "Task Name", dataType = "String", example = "inputTableName", notes = "test case related table name in a task") + private String tableName; + + @ApiModelProperty(value = "columns", dataType = "String", example = "[\"columnA\", \"columnB\"]", notes = "table columns") + @TableField(typeHandler = JSONObjectHandler.class) + private List columns; + + @ApiModelProperty(value = "row data", dataType = "String", example = "[{\"columnA\":\"valueA\", \"columnB\":\"valueB\"}]") + @TableField(typeHandler = JSONObjectHandler.class) + private List> rowData; + + +} diff --git a/dinky-admin/src/main/java/org/dinky/mapper/TaskTestCaseMapper.java b/dinky-admin/src/main/java/org/dinky/mapper/TaskTestCaseMapper.java new file mode 100644 index 0000000000..90d54108b0 --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/mapper/TaskTestCaseMapper.java @@ -0,0 +1,7 @@ +package org.dinky.mapper; + +import org.dinky.data.model.TableTestCase; +import org.dinky.mybatis.mapper.SuperMapper; + +public interface TaskTestCaseMapper extends SuperMapper { +} diff --git a/dinky-admin/src/main/java/org/dinky/service/TaskTestCaseService.java b/dinky-admin/src/main/java/org/dinky/service/TaskTestCaseService.java new file mode 100644 index 0000000000..ed6809324b --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/service/TaskTestCaseService.java @@ -0,0 +1,16 @@ +package org.dinky.service; + +import org.dinky.data.dto.TaskTestCaseListDTO; +import org.dinky.data.model.TableTestCase; +import org.dinky.mybatis.service.ISuperService; + +public interface TaskTestCaseService extends ISuperService { + + TaskTestCaseListDTO getTestCaseBasedOnStatement(TaskTestCaseListDTO taskTestCaseListDTO); + + void savOrUpdateTestCase(TaskTestCaseListDTO taskTestCaseListDTO); + + Boolean checkTaskOperatePermission(Integer taskId); + + TableTestCase getTestCaseByTaskIdAndTableName(Integer taskId, String tableName); +} diff --git a/dinky-admin/src/main/java/org/dinky/service/impl/TaskTestCaseServiceImpl.java b/dinky-admin/src/main/java/org/dinky/service/impl/TaskTestCaseServiceImpl.java new file mode 100644 index 0000000000..892696ba5d --- /dev/null +++ b/dinky-admin/src/main/java/org/dinky/service/impl/TaskTestCaseServiceImpl.java @@ -0,0 +1,139 @@ +package org.dinky.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.dinky.assertion.Asserts; +import org.dinky.data.dto.TaskTestCaseListDTO; +import org.dinky.data.model.TableTestCase; +import org.dinky.executor.ExecutorConfig; +import org.dinky.explainer.lineage.LineageBuilder; +import org.dinky.explainer.lineage.LineageColumn; +import org.dinky.explainer.lineage.LineageTable; +import org.dinky.mapper.TaskTestCaseMapper; +import org.dinky.mybatis.service.impl.SuperServiceImpl; +import org.dinky.service.TaskService; +import org.dinky.service.TaskTestCaseService; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@Service +@RequiredArgsConstructor +@Slf4j +public class TaskTestCaseServiceImpl extends SuperServiceImpl implements TaskTestCaseService { + + private final TaskService taskService; + + @Override + public TaskTestCaseListDTO getTestCaseBasedOnStatement(TaskTestCaseListDTO taskTestCaseListDTO) { + // generate source tables and schemes + List tableschemeParsedFromstatementlist = LineageBuilder.getSourceTablesByLogicalPlan(taskTestCaseListDTO.getStatement(), ExecutorConfig.DEFAULT); + + // Merged test case list + List mergedTestCaseList = new ArrayList<>(); + + // get existed test cases from database + List testCaseFromDbList = baseMapper.selectList( + new LambdaQueryWrapper().eq(TableTestCase::getTaskId, taskTestCaseListDTO.getTaskId())); + Map testCaseFromDbMap = new HashMap<>(); + testCaseFromDbList.forEach((testCase) -> testCaseFromDbMap.put(testCase.getTableName(), testCase)); + + // ergodic every table and fetch test case row data as much as possible + for (LineageTable tableScheme : tableschemeParsedFromstatementlist) { + // table parsed from the latest statement + String tableName = tableScheme.getName(); + TableTestCase mergedTableTestCase = TableTestCase + .builder() + .tableName(tableName) + .columns(tableScheme.getColumns().stream().map(LineageColumn::getName).collect(Collectors.toList())) + .rowData(new ArrayList<>()) + .build(); + mergedTestCaseList.add(mergedTableTestCase); + // try to reuse test cases + if (testCaseFromDbMap.containsKey(tableName)) { + Set latestColumnSet = new HashSet<>(mergedTableTestCase.getColumns()); + List> rowDataListFromDb = testCaseFromDbMap.get(tableName).getRowData(); + // handle every row data + mergedTableTestCase.getRowData().addAll(getRowDataCanBeUsedInNewScheme(latestColumnSet, rowDataListFromDb)); + } + } + + return TaskTestCaseListDTO.builder().taskTableInputList(mergedTestCaseList).build(); + } + + @Override + public void savOrUpdateTestCase(TaskTestCaseListDTO taskTestCaseListDTO) { + List testCaseList = taskTestCaseListDTO.getTaskTableInputList(); + + Map tableNameTestCaseMap = new HashMap<>(); + if (Asserts.isNotNull(testCaseList)) { + // task id init + testCaseList.forEach((tableTestCase -> { + tableTestCase.setTaskId(taskTestCaseListDTO.getTaskId()); + tableNameTestCaseMap.put(tableTestCase.getTableName(), tableTestCase); + })); + // update id in order to update existed records + List existedTestCaselist = baseMapper.selectList(new LambdaQueryWrapper().eq(TableTestCase::getTaskId, taskTestCaseListDTO.getTaskId())); + // delete test cases that do not exist in latest input source tables + List deleteTestCaseIdlist = new ArrayList<>(); + + for (TableTestCase existedTestCase : existedTestCaselist) { + String existedTableName = existedTestCase.getTableName(); + if (tableNameTestCaseMap.containsKey(existedTableName)) { + // based on existed id, existed table test case will be updated + TableTestCase tableTestCase = tableNameTestCaseMap.get(existedTableName); + tableTestCase.setId(existedTestCase.getId()); + } else { + deleteTestCaseIdlist.add(existedTestCase.getId()); + } + } + + if (Asserts.isNotNullCollection(testCaseList)) { + baseMapper.insertOrUpdate(testCaseList); + } + if (Asserts.isNotNullCollection(deleteTestCaseIdlist)) { + baseMapper.deleteByIds(deleteTestCaseIdlist); + } + + } + + } + + @Override + public Boolean checkTaskOperatePermission(Integer taskId) { + return taskService.checkTaskOperatePermission(taskId); + } + + @Override + public TableTestCase getTestCaseByTaskIdAndTableName(Integer taskId, String tableName) { + return baseMapper.selectOne(new LambdaQueryWrapper().eq(TableTestCase::getTaskId, taskId).eq(TableTestCase::getTableName, tableName)); + } + + private List> getRowDataCanBeUsedInNewScheme + (Set latestColumnSet, List> existingList) { + List> mergedRowDataList = new ArrayList<>(); + // handle every row data + for (Map rowData : existingList) { + Map mergedRowData = new HashMap<>(); + for (Map.Entry entry : rowData.entrySet()) { + String existedColumnName = entry.getKey(); + String existedColumnData = entry.getValue(); + // keep the column data that still contains in latest scheme + if (latestColumnSet.contains(existedColumnName)) { + mergedRowData.put(existedColumnName, existedColumnData); + } + } + if (Asserts.isNotNullMap(mergedRowData)) { + mergedRowDataList.add(mergedRowData); + } + } + return mergedRowDataList; + } +} diff --git a/dinky-core/src/main/java/org/dinky/connector/mock/source/MockDynamicTableSource.java b/dinky-core/src/main/java/org/dinky/connector/mock/source/MockDynamicTableSource.java new file mode 100644 index 0000000000..edce127d93 --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/connector/mock/source/MockDynamicTableSource.java @@ -0,0 +1,67 @@ +package org.dinky.connector.mock.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +public class MockDynamicTableSource implements ScanTableSource { + + private final String hostName; + + private final String port; + + private final Integer taskId; + + private final String tableName; + + private final DecodingFormat> decodingFormat; + + private final DataType producedDataType; + + public MockDynamicTableSource(String hostName, String port, Integer taskId, String tableName, DecodingFormat> decodingFormat, DataType producedDataType) { + this.hostName = hostName; + this.port = port; + this.taskId = taskId; + this.tableName = tableName; + this.decodingFormat = decodingFormat; + this.producedDataType = producedDataType; + } + + @Override + public ChangelogMode getChangelogMode() { + return decodingFormat.getChangelogMode(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + // create runtime classes that are shipped to the cluster + final DeserializationSchema deserializer = decodingFormat.createRuntimeDecoder( + scanContext, + producedDataType + ); + final SourceFunction sourceFuntion = new MockSourceFunction( + hostName, + port, + taskId, + tableName, + deserializer + ); + return SourceFunctionProvider.of(sourceFuntion, false); + } + + @Override + public DynamicTableSource copy() { + return null; + } + + @Override + public String asSummaryString() { + return ""; + } +} diff --git a/dinky-core/src/main/java/org/dinky/connector/mock/source/MockDynamicTableSourceFactory.java b/dinky-core/src/main/java/org/dinky/connector/mock/source/MockDynamicTableSourceFactory.java new file mode 100644 index 0000000000..7280dec9a6 --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/connector/mock/source/MockDynamicTableSourceFactory.java @@ -0,0 +1,60 @@ +package org.dinky.connector.mock.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.json.JsonFormatFactory; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class MockDynamicTableSourceFactory implements DynamicTableSourceFactory { + + public static final String IDENTIFIER = "dinky-mock-source"; + + public static final ConfigOption HOST_NAME = ConfigOptions.key("hostName").stringType().noDefaultValue(); + public static final ConfigOption PORT = ConfigOptions.key("port").stringType().noDefaultValue(); + public static final ConfigOption TASK_ID = ConfigOptions.key("taskId").intType().noDefaultValue(); + public static final ConfigOption TABLE_NAME_BEFORE_MOCK = ConfigOptions.key("port").stringType().noDefaultValue(); + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + JsonFormatFactory jsonFormatFactory = new JsonFormatFactory(); + DecodingFormat> decodingFormat = jsonFormatFactory.createDecodingFormat(context, new Configuration()); + final ReadableConfig options = helper.getOptions(); + final DataType producedDataType = + context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(); + return new MockDynamicTableSource(options.get(HOST_NAME), + options.get(PORT), + options.get(TASK_ID), + options.get(TABLE_NAME_BEFORE_MOCK), + decodingFormat, + producedDataType); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return new HashSet<>(Arrays.asList(HOST_NAME, TASK_ID, TABLE_NAME_BEFORE_MOCK)); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/dinky-core/src/main/java/org/dinky/connector/mock/source/MockSourceFunction.java b/dinky-core/src/main/java/org/dinky/connector/mock/source/MockSourceFunction.java new file mode 100644 index 0000000000..6dee785cce --- /dev/null +++ b/dinky-core/src/main/java/org/dinky/connector/mock/source/MockSourceFunction.java @@ -0,0 +1,82 @@ +package org.dinky.connector.mock.source; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.table.data.RowData; +import org.apache.http.HttpEntity; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.dinky.assertion.Asserts; +import org.dinky.data.exception.DinkyException; +import org.dinky.utils.JsonUtils; + +import java.text.MessageFormat; +import java.util.LinkedList; +import java.util.List; + +public class MockSourceFunction extends RichSourceFunction { + + private final String hostname; + + private final Integer taskId; + private final String tableName; + private final String port; + private final DeserializationSchema deserializer; + private final List testCaseRowData; + private final String TEST_CASE_OPEN_API_TEMPLATE = "http://{0}:{1}/openapi/getTableTestCase?taskId={2}&tableName={3}"; + + + public MockSourceFunction(String hostname, + String port, + Integer taskId, + String tableName, + DeserializationSchema deserializer) { + this.hostname = hostname; + this.port = port; + this.taskId = taskId; + this.tableName = tableName; + this.deserializer = deserializer; + this.testCaseRowData = new LinkedList<>(); + } + + @Override + public void open(OpenContext parameters) throws Exception { + super.open(parameters); + // generate test case for current table by open api + CloseableHttpClient httpClient = HttpClients.createDefault(); + HttpGet httpGet = new HttpGet(MessageFormat.format(TEST_CASE_OPEN_API_TEMPLATE, hostname, port, taskId, tableName)); + // http execute + CloseableHttpResponse response = httpClient.execute(httpGet); + int statusCode = response.getStatusLine().getStatusCode(); + HttpEntity entity = response.getEntity(); + //check response + if (statusCode == 200 && Asserts.isNotNull(entity)) { + ObjectNode jsonNodes = JsonUtils.parseObject(EntityUtils.toString(entity)); + ArrayNode arrayNode = (ArrayNode) jsonNodes.get("data").get("rowData"); + for (JsonNode jsonNode : arrayNode) { + //deserialize test case + testCaseRowData.add(deserializer.deserialize(jsonNode.toString().getBytes())); + } + } else { + throw new DinkyException("Get test case for table " + tableName + " failed, response " + response); + } + } + + + @Override + public void run(SourceContext sourceContext) throws Exception { + testCaseRowData.forEach(sourceContext::collect); + } + + @Override + public void cancel() { + // do nothing + } +} diff --git a/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java b/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java index 8dcb03e069..3ad765eb35 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java +++ b/dinky-core/src/main/java/org/dinky/explainer/lineage/LineageBuilder.java @@ -29,8 +29,11 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** * LineageBuilder @@ -109,4 +112,14 @@ public static LineageResult getColumnLineageByLogicalPlan(String statement, Expl List tables = new ArrayList<>(tableMap.values()); return LineageResult.build(tables, relations); } + + public static List getSourceTablesByLogicalPlan(String statement, ExecutorConfig executorConfig) { + LineageResult lineageResult = LineageBuilder.getColumnLineageByLogicalPlan(statement, executorConfig); + List relations = lineageResult.getRelations(); + // get source tables based on lineage + Set sourceTableIdSet = new HashSet<>(); + relations.forEach((relation) -> + sourceTableIdSet.add(relation.getSrcTableId())); + return lineageResult.getTables().stream().filter((table) -> sourceTableIdSet.contains(table.getId())).collect(Collectors.toList()); + } } diff --git a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java index ad9a316fae..6e0f040288 100644 --- a/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java +++ b/dinky-core/src/main/java/org/dinky/explainer/mock/MockStatementExplainer.java @@ -19,32 +19,39 @@ package org.dinky.explainer.mock; -import org.dinky.connector.mock.sink.MockDynamicTableSinkFactory; -import org.dinky.data.job.JobStatement; -import org.dinky.data.job.JobStatementType; -import org.dinky.data.job.SqlType; -import org.dinky.executor.CustomTableEnvironment; -import org.dinky.job.JobStatementPlan; -import org.dinky.utils.JsonUtils; - +import lombok.extern.slf4j.Slf4j; import org.apache.calcite.config.Lex; +import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlInsert; +import org.apache.calcite.sql.SqlJoin; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.dialect.AnsiSqlDialect; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.util.SqlBasicVisitor; +import org.apache.calcite.sql.util.SqlVisitor; import org.apache.commons.lang3.StringUtils; import org.apache.flink.sql.parser.ddl.SqlCreateTable; +import org.dinky.connector.mock.sink.MockDynamicTableSinkFactory; +import org.dinky.data.job.JobStatement; +import org.dinky.data.job.JobStatementType; +import org.dinky.data.job.SqlType; +import org.dinky.executor.CustomTableEnvironment; +import org.dinky.executor.ExecutorConfig; +import org.dinky.explainer.lineage.LineageBuilder; +import org.dinky.explainer.lineage.LineageTable; +import org.dinky.job.JobStatementPlan; +import org.dinky.utils.JsonUtils; import java.text.MessageFormat; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; - -import lombok.extern.slf4j.Slf4j; +import java.util.stream.Collectors; @Slf4j public class MockStatementExplainer { @@ -100,6 +107,39 @@ private void mockSink(JobStatementPlan jobStatementPlan) { log.debug("Mock sink succeed: {}", JsonUtils.toJsonString(jobStatementPlan)); } + /** + * TODO implement source table mock based on sql statement context + * + * @param jobStatementPlan + */ + private void mockSource(JobStatementPlan jobStatementPlan) { + List sourceTables = LineageBuilder.getSourceTablesByLogicalPlan(jobStatementPlan.getStatements(), ExecutorConfig.DEFAULT); + Set targetTableNames = sourceTables.stream().map((source) -> source.getName().split("\\.")[2]).collect(Collectors.toSet());// modify trans + SqlVisitor visitor = new SqlBasicVisitor() { + @Override + public SqlNode visit(SqlCall call) { + //If directly select from or join a table + if (call instanceof SqlSelect) { + SqlNode from = ((SqlSelect) call).getFrom(); + if (from instanceof SqlIdentifier) { + // mockTransTableName((SqlIdentifier)from,mockedTableName); + } + } else if (call instanceof SqlJoin) { + SqlJoin sqlJoin = (SqlJoin) call; + // left right check + } + //Recursive processing of nested queries + List operandList = call.getOperandList(); + for (SqlNode sqlNode : operandList) { + if (sqlNode instanceof SqlCall) { + visit((SqlCall) sqlNode); + } + } + return call; + } + }; + } + /** * get tables names of insert statements, these tables will be mocked * diff --git a/dinky-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/dinky-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 73603caf76..cfa8196fae 100644 --- a/dinky-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/dinky-core/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -1,2 +1,3 @@ org.dinky.connector.printnet.sink.PrintNetDynamicTableSinkFactory org.dinky.connector.mock.sink.MockDynamicTableSinkFactory +org.dinky.connector.mock.source.MockDynamicTableSourceFactory From d5e8cdf432c6a5736594bdb927407ffa05ff846d Mon Sep 17 00:00:00 2001 From: MactavishCui Date: Thu, 28 Nov 2024 01:21:55 +0800 Subject: [PATCH 3/3] my sql ddl --- .../mysql/V20241128.1.3.0__release.sql | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) create mode 100644 dinky-admin/src/main/resources/db/migration/mysql/V20241128.1.3.0__release.sql diff --git a/dinky-admin/src/main/resources/db/migration/mysql/V20241128.1.3.0__release.sql b/dinky-admin/src/main/resources/db/migration/mysql/V20241128.1.3.0__release.sql new file mode 100644 index 0000000000..c721044d92 --- /dev/null +++ b/dinky-admin/src/main/resources/db/migration/mysql/V20241128.1.3.0__release.sql @@ -0,0 +1,20 @@ +SET NAMES utf8mb4; +SET FOREIGN_KEY_CHECKS = 0; + +-- ---------------------------- +-- Table structure for dinky_task_test_case +-- ---------------------------- +CREATE TABLE IF NOT EXISTS `dinky_task_test_case` ( + `id` INT(11) NOT NULL AUTO_INCREMENT COMMENT 'id', + `task_id` INT(11) NOT NULL COMMENT 'taskId', + `table_name` VARCHAR(255) CHARACTER SET Utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'table name', + `columns` MEDIUMTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT 'columns', + `row_data` MEDIUMTEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT 'row data', + `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', + PRIMARY KEY (`id`), + UNIQUE INDEX `id_table_name_un_idx1` (`task_id`, `table_name`) USING BTREE, + FOREIGN KEY (`task_id`) REFERENCES `dinky_task` (`id`) ON DELETE CASCADE +) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 'Test Case' ROW_FORMAT = Dynamic; + + +SET FOREIGN_KEY_CHECKS = 1; \ No newline at end of file