Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into hudi-spec-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
JeraldJF committed Nov 12, 2024
2 parents 2e50acf + fe69a8b commit 9be0bc4
Show file tree
Hide file tree
Showing 16 changed files with 52 additions and 101 deletions.
1 change: 1 addition & 0 deletions api-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"@babel/traverse": "7.23.2"
},
"devDependencies": {
"@types/busboy": "^1.5.4",
"@types/chai": "^4.3.3",
"@types/chai-as-promised": "^7.1.5",
"@types/chai-spies": "^1.0.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,15 @@ const readyForPublish = async (dataset: Record<string, any>, updated_by: any) =>
let defaultConfigs: any = _.cloneDeep(defaultDatasetConfig)
defaultConfigs = _.omit(defaultConfigs, ["router_config"])
defaultConfigs = _.omit(defaultConfigs, "dedup_config.dedup_key");
if (_.get(draftDataset, "dataset_config.keys_config")) {
defaultConfigs = _.omit(defaultConfigs, "dataset_config.keys_config");
}
if (draftDataset?.type === "master") {
defaultConfigs = _.omit(defaultConfigs, "dataset_config.keys_config.data_key");
defaultConfigs = _.omit(defaultConfigs, "dataset_config.cache_config.redis_db");
}
_.set(draftDataset, "updated_by", updated_by);
_.mergeWith(draftDataset, defaultConfigs, draftDataset, (objValue, srcValue, key) => {
_.mergeWith(draftDataset, draftDataset, defaultConfigs, (objValue, srcValue, key) => {
if (key === "created_by" || key === "updated_by") {
if (objValue !== "SYSTEM") {
return objValue;
Expand All @@ -107,6 +110,7 @@ const readyForPublish = async (dataset: Record<string, any>, updated_by: any) =>
return objValue;
}
});

const datasetValid: Record<string, any> = schemaValidation(draftDataset, ReadyToPublishSchema)
if (!datasetValid.isValid) {
throw {
Expand Down
60 changes: 4 additions & 56 deletions api-service/src/controllers/GenerateSignedURL/GenerateSignedURL.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import logger from "../../logger";
import { ErrorObject } from "../../types/ResponseModel";
import { schemaValidation } from "../../services/ValidationService";
import GenerateURL from "./GenerateSignedURLValidationSchema.json"
import { cloudProvider } from "../../services/CloudServices";
import { config } from "../../configs/Config";
import { URLAccess } from "../../types/SampleURLModel";
import { v4 as uuidv4 } from "uuid";
import path from "path";
import { generatePreSignedUrl } from "./helper";

export const apiId = "api.files.generate-url"
export const code = "FILES_GENERATE_URL_FAILURE"
const maxFiles = config.presigned_url_configs.maxFiles
let containerType: string;

const generateSignedURL = async (req: Request, res: Response) => {
const requestBody = req.body
const msgid = _.get(req, ["body", "params", "msgid"]);
const resmsgid = _.get(res, "resmsgid");
containerType = _.get(req, ["body", "request", "type"]);
try {
const isRequestValid: Record<string, any> = schemaValidation(req.body, GenerateURL)
if (!isRequestValid.isValid) {
Expand All @@ -46,21 +46,7 @@ const generateSignedURL = async (req: Request, res: Response) => {
errCode: "BAD_REQUEST"
} as ErrorObject, req, res);
}

const { filesList, updatedFileNames } = transformFileNames(files, access)
logger.info(`Updated file names with path:${updatedFileNames}`)

const urlExpiry: number = getURLExpiry(access)
const preSignedUrls = await Promise.all(cloudProvider.generateSignedURLs(config.cloud_config.container, updatedFileNames, access, urlExpiry))
const signedUrlList = _.map(preSignedUrls, list => {
const fileNameWithUid = _.keys(list)[0]
return {
filePath: getFilePath(fileNameWithUid),
fileName: filesList.get(fileNameWithUid),
preSignedUrl: _.values(list)[0]
}
})

const signedUrlList = await generatePreSignedUrl(access, files, containerType)
logger.info({ apiId, requestBody, msgid, resmsgid, response: signedUrlList, message: `Sample urls generated successfully for files:${files}` })
ResponseHandler.successResponse(req, res, { status: httpStatus.OK, data: signedUrlList })
} catch (error: any) {
Expand All @@ -74,44 +60,6 @@ const generateSignedURL = async (req: Request, res: Response) => {
}
}

const getFilePath = (file: string) => {
return `${config.cloud_config.container}/${config.presigned_url_configs.service}/user_uploads/${file}`
}

const transformFileNames = (fileList: Array<string | any>, access: string): Record<string, any> => {
if (access === URLAccess.Read) {
return transformReadFiles(fileList)
}
return transformWriteFiles(fileList)
}

const transformReadFiles = (fileNames: Array<string | any>) => {
const fileMap = new Map();
const updatedFileNames = _.map(fileNames, file => {
fileMap.set(file, file)
return getFilePath(file)
})
return { filesList: fileMap, updatedFileNames }
}

const transformWriteFiles = (fileNames: Array<string | any>) => {
const fileMap = new Map();
const updatedFileNames = _.map(fileNames, file => {
const uuid = uuidv4().replace(/-/g, "").slice(0, 6);
const ext = path.extname(file)
const baseName = path.basename(file, ext)
const updatedFileName = `${baseName}_${uuid}${ext}`
fileMap.set(updatedFileName, file)
return getFilePath(updatedFileName)
})
return { filesList: fileMap, updatedFileNames }

}

const getURLExpiry = (access: string) => {
return access === URLAccess.Read ? config.presigned_url_configs.read_storage_url_expiry : config.presigned_url_configs.write_storage_url_expiry
}

const checkLimitExceed = (files: Array<string>): boolean => {
return _.size(files) > maxFiles
}
Expand Down
5 changes: 5 additions & 0 deletions api-service/src/helpers/ResponseHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { onFailure, onObsrvFailure, onSuccess } from "../metrics/prometheus/help
import moment from "moment";
import _ from "lodash";
import { ObsrvError } from "../types/ObsrvError";
import logger from "../logger";

const ResponseHandler = {
successResponse: (req: Request, res: Response, result: Result) => {
Expand All @@ -26,6 +27,8 @@ const ResponseHandler = {

errorResponse: (error: Record<string, any>, req: Request, res: Response) => {
const { statusCode, message, errCode, code = "INTERNAL_SERVER_ERROR", trace = "" } = error;
const sanitizedError = { ...error, proxyAuthKey: "REDACTED" };
logger.error(sanitizedError)
const { id, entity, body } = req as any;
const msgid = _.get(body, ["params", "msgid"])
const resmsgid = _.get(res, "resmsgid")
Expand All @@ -37,6 +40,8 @@ const ResponseHandler = {

obsrvErrorResponse: (error: ObsrvError, req: Request, res: Response) => {
const { statusCode, message, errCode, code = "INTERNAL_SERVER_ERROR", data } = error;
const sanitizedError = { ...error, proxyAuthKey: "REDACTED" };
logger.error(sanitizedError)
const { id, entity, body } = req as any;
const msgid = _.get(body, ["params", "msgid"])
const resmsgid = _.get(res, "resmsgid")
Expand Down
3 changes: 2 additions & 1 deletion api-service/src/routes/Router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import { OperationType, telemetryAuditStart } from "../services/telemetry";
import telemetryActions from "../telemetry/telemetryActions";
import datasetMetrics from "../controllers/DatasetMetrics/DatasetMetricsController";
import checkRBAC from "../middlewares/RBAC_middleware";
import connectorRegisterController from "../controllers/ConnectorRegister/ConnectorRegisterController";

export const router = express.Router();

Expand Down Expand Up @@ -58,7 +59,7 @@ router.post("/datasets/copy", setDataToRequestObject("api.datasets.copy"), onReq
router.post("/connectors/list", setDataToRequestObject("api.connectors.list"), onRequest({ entity: Entity.Management }), telemetryAuditStart({action: telemetryActions.listConnectors, operationType: OperationType.GET}), checkRBAC.handler(), ConnectorsList);
router.get("/connectors/read/:id", setDataToRequestObject("api.connectors.read"), onRequest({entity: Entity.Management }), telemetryAuditStart({action: telemetryActions.readConnectors, operationType: OperationType.GET}), checkRBAC.handler(), ConnectorsRead);
router.post("/datasets/import", setDataToRequestObject("api.datasets.import"), onRequest({ entity: Entity.Management }), checkRBAC.handler(), DatasetImport);

router.post("/connector/register", setDataToRequestObject("api.connector.register"), onRequest({ entity: Entity.Management }), connectorRegisterController);
//Wrapper Service
router.post("/obsrv/data/sql-query", setDataToRequestObject("api.obsrv.data.sql-query"), onRequest({ entity: Entity.Data_out }), checkRBAC.handler(), sqlQuery);
router.post("/data/metrics", setDataToRequestObject("api.data.metrics"), onRequest({ entity: Entity.Data_out }), datasetMetrics)
Expand Down
10 changes: 5 additions & 5 deletions api-service/src/services/DatasetService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,8 @@ class DatasetService {
private createHudiDataSource = async (draftDataset: Record<string, any>, transaction: Transaction) => {

const {created_by, updated_by} = draftDataset;
const allFields = await tableGenerator.getAllFieldsHudi(draftDataset, "hudi");
const draftDatasource = this.createDraftDatasource(draftDataset, "hudi");
const allFields = await tableGenerator.getAllFieldsHudi(draftDataset, "datalake");
const draftDatasource = this.createDraftDatasource(draftDataset, "datalake");
const ingestionSpec = tableGenerator.getHudiIngestionSpecForCreate(draftDataset, allFields, draftDatasource.datasource_ref);
_.set(draftDatasource, "ingestion_spec", ingestionSpec)
_.set(draftDatasource, "created_by", created_by);
Expand All @@ -356,9 +356,9 @@ class DatasetService {
private updateHudiDataSource = async (draftDataset: Record<string, any>, transaction: Transaction) => {

const {created_by, updated_by} = draftDataset;
const allFields = await tableGenerator.getAllFieldsHudi(draftDataset, "hudi");
const draftDatasource = this.createDraftDatasource(draftDataset, "hudi");
const dsId = _.join([draftDataset.dataset_id, "events", "hudi"], "_")
const allFields = await tableGenerator.getAllFieldsHudi(draftDataset, "datalake");
const draftDatasource = this.createDraftDatasource(draftDataset, "datalake");
const dsId = _.join([draftDataset.dataset_id, "events", "datalake"], "_")
const liveDatasource = await Datasource.findOne({ where: { id: dsId }, attributes: ["ingestion_spec"], raw: true }) as unknown as Record<string, any>
const ingestionSpec = tableGenerator.getHudiIngestionSpecForUpdate(draftDataset, liveDatasource?.ingestion_spec, allFields, draftDatasource?.datasource_ref);
_.set(draftDatasource, "ingestion_spec", ingestionSpec)
Expand Down
7 changes: 3 additions & 4 deletions api-service/src/services/TableGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,9 @@ class TableGenerator extends BaseTableGenerator {
return {
dataset: dataset.dataset_id,
schema: {
table: datasourceRef,
table: _.includes(datasourceRef, '-')

Check failure on line 197 in api-service/src/services/TableGenerator.ts

View workflow job for this annotation

GitHub Actions / test-cases

Strings must use doublequote
? _.replace(datasourceRef, /-/g, '_')

Check failure on line 198 in api-service/src/services/TableGenerator.ts

View workflow job for this annotation

GitHub Actions / test-cases

Strings must use doublequote
: datasourceRef,
partitionColumn: partitionKey,
timestampColumn: timestampKey,
primaryKey: primaryKey,
Expand Down Expand Up @@ -233,9 +235,6 @@ class TableGenerator extends BaseTableGenerator {
private getHudiColumnSpec = (allFields: Record<string, any>[], primaryKey: string, partitionKey: string, timestampKey: string): Record<string, any>[] => {

Check failure on line 235 in api-service/src/services/TableGenerator.ts

View workflow job for this annotation

GitHub Actions / test-cases

'primaryKey' is defined but never used. Allowed unused args must match /^_/u

Check failure on line 235 in api-service/src/services/TableGenerator.ts

View workflow job for this annotation

GitHub Actions / test-cases

'partitionKey' is defined but never used. Allowed unused args must match /^_/u

Check failure on line 235 in api-service/src/services/TableGenerator.ts

View workflow job for this annotation

GitHub Actions / test-cases

'timestampKey' is defined but never used. Allowed unused args must match /^_/u

const dataFields = _.cloneDeep(allFields);
_.remove(dataFields, { name: primaryKey })
_.remove(dataFields, { name: partitionKey })
_.remove(dataFields, { name: timestampKey })
let index = 1;
const transformFields = _.map(dataFields, (field) => {
return {
Expand Down
13 changes: 4 additions & 9 deletions command-service/helm-charts/flink-connector/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,10 @@ serviceMonitor:
jobLabel: "app.kubernetes.io/name"
port: prom

flink_jobs:
kafka-connector-1-0-0:
registry: sanketikahub
repository: flink-connectors
tag: 1.1.0
enabled: "false"
connector_id: "kafka-connector-1.0.0"
source: "kafka-connector-1.0.0"
main_program: "kafka-connector-1.0.0.jar"

# override flink_jobs
# flink_jobs:


commonAnnotations:
reloader.stakater.com/auto: "true"
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{{- define "base.cronReleaseName" -}}
{{- $name := printf "%s--%s" .Chart.Name .Values.instance_id }}
{{- $name := printf "%s" .Values.instance_id }}
{{- default .Values.instance_id $name }}
{{- end }}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ spec:
- |
# Wait for the Spark pod to be ready
SPARK_POD=$(kubectl get pods -l app.kubernetes.io/name=spark,app.kubernetes.io/component=master -o jsonpath='{.items[0].metadata.name}')
kubectl exec -it $SPARK_POD -- bash -c "/opt/bitnami/spark/bin/spark-submit --master={{ .Values.spark.master.host }} --conf spark.pyspark.driver.python={{ .Values.python_path }} --conf spark.pyspark.python={{ .Values.python_path }} --jars /data/connectors/{{ .Values.connector_source }}/libs/\* /data/connectors/{{ .Values.connector_source }}/{{ .Values.main_file }} -f /data/conf/connectors-python-config.yaml -c {{ .Values.instance_id }}"
kubectl exec -it $SPARK_POD -- bash -c "/opt/bitnami/spark/bin/spark-submit --master=local[*] --conf spark.pyspark.driver.python={{ .Values.python_path }} --conf spark.pyspark.python={{ .Values.python_path }} --jars /data/connectors/{{ .Values.connector_source }}/libs/\* /data/connectors/{{ .Values.connector_source }}/{{ .Values.main_file }} -f /data/conf/connectors-python-config.yaml -c {{ .Values.instance_id }}"
{{- end }}
{{- with .Values.sidecars }}
{{- toYaml . | nindent 12 }}
Expand Down
26 changes: 12 additions & 14 deletions command-service/src/command/connector_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ def execute(self, command_payload: CommandPayload, action: Action):

def _deploy_connectors(self, dataset_id, active_connectors, is_masterdata):
result = None
self._stop_connector_jobs(is_masterdata, self.connector_job_config["spark"]["namespace"])
self._stop_connector_jobs(is_masterdata, self.connector_job_config["spark"]["namespace"], active_connectors, dataset_id)
result = self._install_jobs(dataset_id, active_connectors, is_masterdata)

return result

def _stop_connector_jobs(self, is_masterdata, namespace):
def _stop_connector_jobs(self, is_masterdata, namespace, active_connectors, dataset_id):
print(f"Uninstalling jobs for {namespace}..")
base_helm_chart = self.connector_job_config["spark"]["base_helm_chart"]

Expand All @@ -61,13 +61,13 @@ def _stop_connector_jobs(self, is_masterdata, namespace):
helm_ls_result = subprocess.run(
helm_ls_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)

if helm_ls_result.returncode == 0:
jobs = helm_ls_result.stdout.decode()
for job in jobs.splitlines()[1:]:
release_name = job.split()[0]
if base_helm_chart in job:
print("Uninstalling job {0}".format(release_name))
jobs = helm_ls_result.stdout.decode().splitlines()[1:]
job_names = {job.split()[0] for job in jobs if base_helm_chart in job}
spark_connector = {connector.id for connector in active_connectors if connector.connector_runtime == "spark"}
for release_name in spark_connector:
if release_name in job_names:
print(f"Uninstalling job {release_name} related to dataset'{dataset_id}'...")
helm_uninstall_cmd = [
"helm",
"uninstall",
Expand All @@ -81,12 +81,10 @@ def _stop_connector_jobs(self, is_masterdata, namespace):
stderr=subprocess.PIPE,
)
if helm_uninstall_result.returncode == 0:
print(f"Successfully uninstalled job {release_name}...")
print(f"Successfully uninstalled job '{release_name}'...")
else:
print(
f"Error uninstalling job {release_name}: {helm_uninstall_result.stderr.decode()}"
)

print(f"Error uninstalling job '{release_name}': {helm_uninstall_result.stderr.decode()}")

def _install_jobs(self, dataset_id, active_connectors, is_masterdata):
result = None
for connector in active_connectors:
Expand Down Expand Up @@ -277,7 +275,7 @@ def _perform_spark_install(self, dataset_id, connector_instance):
def _get_connector_details(self, dataset_id):
active_connectors = []
query = f"""
SELECT ci.id, ci.connector_id, ci.operations_config, cr.runtime as connector_runtime, cr.source as connector_source, cr.technology, cr.version
SELECT ci.id, ci.connector_id, ci.dataset_id, ci.operations_config, cr.runtime as connector_runtime, cr.source as connector_source, cr.technology, cr.version
FROM connector_instances ci
JOIN connector_registry cr on ci.connector_id = cr.id
WHERE ci.status= %s and ci.dataset_id = %s
Expand Down
3 changes: 2 additions & 1 deletion command-service/src/command/connector_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def process_metadata(self, rel_path, connector_source) -> RegistryResponse:
def execute_query(self, query, params) -> bool:
try:
result = self.db_service.execute_upsert(sql=query, params=params)
print(f"Connector Registry | {result} rows affected")
return result > 0 # Assuming the result is the number of affected rows
except Exception as e:
print(
Expand Down Expand Up @@ -426,7 +427,7 @@ def update_connector_registry(self, _id, ver):
f"UPDATE connector_registry SET status = 'Retired', updated_date = now() WHERE connector_id = %s AND status = 'Live' AND version != %s", (_id, ver)
)
print(
f"Connector Registry | Updated {result} existing rows with connector_id: {_id} and version: {ver}"
f"Connector Registry | Retired {result} versions for connector_id: {_id} and version: {ver}"
)
except Exception as e:
print(
Expand Down
9 changes: 4 additions & 5 deletions command-service/src/command/dataset_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(
self.http_service = http_service
self.config_service_host = self.config.find("config_service.host")
self.config_service_port = self.config.find("config_service.port")
self.base_url = f"http://{self.config_service_host}:{self.config_service_port}/datasets/v1/export"
self.base_url = f"http://{self.config_service_host}:{self.config_service_port}/v2/datasets/export"

def _get_draft_dataset_record(self, dataset_id):
query = f"""
Expand Down Expand Up @@ -66,10 +66,9 @@ def _check_for_live_record(self, dataset_id):
def audit_live_dataset(self, command_payload: CommandPayload, ts: int):
dataset_id = command_payload.dataset_id
dataset_record, data_version = self._check_for_live_record(dataset_id)
export_dataset = self.http_service.post(
url=self.base_url,
body=json.dumps({"dataset_id": dataset_id}),
headers={"Content-Type": "application/json"},
url=self.base_url + '/{}'.format(dataset_id)
export_dataset = self.http_service.get(
url=url
)
if export_dataset.status == 200:
result = json.loads(export_dataset.body)
Expand Down
1 change: 0 additions & 1 deletion command-service/src/command/db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ def _insert_connector_instances(self, dataset_id, draft_dataset_record):
current_timestamp,
current_timestamp,
current_timestamp,

connector_config.connector_config,
json.dumps(connector_config.operations_config).replace("'", "''"),
draft_dataset_record.get('updated_by'),
Expand Down
1 change: 1 addition & 0 deletions command-service/src/model/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class ConnectorRegsitryv2:
class ConnectorInstance:
id: str
connector_id: str
dataset_id: str
operations_config: dict
connector_runtime: str
connector_source: dict
Expand Down
Loading

0 comments on commit 9be0bc4

Please sign in to comment.