Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send HighestVersion attribute to Inflator #102

Merged
merged 2 commits into from
Nov 28, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ services:
environment:
NETKAN_REMOTE: ${NETKAN_METADATA_PATH}
SSH_KEY: ${CKAN_NETKAN_SSHKEY}
CKANMETA_REMOTE: ${CKAN_METADATA_PATH}
AWS_DEFAULT_REGION: ${CKAN_AWS_DEFAULT_REGION}
AWS_SECRET_ACCESS_KEY: ${CKAN_AWS_SECRET_ACCESS_KEY}
AWS_ACCESS_KEY_ID: ${CKAN_AWS_ACCESS_KEY_ID}
Expand Down
21 changes: 13 additions & 8 deletions netkan/netkan/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def netkan(debug):
help='SQS Queue to poll for metadata'
)
@click.option(
'--metadata', envvar='CKANMETA_REMOTE',
'--ckanmeta-remote', '--metadata', envvar='CKANMETA_REMOTE',
help='Path/URL/SSH to Metadata Repo',
)
@click.option(
Expand All @@ -59,9 +59,9 @@ def netkan(debug):
'--key', envvar='SSH_KEY', required=True,
help='SSH key for accessing repositories'
)
def indexer(queue, metadata, token, repo, user, key, timeout):
def indexer(queue, ckanmeta_remote, token, repo, user, key, timeout):
init_ssh(key, '/home/netkan/.ssh')
ckan_meta = init_repo(metadata, '/tmp/CKAN-meta')
ckan_meta = init_repo(ckanmeta_remote, '/tmp/CKAN-meta')

github_pr = GitHubPR(token, repo, user)
sqs = boto3.resource('sqs')
Expand Down Expand Up @@ -94,6 +94,10 @@ def indexer(queue, metadata, token, repo, user, key, timeout):
'--netkan-remote', '--netkan', envvar='NETKAN_REMOTE',
help='Path/URL to NetKAN Repo for dev override',
)
@click.option(
'--ckanmeta-remote', envvar='CKANMETA_REMOTE',
help='Path/URL/SSH to Metadata Repo',
)
@click.option(
'--key', envvar='SSH_KEY', required=True,
help='SSH key for accessing repositories'
Expand All @@ -115,15 +119,16 @@ def indexer(queue, metadata, token, repo, user, key, timeout):
'--min-credits', default=200,
help='Only schedule if we have at least this many credits remaining'
)
def scheduler(queue, netkan_remote, key, max_queued, dev, group, min_credits):
def scheduler(queue, netkan_remote, ckanmeta_remote, key, max_queued, dev, group, min_credits):
init_ssh(key, '/home/netkan/.ssh')
sched = NetkanScheduler(
Path('/tmp/NetKAN'), queue,
Path('/tmp/NetKAN'), Path('/tmp/CKAN-meta'), queue,
nonhooks_group=(group == 'all' or group == 'nonhooks'),
webhooks_group=(group == 'all' or group == 'webhooks'),
)
if sched.can_schedule(max_queued, dev, min_credits):
init_repo(netkan_remote, '/tmp/NetKAN')
init_repo(ckanmeta_remote, '/tmp/CKAN-meta')
sched.schedule_all_netkans()
logging.info("NetKANs submitted to %s", queue)

Expand Down Expand Up @@ -240,7 +245,7 @@ def clean_cache(days):
help='Path/URL/SSH to NetKAN repo for mod list',
)
@click.option(
'--ckan-meta', envvar='CKANMETA_REMOTE',
'--ckanmeta-remote', '--ckan-meta', envvar='CKANMETA_REMOTE',
help='Path/URL/SSH to CKAN-meta repo for output',
)
@click.option(
Expand All @@ -251,10 +256,10 @@ def clean_cache(days):
'--key', envvar='SSH_KEY', required=True,
help='SSH key for accessing repositories'
)
def download_counter(netkan_remote, ckan_meta, token, key):
def download_counter(netkan_remote, ckanmeta_remote, token, key, debug):
init_ssh(key, '/home/netkan/.ssh')
init_repo(netkan_remote, '/tmp/NetKAN')
meta = init_repo(ckan_meta, '/tmp/CKAN-meta')
meta = init_repo(ckanmeta_remote, '/tmp/CKAN-meta')
logging.info('Starting Download Count Calculation...')
DownloadCounter(
'/tmp/NetKAN',
Expand Down
34 changes: 28 additions & 6 deletions netkan/netkan/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,19 @@ def hook_only(self):
return False
return self.on_spacedock

def sqs_message(self):
def sqs_message_attribs(self, ckan_group=None):
attribs = {}
if ckan_group and not getattr(self, 'x_netkan_allow_out_of_order', False):
attribs['HighestVersion'] = ckan_group.highest_version().string
return attribs

def sqs_message(self, ckan_group=None):
return {
'Id': self.identifier,
'MessageBody': self.contents,
'MessageGroupId': '1',
'Id': self.identifier,
'MessageBody': self.contents,
'MessageGroupId': '1',
'MessageDeduplicationId': uuid.uuid4().hex,
'MessageAttributes': self.sqs_message_attribs(ckan_group),
}


Expand Down Expand Up @@ -255,15 +262,17 @@ def _number_compare(v1, v2) -> (int, str, str):
# Process our strings while there are characters remaining
while len(first_remainder) > 0 and len(second_remainder) > 0:
# Start by comparing the string parts.
(result, first_remainder, second_remainder) = _string_compare(first_remainder, second_remainder)
(result, first_remainder, second_remainder) = _string_compare(
first_remainder, second_remainder)

if result != 0:
return result > 0

# Otherwise, compare the number parts.
# It's okay not to check if our strings are exhausted, because
# if they are the exhausted parts will return zero.
(result, first_remainder, second_remainder) = _number_compare(first_remainder, second_remainder)
(result, first_remainder, second_remainder) = _number_compare(
first_remainder, second_remainder)

# Again, return difference if found.
if result != 0:
Expand All @@ -278,3 +287,16 @@ def _number_compare(v1, v2) -> (int, str, str):

def __str__(self):
return self.string


class CkanGroup:

def __init__(self, ckan_meta_path, identifier):
self.path = Path(ckan_meta_path, identifier)

def highest_version(self):
ckans = (Ckan(p) for p in self.path.glob('**/*.ckan'))
highest = max(ckans, default=None, key=lambda ck: ck.version)
if highest:
return highest.version
return None
8 changes: 5 additions & 3 deletions netkan/netkan/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,17 @@
import boto3
import requests

from .metadata import Netkan
from .metadata import Netkan, CkanGroup
from .common import sqs_batch_entries


class NetkanScheduler:

def __init__(self, path, queue, base='NetKAN/', nonhooks_group=False, webhooks_group=False):
def __init__(self, path, ckan_meta_path, queue, base='NetKAN/', nonhooks_group=False, webhooks_group=False):
self.path = Path(path, base)
self.nonhooks_group = nonhooks_group
self.webhooks_group = webhooks_group
self.ckan_meta_path = ckan_meta_path

# TODO: This isn't super neat, do something better.
self.queue_url = 'test_url'
Expand Down Expand Up @@ -42,7 +43,8 @@ def _in_group(self, netkan):
return self.nonhooks_group

def schedule_all_netkans(self):
messages = (nk.sqs_message() for nk in self.netkans() if self._in_group(nk))
messages = (nk.sqs_message(CkanGroup(self.ckan_meta_path, nk.identifier))
for nk in self.netkans() if self._in_group(nk))
for batch in sqs_batch_entries(messages):
self.client.send_message_batch(**self.sqs_batch_attrs(batch))

Expand Down
1 change: 1 addition & 0 deletions netkan/netkan/webhooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def create_app():
# Set up config
app.config['secret'] = os.environ.get('XKAN_GHSECRET')
app.config['netkan_repo'] = init_repo(os.environ.get('NETKAN_REMOTE'), '/tmp/NetKAN')
app.config['ckanmeta_repo'] = init_repo(os.environ.get('CKANMETA_REMOTE'), '/tmp/CKAN-meta')
app.config['client'] = boto3.client('sqs')
sqs = boto3.resource('sqs')
app.config['inflation_queue'] = sqs.get_queue_by_name(
Expand Down
3 changes: 2 additions & 1 deletion netkan/netkan/webhooks/github_inflate.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from ..common import netkans, sqs_batch_entries
from .github_utils import signature_required
from ..metadata import CkanGroup


github_inflate = Blueprint('github_inflate', __name__) # pylint: disable=invalid-name
Expand Down Expand Up @@ -55,7 +56,7 @@ def ids_from_commits(commits):
def inflate(ids):
# Make sure our NetKAN repo is up to date
current_app.config['netkan_repo'].remotes.origin.pull('master', strategy_option='theirs')
messages = (nk.sqs_message()
messages = (nk.sqs_message(CkanGroup(current_app.config['ckanmeta_repo'].working_dir, nk.identifier))
for nk in netkans(current_app.config['netkan_repo'].working_dir, ids))
for batch in sqs_batch_entries(messages):
current_app.config['client'].send_message_batch(
Expand Down
3 changes: 2 additions & 1 deletion netkan/netkan/webhooks/inflate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from flask import Blueprint, current_app, request

from ..common import netkans, sqs_batch_entries
from ..metadata import CkanGroup


inflate = Blueprint('inflate', __name__) # pylint: disable=invalid-name
Expand All @@ -18,7 +19,7 @@ def inflate_hook():
return 'An array of identifiers is required', 400
# Make sure our NetKAN repo is up to date
current_app.config['netkan_repo'].remotes.origin.pull('master', strategy_option='theirs')
messages = (nk.sqs_message()
messages = (nk.sqs_message(CkanGroup(current_app.config['ckanmeta_repo'].working_dir, nk.identifier))
for nk in netkans(current_app.config['netkan_repo'].working_dir, ids))
for batch in sqs_batch_entries(messages):
current_app.logger.info(f'Queueing inflation request batch: {batch}')
Expand Down
5 changes: 3 additions & 2 deletions netkan/netkan/webhooks/spacedock_inflate.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pathlib import Path
from flask import Blueprint, current_app, request

from ..metadata import Netkan
from ..metadata import Netkan, CkanGroup
from ..common import sqs_batch_entries


Expand All @@ -19,7 +19,8 @@ def inflate_hook():
nks = find_netkans(request.form.get('mod_id'))
if nks:
# Submit them to the queue
messages = (nk.sqs_message() for nk in nks)
messages = (nk.sqs_message(CkanGroup(current_app.config['ckanmeta_repo'].working_dir, nk.identifier))
for nk in nks)
for batch in sqs_batch_entries(messages):
current_app.config['client'].send_message_batch(
QueueUrl=current_app.config['inflation_queue'].url,
Expand Down
2 changes: 2 additions & 0 deletions prod-stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@
'env': [
('SQS_QUEUE', GetAtt(inbound, 'QueueName')),
('NETKAN_REMOTE', NETKAN_REMOTE),
('CKANMETA_REMOTE', CKANMETA_REMOTE),
('AWS_DEFAULT_REGION', Sub('${AWS::Region}')),
],
'schedule': 'rate(2 hours)',
Expand Down Expand Up @@ -752,6 +753,7 @@
],
'env': [
('NETKAN_REMOTE', NETKAN_REMOTE),
('CKANMETA_REMOTE', CKANMETA_REMOTE),
('AWS_DEFAULT_REGION', Sub('${AWS::Region}')),
('INFLATION_SQS_QUEUE', GetAtt(inbound, 'QueueName')),
],
Expand Down