Skip to content

Commit

Permalink
Respect delete markers in package-related ES queries (#2000)
Browse files Browse the repository at this point in the history
* search lambda: support filter_path, cleanup
* respect delete markers in pkg-related search queries
* filter-out non-epoch pointer files
* utils/search: filter-out pkg del markers
* show helpful msg when revisions popup is empty
* properly compute version stack for pkg and rev count
* properly compute version stack for pkg and rev list
  • Loading branch information
nl0 authored Jan 17, 2021
1 parent e899aa9 commit 06b2387
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 83 deletions.
4 changes: 2 additions & 2 deletions catalog/app/containers/Bucket/Overview.js
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ function Head({ req, s3, overviewUrl, bucket, description }) {
const isRODA = !!overviewUrl && overviewUrl.includes(`/${RODA_BUCKET}/`)
const colorPool = useConst(() => mkKeyedPool(COLOR_MAP))
const statsData = useData(requests.bucketStats, { req, s3, bucket, overviewUrl })
const pkgStatsData = useData(requests.bucketPkgStats, { req, bucket })
const pkgCountData = useData(requests.countPackageRevisions, { req, bucket })
return (
<M.Paper className={classes.root}>
<M.Box className={classes.top}>
Expand Down Expand Up @@ -798,7 +798,7 @@ function Head({ req, s3, overviewUrl, bucket, description }) {
fallback={() => '?'}
/>
<StatDisplay
value={AsyncResult.prop('totalPackages', pkgStatsData.result)}
value={pkgCountData.result}
format={formatQuantity}
label="Packages"
fallback={() => null}
Expand Down
73 changes: 42 additions & 31 deletions catalog/app/containers/Bucket/PackageTree.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,38 +134,49 @@ function RevisionInfo({ revisionData, revision, bucket, name, path }) {
<M.List className={classes.list} ref={listRef}>
{AsyncResult.case(
{
Ok: R.map((r) => (
<M.ListItem
key={r.hash}
button
onClick={close}
selected={r.selected}
component={RRLink}
to={urls.bucketPackageTree(bucket, name, r.hash, path)}
>
<M.ListItemText
primary={dateFns.format(r.modified, 'MMMM do yyyy - h:mma')}
secondary={
<span className={classes.secondaryText}>
<span className={classes.line}>
{r.message || <i>No message</i>}
Ok: R.ifElse(
R.isEmpty,
() => (
<M.ListItem>
<M.ListItemText
primary="No revisions found"
secondary="Looks like this package has been deleted"
/>
</M.ListItem>
),
R.map((r) => (
<M.ListItem
key={r.hash}
button
onClick={close}
selected={r.selected}
component={RRLink}
to={urls.bucketPackageTree(bucket, name, r.hash, path)}
>
<M.ListItemText
primary={dateFns.format(r.modified, 'MMMM do yyyy - h:mma')}
secondary={
<span className={classes.secondaryText}>
<span className={classes.line}>
{r.message || <i>No message</i>}
</span>
<br />
<span className={cx(classes.line, classes.mono)}>{r.hash}</span>
</span>
<br />
<span className={cx(classes.line, classes.mono)}>{r.hash}</span>
</span>
}
/>
<M.ListItemSecondaryAction>
<M.IconButton
title="Copy package revision's canonical catalog URI to the clipboard"
href={getHttpsUri(r)}
onClick={copyHttpsUri(r, listRef)}
>
<M.Icon>link</M.Icon>
</M.IconButton>
</M.ListItemSecondaryAction>
</M.ListItem>
)),
}
/>
<M.ListItemSecondaryAction>
<M.IconButton
title="Copy package revision's canonical catalog URI to the clipboard"
href={getHttpsUri(r)}
onClick={copyHttpsUri(r, listRef)}
>
<M.Icon>link</M.Icon>
</M.IconButton>
</M.ListItemSecondaryAction>
</M.ListItem>
)),
),
Err: () => (
<M.ListItem>
<M.ListItemIcon>
Expand Down
194 changes: 148 additions & 46 deletions catalog/app/containers/Bucket/requests.js
Original file line number Diff line number Diff line change
Expand Up @@ -214,24 +214,6 @@ export const bucketStats = async ({ req, s3, bucket, overviewUrl }) => {
throw new Error('Stats unavailable')
}

export const bucketPkgStats = async ({ req, bucket }) => {
try {
// TODO: use pkg_stats action when it's implemented
return await req('/search', { index: `${bucket}_packages`, action: 'stats' }).then(
R.applySpec({
totalPackages: R.path(['aggregations', 'totalPackageHandles', 'value']),
}),
)
} catch (e) {
// eslint-disable-next-line no-console
console.log('Unable to fetch package stats:')
// eslint-disable-next-line no-console
console.error(e)
}

throw new Error('Package stats unavailable')
}

const fetchFileVersioned = async ({ s3, bucket, path, version }) => {
const versionExists = await ensureObjectIsPresent({
s3,
Expand Down Expand Up @@ -770,12 +752,55 @@ const mkFilterQuery = (filter) =>
}
: { match_all: {} }

const NOT_DELETED_METRIC = {
scripted_metric: {
init_script: 'state.last_modified = 0; state.deleted = false',
map_script: `
def last_modified = doc.last_modified.getValue().toInstant().toEpochMilli();
if (last_modified > state.last_modified) {
state.last_modified = last_modified;
state.deleted = doc.delete_marker.getValue();
}
`,
reduce_script: `
def last_modified = 0;
def deleted = false;
for (s in states) {
if (s.last_modified > last_modified) {
last_modified = s.last_modified;
deleted = s.deleted;
}
}
return deleted ? 0 : 1;
`,
},
}

export const countPackages = withErrorHandling(async ({ req, bucket, filter }) => {
const body = {
query: mkFilterQuery(filter),
query: {
bool: {
must: [mkFilterQuery(filter), { regexp: { pointer_file: TIMESTAMP_RE_SRC } }],
},
},
aggs: {
total: {
cardinality: { field: 'handle' },
packages: {
terms: { field: 'handle', size: 1000000 },
aggs: {
revision_objects: {
// TODO: use pointer_file when it's converted to a keyword field
terms: { field: 'key', size: 1000000 },
aggs: { not_deleted: NOT_DELETED_METRIC },
},
revision_count: {
sum_bucket: { buckets_path: 'revision_objects>not_deleted.value' },
},
},
},
total_revisions: {
sum_bucket: {
buckets_path: 'packages>revision_count',
},
},
},
}
Expand All @@ -784,8 +809,9 @@ export const countPackages = withErrorHandling(async ({ req, bucket, filter }) =
action: 'packages',
body: JSON.stringify(body),
size: 0,
filter_path: 'aggregations.total_revisions',
})
return result.aggregations.total.value
return result.aggregations.total_revisions.value
})

export const listPackages = withErrorHandling(
Expand All @@ -812,27 +838,37 @@ export const listPackages = withErrorHandling(
})

const body = {
query: mkFilterQuery(filter),
query: {
bool: {
must: [mkFilterQuery(filter), { regexp: { pointer_file: TIMESTAMP_RE_SRC } }],
},
},
aggs: {
packages: {
composite: {
// the limit is configured in ES cluster settings (search.max_buckets)
size: 10000,
sources: [
{
handle: {
terms: { field: 'handle' },
},
},
],
sources: [{ handle: { terms: { field: 'handle' } } }],
},
aggs: {
modified: {
max: { field: 'last_modified' },
// TODO: take into account only timestamps of not-deleted revisions
modified: { max: { field: 'last_modified' } },
revision_objects: {
terms: { field: 'key', size: 1000000 },
aggs: { not_deleted: NOT_DELETED_METRIC },
},
revision_count: {
sum_bucket: { buckets_path: 'revision_objects>not_deleted.value' },
},
drop_deleted: {
bucket_selector: {
buckets_path: { revision_count: 'revision_count' },
script: 'params.revision_count > 0',
},
},
sort: {
bucket_sort: {
sort: sort === 'modified' ? [{ modified: { order: 'desc' } }] : undefined,
sort: sort === 'modified' ? [{ modified: 'desc' }] : undefined,
size: perPage,
from: perPage * (page - 1),
},
Expand All @@ -846,11 +882,16 @@ export const listPackages = withErrorHandling(
action: 'packages',
body: JSON.stringify(body),
size: 0,
filter_path: [
'aggregations.packages.buckets.key',
'aggregations.packages.buckets.modified',
'aggregations.packages.buckets.revision_count',
].join(','),
})
const packages = result.aggregations.packages.buckets.map((b) => ({
name: b.key.handle,
modified: new Date(b.modified.value),
revisions: b.doc_count,
revisions: b.revision_count.value,
}))

if (!countsP) return packages
Expand Down Expand Up @@ -917,10 +958,29 @@ export const countPackageRevisions = ({ req, bucket, name }) =>
req('/search', {
index: `${bucket}_packages`,
action: 'packages',
body: JSON.stringify({ query: { term: { handle: name } } }),
body: JSON.stringify({
query: {
bool: {
must: [
name ? { term: { handle: name } } : { match_all: {} },
{ regexp: { pointer_file: TIMESTAMP_RE_SRC } },
],
},
},
aggs: {
revision_objects: {
terms: { field: 'key', size: 1000000 },
aggs: { not_deleted: NOT_DELETED_METRIC },
},
revision_count: {
sum_bucket: { buckets_path: 'revision_objects>not_deleted.value' },
},
},
}),
size: 0,
filter_path: 'aggregations.revision_count',
})
.then(R.path(['hits', 'total']))
.then(R.path(['aggregations', 'revision_count', 'value']))
.catch(errors.catchErrors())

function tryParse(s) {
Expand All @@ -936,19 +996,60 @@ export const getPackageRevisions = withErrorHandling(
req('/search', {
index: `${bucket}_packages`,
action: 'packages',
size: 0,
filter_path: 'aggregations.revisions.buckets.latest.hits.hits._source',
body: JSON.stringify({
query: { term: { handle: name } },
sort: [{ last_modified: 'desc' }],
query: {
bool: {
must: [
{ term: { handle: name } },
{ regexp: { pointer_file: TIMESTAMP_RE_SRC } },
],
},
},
aggs: {
revisions: {
composite: {
// the limit is configured in ES cluster settings (search.max_buckets)
size: 10000,
sources: [{ pointer: { terms: { field: 'key' } } }],
},
aggs: {
not_deleted: NOT_DELETED_METRIC,
drop_deleted: {
bucket_selector: {
buckets_path: { not_deleted: 'not_deleted.value' },
script: 'params.not_deleted > 0',
},
},
latest: {
top_hits: {
size: 1,
sort: { last_modified: 'desc' },
_source: [
'comment',
'hash',
'last_modified',
'metadata',
'package_stats',
],
},
},
sort: {
bucket_sort: {
sort: [{ _key: 'desc' }],
size: perPage,
from: perPage * (page - 1),
},
},
},
},
},
}),
size: perPage,
from: perPage * (page - 1),
_source: ['comment', 'hash', 'last_modified', 'metadata', 'package_stats'].join(
',',
),
}).then(
R.pipe(
R.path(['hits', 'hits']),
R.map(({ _source: s }) => ({
R.path(['aggregations', 'revisions', 'buckets']),
R.map(({ latest: { hits: { hits: [{ _source: s }] } } }) => ({
hash: s.hash,
modified: new Date(s.last_modified),
stats: {
Expand All @@ -973,7 +1074,8 @@ export const loadRevisionHash = ({ s3, bucket, name, id }) =>
}))

const HASH_RE = /^[a-f0-9]{64}$/
const TIMESTAMP_RE = /^1[0-9]{9}$/
const TIMESTAMP_RE_SRC = '[0-9]{10}'
const TIMESTAMP_RE = new RegExp(`^${TIMESTAMP_RE_SRC}$`)

// returns { hash, modified }
export async function resolvePackageRevision({ s3, bucket, name, revision }) {
Expand Down
1 change: 1 addition & 0 deletions catalog/app/utils/search.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ const parseJSON = (s) => {
}

const extractPkgData = ({ bucket, score, src }) => {
if (src.delete_marker || !src.hash) return {}
const key = `package:${bucket}/${src.handle}:${src.hash}`
return {
[key]: {
Expand Down
Loading

0 comments on commit 06b2387

Please sign in to comment.