diff --git a/catalog/app/containers/Bucket/Overview.js b/catalog/app/containers/Bucket/Overview.js index 6a90dec7cbb..683b5164c4d 100644 --- a/catalog/app/containers/Bucket/Overview.js +++ b/catalog/app/containers/Bucket/Overview.js @@ -758,7 +758,7 @@ function Head({ req, s3, overviewUrl, bucket, description }) { const isRODA = !!overviewUrl && overviewUrl.includes(`/${RODA_BUCKET}/`) const colorPool = useConst(() => mkKeyedPool(COLOR_MAP)) const statsData = useData(requests.bucketStats, { req, s3, bucket, overviewUrl }) - const pkgStatsData = useData(requests.bucketPkgStats, { req, bucket }) + const pkgCountData = useData(requests.countPackageRevisions, { req, bucket }) return ( @@ -798,7 +798,7 @@ function Head({ req, s3, overviewUrl, bucket, description }) { fallback={() => '?'} /> null} diff --git a/catalog/app/containers/Bucket/PackageTree.js b/catalog/app/containers/Bucket/PackageTree.js index 84c4f1429ab..c20c249629c 100644 --- a/catalog/app/containers/Bucket/PackageTree.js +++ b/catalog/app/containers/Bucket/PackageTree.js @@ -134,38 +134,49 @@ function RevisionInfo({ revisionData, revision, bucket, name, path }) { {AsyncResult.case( { - Ok: R.map((r) => ( - - - - {r.message || No message} + Ok: R.ifElse( + R.isEmpty, + () => ( + + + + ), + R.map((r) => ( + + + + {r.message || No message} + +
+ {r.hash}
-
- {r.hash} - - } - /> - - - link - - -
- )), + } + /> + + + link + + + + )), + ), Err: () => ( diff --git a/catalog/app/containers/Bucket/requests.js b/catalog/app/containers/Bucket/requests.js index 57e71f946dd..6e7ae6db042 100644 --- a/catalog/app/containers/Bucket/requests.js +++ b/catalog/app/containers/Bucket/requests.js @@ -214,24 +214,6 @@ export const bucketStats = async ({ req, s3, bucket, overviewUrl }) => { throw new Error('Stats unavailable') } -export const bucketPkgStats = async ({ req, bucket }) => { - try { - // TODO: use pkg_stats action when it's implemented - return await req('/search', { index: `${bucket}_packages`, action: 'stats' }).then( - R.applySpec({ - totalPackages: R.path(['aggregations', 'totalPackageHandles', 'value']), - }), - ) - } catch (e) { - // eslint-disable-next-line no-console - console.log('Unable to fetch package stats:') - // eslint-disable-next-line no-console - console.error(e) - } - - throw new Error('Package stats unavailable') -} - const fetchFileVersioned = async ({ s3, bucket, path, version }) => { const versionExists = await ensureObjectIsPresent({ s3, @@ -770,12 +752,55 @@ const mkFilterQuery = (filter) => } : { match_all: {} } +const NOT_DELETED_METRIC = { + scripted_metric: { + init_script: 'state.last_modified = 0; state.deleted = false', + map_script: ` + def last_modified = doc.last_modified.getValue().toInstant().toEpochMilli(); + if (last_modified > state.last_modified) { + state.last_modified = last_modified; + state.deleted = doc.delete_marker.getValue(); + } + `, + reduce_script: ` + def last_modified = 0; + def deleted = false; + for (s in states) { + if (s.last_modified > last_modified) { + last_modified = s.last_modified; + deleted = s.deleted; + } + } + return deleted ? 0 : 1; + `, + }, +} + export const countPackages = withErrorHandling(async ({ req, bucket, filter }) => { const body = { - query: mkFilterQuery(filter), + query: { + bool: { + must: [mkFilterQuery(filter), { regexp: { pointer_file: TIMESTAMP_RE_SRC } }], + }, + }, aggs: { - total: { - cardinality: { field: 'handle' }, + packages: { + terms: { field: 'handle', size: 1000000 }, + aggs: { + revision_objects: { + // TODO: use pointer_file when it's converted to a keyword field + terms: { field: 'key', size: 1000000 }, + aggs: { not_deleted: NOT_DELETED_METRIC }, + }, + revision_count: { + sum_bucket: { buckets_path: 'revision_objects>not_deleted.value' }, + }, + }, + }, + total_revisions: { + sum_bucket: { + buckets_path: 'packages>revision_count', + }, }, }, } @@ -784,8 +809,9 @@ export const countPackages = withErrorHandling(async ({ req, bucket, filter }) = action: 'packages', body: JSON.stringify(body), size: 0, + filter_path: 'aggregations.total_revisions', }) - return result.aggregations.total.value + return result.aggregations.total_revisions.value }) export const listPackages = withErrorHandling( @@ -812,27 +838,37 @@ export const listPackages = withErrorHandling( }) const body = { - query: mkFilterQuery(filter), + query: { + bool: { + must: [mkFilterQuery(filter), { regexp: { pointer_file: TIMESTAMP_RE_SRC } }], + }, + }, aggs: { packages: { composite: { // the limit is configured in ES cluster settings (search.max_buckets) size: 10000, - sources: [ - { - handle: { - terms: { field: 'handle' }, - }, - }, - ], + sources: [{ handle: { terms: { field: 'handle' } } }], }, aggs: { - modified: { - max: { field: 'last_modified' }, + // TODO: take into account only timestamps of not-deleted revisions + modified: { max: { field: 'last_modified' } }, + revision_objects: { + terms: { field: 'key', size: 1000000 }, + aggs: { not_deleted: NOT_DELETED_METRIC }, + }, + revision_count: { + sum_bucket: { buckets_path: 'revision_objects>not_deleted.value' }, + }, + drop_deleted: { + bucket_selector: { + buckets_path: { revision_count: 'revision_count' }, + script: 'params.revision_count > 0', + }, }, sort: { bucket_sort: { - sort: sort === 'modified' ? [{ modified: { order: 'desc' } }] : undefined, + sort: sort === 'modified' ? [{ modified: 'desc' }] : undefined, size: perPage, from: perPage * (page - 1), }, @@ -846,11 +882,16 @@ export const listPackages = withErrorHandling( action: 'packages', body: JSON.stringify(body), size: 0, + filter_path: [ + 'aggregations.packages.buckets.key', + 'aggregations.packages.buckets.modified', + 'aggregations.packages.buckets.revision_count', + ].join(','), }) const packages = result.aggregations.packages.buckets.map((b) => ({ name: b.key.handle, modified: new Date(b.modified.value), - revisions: b.doc_count, + revisions: b.revision_count.value, })) if (!countsP) return packages @@ -917,10 +958,29 @@ export const countPackageRevisions = ({ req, bucket, name }) => req('/search', { index: `${bucket}_packages`, action: 'packages', - body: JSON.stringify({ query: { term: { handle: name } } }), + body: JSON.stringify({ + query: { + bool: { + must: [ + name ? { term: { handle: name } } : { match_all: {} }, + { regexp: { pointer_file: TIMESTAMP_RE_SRC } }, + ], + }, + }, + aggs: { + revision_objects: { + terms: { field: 'key', size: 1000000 }, + aggs: { not_deleted: NOT_DELETED_METRIC }, + }, + revision_count: { + sum_bucket: { buckets_path: 'revision_objects>not_deleted.value' }, + }, + }, + }), size: 0, + filter_path: 'aggregations.revision_count', }) - .then(R.path(['hits', 'total'])) + .then(R.path(['aggregations', 'revision_count', 'value'])) .catch(errors.catchErrors()) function tryParse(s) { @@ -936,19 +996,60 @@ export const getPackageRevisions = withErrorHandling( req('/search', { index: `${bucket}_packages`, action: 'packages', + size: 0, + filter_path: 'aggregations.revisions.buckets.latest.hits.hits._source', body: JSON.stringify({ - query: { term: { handle: name } }, - sort: [{ last_modified: 'desc' }], + query: { + bool: { + must: [ + { term: { handle: name } }, + { regexp: { pointer_file: TIMESTAMP_RE_SRC } }, + ], + }, + }, + aggs: { + revisions: { + composite: { + // the limit is configured in ES cluster settings (search.max_buckets) + size: 10000, + sources: [{ pointer: { terms: { field: 'key' } } }], + }, + aggs: { + not_deleted: NOT_DELETED_METRIC, + drop_deleted: { + bucket_selector: { + buckets_path: { not_deleted: 'not_deleted.value' }, + script: 'params.not_deleted > 0', + }, + }, + latest: { + top_hits: { + size: 1, + sort: { last_modified: 'desc' }, + _source: [ + 'comment', + 'hash', + 'last_modified', + 'metadata', + 'package_stats', + ], + }, + }, + sort: { + bucket_sort: { + sort: [{ _key: 'desc' }], + size: perPage, + from: perPage * (page - 1), + }, + }, + }, + }, + }, }), - size: perPage, - from: perPage * (page - 1), - _source: ['comment', 'hash', 'last_modified', 'metadata', 'package_stats'].join( - ',', - ), }).then( R.pipe( - R.path(['hits', 'hits']), - R.map(({ _source: s }) => ({ + R.path(['aggregations', 'revisions', 'buckets']), + R.map(({ latest: { hits: { hits: [{ _source: s }] } } }) => ({ hash: s.hash, modified: new Date(s.last_modified), stats: { @@ -973,7 +1074,8 @@ export const loadRevisionHash = ({ s3, bucket, name, id }) => })) const HASH_RE = /^[a-f0-9]{64}$/ -const TIMESTAMP_RE = /^1[0-9]{9}$/ +const TIMESTAMP_RE_SRC = '[0-9]{10}' +const TIMESTAMP_RE = new RegExp(`^${TIMESTAMP_RE_SRC}$`) // returns { hash, modified } export async function resolvePackageRevision({ s3, bucket, name, revision }) { diff --git a/catalog/app/utils/search.js b/catalog/app/utils/search.js index aa3cc182003..2365b950fd8 100644 --- a/catalog/app/utils/search.js +++ b/catalog/app/utils/search.js @@ -82,6 +82,7 @@ const parseJSON = (s) => { } const extractPkgData = ({ bucket, score, src }) => { + if (src.delete_marker || !src.hash) return {} const key = `package:${bucket}/${src.handle}:${src.hash}` return { [key]: { diff --git a/lambdas/search/index.py b/lambdas/search/index.py index 552c3a85871..e5000b20fd0 100644 --- a/lambdas/search/index.py +++ b/lambdas/search/index.py @@ -39,6 +39,7 @@ def lambda_handler(request): # 0-indexed starting position (for pagination) user_from = int(request.args.get('from', 0)) user_retry = int(request.args.get('retry', 0)) + filter_path = request.args.get('filter_path') terminate_after = int(os.environ.get('MAX_DOCUMENTS_PER_SHARD', 10_000)) if not user_indexes or not isinstance(user_indexes, str): @@ -127,8 +128,6 @@ def lambda_handler(request): "terms": {"field": 'ext'}, "aggs": {"size": {"sum": {"field": 'size'}}}, }, - # TODO: move this to a separate action (pkg_stats) - "totalPackageHandles": {"value_count": {"field": "handle"}}, } } size = 0 # We still get all aggregates, just don't need the results @@ -202,6 +201,7 @@ def lambda_handler(request): _source=_source, size=size, from_=user_from, + filter_path=filter_path, # try turning this off to consider all documents terminate_after=terminate_after, ) diff --git a/lambdas/search/tests/test_search.py b/lambdas/search/tests/test_search.py index 07430f51207..b4a6821a613 100644 --- a/lambdas/search/tests/test_search.py +++ b/lambdas/search/tests/test_search.py @@ -422,8 +422,6 @@ def _callback(request): "terms": {"field": 'ext'}, "aggs": {"size": {"sum": {"field": 'size'}}}, }, - # TODO: move this to a separate action (pkg_stats) - "totalPackageHandles": {"value_count": {"field": "handle"}}, } } # use 'all_gz' since it's not altered by the handler