Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(cli): add avro util test cases #1752

Merged
merged 2 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions cli/src/__tests__/utils/avro.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import { clearCache, getAvroType, serializeAvroToBuffer, deserializeBufferToAvro } from '../../utils/avro'
import logWrapper from '../../utils/logWrapper'
import { expect, describe, it } from '@jest/globals'
import * as avsc from 'avsc'
import * as path from 'path'

// Mock the logWrapper and process.exit
jest.mock('../../utils/logWrapper', () => ({
fail: jest.fn(),
}))
const mockExit = jest.spyOn(process, 'exit').mockImplementation((code?: number) => {
throw new Error(`Process exited with code ${code}`)
})

const mockSchemaPath = path.join(__dirname, 'mockData/mockAvroSchema.avsc')
const schema: avsc.Type = avsc.Type.forSchema({
type: 'record',
name: 'SensorData',
fields: [
{ name: 'deviceId', type: 'string' },
{ name: 'sensorType', type: 'string' },
{ name: 'value', type: 'double' },
{ name: 'timestamp', type: 'long' },
],
})
expect(avsc.Type.isType(schema)).toBe(true)

const jsonMessage = '{"deviceId":"123456", "sensorType": "Temperature", "value": 22.5, "timestamp": 16700}'
const message = JSON.parse(jsonMessage)

// tests:
describe('avro', () => {
beforeEach(() => {
// clear schema cache before each test
clearCache()
})

describe('getAvroType', () => {
it('should get avro schema from a specific file', () => {
const resultSchema = getAvroType(mockSchemaPath)

expect(avsc.Type.isType(resultSchema)).toBe(true)
expect(schema.equals(resultSchema)).toBe(true)
})

it('should throw an error if schema read from file is invalid', () => {
const invalidSchemaPath = path.join(__dirname, '/mockData/invalidSchema.avsc')

expect(() => getAvroType(invalidSchemaPath)).toThrow()
expect(logWrapper.fail).toHaveBeenCalledWith(expect.stringMatching(/Schema not following JSON format:*/))
expect(mockExit).toHaveBeenCalledWith(1)
})

it('should throw an error if file does not exist', () => {
const nonExistFilePath = path.join(__dirname, 'file_not_exists')
expect(() => getAvroType(nonExistFilePath)).toThrow()
expect(logWrapper.fail).toHaveBeenCalledWith(expect.stringMatching(/Unable to load avro schema from *:*/))
expect(mockExit).toHaveBeenCalledWith(1)
})
})

describe('serializeAvroToBuffer', () => {
it('should serialize Json message with an avro schema correctly', () => {
const inputMessage = message

const targetBuffer = schema.toBuffer(inputMessage)

const resultBuffer = serializeAvroToBuffer(jsonMessage, mockSchemaPath)

expect(resultBuffer).toBeInstanceOf(Buffer)
expect(targetBuffer.equals(resultBuffer)).toBe(true)
})

it('should throw an error if input message does not follow json format', () => {
const unstructuredInputMessage = 'Hello, world!'

expect(() => serializeAvroToBuffer(unstructuredInputMessage, mockSchemaPath)).toThrow()
expect(logWrapper.fail).toHaveBeenCalledWith(expect.stringMatching(/Invalid JSON input:*/))
expect(mockExit).toHaveBeenCalledWith(1)
})

it('should throw an error if message does not match schema', async () => {
const unmatchedMessageObj = JSON.parse(jsonMessage)
delete unmatchedMessageObj.deviceId
const unmatchedMessage = JSON.stringify(unmatchedMessageObj)

expect(() => serializeAvroToBuffer(unmatchedMessage, mockSchemaPath)).toThrow()
expect(logWrapper.fail).toHaveBeenCalledWith(
expect.stringMatching(/Unable to serialize message to avro buffer:*/),
)
expect(mockExit).toHaveBeenCalledWith(1)
})
})

describe('deserializeBufferToAvro', () => {
it('should deserialize Buffer to string with an avro schema correctly', () => {
const inputBuffer = schema.toBuffer(message)
const targetMessage = JSON.stringify(schema.fromBuffer(inputBuffer))

// test without format
const resultMessageWithoutFormat = deserializeBufferToAvro(inputBuffer, mockSchemaPath, false)
expect(typeof resultMessageWithoutFormat).toBe('string')
expect(resultMessageWithoutFormat === targetMessage).toBe(true)

// test with format
const resultMessageWithFormat = deserializeBufferToAvro(inputBuffer, mockSchemaPath, true)
expect(resultMessageWithFormat).toBeInstanceOf(Buffer)
expect(resultMessageWithoutFormat === targetMessage).toBe(true)
})

it('should throw an error if Buffer is not valid avro encoded buffer', () => {
const randomBuffer = Buffer.from([0x4f, 0x61, 0x7a, 0x3b, 0x19, 0x8e, 0x27, 0x56, 0x9c, 0x2d, 0x73, 0x81])

// test without format
expect(() => deserializeBufferToAvro(randomBuffer, mockSchemaPath, false)).toThrow()
expect(logWrapper.fail).toHaveBeenCalledWith(expect.stringMatching(/Unable to deserialize avro encoded buffer:*/))
expect(mockExit).toHaveBeenCalledWith(1)

// test with format
expect(() => deserializeBufferToAvro(randomBuffer, mockSchemaPath, true)).toThrow()
expect(logWrapper.fail).toHaveBeenCalledWith(expect.stringMatching(/Unable to deserialize avro encoded buffer:*/))
expect(mockExit).toHaveBeenCalledWith(1)
})
})
})
9 changes: 9 additions & 0 deletions cli/src/__tests__/utils/mockData/invalidSchema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"type": "record",
"name": "Sens
"fields": [
{"name": "deviceId", "type": "string"}
pe", "type": "string"}
{"na
{"name": "timestamp", "type":}
}
10 changes: 10 additions & 0 deletions cli/src/__tests__/utils/mockData/mockAvroSchema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"type": "record",
"name": "SensorData",
"fields": [
{"name": "deviceId", "type": "string"},
{"name": "sensorType", "type": "string"},
{"name": "value", "type": "double"},
{"name": "timestamp", "type": "long"}
]
}
2 changes: 1 addition & 1 deletion cli/src/lib/sub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const processReceivedMessage = (
)

case 'avro':
return deserializeBufferToAvro(payload, schemaOptions.avscPath, format)
return deserializeBufferToAvro(payload, schemaOptions.avscPath, format ? true : false)
}
}

Expand Down
23 changes: 15 additions & 8 deletions cli/src/utils/avro.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,28 @@ import logWrapper from './logWrapper'

const schemaCache: { [key: string]: avro.Type } = {}

const getAvroType = (schemaPath: string): avro.Type => {
export const clearCache = (): void => {
Object.keys(schemaCache).forEach((key) => delete schemaCache[key])
}

export const getAvroType = (schemaPath: string): avro.Type => {
// first search from cache
if (schemaCache[schemaPath]) {
return schemaCache[schemaPath]
}

try {
const schemaStr = fs.readFileSync(schemaPath, 'utf-8')
let parsedSchema

try {
JSON.parse(schemaStr)
parsedSchema = JSON.parse(schemaStr)
} catch (err: unknown) {
logWrapper.fail(`Schema not following JSON format: ${(err as Error).message}`)
process.exit(1)
}

const type = avro.Type.forSchema(JSON.parse(schemaStr))
const type = avro.Type.forSchema(parsedSchema)

// cache the parsed schema
schemaCache[schemaPath] = type
Expand All @@ -32,20 +37,22 @@ const getAvroType = (schemaPath: string): avro.Type => {
}
}

export const serializeAvroToBuffer = (raw: string | Buffer, avscSchemaPath: string): Buffer => {
const type: avro.Type = getAvroType(avscSchemaPath)
export const serializeAvroToBuffer = (raw: string | Buffer, avroSchemaPath: string): Buffer => {
const type: avro.Type = getAvroType(avroSchemaPath)

let rawMessage = raw.toString('utf-8')
let parsedMessage

// Avro requires structured message as input
try {
JSON.parse(rawMessage)
parsedMessage = JSON.parse(rawMessage)
} catch (err: unknown) {
logWrapper.fail(`Invalid JSON input: ${(err as Error).message}`)
process.exit(1)
}

try {
const serializedMessage = type.toBuffer(JSON.parse(rawMessage))
const serializedMessage = type.toBuffer(parsedMessage)
return Buffer.from(serializedMessage)
} catch (err: unknown) {
logWrapper.fail(`Unable to serialize message to avro buffer: ${err}`)
Expand All @@ -56,7 +63,7 @@ export const serializeAvroToBuffer = (raw: string | Buffer, avscSchemaPath: stri
export const deserializeBufferToAvro = (
payload: Buffer,
avscSchemaPath: string,
needFormat?: FormatType,
needFormat: boolean,
): string | Buffer => {
const type: avro.Type = getAvroType(avscSchemaPath)

Expand Down
Loading