Skip to content

Commit

Permalink
Send HighestVersion attribute to Inflator
Browse files Browse the repository at this point in the history
  • Loading branch information
HebaruSan committed Nov 25, 2019
1 parent 55b29db commit 7e45b46
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 21 deletions.
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ services:
environment:
NETKAN_REMOTE: ${NETKAN_METADATA_PATH}
SSH_KEY: ${CKAN_NETKAN_SSHKEY}
CKANMETA_REMOTE: ${CKAN_METADATA_PATH}
AWS_DEFAULT_REGION: ${CKAN_AWS_DEFAULT_REGION}
AWS_SECRET_ACCESS_KEY: ${CKAN_AWS_SECRET_ACCESS_KEY}
AWS_ACCESS_KEY_ID: ${CKAN_AWS_ACCESS_KEY_ID}
Expand Down
21 changes: 13 additions & 8 deletions netkan/netkan/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def netkan(debug):
help='SQS Queue to poll for metadata'
)
@click.option(
'--metadata', envvar='CKANMETA_REMOTE',
'--ckanmeta-remote', '--metadata', envvar='CKANMETA_REMOTE',
help='Path/URL/SSH to Metadata Repo',
)
@click.option(
Expand All @@ -59,9 +59,9 @@ def netkan(debug):
'--key', envvar='SSH_KEY', required=True,
help='SSH key for accessing repositories'
)
def indexer(queue, metadata, token, repo, user, key, timeout):
def indexer(queue, ckanmeta_remote, token, repo, user, key, timeout):
init_ssh(key, '/home/netkan/.ssh')
ckan_meta = init_repo(metadata, '/tmp/CKAN-meta')
ckan_meta = init_repo(ckanmeta_remote, '/tmp/CKAN-meta')

github_pr = GitHubPR(token, repo, user)
sqs = boto3.resource('sqs')
Expand Down Expand Up @@ -94,6 +94,10 @@ def indexer(queue, metadata, token, repo, user, key, timeout):
'--netkan-remote', '--netkan', envvar='NETKAN_REMOTE',
help='Path/URL to NetKAN Repo for dev override',
)
@click.option(
'--ckanmeta-remote', envvar='CKANMETA_REMOTE',
help='Path/URL/SSH to Metadata Repo',
)
@click.option(
'--key', envvar='SSH_KEY', required=True,
help='SSH key for accessing repositories'
Expand All @@ -115,15 +119,16 @@ def indexer(queue, metadata, token, repo, user, key, timeout):
'--min-credits', default=200,
help='Only schedule if we have at least this many credits remaining'
)
def scheduler(queue, netkan_remote, key, max_queued, dev, group, min_credits):
def scheduler(queue, netkan_remote, ckanmeta_remote, key, max_queued, dev, group, min_credits):
init_ssh(key, '/home/netkan/.ssh')
sched = NetkanScheduler(
Path('/tmp/NetKAN'), queue,
Path('/tmp/NetKAN'), Path('/tmp/CKAN-meta'), queue,
nonhooks_group=(group == 'all' or group == 'nonhooks'),
webhooks_group=(group == 'all' or group == 'webhooks'),
)
if sched.can_schedule(max_queued, dev, min_credits):
init_repo(netkan_remote, '/tmp/NetKAN')
init_repo(ckanmeta_remote, '/tmp/CKAN-meta')
sched.schedule_all_netkans()
logging.info("NetKANs submitted to %s", queue)

Expand Down Expand Up @@ -240,7 +245,7 @@ def clean_cache(days):
help='Path/URL/SSH to NetKAN repo for mod list',
)
@click.option(
'--ckan-meta', envvar='CKANMETA_REMOTE',
'--ckanmeta-remote', '--ckan-meta', envvar='CKANMETA_REMOTE',
help='Path/URL/SSH to CKAN-meta repo for output',
)
@click.option(
Expand All @@ -251,10 +256,10 @@ def clean_cache(days):
'--key', envvar='SSH_KEY', required=True,
help='SSH key for accessing repositories'
)
def download_counter(netkan_remote, ckan_meta, token, key):
def download_counter(netkan_remote, ckanmeta_remote, token, key, debug):
init_ssh(key, '/home/netkan/.ssh')
init_repo(netkan_remote, '/tmp/NetKAN')
meta = init_repo(ckan_meta, '/tmp/CKAN-meta')
meta = init_repo(ckanmeta_remote, '/tmp/CKAN-meta')
logging.info('Starting Download Count Calculation...')
DownloadCounter(
'/tmp/NetKAN',
Expand Down
34 changes: 28 additions & 6 deletions netkan/netkan/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,19 @@ def hook_only(self):
return False
return self.on_spacedock

def sqs_message(self):
def sqs_message_attribs(self, ckan_group=None):
attribs = {}
if ckan_group and not getattr(self, 'x_netkan_allow_out_of_order', False):
attribs['HighestVersion'] = ckan_group.highest_version().string
return attribs

def sqs_message(self, ckan_group=None):
return {
'Id': self.identifier,
'MessageBody': self.contents,
'MessageGroupId': '1',
'Id': self.identifier,
'MessageBody': self.contents,
'MessageGroupId': '1',
'MessageDeduplicationId': uuid.uuid4().hex,
'MessageAttributes': self.sqs_message_attribs(ckan_group),
}


Expand Down Expand Up @@ -251,15 +258,17 @@ def _number_compare(v1, v2) -> (int, str, str):
# Process our strings while there are characters remaining
while len(first_remainder) > 0 and len(second_remainder) > 0:
# Start by comparing the string parts.
(result, first_remainder, second_remainder) = _string_compare(first_remainder, second_remainder)
(result, first_remainder, second_remainder) = _string_compare(
first_remainder, second_remainder)

if result != 0:
return result > 0

# Otherwise, compare the number parts.
# It's okay not to check if our strings are exhausted, because
# if they are the exhausted parts will return zero.
(result, first_remainder, second_remainder) = _number_compare(first_remainder, second_remainder)
(result, first_remainder, second_remainder) = _number_compare(
first_remainder, second_remainder)

# Again, return difference if found.
if result != 0:
Expand All @@ -274,3 +283,16 @@ def _number_compare(v1, v2) -> (int, str, str):

def __str__(self):
return self.string


class CkanGroup:

def __init__(self, ckan_meta_path, identifier):
self.path = Path(ckan_meta_path, identifier)

def highest_version(self):
ckans = (Ckan(p) for p in self.path.glob('**/*.ckan'))
highest = max(ckans, default=None, key=lambda ck: ck.version)
if highest:
return highest.version
return None
8 changes: 5 additions & 3 deletions netkan/netkan/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import boto3
import requests

from .metadata import Netkan
from .metadata import Netkan, CkanGroup
from .common import sqs_batch_entries


class NetkanScheduler:

def __init__(self, path, queue, base='NetKAN/', nonhooks_group=False, webhooks_group=False):
def __init__(self, path, ckan_meta_path, queue, base='NetKAN/', nonhooks_group=False, webhooks_group=False):
self.path = Path(path, base)
self.nonhooks_group = nonhooks_group
self.webhooks_group = webhooks_group
self.ckan_meta_path = ckan_meta_path

# TODO: This isn't super neat, do something better.
self.queue_url = 'test_url'
Expand Down Expand Up @@ -42,7 +43,8 @@ def _in_group(self, netkan):
return self.nonhooks_group

def schedule_all_netkans(self):
messages = (nk.sqs_message() for nk in self.netkans() if self._in_group(nk))
messages = (nk.sqs_message(CkanGroup(self.ckan_meta_path, nk.identifier))
for nk in self.netkans() if self._in_group(nk))
for batch in sqs_batch_entries(messages):
self.client.send_message_batch(**self.sqs_batch_attrs(batch))

Expand Down
1 change: 1 addition & 0 deletions netkan/netkan/webhooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def create_app():
# Set up config
app.config['secret'] = os.environ.get('XKAN_GHSECRET')
app.config['netkan_repo'] = init_repo(os.environ.get('NETKAN_REMOTE'), '/tmp/NetKAN')
app.config['ckanmeta_repo'] = init_repo(os.environ.get('CKANMETA_REMOTE'), '/tmp/CKAN-meta')
app.config['client'] = boto3.client('sqs')
sqs = boto3.resource('sqs')
app.config['inflation_queue'] = sqs.get_queue_by_name(
Expand Down
3 changes: 2 additions & 1 deletion netkan/netkan/webhooks/github_inflate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from ..common import netkans, sqs_batch_entries
from .github_utils import signature_required
from ..metadata import CkanGroup


github_inflate = Blueprint('github_inflate', __name__) # pylint: disable=invalid-name
Expand Down Expand Up @@ -55,7 +56,7 @@ def ids_from_commits(commits):
def inflate(ids):
# Make sure our NetKAN repo is up to date
current_app.config['netkan_repo'].remotes.origin.pull('master', strategy_option='theirs')
messages = (nk.sqs_message()
messages = (nk.sqs_message(CkanGroup(current_app.config['ckanmeta_repo'].working_dir, nk.identifier))
for nk in netkans(current_app.config['netkan_repo'].working_dir, ids))
for batch in sqs_batch_entries(messages):
current_app.config['client'].send_message_batch(
Expand Down
3 changes: 2 additions & 1 deletion netkan/netkan/webhooks/inflate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from flask import Blueprint, current_app, request

from ..common import netkans, sqs_batch_entries
from ..metadata import CkanGroup


inflate = Blueprint('inflate', __name__) # pylint: disable=invalid-name
Expand All @@ -18,7 +19,7 @@ def inflate_hook():
return 'An array of identifiers is required', 400
# Make sure our NetKAN repo is up to date
current_app.config['netkan_repo'].remotes.origin.pull('master', strategy_option='theirs')
messages = (nk.sqs_message()
messages = (nk.sqs_message(CkanGroup(current_app.config['ckanmeta_repo'].working_dir, nk.identifier))
for nk in netkans(current_app.config['netkan_repo'].working_dir, ids))
for batch in sqs_batch_entries(messages):
current_app.logger.info(f'Queueing inflation request batch: {batch}')
Expand Down
5 changes: 3 additions & 2 deletions netkan/netkan/webhooks/spacedock_inflate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pathlib import Path
from flask import Blueprint, current_app, request

from ..metadata import Netkan
from ..metadata import Netkan, CkanGroup
from ..common import sqs_batch_entries


Expand All @@ -19,7 +19,8 @@ def inflate_hook():
nks = find_netkans(request.form.get('mod_id'))
if nks:
# Submit them to the queue
messages = (nk.sqs_message() for nk in nks)
messages = (nk.sqs_message(CkanGroup(current_app.config['ckanmeta_repo'].working_dir, nk.identifier))
for nk in nks)
for batch in sqs_batch_entries(messages):
current_app.config['client'].send_message_batch(
QueueUrl=current_app.config['inflation_queue'].url,
Expand Down
2 changes: 2 additions & 0 deletions prod-stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@
'env': [
('SQS_QUEUE', GetAtt(inbound, 'QueueName')),
('NETKAN_REMOTE', NETKAN_REMOTE),
('CKANMETA_REMOTE', CKANMETA_REMOTE),
('AWS_DEFAULT_REGION', Sub('${AWS::Region}')),
],
'schedule': 'rate(2 hours)',
Expand Down Expand Up @@ -752,6 +753,7 @@
],
'env': [
('NETKAN_REMOTE', NETKAN_REMOTE),
('CKANMETA_REMOTE', CKANMETA_REMOTE),
('AWS_DEFAULT_REGION', Sub('${AWS::Region}')),
('INFLATION_SQS_QUEUE', GetAtt(inbound, 'QueueName')),
],
Expand Down

0 comments on commit 7e45b46

Please sign in to comment.