Skip to content

Commit

Permalink
Merge pull request #257 from ZIMkaRU/feature/send-event-by-ws-when-cs…
Browse files Browse the repository at this point in the history
…v-is-ready

Send event by ws when csv is ready
  • Loading branch information
prdn authored Feb 27, 2023
2 parents 120d5c8 + aa33989 commit 975bb3b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 15 deletions.
13 changes: 13 additions & 0 deletions workers/api.framework.report.wrk.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,26 @@ class WrkReportFrameWorkApi extends WrkReportServiceApi {
...deps
})

const aggregatorQueue = this.lokue_aggregator.q
const conf = this.conf[this.group]
const wsTransport = this.container.get(TYPES.WSTransport)
const wsEventEmitter = this.container.get(TYPES.WSEventEmitter)
const sync = this.container.get(TYPES.Sync)
const processMessageManager = this.container.get(TYPES.ProcessMessageManager)

await wsTransport.start()

aggregatorQueue.on('completed', (res) => {
const { csvFilesMetadata, userInfo } = res ?? {}

wsEventEmitter.emitCsvGenerationCompletedToOne(
{ csvFilesMetadata },
userInfo
).then(() => {}, (err) => {
this.logger.error(`WS_EVENT_EMITTER:CSV_COMPLETED: ${err.stack || err}`)
})
})

if (
!conf.syncMode ||
!conf.isSchedulerEnabled
Expand Down
3 changes: 2 additions & 1 deletion workers/loc.api/service.report.framework.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ class FrameworkReportService extends ReportService {
'email',
'id',
'isSubAccount',
'subUsers'
'subUsers',
'_id'
]
}
)
Expand Down
37 changes: 23 additions & 14 deletions workers/loc.api/ws-transport/ws.event.emitter.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ class WSEventEmitter extends AbstractWSEventEmitter {
)
}

isInvalidAuth (auth = {}, { _id, email } = {}) {
isNotTargetUser (auth = {}, user = {}) {
// For the sync process user id need to take from the session object
const _id = auth?._id ?? auth?.session?._id

return (
auth._id !== _id ||
auth.email !== email
!Number.isInteger(_id) ||
user?._id !== _id
)
}

Expand All @@ -56,10 +59,7 @@ class WSEventEmitter extends AbstractWSEventEmitter {
auth = {}
) {
return this.emitSyncingStep(async (user, ...args) => {
if (
!Number.isInteger(auth?._id) ||
user?._id !== auth?._id
) {
if (this.isNotTargetUser(auth, user)) {
return { isNotEmitted: true }
}

Expand All @@ -69,18 +69,27 @@ class WSEventEmitter extends AbstractWSEventEmitter {
})
}

emitBfxUnamePwdAuthRequiredToOne (
emitCsvGenerationCompletedToOne (
handler = () => {},
auth = {}
) {
return this.emit(async (user, ...args) => {
// For the sync process user id need to take from the session object
const id = auth?._id ?? auth?.session?._id
if (this.isNotTargetUser(auth, user)) {
return { isNotEmitted: true }
}

return typeof handler === 'function'
? await handler(user, ...args)
: handler
}, 'emitCsvGenerationCompletedToOne')
}

if (
!Number.isInteger(id) ||
user?._id !== id
) {
emitBfxUnamePwdAuthRequiredToOne (
handler = () => {},
auth = {}
) {
return this.emit(async (user, ...args) => {
if (this.isNotTargetUser(auth, user)) {
return { isNotEmitted: true }
}

Expand Down

0 comments on commit 975bb3b

Please sign in to comment.