diff --git a/conf_files/pocs.yaml b/conf_files/pocs.yaml index fa2843a8b..1a6e3048d 100644 --- a/conf_files/pocs.yaml +++ b/conf_files/pocs.yaml @@ -52,4 +52,15 @@ messaging: # Must match ports in peas.yaml. cmd_port: 6500 msg_port: 6510 +#Enable to output POCS messages to social accounts +# social_accounts: +# twitter: +# consumer_key: [your_consumer_key] +# consumer_secret: [your_consumer_secret] +# access_token: [your_access_token] +# access_token_secret: [your_access_token_secret] +# slack: +# webhook_url: [your_webhook_url] +# output_timestamp: False + state_machine: simple_state_table diff --git a/pocs/tests/test_social_messaging.py b/pocs/tests/test_social_messaging.py new file mode 100644 index 000000000..b40cef1fa --- /dev/null +++ b/pocs/tests/test_social_messaging.py @@ -0,0 +1,96 @@ +import pytest +import tweepy +import requests +import unittest.mock + +from pocs.utils.social_twitter import SocialTwitter +from pocs.utils.social_slack import SocialSlack + + +@pytest.fixture(scope='module') +def twitter_config(): + twitter_config = {'consumer_key': 'mock_consumer_key', 'consumer_secret': 'mock_consumer_secret', 'access_token': 'mock_access_token', 'access_token_secret': 'access_token_secret'} + return twitter_config + + +@pytest.fixture(scope='module') +def slack_config(): + slack_config = {'webhook_url': 'mock_webhook_url', 'output_timestamp': False} + return slack_config + + +# Twitter sink tests +def test_no_consumer_key(twitter_config): + with unittest.mock.patch.dict(twitter_config), pytest.raises(ValueError) as ve: + del twitter_config['consumer_key'] + social_twitter = SocialTwitter(**twitter_config) + assert 'consumer_key parameter is not defined.' == str(ve.value) + + +def test_no_consumer_secret(twitter_config): + with unittest.mock.patch.dict(twitter_config), pytest.raises(ValueError) as ve: + del twitter_config['consumer_secret'] + social_twitter = SocialTwitter(**twitter_config) + assert 'consumer_secret parameter is not defined.' == str(ve.value) + + +def test_no_access_token(twitter_config): + with unittest.mock.patch.dict(twitter_config), pytest.raises(ValueError) as ve: + del twitter_config['access_token'] + social_twitter = SocialTwitter(**twitter_config) + assert 'access_token parameter is not defined.' == str(ve.value) + + +def test_no_access_token_secret(twitter_config): + with unittest.mock.patch.dict(twitter_config), pytest.raises(ValueError) as ve: + del twitter_config['access_token_secret'] + social_twitter = SocialTwitter(**twitter_config) + assert 'access_token_secret parameter is not defined.' == str(ve.value) + + +def test_send_message_twitter(twitter_config): + with unittest.mock.patch.object(tweepy.API, 'update_status') as mock_update_status: + social_twitter = SocialTwitter(**twitter_config) + mock_message = "mock_message" + mock_timestamp = "mock_timestamp" + social_twitter.send_message(mock_message, mock_timestamp) + + mock_update_status.assert_called_once_with('{} - {}'.format(mock_message, mock_timestamp)) + + +def test_send_message_twitter_no_timestamp(twitter_config): + with unittest.mock.patch.dict(twitter_config, {'output_timestamp': False}), unittest.mock.patch.object(tweepy.API, 'update_status') as mock_update_status: + social_twitter = SocialTwitter(**twitter_config) + mock_message = "mock_message" + mock_timestamp = "mock_timestamp" + social_twitter.send_message(mock_message, mock_timestamp) + + mock_update_status.assert_called_once_with(mock_message) + + +# Slack sink tests +def test_no_webhook_url(slack_config): + with unittest.mock.patch.dict(slack_config), pytest.raises(ValueError) as ve: + del slack_config['webhook_url'] + slack_config = SocialSlack(**slack_config) + assert 'webhook_url parameter is not defined.' == str(ve.value) + + +def test_send_message_slack(slack_config): + with unittest.mock.patch.object(requests, 'post') as mock_post: + social_slack = SocialSlack(**slack_config) + mock_message = "mock_message" + mock_timestamp = "mock_timestamp" + social_slack.send_message(mock_message, mock_timestamp) + + mock_post.assert_called_once_with(slack_config['webhook_url'], json={'text': mock_message}) + + +def test_send_message_slack_timestamp(slack_config): + with unittest.mock.patch.dict(slack_config, {'output_timestamp': True}), unittest.mock.patch.object(requests, 'post') as mock_post: + social_slack = SocialSlack(**slack_config) + mock_message = "mock_message" + mock_timestamp = "mock_timestamp" + social_slack.send_message(mock_message, mock_timestamp) + + mock_post.assert_called_once_with(slack_config['webhook_url'], json={'text': '{} - {}'.format(mock_message, mock_timestamp)}) diff --git a/pocs/utils/social_slack.py b/pocs/utils/social_slack.py new file mode 100644 index 000000000..8b5cc466a --- /dev/null +++ b/pocs/utils/social_slack.py @@ -0,0 +1,28 @@ +import requests + +from pocs.utils.logger import get_root_logger + + +class SocialSlack(object): + + """Social Messaging sink to output to Slack.""" + + logger = get_root_logger() + + def __init__(self, **kwargs): + self.web_hook = kwargs.get('webhook_url', '') + if self.web_hook == '': + raise ValueError('webhook_url parameter is not defined.') + else: + self.output_timestamp = kwargs.get('output_timestamp', False) + + def send_message(self, msg, timestamp): + try: + if self.output_timestamp: + post_msg = '{} - {}'.format(msg, timestamp) + else: + post_msg = msg + + response = requests.post(self.web_hook, json={'text': post_msg}) + except Exception as e: + self.logger.debug('Error posting to slack: {}'.format(e)) diff --git a/pocs/utils/social_twitter.py b/pocs/utils/social_twitter.py new file mode 100644 index 000000000..4e830564f --- /dev/null +++ b/pocs/utils/social_twitter.py @@ -0,0 +1,47 @@ +import tweepy + +from pocs.utils.logger import get_root_logger + + +class SocialTwitter(object): + + """Social Messaging sink to output to Twitter.""" + + logger = get_root_logger() + + def __init__(self, **kwargs): + consumer_key = kwargs.get('consumer_key', '') + if consumer_key == '': + raise ValueError('consumer_key parameter is not defined.') + consumer_secret = kwargs.get('consumer_secret', '') + if consumer_secret == '': + raise ValueError('consumer_secret parameter is not defined.') + access_token = kwargs.get('access_token', '') + if access_token == '': + raise ValueError('access_token parameter is not defined.') + access_token_secret = kwargs.get('access_token_secret', '') + if access_token_secret == '': + raise ValueError('access_token_secret parameter is not defined.') + + # Output timestamp should always be True by default otherwise Twitter will reject duplicate statuses. + self.output_timestamp = kwargs.get("output_timestamp", True) + + # Create a new twitter api object + try: + auth = tweepy.OAuthHandler(consumer_key, consumer_secret) + auth.set_access_token(access_token, access_token_secret) + + self.api = tweepy.API(auth) + except tweepy.TweepError as e: + msg = 'Error authenicating with Twitter. Please check your Twitter configuration.' + self.logger.warning(msg) + raise ValueError(msg) + + def send_message(self, msg, timestamp): + try: + if self.output_timestamp: + retStatus = self.api.update_status('{} - {}'.format(msg, timestamp)) + else: + retStatus = self.api.update_status(msg) + except tweepy.TweepError as e: + self.logger.debug('Error tweeting message. Please check your Twitter configuration.') diff --git a/requirements.txt b/requirements.txt index 8aef75afc..186ea552d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,7 +18,9 @@ python_dateutil >= 2.5.3 PyYAML >= 3.11 pyzmq >= 15.3.0 readline +requests scikit_image >= 0.12.3 scipy >= 0.17.1 transitions >= 0.4.0 +tweepy wcsaxes diff --git a/scripts/run_social_messaging.py b/scripts/run_social_messaging.py new file mode 100755 index 000000000..c669fe44b --- /dev/null +++ b/scripts/run_social_messaging.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python + +import argparse +import sys +import threading +import time +import zmq + +from pocs.utils.config import load_config +from pocs.utils.logger import get_root_logger +from pocs.utils.messaging import PanMessaging +from pocs.utils.social_twitter import SocialTwitter +from pocs.utils.social_slack import SocialSlack + +the_root_logger = None + + +def say(fmt, *args, error=False): + if args: + msg = fmt.format(*args) + else: + msg = fmt + if error: + print(msg, file=sys.stderr) + the_root_logger.error(msg) + else: + print(msg) + the_root_logger.info(msg) + + +def check_social_messages_loop(msg_port, social_twitter, social_slack): + cmd_social_subscriber = PanMessaging.create_subscriber(msg_port, 'PANCHAT') + + poller = zmq.Poller() + poller.register(cmd_social_subscriber.socket, zmq.POLLIN) + + try: + while True: + # Poll for messages + sockets = dict(poller.poll(500)) # 500 ms timeout + + if cmd_social_subscriber.socket in sockets and \ + sockets[cmd_social_subscriber.socket] == zmq.POLLIN: + + msg_type, msg_obj = cmd_social_subscriber.receive_message(flags=zmq.NOBLOCK) + + # Check the various social sinks + if social_twitter is not None: + social_twitter.send_message(msg_obj['message'], msg_obj['timestamp']) + + if social_slack is not None: + social_slack.send_message(msg_obj['message'], msg_obj['timestamp']) + + time.sleep(1) + except KeyboardInterrupt: + pass + + +def run_social_sinks(msg_port, social_twitter, social_slack): + the_root_logger.info('Creating sockets') + + threads = [] + + name='social_messaging' + + t = threading.Thread( + target=check_social_messages_loop, name=name, args=(msg_port, social_twitter, social_slack), daemon=True) + the_root_logger.info('Starting thread {}', name) + t.start() + threads.append(t) + + time.sleep(0.05) + if not any([t.is_alive() for t in threads]): + say('Failed to start social sinks thread!', error=True) + sys.exit(1) + else: + the_root_logger.info('Started social messaging') + print() + print('Hit Ctrl-c to stop') + try: + # Keep running until they've all died. + while threads: + for t in threads: + t.join(timeout=100) + if t.is_alive(): + continue + say('Thread {} has stopped', t.name, error=True) + threads.remove(t) + break + # If we get here, then the forwarders died for some reason. + sys.exit(1) + except KeyboardInterrupt: + sys.exit(0) + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Run social messaging to forward platform messages to social channels.') + parser.add_argument( + '--from_config', + action='store_true', + help='Read social channels config from the pocs.yaml and pocs_local.yaml config files.') + args = parser.parse_args() + + def arg_error(msg): + print(msg, file=sys.stderr) + parser.print_help() + sys.exit(1) + + # Initialise social sinks to None + social_twitter = None + social_slack = None + + if args.from_config: + config = load_config(config_files=['pocs']) + + social_config = config.get('social_accounts') + if social_config: + # Check which social sinks we can create based on config + + # Twitter sink + twitter_config = social_config.get('twitter') + if twitter_config: + try: + social_twitter = SocialTwitter(**twitter_config) + except ValueError as e: + print('Twitter sink could not be initialised. Please check your config. Error: {}'.format(str(e))) + + # Slack sink + slack_config = social_config.get('slack') + if slack_config: + try: + social_slack = SocialSlack(**slack_config) + except ValueError as e: + print('Slack sink could not be initialised. Please check your config. Error: {}'.format(str(e))) + else: + print('No social accounts defined in config, exiting.') + sys.exit(0) + + if not social_twitter and not social_slack: + print('No social messaging sinks defined, exiting.') + sys.exit(0) + + # Messaging port to subscribe on + msg_port = config['messaging']['msg_port'] + 1 + + the_root_logger = get_root_logger() + + run_social_sinks(msg_port, social_twitter, social_slack) diff --git a/scripts/startup/start_social_messaging.sh b/scripts/startup/start_social_messaging.sh new file mode 100755 index 000000000..1b9125c78 --- /dev/null +++ b/scripts/startup/start_social_messaging.sh @@ -0,0 +1,11 @@ +#!/bin/bash -ex + +WINDOW="${1}" +echo "Running $(basename "${0}") at $(date), WINDOW=${WINDOW}" + +tmux send-keys -t "${WINDOW}" "date" C-m +sleep 0.5s +tmux send-keys -t "${WINDOW}" \ + "python $POCS/scripts/run_social_messaging.py --from_config" C-m + +echo "Done at $(date)"