-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add script to kick idle bridged users from a room #340
Changes from 1 commit
daa70ec
a88e7e8
93e55d8
896de7b
f8ec915
0c36fc1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
#!/usr/bin/env python | ||
from __future__ import print_function | ||
import argparse | ||
import sys | ||
import json | ||
import urllib | ||
import requests | ||
import re | ||
|
||
## debug request | ||
import httplib as http_client | ||
http_client.HTTPConnection.debuglevel = 1 | ||
|
||
def get_room_id(homeserver, alias, token): | ||
res = requests.get(homeserver + "/_matrix/client/r0/directory/room/" + urllib.quote(alias) + "?access_token=" + token).json() | ||
return res.get("room_id", None) | ||
|
||
def get_last_active_ago(homeserver, user_id, token): | ||
res = requests.get(homeserver + "/_matrix/client/r0/presence/" + urllib.quote(user_id) + "/status?access_token=" + token).json() | ||
return res.get("last_active_ago", None) | ||
|
||
def is_idle(homeserver, user_id, token, activity_threshold_ms): | ||
return get_last_active_ago(homeserver, user_id, token) > activity_threshold_ms | ||
|
||
def get_idle_users(homeserver, room_id, token, since): | ||
res = requests.get(homeserver + "/_matrix/client/r0/rooms/" + urllib.quote(room_id) + "/members?access_token=" + token) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't use this API, it'll take far too long. Instead, use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should really be documented... would have saved me some time There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's a new API Erik added not long ago, hence missing docs. |
||
user_ids = [event["state_key"] for event in res.json().get("chunk", None)] | ||
|
||
activity_threshold_ms = since * 24 * 60 * 60 * 1000 | ||
|
||
return [user_id for user_id in user_ids if is_idle(homeserver, user_id, token, activity_threshold_ms)] | ||
|
||
def kick_idlers(homeserver, homeserver_domain, room_id, token, since, user_template=None): | ||
reason = "Being idle for >%s days" % since | ||
|
||
user_ids = get_idle_users(homeserver, room_id, token, since) | ||
print("Kicking %s idle users from %s" % (len(user_ids), room_id)) | ||
for user_id in user_ids: | ||
# Ignore unclaimed users, if user_template is specified | ||
if user_template and not claims_user_id(user_id, user_template, homeserver_domain): | ||
continue | ||
res = requests.post( | ||
homeserver + "/_matrix/client/r0/rooms/" + urllib.quote(room_id) + "/kick?access_token=" + token, | ||
data = json.dumps({ | ||
"reason": reason, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
"user_id": user_id | ||
}) | ||
) | ||
res.raise_for_status() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't raise for status here. It'll be annoying as all hell if it spends the time working out who to kick then dies on the first failure. I would add the failures to a list and then whine at the end saying "I couldn't kick these guys because $RESPONSE_JSON". |
||
|
||
def claims_user_id(user_id, user_template, homeserver_domain): | ||
# the server claims the given user ID if the ID matches the user ID template. | ||
regex = template_to_regex( | ||
user_template, | ||
{ | ||
"$SERVER": homeserver_domain | ||
}, | ||
{ | ||
"$NICK": "(.*)" | ||
}, | ||
escapeRegExp(":" + homeserver_domain) | ||
) | ||
print("Matching %s to %s" % (regex, user_id)) | ||
return re.match(regex, user_id) | ||
|
||
def template_to_regex(template, literal_vars, regex_vars, suffix = ""): | ||
# The 'template' is a literal string with some special variables which need | ||
# to be find/replaced. | ||
regex = template; | ||
for k in literal_vars: | ||
regex = re.sub(escapeRegExp(k), regex, literal_vars[k]) | ||
|
||
# at this point the template is still a literal string, so escape it before | ||
# applying the regex vars. | ||
regex = escapeRegExp(regex); | ||
# apply regex vars | ||
|
||
for k in regex_vars: | ||
regex = re.sub( | ||
# double escape, because we bluntly escaped the entire string before | ||
# so our match is now escaped. | ||
escapeRegExp(escapeRegExp(k)), regex, regex_vars[k] | ||
) | ||
|
||
return regex + suffix | ||
|
||
def escapeRegExp(s): | ||
return re.escape(s); | ||
|
||
|
||
def main(token, alias, homeserver, homeserver_domain, since, user_template): | ||
print("Removing idle users in %s" % alias) | ||
token = token | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uhh, nop? |
||
room_id = get_room_id(homeserver, alias, token) | ||
if not room_id: | ||
raise Exception("Cannot resolve room alias to room_id") | ||
|
||
kick_idlers(homeserver, homeserver_domain, room_id, token, since, user_template) | ||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser("Remove idle users from a given Matrix room") | ||
parser.add_argument("-t", "--token", help="The AS token", required=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically any token, not just AS. |
||
parser.add_argument("-a", "--alias", help="The alias of the room eg '#freenode_#matrix-dev:matrix.org'", required=True) | ||
parser.add_argument("-u", "--homeserver", help="Base homeserver URL eg 'https://matrix.org'", required=True) | ||
parser.add_argument("-d", "--domain", help=" matrix.org'", required=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
parser.add_argument("-s", "--since", type=int, help="Since idle users have been offline for", required=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing units and example. |
||
parser.add_argument("-e", "--template", help="User template to determine whether a user should be kicked. E.g. @$SERVER_$NICK", required=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would just do prefix matching to save yourself the hassle from trying to work out what to correctly parse things as. It also makes it clearer to the end-user what they should put as a template value ("@freenode_" for example). Bear in mind people who run this may not know the IRC bridge config file syntax so There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh fair point. Matching by static prefix string makes sense. |
||
args = parser.parse_args() | ||
if not args.token or not args.alias or not args.homeserver: | ||
parser.print_help() | ||
sys.exit(1) | ||
main(token=args.token, alias=args.alias, homeserver=args.homeserver, homeserver_domain=args.domain, since=args.since, user_template=args.template) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Rather than return a nullable
room_id
, I would just do:This will throw on non-2xx and if
room_id
doesn't exist, both of which you want to do. You can then remove the null guard on:94
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then you would get a quite useless error when the
room_id
doesn't get sent back? If this does indicate that the server couldn't resolve it it, would it not be better to throw an error that suggests as such?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah but then you don't get to see the error. Fair point, I'll have it raise for status.