diff --git a/src/database/services/MessageService.ts b/src/database/services/MessageService.ts index 5784b2cd7..2351348a9 100644 --- a/src/database/services/MessageService.ts +++ b/src/database/services/MessageService.ts @@ -36,30 +36,7 @@ export default class MessageService { } as MessageModel } - public async get( - connectionId: string, - options: { - page?: number - limit?: number - msgType?: MessageType - topic?: string - searchParams?: { topic?: string; payload?: string } - } = {}, - ): Promise { - const defaultOpts = { page: 1, limit: 20, msgType: 'all' } - const { page, limit, msgType } = { ...defaultOpts, ...options } - let { topic } = { ...defaultOpts, ...options } - - const total = await this.messageRepository.count({ connectionId }) - const publishedTotal = await this.messageRepository.count({ connectionId, out: true }) - const receivedTotal = await this.messageRepository.count({ connectionId, out: false }) - - let query = this.messageRepository - .createQueryBuilder('msg') - .where('msg.connectionId = :connection', { connection: connectionId }) - - msgType !== 'all' && query.andWhere('msg.out = :out', { out: msgType === 'publish' }) - + public handleTopicQuery(query: $TSFixed, topic?: string) { if (topic && topic !== '#') { // Escape special characters for SQL LIKE topic = topic.replace(/[\\%_]/g, '\\$&') @@ -92,6 +69,34 @@ export default class MessageService { query.andWhere('msg.topic LIKE :topic ESCAPE "\\"', { topic }) } } + return query + } + + public async get( + connectionId: string, + options: { + page?: number + limit?: number + msgType?: MessageType + topic?: string + searchParams?: { topic?: string; payload?: string } + } = {}, + ): Promise { + const defaultOpts = { page: 1, limit: 20, msgType: 'all' } + const { page, limit, msgType } = { ...defaultOpts, ...options } + let { topic } = { ...defaultOpts, ...options } + + const total = await this.messageRepository.count({ connectionId }) + const publishedTotal = await this.messageRepository.count({ connectionId, out: true }) + const receivedTotal = await this.messageRepository.count({ connectionId, out: false }) + + let query = this.messageRepository + .createQueryBuilder('msg') + .where('msg.connectionId = :connection', { connection: connectionId }) + + msgType !== 'all' && query.andWhere('msg.out = :out', { out: msgType === 'publish' }) + + query = this.handleTopicQuery(query, topic) if (options.searchParams) { const { topic, payload } = options.searchParams @@ -142,38 +147,7 @@ export default class MessageService { msgType !== 'all' && query.andWhere('msg.out = :out', { out: msgType === 'publish' }) - if (topic && topic !== '#') { - // Escape special characters for SQL LIKE - topic = topic.replace(/[\\%_]/g, '\\$&') - - // Remove $share prefix if present - if (topic.startsWith('$share/')) { - topic = topic.split('/').slice(2).join('/') - } - - /* - Handle `+` wildcard - Known Issue: '+' wildcard handling in MQTT topics is incorrect. - '+' is replaced with '%' for SQL LIKE, causing multi-level match. - - Incorrect: 'testtopic/+/test' matches 'testtopic/1/2/test' - - Incorrect: 'testtopic/+/hello/+' can not matches 'testtopic/hello/hello/hello' - TODO: FIX this issue. - */ - if (topic.includes('+')) { - topic = topic.replace('+', '%') - } - - // Handle '#' wildcard - if (topic.endsWith('/#')) { - const baseTopic = topic.slice(0, -2) // Remove '/#' - query.andWhere('(msg.topic LIKE :baseTopic OR msg.topic LIKE :topic ESCAPE "\\")', { - baseTopic, - topic: baseTopic + '/%', - }) - } else { - query.andWhere('msg.topic LIKE :topic ESCAPE "\\"', { topic }) - } - } + query = this.handleTopicQuery(query, topic) if (options.searchParams) { const { topic, payload } = options.searchParams