Skip to content

Commit

Permalink
Add a basic client to consume from the Wikimedia EventStreams API
Browse files Browse the repository at this point in the history
  • Loading branch information
siddharthvp committed Sep 6, 2020
1 parent 789a0a9 commit 617926a
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 2 deletions.
35 changes: 35 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "mwn",
"version": "0.6.0",
"version": "0.7.0",
"description": "MediaWiki bot framework for NodeJS",
"main": "./src/bot.js",
"scripts": {
Expand All @@ -26,6 +26,7 @@
"dependencies": {
"axios": "^0.19.2",
"axios-cookiejar-support": "^1.0.0",
"eventsource": "^1.0.7",
"form-data": "^3.0.0",
"oauth-1.0a": "^2.2.6",
"semlog": "^0.6.10",
Expand All @@ -41,4 +42,4 @@
"eslint-plugin-standard": "^4.0.1",
"mocha": "^7.1.1"
}
}
}
6 changes: 6 additions & 0 deletions src/bot.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const Wikitext = require('./wikitext');
const User = require('./user');
const Category = require('./category');
const File = require('./file');
const Stream = require('./eventstream');
const static_utils = require('./static_utils');

class mwn {
Expand Down Expand Up @@ -229,6 +230,11 @@ class mwn {
*/
this.file = File(this);

/**
* Sub-class for the EventStreams API
*/
this.stream = Stream(mwn, this);

// set up any semlog options
semlog.updateConfig(this.options.semlog || {});
}
Expand Down
80 changes: 80 additions & 0 deletions src/eventstream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
const EventSource = require('eventsource');

module.exports = function (mwn, bot) {

class EventStream extends EventSource {

/**
* Access the Wikimedia EventStreams API
* @see https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams
* @param {string[]} streams
* @param {Object} [config={}]
* @config {string|Date} since
* @config {Function} onopen
* @config {Function} onerror
*/
constructor(streams, config = {}) {
if (Array.isArray(streams)) {
streams = streams.join(',');
}

let since = config.since ? `?since=${new bot.date(config.since).format('YYYYMMDDHHmmss')}` : '';
super(`https://stream.wikimedia.org/v2/stream/${streams}${since}`, {
headers: {
'User-Agent': bot.userAgent
}
});

this.onopen = config.onopen || function () {
mwn.log(`[S] Opened eventsource connection for ${streams} stream(s)`);
};
this.onerror = config.onerror || function (err) {
mwn.log(`[W] event source encountered error: ${err}`);
};

}

/**
* Register a function to trigger for every message data from the source.
* @param {Function} action
* @param {Function | Object} [filter={}]
*/
addListener(action, filter = {}) {
let filterer = typeof filter === 'function' ?
filter :
function(data) {
for (let key of Object.keys(filter)) {
if (data[key] !== filter[key]) {
return false;
}
}
return true;
};

this.onmessage = function(event) {
let data = JSON.parse(event.data);
if (!filterer(data)) {
return;
}
action(data);
};
}

/**
* Access the recentchange EventStreams API
* @see https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams
* @param {{wiki: string, type: ("edit"|"log"|"new"|"categorize"), title: string, namespace: number,
* user: string, bot: boolean, minor: boolean} | Function} filter
* @param {Function} action
*/
static recentchange(filter, action) {
let stream = new EventStream('recentchange');
stream.addListener(filter, action);
return stream;
}

}

return EventStream;

};

0 comments on commit 617926a

Please sign in to comment.