Skip to content

Commit

Permalink
feat(server): unsubscribe from topic
Browse files Browse the repository at this point in the history
WIP, to be completed with API overhaul
  • Loading branch information
manuartero committed Mar 24, 2021
1 parent 8c07fdf commit fe5e61b
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 42 deletions.
16 changes: 16 additions & 0 deletions src/server/handlers.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,26 @@ const createHandlers = (adapter, storage, bot) => {
return getTopics()
},

removeTopic: async topic => {
const formerSubscribers = await storage.getSubscribers(topic)
const cancelSubscriptionTasks = formerSubscribers.map(user => {
return () => storage.cancelSubscription(user, topic)
})
await Promise.all(cancelSubscriptionTasks)
await storage.removeTopic(topic)
return getTopics()
},

forceSubscription: async (user, topic) => {
await storage.subscribe(user, topic)
const subscribers = await storage.getSubscribers(topic)
return subscribers
},

cancelSubscription: async (user, topic) => {
await storage.cancelSubscription(user, topic)
const currentSubscribers = await storage.getSubscribers(topic)
return currentSubscribers
}
}
}
Expand Down
104 changes: 67 additions & 37 deletions src/server/restify-server.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const ensureTopic = req =>

/**
* restify server in charge of:
* - routing to handler functions
* - routing to handler functions (delegating all business-logic)
* - logging requests and responses
* - extracting input from request (parsing)
* - handle HTTP status codes
Expand All @@ -31,7 +31,9 @@ const createRestifyServer = ({
getUsers,
getTopics,
createTopic,
forceSubscription
removeTopic,
forceSubscription,
cancelSubscription
}) => {
const server = restify.createServer({ log })
server.use(restify.plugins.queryParser())
Expand All @@ -54,41 +56,6 @@ const createRestifyServer = ({
next()
})

server.get('/api/v1/users', async (_, res, next) => {
const users = await getUsers()
res.send(200, users)
next()
})

server.get('/api/v1/topics', async (_, res, next) => {
const topics = await getTopics()
res.send(200, topics)
next()
})

server.post('/api/v1/topics', async (req, res, next) => {
const topic = req.body ? req.body.name : undefined
if (!topic) {
const err = new BadRequestError("required: 'name'")
return next(err)
}
const topics = await createTopic(topic)
res.send(200, topics)
next()
})

server.put('/api/v1/topics/:topic', async (req, res, next) => {
const user = req.body ? req.body.user : undefined
if (!user) {
const err = new BadRequestError("required: 'user'")
return next(err)
}
const topic = req.params.topic
const subscribers = await forceSubscription(user, topic)
res.send(200, { subscribers })
next()
})

/** Resolves 202: Accepted */
server.post('/api/v1/messages', async (req, res, next) => {
await processMessage(req, res)
Expand Down Expand Up @@ -136,6 +103,69 @@ const createRestifyServer = ({
}
})

server.get('/api/v1/admin', (_, res, next) => {
const { routes } = server.getDebugInfo()
res.send(200, routes)
})

server.get('/api/v1/admin/users', async (_, res, next) => {
const users = await getUsers()
res.send(200, users)
next()
})

server.get('/api/v1/admin/topics', async (_, res, next) => {
const topics = await getTopics()
res.send(200, topics)
next()
})

server.post('/api/v1/admin/topics', async (req, res, next) => {
const topic = req.body ? req.body.name : undefined
if (!topic) {
const err = new BadRequestError("required: 'name'")
return next(err)
}
const topics = await createTopic(topic)
res.send(200, topics)
next()
})

server.del('/api/v1/topics/:topic', async (req, res, next) => {
const topic = req.body ? req.body.name : undefined
try {
const topics = await removeTopic(topic)
res.send(200, topics)
next()
} catch (err) {
next(err)
}
})

server.put('/api/v1/topics/:topic/subscribers', async (req, res, next) => {
const user = req.body ? req.body.user : undefined
if (!user) {
const err = new BadRequestError("required: 'user'")
return next(err)
}
const topic = req.params.topic
const subscribers = await forceSubscription(user, topic)
res.send(200, { subscribers })
next()
})

// server.del('/api/v1/topics/:topic', async (req, res, next) => {
// const user = req.body ? req.body.user : undefined
// if (!user) {
// const err = new BadRequestError("required: 'user'")
// return next(err)
// }
// const topic = req.params.topic
// const subscribers = await cancelSubscription(user, topic)
// res.send(200, { subscribers })
// next()
// })

return {
/**
* @param {object=} param0
Expand Down
28 changes: 26 additions & 2 deletions src/server/restify-server.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@ const mockedHandlers = {
orange: ['[email protected]', '[email protected]'],
tangerine: []
}),
removeTopic: jest.fn().mockResolvedValue({
banana: ['[email protected]'],
tangerine: []
}),
forceSubscription: jest
.fn()
.mockResolvedValue(['[email protected]', '[email protected]'])
.mockResolvedValue(['[email protected]', '[email protected]']),
cancelSubscription: jest.fn().mockResolvedValue(['[email protected]'])
}

describe('createRestifyServer()', () => {
Expand Down Expand Up @@ -107,6 +112,21 @@ describe('createRestifyServer()', () => {
})
})

describe('[DELETE] /api/v1/topics', () => {
it('[200] routes to removeTopic()', done => {
// @ts-ignore
client.del('/api/v1/topics', (_, __, res, data) => {
expect(res.statusCode).toEqual(200)
expect(data).toEqual({
banana: ['[email protected]'],
tangerine: []
})
expect(mockedHandlers.removeTopic).toHaveBeenCalledWith('')
done()
})
})
})

describe('[PUT] /api/v1/topics/{topic}', () => {
it("[400] requires 'user'", done => {
client.put(
Expand Down Expand Up @@ -255,7 +275,11 @@ describe('createRestifyServer()', () => {
it('[202] routes to broadcast() (considering topic creation)', done => {
client.post(
'/api/v1/broadcast',
{ topic: 'orange', message: 'orange event', createTopicIfNotExists: true },
{
topic: 'orange',
message: 'orange event',
createTopicIfNotExists: true
},
// @ts-ignore
(_, __, res, data) => {
expect(mockedHandlers.broadcast).toHaveBeenCalledWith(
Expand Down
34 changes: 33 additions & 1 deletion src/storage/mysql.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,30 @@ const ensureTopic = async topic => {
const registerTopic = async topic =>
ensureTopic(topic).then(instance => !!instance)

/**
* @param {string} topic
* @return {Promise<boolean>}
*/
const removeTopic = async topic => {
log.debug(`[db] removing topic: "${topic}"`)
return Topics.destroy({ where: { name: topic } })
.then(affectedRows => {
if (affectedRows === 1) {
log.debug(`[db] removed topic "${topic}"`)
return true
}
log.warn(`[db] unexpected affected rows removing topic "${topic}"`)
return false
})
.catch(err => {
log.error(`[db] unable to remove topic "${topic}"`, err)
return false
})
}

/**
* @param {string} user
* @return {Promise<{userInstance: any, topics: string[]|null}>}
* @return {Promise<{userInstance: any, topics: string[]}|null>}
*/
const getAllUserInfo = async user => {
log.debug(`[db] reading subscriptions for user "${user}"`)
Expand Down Expand Up @@ -157,6 +178,15 @@ const subscribe = async (user, topic) => {
})
}

/**
* @param {string} user
* @param {string} topic
* @return {Promise<boolean>}
*/
const cancelSubscription = async (user, topic) => {

}

/**
* @param {string} topic
* @return {Promise<string[]|null>}
Expand Down Expand Up @@ -268,7 +298,9 @@ const storage = {
saveConversation,
getConversation,
registerTopic,
removeTopic,
subscribe,
cancelSubscription,
getSubscribedTopics,
getSubscribers,
listUsers,
Expand Down
14 changes: 12 additions & 2 deletions types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ declare namespace Types {
/** @return success flag (independently of actual operation - e.g. already existing entry) */
registerTopic: (topic: string) => Promise<boolean>;

removeTopic: (topic: string) => Promise<boolean>;

/** @return success flag (independently of actual operation - e.g. already existing entry) */
subscribe: (user: string, topic: string) => Promise<boolean>;

cancelSubscription: (user: string, topic: string) => Promise<boolean>;

/** @return null if err */
getSubscribedTopics: (user: string) => Promise<string[] | null>;

Expand All @@ -50,6 +54,8 @@ declare namespace Types {
ensureTopic?: boolean;
}

type TopicsDictionary = { [topic: string]: string[] };

interface Handlers {
/* bot-SDK entry point */
processMessage: (
Expand All @@ -76,14 +82,18 @@ declare namespace Types {
/* debugging */

getUsers: () => Promise<string[]>;
getTopics: () => Promise<{ [topic: string]: string[] }>;
getTopics: () => Promise<TopicsDictionary>;

/* ops */

/** @return topics */
createTopic: (topic: string) => Promise<{ [topic: string]: string[] }>;
createTopic: (topic: string) => Promise<TopicsDictionary>;
/** @return topics */
removeTopic: (topic: string) => Promise<TopicsDictionary>;
/** @return subscribers */
forceSubscription: (user: string, topic: string) => Promise<string[]>;
/** @return subscribers */
cancelSubscription: (user: string, topic: string) => Promise<string[]>;
}

interface ICard {
Expand Down

0 comments on commit fe5e61b

Please sign in to comment.