Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect delete markers in package-related ES queries #2000

Merged
merged 7 commits into from
Jan 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions catalog/app/containers/Bucket/Overview.js
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ function Head({ req, s3, overviewUrl, bucket, description }) {
const isRODA = !!overviewUrl && overviewUrl.includes(`/${RODA_BUCKET}/`)
const colorPool = useConst(() => mkKeyedPool(COLOR_MAP))
const statsData = useData(requests.bucketStats, { req, s3, bucket, overviewUrl })
const pkgStatsData = useData(requests.bucketPkgStats, { req, bucket })
const pkgCountData = useData(requests.countPackageRevisions, { req, bucket })
return (
<M.Paper className={classes.root}>
<M.Box className={classes.top}>
Expand Down Expand Up @@ -798,7 +798,7 @@ function Head({ req, s3, overviewUrl, bucket, description }) {
fallback={() => '?'}
/>
<StatDisplay
value={AsyncResult.prop('totalPackages', pkgStatsData.result)}
value={pkgCountData.result}
format={formatQuantity}
label="Packages"
fallback={() => null}
Expand Down
73 changes: 42 additions & 31 deletions catalog/app/containers/Bucket/PackageTree.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,38 +134,49 @@ function RevisionInfo({ revisionData, revision, bucket, name, path }) {
<M.List className={classes.list} ref={listRef}>
{AsyncResult.case(
{
Ok: R.map((r) => (
<M.ListItem
key={r.hash}
button
onClick={close}
selected={r.selected}
component={RRLink}
to={urls.bucketPackageTree(bucket, name, r.hash, path)}
>
<M.ListItemText
primary={dateFns.format(r.modified, 'MMMM do yyyy - h:mma')}
secondary={
<span className={classes.secondaryText}>
<span className={classes.line}>
{r.message || <i>No message</i>}
Ok: R.ifElse(
R.isEmpty,
() => (
<M.ListItem>
<M.ListItemText
primary="No revisions found"
secondary="Looks like this package has been deleted"
/>
</M.ListItem>
),
R.map((r) => (
<M.ListItem
key={r.hash}
button
onClick={close}
selected={r.selected}
component={RRLink}
to={urls.bucketPackageTree(bucket, name, r.hash, path)}
>
<M.ListItemText
primary={dateFns.format(r.modified, 'MMMM do yyyy - h:mma')}
secondary={
<span className={classes.secondaryText}>
<span className={classes.line}>
{r.message || <i>No message</i>}
</span>
<br />
<span className={cx(classes.line, classes.mono)}>{r.hash}</span>
</span>
<br />
<span className={cx(classes.line, classes.mono)}>{r.hash}</span>
</span>
}
/>
<M.ListItemSecondaryAction>
<M.IconButton
title="Copy package revision's canonical catalog URI to the clipboard"
href={getHttpsUri(r)}
onClick={copyHttpsUri(r, listRef)}
>
<M.Icon>link</M.Icon>
</M.IconButton>
</M.ListItemSecondaryAction>
</M.ListItem>
)),
}
/>
<M.ListItemSecondaryAction>
<M.IconButton
title="Copy package revision's canonical catalog URI to the clipboard"
href={getHttpsUri(r)}
onClick={copyHttpsUri(r, listRef)}
>
<M.Icon>link</M.Icon>
</M.IconButton>
</M.ListItemSecondaryAction>
</M.ListItem>
)),
),
Err: () => (
<M.ListItem>
<M.ListItemIcon>
Expand Down
194 changes: 148 additions & 46 deletions catalog/app/containers/Bucket/requests.js
Original file line number Diff line number Diff line change
Expand Up @@ -214,24 +214,6 @@ export const bucketStats = async ({ req, s3, bucket, overviewUrl }) => {
throw new Error('Stats unavailable')
}

export const bucketPkgStats = async ({ req, bucket }) => {
try {
// TODO: use pkg_stats action when it's implemented
return await req('/search', { index: `${bucket}_packages`, action: 'stats' }).then(
R.applySpec({
totalPackages: R.path(['aggregations', 'totalPackageHandles', 'value']),
}),
)
} catch (e) {
// eslint-disable-next-line no-console
console.log('Unable to fetch package stats:')
// eslint-disable-next-line no-console
console.error(e)
}

throw new Error('Package stats unavailable')
}

const fetchFileVersioned = async ({ s3, bucket, path, version }) => {
const versionExists = await ensureObjectIsPresent({
s3,
Expand Down Expand Up @@ -770,12 +752,55 @@ const mkFilterQuery = (filter) =>
}
: { match_all: {} }

const NOT_DELETED_METRIC = {
scripted_metric: {
init_script: 'state.last_modified = 0; state.deleted = false',
map_script: `
def last_modified = doc.last_modified.getValue().toInstant().toEpochMilli();
if (last_modified > state.last_modified) {
state.last_modified = last_modified;
state.deleted = doc.delete_marker.getValue();
}
`,
reduce_script: `
def last_modified = 0;
def deleted = false;
for (s in states) {
if (s.last_modified > last_modified) {
last_modified = s.last_modified;
deleted = s.deleted;
}
}
return deleted ? 0 : 1;
`,
},
}

export const countPackages = withErrorHandling(async ({ req, bucket, filter }) => {
const body = {
query: mkFilterQuery(filter),
query: {
bool: {
must: [mkFilterQuery(filter), { regexp: { pointer_file: TIMESTAMP_RE_SRC } }],
},
},
aggs: {
total: {
cardinality: { field: 'handle' },
packages: {
terms: { field: 'handle', size: 1000000 },
aggs: {
revision_objects: {
// TODO: use pointer_file when it's converted to a keyword field
terms: { field: 'key', size: 1000000 },
aggs: { not_deleted: NOT_DELETED_METRIC },
},
revision_count: {
sum_bucket: { buckets_path: 'revision_objects>not_deleted.value' },
},
},
},
total_revisions: {
sum_bucket: {
buckets_path: 'packages>revision_count',
},
},
},
}
Expand All @@ -784,8 +809,9 @@ export const countPackages = withErrorHandling(async ({ req, bucket, filter }) =
action: 'packages',
body: JSON.stringify(body),
size: 0,
filter_path: 'aggregations.total_revisions',
})
return result.aggregations.total.value
return result.aggregations.total_revisions.value
})

export const listPackages = withErrorHandling(
Expand All @@ -812,27 +838,37 @@ export const listPackages = withErrorHandling(
})

const body = {
query: mkFilterQuery(filter),
query: {
bool: {
must: [mkFilterQuery(filter), { regexp: { pointer_file: TIMESTAMP_RE_SRC } }],
},
},
aggs: {
packages: {
composite: {
// the limit is configured in ES cluster settings (search.max_buckets)
size: 10000,
sources: [
{
handle: {
terms: { field: 'handle' },
},
},
],
sources: [{ handle: { terms: { field: 'handle' } } }],
},
aggs: {
modified: {
max: { field: 'last_modified' },
// TODO: take into account only timestamps of not-deleted revisions
modified: { max: { field: 'last_modified' } },
revision_objects: {
terms: { field: 'key', size: 1000000 },
aggs: { not_deleted: NOT_DELETED_METRIC },
},
revision_count: {
sum_bucket: { buckets_path: 'revision_objects>not_deleted.value' },
},
drop_deleted: {
bucket_selector: {
buckets_path: { revision_count: 'revision_count' },
script: 'params.revision_count > 0',
},
},
sort: {
bucket_sort: {
sort: sort === 'modified' ? [{ modified: { order: 'desc' } }] : undefined,
sort: sort === 'modified' ? [{ modified: 'desc' }] : undefined,
size: perPage,
from: perPage * (page - 1),
},
Expand All @@ -846,11 +882,16 @@ export const listPackages = withErrorHandling(
action: 'packages',
body: JSON.stringify(body),
size: 0,
filter_path: [
'aggregations.packages.buckets.key',
'aggregations.packages.buckets.modified',
'aggregations.packages.buckets.revision_count',
].join(','),
})
const packages = result.aggregations.packages.buckets.map((b) => ({
name: b.key.handle,
modified: new Date(b.modified.value),
revisions: b.doc_count,
revisions: b.revision_count.value,
}))

if (!countsP) return packages
Expand Down Expand Up @@ -917,10 +958,29 @@ export const countPackageRevisions = ({ req, bucket, name }) =>
req('/search', {
index: `${bucket}_packages`,
action: 'packages',
body: JSON.stringify({ query: { term: { handle: name } } }),
body: JSON.stringify({
query: {
bool: {
must: [
name ? { term: { handle: name } } : { match_all: {} },
{ regexp: { pointer_file: TIMESTAMP_RE_SRC } },
],
},
},
aggs: {
revision_objects: {
terms: { field: 'key', size: 1000000 },
aggs: { not_deleted: NOT_DELETED_METRIC },
},
revision_count: {
sum_bucket: { buckets_path: 'revision_objects>not_deleted.value' },
},
},
}),
size: 0,
filter_path: 'aggregations.revision_count',
})
.then(R.path(['hits', 'total']))
.then(R.path(['aggregations', 'revision_count', 'value']))
.catch(errors.catchErrors())

function tryParse(s) {
Expand All @@ -936,19 +996,60 @@ export const getPackageRevisions = withErrorHandling(
req('/search', {
index: `${bucket}_packages`,
action: 'packages',
size: 0,
filter_path: 'aggregations.revisions.buckets.latest.hits.hits._source',
body: JSON.stringify({
query: { term: { handle: name } },
sort: [{ last_modified: 'desc' }],
query: {
bool: {
must: [
{ term: { handle: name } },
{ regexp: { pointer_file: TIMESTAMP_RE_SRC } },
],
},
},
aggs: {
revisions: {
composite: {
// the limit is configured in ES cluster settings (search.max_buckets)
size: 10000,
sources: [{ pointer: { terms: { field: 'key' } } }],
},
aggs: {
not_deleted: NOT_DELETED_METRIC,
drop_deleted: {
bucket_selector: {
buckets_path: { not_deleted: 'not_deleted.value' },
script: 'params.not_deleted > 0',
},
},
latest: {
top_hits: {
size: 1,
sort: { last_modified: 'desc' },
_source: [
'comment',
'hash',
'last_modified',
'metadata',
'package_stats',
],
},
},
sort: {
bucket_sort: {
sort: [{ _key: 'desc' }],
size: perPage,
from: perPage * (page - 1),
},
},
},
},
},
}),
size: perPage,
from: perPage * (page - 1),
_source: ['comment', 'hash', 'last_modified', 'metadata', 'package_stats'].join(
',',
),
}).then(
R.pipe(
R.path(['hits', 'hits']),
R.map(({ _source: s }) => ({
R.path(['aggregations', 'revisions', 'buckets']),
R.map(({ latest: { hits: { hits: [{ _source: s }] } } }) => ({
hash: s.hash,
modified: new Date(s.last_modified),
stats: {
Expand All @@ -973,7 +1074,8 @@ export const loadRevisionHash = ({ s3, bucket, name, id }) =>
}))

const HASH_RE = /^[a-f0-9]{64}$/
const TIMESTAMP_RE = /^1[0-9]{9}$/
const TIMESTAMP_RE_SRC = '[0-9]{10}'
const TIMESTAMP_RE = new RegExp(`^${TIMESTAMP_RE_SRC}$`)

// returns { hash, modified }
export async function resolvePackageRevision({ s3, bucket, name, revision }) {
Expand Down
1 change: 1 addition & 0 deletions catalog/app/utils/search.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ const parseJSON = (s) => {
}

const extractPkgData = ({ bucket, score, src }) => {
if (src.delete_marker || !src.hash) return {}
const key = `package:${bucket}/${src.handle}:${src.hash}`
return {
[key]: {
Expand Down
Loading