diff --git a/x-pack/plugins/fleet/common/types/models/epm.ts b/x-pack/plugins/fleet/common/types/models/epm.ts index 3aa65dc3adcd4..827130d802f22 100644 --- a/x-pack/plugins/fleet/common/types/models/epm.ts +++ b/x-pack/plugins/fleet/common/types/models/epm.ts @@ -124,10 +124,25 @@ export type InstallablePackage = RegistryPackage | ArchivePackage; export type AssetsMap = Map; +export interface ArchiveEntry { + path: string; + buffer?: Buffer; +} + +export interface ArchiveIterator { + traverseEntries: (onEntry: (entry: ArchiveEntry) => Promise) => Promise; + getPaths: () => Promise; +} + export interface PackageInstallContext { packageInfo: InstallablePackage; + /** + * @deprecated Use `archiveIterator` to access the package archive entries + * without loading them all into memory at once. + */ assetsMap: AssetsMap; paths: string[]; + archiveIterator: ArchiveIterator; } export type ArchivePackage = PackageSpecManifest & diff --git a/x-pack/plugins/fleet/server/routes/epm/file_handler.test.ts b/x-pack/plugins/fleet/server/routes/epm/file_handler.test.ts index 1eb8387f69751..5690c32c2d7fd 100644 --- a/x-pack/plugins/fleet/server/routes/epm/file_handler.test.ts +++ b/x-pack/plugins/fleet/server/routes/epm/file_handler.test.ts @@ -15,7 +15,7 @@ import { getBundledPackageByPkgKey } from '../../services/epm/packages/bundled_p import { getFile, getInstallation } from '../../services/epm/packages/get'; import type { FleetRequestHandlerContext } from '../..'; import { appContextService } from '../../services'; -import { unpackBufferEntries } from '../../services/epm/archive'; +import { unpackArchiveEntriesIntoMemory } from '../../services/epm/archive'; import { getAsset } from '../../services/epm/archive/storage'; import { getFileHandler } from './file_handler'; @@ -29,7 +29,7 @@ jest.mock('../../services/epm/packages/get'); const mockedGetBundledPackageByPkgKey = jest.mocked(getBundledPackageByPkgKey); const mockedGetInstallation = jest.mocked(getInstallation); const mockedGetFile = jest.mocked(getFile); -const mockedUnpackBufferEntries = jest.mocked(unpackBufferEntries); +const mockedUnpackBufferEntries = jest.mocked(unpackArchiveEntriesIntoMemory); const mockedGetAsset = jest.mocked(getAsset); function mockContext() { diff --git a/x-pack/plugins/fleet/server/routes/epm/file_handler.ts b/x-pack/plugins/fleet/server/routes/epm/file_handler.ts index 0f22a31c1aa72..994f52a71c224 100644 --- a/x-pack/plugins/fleet/server/routes/epm/file_handler.ts +++ b/x-pack/plugins/fleet/server/routes/epm/file_handler.ts @@ -17,7 +17,7 @@ import { defaultFleetErrorHandler } from '../../errors'; import { getAsset } from '../../services/epm/archive/storage'; import { getBundledPackageByPkgKey } from '../../services/epm/packages/bundled_packages'; import { pkgToPkgKey } from '../../services/epm/registry'; -import { unpackBufferEntries } from '../../services/epm/archive'; +import { unpackArchiveEntriesIntoMemory } from '../../services/epm/archive'; const CACHE_CONTROL_10_MINUTES_HEADER: HttpResponseOptions['headers'] = { 'cache-control': 'max-age=600', @@ -69,7 +69,7 @@ export const getFileHandler: FleetRequestHandler< pkgToPkgKey({ name: pkgName, version: pkgVersion }) ); if (bundledPackage) { - const bufferEntries = await unpackBufferEntries( + const bufferEntries = await unpackArchiveEntriesIntoMemory( await bundledPackage.getBuffer(), 'application/zip' ); diff --git a/x-pack/plugins/fleet/server/routes/epm/kibana_assets_handler.ts b/x-pack/plugins/fleet/server/routes/epm/kibana_assets_handler.ts index 8fe83f98669d1..ad0bec6397ee8 100644 --- a/x-pack/plugins/fleet/server/routes/epm/kibana_assets_handler.ts +++ b/x-pack/plugins/fleet/server/routes/epm/kibana_assets_handler.ts @@ -22,6 +22,7 @@ import type { FleetRequestHandler, InstallKibanaAssetsRequestSchema, } from '../../types'; +import { createArchiveIteratorFromMap } from '../../services/epm/archive/archive_iterator'; export const installPackageKibanaAssetsHandler: FleetRequestHandler< TypeOf, @@ -69,6 +70,7 @@ export const installPackageKibanaAssetsHandler: FleetRequestHandler< packageInfo, paths: installedPkgWithAssets.paths, assetsMap: installedPkgWithAssets.assetsMap, + archiveIterator: createArchiveIteratorFromMap(installedPkgWithAssets.assetsMap), }, }); diff --git a/x-pack/plugins/fleet/server/services/epm/archive/archive_iterator.ts b/x-pack/plugins/fleet/server/services/epm/archive/archive_iterator.ts new file mode 100644 index 0000000000000..369b32412bd82 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/epm/archive/archive_iterator.ts @@ -0,0 +1,83 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { AssetsMap, ArchiveIterator, ArchiveEntry } from '../../../../common/types'; + +import { traverseArchiveEntries } from '.'; + +/** + * Creates an iterator for traversing and extracting paths from an archive + * buffer. This iterator is intended to be used for memory efficient traversal + * of archive contents without extracting the entire archive into memory. + * + * @param archiveBuffer - The buffer containing the archive data. + * @param contentType - The content type of the archive (e.g., + * 'application/zip'). + * @returns ArchiveIterator instance. + * + */ +export const createArchiveIterator = ( + archiveBuffer: Buffer, + contentType: string +): ArchiveIterator => { + const paths: string[] = []; + + const traverseEntries = async ( + onEntry: (entry: ArchiveEntry) => Promise + ): Promise => { + await traverseArchiveEntries(archiveBuffer, contentType, async (entry) => { + await onEntry(entry); + }); + }; + + const getPaths = async (): Promise => { + if (paths.length) { + return paths; + } + + await traverseEntries(async (entry) => { + paths.push(entry.path); + }); + + return paths; + }; + + return { + traverseEntries, + getPaths, + }; +}; + +/** + * Creates an archive iterator from the assetsMap. This is a stop-gap solution + * to provide a uniform interface for traversing assets while assetsMap is still + * in use. It works with a map of assets loaded into memory and is not intended + * for use with large archives. + * + * @param assetsMap - A map where the keys are asset paths and the values are + * asset buffers. + * @returns ArchiveIterator instance. + * + */ +export const createArchiveIteratorFromMap = (assetsMap: AssetsMap): ArchiveIterator => { + const traverseEntries = async ( + onEntry: (entry: ArchiveEntry) => Promise + ): Promise => { + for (const [path, buffer] of assetsMap) { + await onEntry({ path, buffer }); + } + }; + + const getPaths = async (): Promise => { + return [...assetsMap.keys()]; + }; + + return { + traverseEntries, + getPaths, + }; +}; diff --git a/x-pack/plugins/fleet/server/services/epm/archive/extract.ts b/x-pack/plugins/fleet/server/services/epm/archive/extract.ts index 84aa161385cb3..952ff634c633e 100644 --- a/x-pack/plugins/fleet/server/services/epm/archive/extract.ts +++ b/x-pack/plugins/fleet/server/services/epm/archive/extract.ts @@ -11,13 +11,12 @@ import * as tar from 'tar'; import yauzl from 'yauzl'; import { bufferToStream, streamToBuffer } from '../streams'; - -import type { ArchiveEntry } from '.'; +import type { ArchiveEntry } from '../../../../common/types'; export async function untarBuffer( buffer: Buffer, filter = (entry: ArchiveEntry): boolean => true, - onEntry = (entry: ArchiveEntry): void => {} + onEntry = async (entry: ArchiveEntry): Promise => {} ) { const deflatedStream = bufferToStream(buffer); // use tar.list vs .extract to avoid writing to disk @@ -37,7 +36,7 @@ export async function untarBuffer( export async function unzipBuffer( buffer: Buffer, filter = (entry: ArchiveEntry): boolean => true, - onEntry = (entry: ArchiveEntry): void => {} + onEntry = async (entry: ArchiveEntry): Promise => {} ): Promise { const zipfile = await yauzlFromBuffer(buffer, { lazyEntries: true }); zipfile.readEntry(); @@ -46,7 +45,7 @@ export async function unzipBuffer( if (!filter({ path })) return zipfile.readEntry(); const entryBuffer = await getZipReadStream(zipfile, entry).then(streamToBuffer); - onEntry({ buffer: entryBuffer, path }); + await onEntry({ buffer: entryBuffer, path }); zipfile.readEntry(); }); return new Promise((resolve, reject) => zipfile.on('end', resolve).on('error', reject)); diff --git a/x-pack/plugins/fleet/server/services/epm/archive/index.ts b/x-pack/plugins/fleet/server/services/epm/archive/index.ts index 5943f8f838fcb..ed9ff2a5e4b72 100644 --- a/x-pack/plugins/fleet/server/services/epm/archive/index.ts +++ b/x-pack/plugins/fleet/server/services/epm/archive/index.ts @@ -5,13 +5,20 @@ * 2.0. */ -import type { AssetParts, AssetsMap } from '../../../../common/types'; +import type { + ArchiveEntry, + ArchiveIterator, + AssetParts, + AssetsMap, +} from '../../../../common/types'; import { PackageInvalidArchiveError, PackageUnsupportedMediaTypeError, PackageNotFoundError, } from '../../../errors'; +import { createArchiveIterator } from './archive_iterator'; + import { deletePackageInfo } from './cache'; import type { SharedKey } from './cache'; import { getBufferExtractor } from './extract'; @@ -20,66 +27,85 @@ export * from './cache'; export { getBufferExtractor, untarBuffer, unzipBuffer } from './extract'; export { generatePackageInfoFromArchiveBuffer } from './parse'; -export interface ArchiveEntry { - path: string; - buffer?: Buffer; -} - export async function unpackBufferToAssetsMap({ - name, - version, contentType, archiveBuffer, + useStreaming, }: { - name: string; - version: string; contentType: string; archiveBuffer: Buffer; -}): Promise<{ paths: string[]; assetsMap: AssetsMap }> { - const assetsMap = new Map(); - const paths: string[] = []; - const entries = await unpackBufferEntries(archiveBuffer, contentType); - - entries.forEach((entry) => { - const { path, buffer } = entry; - if (buffer) { - assetsMap.set(path, buffer); - paths.push(path); - } - }); - - return { assetsMap, paths }; + useStreaming: boolean | undefined; +}): Promise<{ paths: string[]; assetsMap: AssetsMap; archiveIterator: ArchiveIterator }> { + const archiveIterator = createArchiveIterator(archiveBuffer, contentType); + let paths: string[] = []; + let assetsMap: AssetsMap = new Map(); + if (useStreaming) { + paths = await archiveIterator.getPaths(); + // We keep the assetsMap empty as we don't want to load all the assets in memory + assetsMap = new Map(); + } else { + const entries = await unpackArchiveEntriesIntoMemory(archiveBuffer, contentType); + + entries.forEach((entry) => { + const { path, buffer } = entry; + if (buffer) { + assetsMap.set(path, buffer); + paths.push(path); + } + }); + } + + return { paths, assetsMap, archiveIterator }; } -export async function unpackBufferEntries( +/** + * This function extracts all archive entries into memory. + * + * NOTE: This is potentially dangerous for large archives and can cause OOM + * errors. Use 'traverseArchiveEntries' instead to iterate over the entries + * without storing them all in memory at once. + * + * @param archiveBuffer + * @param contentType + * @returns All the entries in the archive buffer + */ +export async function unpackArchiveEntriesIntoMemory( archiveBuffer: Buffer, contentType: string ): Promise { + const entries: ArchiveEntry[] = []; + const addToEntries = async (entry: ArchiveEntry) => void entries.push(entry); + await traverseArchiveEntries(archiveBuffer, contentType, addToEntries); + + // While unpacking a tar.gz file with unzipBuffer() will result in a thrown + // error, unpacking a zip file with untarBuffer() just results in nothing. + if (entries.length === 0) { + throw new PackageInvalidArchiveError( + `Archive seems empty. Assumed content type was ${contentType}, check if this matches the archive type.` + ); + } + return entries; +} + +export async function traverseArchiveEntries( + archiveBuffer: Buffer, + contentType: string, + onEntry: (entry: ArchiveEntry) => Promise +) { const bufferExtractor = getBufferExtractor({ contentType }); if (!bufferExtractor) { throw new PackageUnsupportedMediaTypeError( `Unsupported media type ${contentType}. Please use 'application/gzip' or 'application/zip'` ); } - const entries: ArchiveEntry[] = []; try { const onlyFiles = ({ path }: ArchiveEntry): boolean => !path.endsWith('/'); - const addToEntries = (entry: ArchiveEntry) => entries.push(entry); - await bufferExtractor(archiveBuffer, onlyFiles, addToEntries); + await bufferExtractor(archiveBuffer, onlyFiles, onEntry); } catch (error) { throw new PackageInvalidArchiveError( `Error during extraction of package: ${error}. Assumed content type was ${contentType}, check if this matches the archive type.` ); } - - // While unpacking a tar.gz file with unzipBuffer() will result in a thrown error in the try-catch above, - // unpacking a zip file with untarBuffer() just results in nothing. - if (entries.length === 0) { - throw new PackageInvalidArchiveError( - `Archive seems empty. Assumed content type was ${contentType}, check if this matches the archive type.` - ); - } - return entries; } export const deletePackageCache = ({ name, version }: SharedKey) => { diff --git a/x-pack/plugins/fleet/server/services/epm/archive/parse.ts b/x-pack/plugins/fleet/server/services/epm/archive/parse.ts index 530ca804f24eb..8cccfe9982457 100644 --- a/x-pack/plugins/fleet/server/services/epm/archive/parse.ts +++ b/x-pack/plugins/fleet/server/services/epm/archive/parse.ts @@ -40,7 +40,7 @@ import { import { PackageInvalidArchiveError } from '../../../errors'; import { pkgToPkgKey } from '../registry'; -import { unpackBufferEntries } from '.'; +import { traverseArchiveEntries } from '.'; const readFileAsync = promisify(readFile); export const MANIFEST_NAME = 'manifest.yml'; @@ -160,9 +160,8 @@ export async function generatePackageInfoFromArchiveBuffer( contentType: string ): Promise<{ paths: string[]; packageInfo: ArchivePackage }> { const assetsMap: AssetsBufferMap = {}; - const entries = await unpackBufferEntries(archiveBuffer, contentType); const paths: string[] = []; - entries.forEach(({ path: bufferPath, buffer }) => { + await traverseArchiveEntries(archiveBuffer, contentType, async ({ path: bufferPath, buffer }) => { paths.push(bufferPath); if (buffer && filterAssetPathForParseAndVerifyArchive(bufferPath)) { assetsMap[bufferPath] = buffer; diff --git a/x-pack/plugins/fleet/server/services/epm/archive/storage.ts b/x-pack/plugins/fleet/server/services/epm/archive/storage.ts index dd6321445df75..8f6f151383d5a 100644 --- a/x-pack/plugins/fleet/server/services/epm/archive/storage.ts +++ b/x-pack/plugins/fleet/server/services/epm/archive/storage.ts @@ -15,6 +15,7 @@ import { SavedObjectsErrorHelpers } from '@kbn/core/server'; import { ASSETS_SAVED_OBJECT_TYPE } from '../../../../common'; import type { + ArchiveEntry, InstallablePackage, InstallSource, PackageAssetReference, @@ -24,7 +25,6 @@ import { PackageInvalidArchiveError, PackageNotFoundError } from '../../../error import { appContextService } from '../../app_context'; import { setPackageInfo } from '.'; -import type { ArchiveEntry } from '.'; import { filterAssetPathForParseAndVerifyArchive, parseAndVerifyArchive } from './parse'; const ONE_BYTE = 1024 * 1024; diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/ingest_pipeline/install.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/ingest_pipeline/install.ts index a456734747324..5a4672f67fe53 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/ingest_pipeline/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/ingest_pipeline/install.ts @@ -16,14 +16,13 @@ import type { PackageInfo, } from '../../../../types'; import { getAssetFromAssetsMap, getPathParts } from '../../archive'; -import type { ArchiveEntry } from '../../archive'; import { FLEET_FINAL_PIPELINE_CONTENT, FLEET_FINAL_PIPELINE_ID, FLEET_FINAL_PIPELINE_VERSION, } from '../../../../constants'; import { getPipelineNameForDatastream } from '../../../../../common/services'; -import type { PackageInstallContext } from '../../../../../common/types'; +import type { ArchiveEntry, PackageInstallContext } from '../../../../../common/types'; import { appendMetadataToIngestPipeline } from '../meta'; import { retryTransientEsErrors } from '../retry'; diff --git a/x-pack/plugins/fleet/server/services/epm/elasticsearch/transform/mappings.test.ts b/x-pack/plugins/fleet/server/services/epm/elasticsearch/transform/mappings.test.ts index f34015bf77697..de962850fba8c 100644 --- a/x-pack/plugins/fleet/server/services/epm/elasticsearch/transform/mappings.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/elasticsearch/transform/mappings.test.ts @@ -5,6 +5,8 @@ * 2.0. */ +import { createArchiveIteratorFromMap } from '../../archive/archive_iterator'; + import { loadMappingForTransform } from './mappings'; describe('loadMappingForTransform', () => { @@ -13,6 +15,7 @@ describe('loadMappingForTransform', () => { { packageInfo: {} as any, assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], }, 'test' @@ -49,6 +52,7 @@ describe('loadMappingForTransform', () => { ), ], ]), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [ '/package/ti_opencti/2.1.0/elasticsearch/transform/latest_ioc/fields/ecs.yml', '/package/ti_opencti/2.1.0/elasticsearch/transform/latest_ioc/fields/ecs-extra.yml', diff --git a/x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts b/x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts index 276478099daf8..bf5684f29c205 100644 --- a/x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/kibana/assets/install.ts @@ -325,16 +325,16 @@ export async function deleteKibanaAssetsAndReferencesForSpace({ await saveKibanaAssetsRefs(savedObjectsClient, pkgName, [], true); } +const kibanaAssetTypes = Object.values(KibanaAssetType); +export const isKibanaAssetType = (path: string) => { + const parts = getPathParts(path); + + return parts.service === 'kibana' && (kibanaAssetTypes as string[]).includes(parts.type); +}; + export function getKibanaAssets( packageInstallContext: PackageInstallContext ): Record { - const kibanaAssetTypes = Object.values(KibanaAssetType); - const isKibanaAssetType = (path: string) => { - const parts = getPathParts(path); - - return parts.service === 'kibana' && (kibanaAssetTypes as string[]).includes(parts.type); - }; - const result = Object.fromEntries( kibanaAssetTypes.map((type) => [type, []]) ) as Record; diff --git a/x-pack/plugins/fleet/server/services/epm/kibana/assets/install_with_streaming.ts b/x-pack/plugins/fleet/server/services/epm/kibana/assets/install_with_streaming.ts new file mode 100644 index 0000000000000..2dd5abb84e077 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/epm/kibana/assets/install_with_streaming.ts @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { SavedObjectsClientContract } from '@kbn/core/server'; + +import type { PackageInstallContext } from '../../../../../common/types'; +import type { KibanaAssetReference, KibanaAssetType } from '../../../../types'; +import { getPathParts } from '../../archive'; + +import type { ArchiveAsset } from './install'; +import { + KibanaSavedObjectTypeMapping, + createSavedObjectKibanaAsset, + isKibanaAssetType, + toAssetReference, +} from './install'; +import { getSpaceAwareSaveobjectsClients } from './saved_objects'; + +interface InstallKibanaAssetsWithStreamingArgs { + packageInstallContext: PackageInstallContext; + spaceId: string; +} + +const MAX_ASSETS_TO_INSTALL_IN_PARALLEL = 100; + +export async function installKibanaAssetsWithStreaming({ + spaceId, + packageInstallContext, +}: InstallKibanaAssetsWithStreamingArgs): Promise { + const { archiveIterator } = packageInstallContext; + + const { savedObjectClientWithSpace } = getSpaceAwareSaveobjectsClients(spaceId); + + const assetRefs: KibanaAssetReference[] = []; + let batch: ArchiveAsset[] = []; + + await archiveIterator.traverseEntries(async ({ path, buffer }) => { + if (!buffer || !isKibanaAssetType(path)) { + return; + } + const savedObject = JSON.parse(buffer.toString('utf8')) as ArchiveAsset; + const assetType = getPathParts(path).type as KibanaAssetType; + const soType = KibanaSavedObjectTypeMapping[assetType]; + if (savedObject.type !== soType) { + return; + } + + batch.push(savedObject); + assetRefs.push(toAssetReference(savedObject)); + + if (batch.length >= MAX_ASSETS_TO_INSTALL_IN_PARALLEL) { + await bulkCreateSavedObjects({ + savedObjectsClient: savedObjectClientWithSpace, + kibanaAssets: batch, + refresh: false, + }); + batch = []; + } + }); + + // install any remaining assets + if (batch.length) { + await bulkCreateSavedObjects({ + savedObjectsClient: savedObjectClientWithSpace, + kibanaAssets: batch, + // Use wait_for with the last batch to ensure all assets are readable once the install is complete + refresh: 'wait_for', + }); + } + return assetRefs; +} + +async function bulkCreateSavedObjects({ + savedObjectsClient, + kibanaAssets, + refresh, +}: { + kibanaAssets: ArchiveAsset[]; + savedObjectsClient: SavedObjectsClientContract; + refresh?: boolean | 'wait_for'; +}) { + if (!kibanaAssets.length) { + return []; + } + + const toBeSavedObjects = kibanaAssets.map((asset) => createSavedObjectKibanaAsset(asset)); + + const { saved_objects: createdSavedObjects } = await savedObjectsClient.bulkCreate( + toBeSavedObjects, + { + // We only want to install new saved objects without overwriting existing ones + overwrite: false, + managed: true, + refresh, + } + ); + + return createdSavedObjects; +} diff --git a/x-pack/plugins/fleet/server/services/epm/package_service.ts b/x-pack/plugins/fleet/server/services/epm/package_service.ts index 661475dfadc09..a097db584b460 100644 --- a/x-pack/plugins/fleet/server/services/epm/package_service.ts +++ b/x-pack/plugins/fleet/server/services/epm/package_service.ts @@ -39,7 +39,10 @@ import type { InstallResult } from '../../../common'; import { appContextService } from '..'; -import type { CustomPackageDatasetConfiguration, EnsurePackageResult } from './packages/install'; +import { + type CustomPackageDatasetConfiguration, + type EnsurePackageResult, +} from './packages/install'; import type { FetchFindLatestPackageOptions } from './registry'; import { getPackageFieldsMetadata } from './registry'; @@ -56,6 +59,7 @@ import { } from './packages'; import { generatePackageInfoFromArchiveBuffer } from './archive'; import { getEsPackage } from './archive/storage'; +import { createArchiveIteratorFromMap } from './archive/archive_iterator'; export type InstalledAssetType = EsAssetReference; @@ -381,12 +385,14 @@ class PackageClientImpl implements PackageClient { } const { assetsMap } = esPackage; + const archiveIterator = createArchiveIteratorFromMap(assetsMap); const { installedTransforms } = await installTransforms({ packageInstallContext: { assetsMap, packageInfo, paths, + archiveIterator, }, esClient: this.internalEsClient, savedObjectsClient: this.internalSoClient, diff --git a/x-pack/plugins/fleet/server/services/epm/packages/assets.ts b/x-pack/plugins/fleet/server/services/epm/packages/assets.ts index a82b5c0d103b2..3bb84c0d23163 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/assets.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/assets.ts @@ -5,9 +5,9 @@ * 2.0. */ +import type { ArchiveEntry } from '../../../../common/types'; import type { AssetsMap, PackageInfo } from '../../../types'; import { getAssetFromAssetsMap } from '../archive'; -import type { ArchiveEntry } from '../archive'; const maybeFilterByDataset = (packageInfo: Pick, datasetName: string) => diff --git a/x-pack/plugins/fleet/server/services/epm/packages/get.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/get.test.ts index 2dc295762e33a..5711c8fcccaf4 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/get.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/get.test.ts @@ -27,6 +27,8 @@ import { auditLoggingService } from '../../audit_logging'; import * as Registry from '../registry'; +import { createArchiveIteratorFromMap } from '../archive/archive_iterator'; + import { getInstalledPackages, getPackageInfo, getPackages, getPackageUsageStats } from './get'; jest.mock('../registry'); @@ -915,6 +917,7 @@ owner: elastic`, MockRegistry.getPackage.mockResolvedValue({ paths: [], assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), packageInfo: { name: 'my-package', version: '1.0.0', diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install.test.ts index 709e0d84d70fc..6b3a31eda649e 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install.test.ts @@ -442,6 +442,24 @@ describe('install', () => { expect(response.status).toEqual('installed'); }); + + it('should use streaming installation for the detection rules package', async () => { + jest.spyOn(licenseService, 'hasAtLeast').mockReturnValue(true); + + const response = await installPackage({ + spaceId: DEFAULT_SPACE_ID, + installSource: 'registry', + pkgkey: 'security_detection_engine', + savedObjectsClient: savedObjectsClientMock.create(), + esClient: {} as ElasticsearchClient, + }); + + expect(response.error).toBeUndefined(); + + expect(installStateMachine._stateMachineInstallPackage).toHaveBeenCalledWith( + expect.objectContaining({ useStreaming: true }) + ); + }); }); describe('upload', () => { diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install.ts b/x-pack/plugins/fleet/server/services/epm/packages/install.ts index 1ea6f29cad839..ebe5acc35178d 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install.ts @@ -76,6 +76,7 @@ import { deleteVerificationResult, unpackBufferToAssetsMap, } from '../archive'; +import { createArchiveIteratorFromMap } from '../archive/archive_iterator'; import { toAssetReference } from '../kibana/assets/install'; import type { ArchiveAsset } from '../kibana/assets/install'; import type { PackageUpdateEvent } from '../../upgrade_sender'; @@ -107,6 +108,12 @@ import { removeInstallation } from './remove'; export const UPLOAD_RETRY_AFTER_MS = 10000; // 10s const MAX_ENSURE_INSTALL_TIME = 60 * 1000; +const PACKAGES_TO_INSTALL_WITH_STREAMING = [ + // The security_detection_engine package contains a large number of assets and + // is not suitable for regular installation as it might cause OOM errors. + 'security_detection_engine', +]; + export async function isPackageInstalled(options: { savedObjectsClient: SavedObjectsClientContract; pkgName: string; @@ -449,6 +456,7 @@ async function installPackageFromRegistry({ // TODO: change epm API to /packageName/version so we don't need to do this const { pkgName, pkgVersion: version } = Registry.splitPkgKey(pkgkey); let pkgVersion = version ?? ''; + const useStreaming = PACKAGES_TO_INSTALL_WITH_STREAMING.includes(pkgName); // if an error happens during getInstallType, report that we don't know let installType: InstallType = 'unknown'; @@ -478,11 +486,12 @@ async function installPackageFromRegistry({ } // get latest package version and requested version in parallel for performance - const [latestPackage, { paths, packageInfo, assetsMap, verificationResult }] = + const [latestPackage, { paths, packageInfo, assetsMap, archiveIterator, verificationResult }] = await Promise.all([ latestPkg ? Promise.resolve(latestPkg) : queryLatest(), Registry.getPackage(pkgName, pkgVersion, { ignoreUnverified: force && !neverIgnoreVerificationError, + useStreaming, }), ]); @@ -490,6 +499,7 @@ async function installPackageFromRegistry({ packageInfo, assetsMap, paths, + archiveIterator, }; // let the user install if using the force flag or needing to reinstall or install a previous version due to failed update @@ -542,6 +552,7 @@ async function installPackageFromRegistry({ ignoreMappingUpdateErrors, skipDataStreamRollover, retryFromLastState, + useStreaming, }); } catch (e) { sendEvent({ @@ -580,6 +591,7 @@ async function installPackageWithStateMachine(options: { ignoreMappingUpdateErrors?: boolean; skipDataStreamRollover?: boolean; retryFromLastState?: boolean; + useStreaming?: boolean; }): Promise { const packageInfo = options.packageInstallContext.packageInfo; @@ -599,6 +611,7 @@ async function installPackageWithStateMachine(options: { skipDataStreamRollover, packageInstallContext, retryFromLastState, + useStreaming, } = options; let { telemetryEvent } = options; const logger = appContextService.getLogger(); @@ -696,6 +709,7 @@ async function installPackageWithStateMachine(options: { ignoreMappingUpdateErrors, skipDataStreamRollover, retryFromLastState, + useStreaming, }) .then(async (assets) => { logger.debug(`Removing old assets from previous versions of ${pkgName}`); @@ -785,6 +799,7 @@ async function installPackageByUpload({ } const { packageInfo } = await generatePackageInfoFromArchiveBuffer(archiveBuffer, contentType); const pkgName = packageInfo.name; + const useStreaming = PACKAGES_TO_INSTALL_WITH_STREAMING.includes(pkgName); // Allow for overriding the version in the manifest for cases where we install // stack-aligned bundled packages to support special cases around the @@ -807,17 +822,17 @@ async function installPackageByUpload({ packageInfo, }); - const { assetsMap, paths } = await unpackBufferToAssetsMap({ - name: packageInfo.name, - version: pkgVersion, + const { paths, assetsMap, archiveIterator } = await unpackBufferToAssetsMap({ archiveBuffer, contentType, + useStreaming, }); const packageInstallContext: PackageInstallContext = { packageInfo: { ...packageInfo, version: pkgVersion }, assetsMap, paths, + archiveIterator, }; // update the timestamp of latest installation setLastUploadInstallCache(); @@ -837,6 +852,7 @@ async function installPackageByUpload({ authorizationHeader, ignoreMappingUpdateErrors, skipDataStreamRollover, + useStreaming, }); } catch (e) { return { @@ -1004,12 +1020,14 @@ export async function installCustomPackage( acc.set(asset.path, asset.content); return acc; }, new Map()); - const paths = [...assetsMap.keys()]; + const paths = assets.map((asset) => asset.path); + const archiveIterator = createArchiveIteratorFromMap(assetsMap); const packageInstallContext: PackageInstallContext = { assetsMap, paths, packageInfo, + archiveIterator, }; return await installPackageWithStateMachine({ packageInstallContext, @@ -1341,16 +1359,20 @@ export async function installAssetsForInputPackagePolicy(opts: { ignoreUnverified: force, }); + const archiveIterator = createArchiveIteratorFromMap(pkg.assetsMap); packageInstallContext = { assetsMap: pkg.assetsMap, packageInfo: pkg.packageInfo, paths: pkg.paths, + archiveIterator, }; } else { + const archiveIterator = createArchiveIteratorFromMap(installedPkgWithAssets.assetsMap); packageInstallContext = { assetsMap: installedPkgWithAssets.assetsMap, packageInfo: installedPkgWithAssets.packageInfo, paths: installedPkgWithAssets.paths, + archiveIterator, }; } diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/_state_machine_package_install.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/_state_machine_package_install.test.ts index 174076a9e9b1b..73b78a6cc4aa0 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/_state_machine_package_install.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/_state_machine_package_install.test.ts @@ -38,6 +38,8 @@ import { updateCurrentWriteIndices } from '../../elasticsearch/template/template import { installIndexTemplatesAndPipelines } from '../install_index_template_pipeline'; +import { createArchiveIteratorFromMap } from '../../archive/archive_iterator'; + import { handleState } from './state_machine'; import { _stateMachineInstallPackage } from './_state_machine_package_install'; import { cleanupLatestExecutedState } from './steps'; @@ -110,6 +112,7 @@ describe('_stateMachineInstallPackage', () => { logger: loggerMock.create(), packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -172,6 +175,7 @@ describe('_stateMachineInstallPackage', () => { logger: loggerMock.create(), packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -208,6 +212,7 @@ describe('_stateMachineInstallPackage', () => { logger: loggerMock.create(), packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -257,6 +262,7 @@ describe('_stateMachineInstallPackage', () => { logger: loggerMock.create(), packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -336,6 +342,7 @@ describe('_stateMachineInstallPackage', () => { owner: { github: 'elastic/fleet' }, } as any, assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], }, installType: 'install', diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/_state_machine_package_install.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/_state_machine_package_install.ts index 1f10d40feba38..c941b6d60d63b 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/_state_machine_package_install.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/_state_machine_package_install.ts @@ -48,11 +48,13 @@ import { updateLatestExecutedState, cleanupLatestExecutedState, cleanUpKibanaAssetsStep, + cleanUpUnusedKibanaAssetsStep, cleanupILMPoliciesStep, cleanUpMlModelStep, cleanupIndexTemplatePipelinesStep, cleanupTransformsStep, cleanupArchiveEntriesStep, + stepInstallKibanaAssetsWithStreaming, } from './steps'; import type { StateMachineDefinition, StateMachineStates } from './state_machine'; import { handleState } from './state_machine'; @@ -73,6 +75,7 @@ export interface InstallContext extends StateContext { skipDataStreamRollover?: boolean; retryFromLastState?: boolean; initialState?: INSTALL_STATES; + useStreaming?: boolean; indexTemplates?: IndexTemplateEntry[]; packageAssetRefs?: PackageAssetReference[]; @@ -83,7 +86,7 @@ export interface InstallContext extends StateContext { /** * This data structure defines the sequence of the states and the transitions */ -const statesDefinition: StateMachineStates = { +const regularStatesDefinition: StateMachineStates = { create_restart_installation: { nextState: INSTALL_STATES.INSTALL_KIBANA_ASSETS, onTransition: stepCreateRestartInstallation, @@ -152,6 +155,31 @@ const statesDefinition: StateMachineStates = { }, }; +const streamingStatesDefinition: StateMachineStates = { + create_restart_installation: { + nextState: INSTALL_STATES.INSTALL_KIBANA_ASSETS, + onTransition: stepCreateRestartInstallation, + onPostTransition: updateLatestExecutedState, + }, + install_kibana_assets: { + onTransition: stepInstallKibanaAssetsWithStreaming, + nextState: INSTALL_STATES.SAVE_ARCHIVE_ENTRIES, + onPostTransition: updateLatestExecutedState, + }, + save_archive_entries_from_assets_map: { + onPreTransition: cleanupArchiveEntriesStep, + onTransition: stepSaveArchiveEntries, + nextState: INSTALL_STATES.UPDATE_SO, + onPostTransition: updateLatestExecutedState, + }, + update_so: { + onPreTransition: cleanUpUnusedKibanaAssetsStep, + onTransition: stepSaveSystemObject, + nextState: 'end', + onPostTransition: updateLatestExecutedState, + }, +}; + /* * _stateMachineInstallPackage installs packages using the generic state machine in ./state_machine * installStates is the data structure providing the state machine definition @@ -166,6 +194,10 @@ export async function _stateMachineInstallPackage( const logger = appContextService.getLogger(); let initialState = INSTALL_STATES.CREATE_RESTART_INSTALLATION; + const statesDefinition = context.useStreaming + ? streamingStatesDefinition + : regularStatesDefinition; + // if retryFromLastState, restart install from last install state // if force is passed, the install should be executed from the beginning if (retryFromLastState && !force && installedPkg?.attributes?.latest_executed_state?.name) { diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_create_restart_installation.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_create_restart_installation.test.ts index 2b653728d6574..e5a7fed55fe87 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_create_restart_installation.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_create_restart_installation.test.ts @@ -31,6 +31,8 @@ import { auditLoggingService } from '../../../../audit_logging'; import { restartInstallation, createInstallation } from '../../install'; import type { Installation } from '../../../../../../common'; +import { createArchiveIteratorFromMap } from '../../../archive/archive_iterator'; + import { stepCreateRestartInstallation } from './step_create_restart_installation'; jest.mock('../../../../audit_logging'); @@ -84,6 +86,7 @@ describe('stepCreateRestartInstallation', () => { logger, packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -120,6 +123,7 @@ describe('stepCreateRestartInstallation', () => { logger, packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -164,6 +168,7 @@ describe('stepCreateRestartInstallation', () => { logger, packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -208,6 +213,7 @@ describe('stepCreateRestartInstallation', () => { logger, packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_delete_previous_pipelines.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_delete_previous_pipelines.test.ts index 7d8a251433bb5..06201770ee2e2 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_delete_previous_pipelines.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_delete_previous_pipelines.test.ts @@ -24,6 +24,8 @@ import { deletePreviousPipelines, } from '../../../elasticsearch/ingest_pipeline'; +import { createArchiveIteratorFromMap } from '../../../archive/archive_iterator'; + import { stepDeletePreviousPipelines } from './step_delete_previous_pipelines'; jest.mock('../../../elasticsearch/ingest_pipeline'); @@ -84,6 +86,7 @@ describe('stepDeletePreviousPipelines', () => { owner: { github: 'elastic/fleet' }, } as any, assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], }; appContextService.start( @@ -276,6 +279,7 @@ describe('stepDeletePreviousPipelines', () => { owner: { github: 'elastic/fleet' }, } as any, assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], }; appContextService.start( diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_ilm_policies.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_ilm_policies.test.ts index 2cf9b23bb9adb..4c106a0c68f15 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_ilm_policies.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_ilm_policies.test.ts @@ -24,6 +24,8 @@ import { installIlmForDataStream } from '../../../elasticsearch/datastream_ilm/i import { ElasticsearchAssetType } from '../../../../../types'; import { deleteILMPolicies, deletePrerequisiteAssets } from '../../remove'; +import { createArchiveIteratorFromMap } from '../../../archive/archive_iterator'; + import { stepInstallILMPolicies, cleanupILMPoliciesStep } from './step_install_ilm_policies'; jest.mock('../../../archive/storage'); @@ -56,6 +58,7 @@ const packageInstallContext = { owner: { github: 'elastic/fleet' }, } as any, assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], }; let soClient: jest.Mocked; @@ -239,6 +242,7 @@ describe('stepInstallILMPolicies', () => { owner: { github: 'elastic/fleet' }, } as any, assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], }, installType: 'install', diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_index_template_pipelines.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_index_template_pipelines.test.ts index d258747edc6ef..1c368cfd998d3 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_index_template_pipelines.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_index_template_pipelines.test.ts @@ -37,6 +37,8 @@ const mockDeletePrerequisiteAssets = deletePrerequisiteAssets as jest.MockedFunc typeof deletePrerequisiteAssets >; +import { createArchiveIteratorFromMap } from '../../../archive/archive_iterator'; + import { stepInstallIndexTemplatePipelines, cleanupIndexTemplatePipelinesStep, @@ -122,6 +124,7 @@ describe('stepInstallIndexTemplatePipelines', () => { owner: { github: 'elastic/fleet' }, } as any, assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], }; appContextService.start( @@ -281,6 +284,7 @@ describe('stepInstallIndexTemplatePipelines', () => { ], } as any, assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], }; appContextService.start( @@ -431,6 +435,7 @@ describe('stepInstallIndexTemplatePipelines', () => { ], } as any, assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], }; appContextService.start( @@ -521,6 +526,7 @@ describe('stepInstallIndexTemplatePipelines', () => { ], } as any, assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], }; appContextService.start( @@ -574,6 +580,7 @@ describe('stepInstallIndexTemplatePipelines', () => { owner: { github: 'elastic/fleet' }, } as any, assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], }; appContextService.start( @@ -647,6 +654,7 @@ describe('cleanupIndexTemplatePipelinesStep', () => { ], } as any, assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], }; const mockInstalledPackageSo: SavedObject = { diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_kibana_assets.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_kibana_assets.test.ts index 52c93c61c16e1..cf9d953868b6a 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_kibana_assets.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_kibana_assets.test.ts @@ -23,8 +23,25 @@ import { deleteKibanaAssets } from '../../remove'; import { KibanaSavedObjectType, type Installation } from '../../../../../types'; -import { stepInstallKibanaAssets, cleanUpKibanaAssetsStep } from './step_install_kibana_assets'; +import { createArchiveIteratorFromMap } from '../../../archive/archive_iterator'; +import { + stepInstallKibanaAssets, + cleanUpKibanaAssetsStep, + stepInstallKibanaAssetsWithStreaming, + cleanUpUnusedKibanaAssetsStep, +} from './step_install_kibana_assets'; + +jest.mock('../../../kibana/assets/saved_objects', () => { + return { + getSpaceAwareSaveobjectsClients: jest.fn().mockReturnValue({ + savedObjectClientWithSpace: jest.fn(), + savedObjectsImporter: jest.fn(), + savedObjectTagAssignmentService: jest.fn(), + savedObjectTagClient: jest.fn(), + }), + }; +}); jest.mock('../../../kibana/assets/install'); jest.mock('../../remove', () => { return { @@ -58,6 +75,7 @@ const packageInstallContext = { } as any, paths: ['some/path/1', 'some/path/2'], assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), }; describe('stepInstallKibanaAssets', () => { @@ -82,6 +100,7 @@ describe('stepInstallKibanaAssets', () => { logger: loggerMock.create(), packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -102,7 +121,7 @@ describe('stepInstallKibanaAssets', () => { }); await expect(installationPromise).resolves.not.toThrowError(); - expect(mockedInstallKibanaAssetsAndReferencesMultispace).toBeCalledTimes(1); + expect(installKibanaAssetsAndReferencesMultispace).toBeCalledTimes(1); }); esClient = elasticsearchServiceMock.createClusterClient().asInternalUser; appContextService.start(createAppContextStartContractMock()); @@ -121,6 +140,7 @@ describe('stepInstallKibanaAssets', () => { logger: loggerMock.create(), packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -144,6 +164,60 @@ describe('stepInstallKibanaAssets', () => { }); }); +describe('stepInstallKibanaAssetsWithStreaming', () => { + beforeEach(async () => { + soClient = savedObjectsClientMock.create(); + esClient = elasticsearchServiceMock.createClusterClient().asInternalUser; + appContextService.start(createAppContextStartContractMock()); + }); + + it('should rely on archiveIterator instead of in-memory assetsMap', async () => { + const assetsMap = new Map(); + assetsMap.get = jest.fn(); + assetsMap.set = jest.fn(); + + const archiveIterator = { + traverseEntries: jest.fn(), + getPaths: jest.fn(), + }; + + const result = await stepInstallKibanaAssetsWithStreaming({ + savedObjectsClient: soClient, + esClient, + logger: loggerMock.create(), + packageInstallContext: { + assetsMap, + archiveIterator, + paths: [], + packageInfo: { + title: 'title', + name: 'xyz', + version: '4.5.6', + description: 'test', + type: 'integration', + categories: ['cloud', 'custom'], + format_version: 'string', + release: 'experimental', + conditions: { kibana: { version: 'x.y.z' } }, + owner: { github: 'elastic/fleet' }, + }, + }, + installType: 'install', + installSource: 'registry', + spaceId: DEFAULT_SPACE_ID, + }); + + expect(result).toEqual({ installedKibanaAssetsRefs: [] }); + + // Verify that assetsMap was not used + expect(assetsMap.get).not.toBeCalled(); + expect(assetsMap.set).not.toBeCalled(); + + // Verify that archiveIterator was used + expect(archiveIterator.traverseEntries).toBeCalled(); + }); +}); + describe('cleanUpKibanaAssetsStep', () => { const mockInstalledPackageSo: SavedObject = { id: 'mocked-package', @@ -302,3 +376,84 @@ describe('cleanUpKibanaAssetsStep', () => { expect(mockedDeleteKibanaAssets).not.toBeCalled(); }); }); + +describe('cleanUpUnusedKibanaAssetsStep', () => { + const mockInstalledPackageSo: SavedObject = { + id: 'mocked-package', + attributes: { + name: 'test-package', + version: '1.0.0', + install_status: 'installing', + install_version: '1.0.0', + install_started_at: new Date().toISOString(), + install_source: 'registry', + verification_status: 'verified', + installed_kibana: [] as any, + installed_es: [] as any, + es_index_patterns: {}, + }, + type: PACKAGES_SAVED_OBJECT_TYPE, + references: [], + }; + + const installationContext = { + savedObjectsClient: soClient, + savedObjectsImporter: jest.fn(), + esClient, + logger: loggerMock.create(), + packageInstallContext, + installType: 'install' as const, + installSource: 'registry' as const, + spaceId: DEFAULT_SPACE_ID, + retryFromLastState: true, + initialState: 'install_kibana_assets' as any, + }; + + beforeEach(async () => { + soClient = savedObjectsClientMock.create(); + esClient = elasticsearchServiceMock.createClusterClient().asInternalUser; + appContextService.start(createAppContextStartContractMock()); + }); + + it('should not clean up assets if they all present in the new package', async () => { + const installedAssets = [{ type: KibanaSavedObjectType.dashboard, id: 'dashboard-1' }]; + await cleanUpUnusedKibanaAssetsStep({ + ...installationContext, + installedPkg: { + ...mockInstalledPackageSo, + attributes: { + ...mockInstalledPackageSo.attributes, + installed_kibana: installedAssets, + }, + }, + installedKibanaAssetsRefs: installedAssets, + }); + + expect(mockedDeleteKibanaAssets).not.toBeCalled(); + }); + + it('should clean up assets that are not present in the new package', async () => { + const installedAssets = [ + { type: KibanaSavedObjectType.dashboard, id: 'dashboard-1' }, + { type: KibanaSavedObjectType.dashboard, id: 'dashboard-2' }, + ]; + const newAssets = [{ type: KibanaSavedObjectType.dashboard, id: 'dashboard-1' }]; + await cleanUpUnusedKibanaAssetsStep({ + ...installationContext, + installedPkg: { + ...mockInstalledPackageSo, + attributes: { + ...mockInstalledPackageSo.attributes, + installed_kibana: installedAssets, + }, + }, + installedKibanaAssetsRefs: newAssets, + }); + + expect(mockedDeleteKibanaAssets).toBeCalledWith({ + installedObjects: [installedAssets[1]], + spaceId: 'default', + packageInfo: packageInstallContext.packageInfo, + }); + }); +}); diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_kibana_assets.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_kibana_assets.ts index b5a1fff91d3b8..1ba1b92d07221 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_kibana_assets.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_kibana_assets.ts @@ -11,7 +11,9 @@ import { withPackageSpan } from '../../utils'; import type { InstallContext } from '../_state_machine_package_install'; import { deleteKibanaAssets } from '../../remove'; +import type { KibanaAssetReference } from '../../../../../../common/types'; import { INSTALL_STATES } from '../../../../../../common/types'; +import { installKibanaAssetsWithStreaming } from '../../../kibana/assets/install_with_streaming'; export async function stepInstallKibanaAssets(context: InstallContext) { const { savedObjectsClient, logger, installedPkg, packageInstallContext, spaceId } = context; @@ -37,6 +39,21 @@ export async function stepInstallKibanaAssets(context: InstallContext) { return { kibanaAssetPromise }; } +export async function stepInstallKibanaAssetsWithStreaming(context: InstallContext) { + const { packageInstallContext, spaceId } = context; + + const installedKibanaAssetsRefs = await withPackageSpan( + 'Install Kibana assets with streaming', + () => + installKibanaAssetsWithStreaming({ + packageInstallContext, + spaceId, + }) + ); + + return { installedKibanaAssetsRefs }; +} + export async function cleanUpKibanaAssetsStep(context: InstallContext) { const { logger, @@ -65,3 +82,44 @@ export async function cleanUpKibanaAssetsStep(context: InstallContext) { }); } } + +/** + * Cleans up Kibana assets that are no longer in the package. As opposite to + * `cleanUpKibanaAssetsStep`, this one is used after the package assets are + * installed. + * + * This function compares the currently installed Kibana assets with the assets + * in the previous package and removes any assets that are no longer present in the + * new installation. + * + */ +export async function cleanUpUnusedKibanaAssetsStep(context: InstallContext) { + const { logger, installedPkg, packageInstallContext, spaceId, installedKibanaAssetsRefs } = + context; + const { packageInfo } = packageInstallContext; + + if (!installedKibanaAssetsRefs) { + return; + } + + logger.debug('Clean up Kibana assets that are no longer in the package'); + + // Get the assets installed by the previous package + const previousAssetRefs = installedPkg?.attributes.installed_kibana ?? []; + + // Remove any assets that are not in the new package + const nextAssetRefKeys = new Set( + installedKibanaAssetsRefs.map((asset: KibanaAssetReference) => `${asset.id}-${asset.type}`) + ); + const assetsToRemove = previousAssetRefs.filter( + (existingAsset) => !nextAssetRefKeys.has(`${existingAsset.id}-${existingAsset.type}`) + ); + + if (assetsToRemove.length === 0) { + return; + } + + await withPackageSpan('Clean up Kibana assets that are no longer in the package', async () => { + await deleteKibanaAssets({ installedObjects: assetsToRemove, spaceId, packageInfo }); + }); +} diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_mlmodel.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_mlmodel.test.ts index 1afb436eb4361..df939f3a458b6 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_mlmodel.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_mlmodel.test.ts @@ -22,6 +22,8 @@ import { createAppContextStartContractMock } from '../../../../../mocks'; import { installMlModel } from '../../../elasticsearch/ml_model'; import { deleteMLModels, deletePrerequisiteAssets } from '../../remove'; +import { createArchiveIteratorFromMap } from '../../../archive/archive_iterator'; + import { stepInstallMlModel, cleanUpMlModelStep } from './step_install_mlmodel'; jest.mock('../../../elasticsearch/ml_model'); @@ -53,6 +55,7 @@ const packageInstallContext = { } as any, paths: ['some/path/1', 'some/path/2'], assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), }; let soClient: jest.Mocked; let esClient: jest.Mocked; diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_transforms.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_transforms.test.ts index 1ac2383950b05..3bf07d52c6cbf 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_transforms.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_install_transforms.test.ts @@ -22,6 +22,8 @@ import { createAppContextStartContractMock } from '../../../../../mocks'; import { installTransforms } from '../../../elasticsearch/transform/install'; import { cleanupTransforms } from '../../remove'; +import { createArchiveIteratorFromMap } from '../../../archive/archive_iterator'; + import { stepInstallTransforms, cleanupTransformsStep } from './step_install_transforms'; jest.mock('../../../elasticsearch/transform/install'); @@ -52,6 +54,7 @@ const packageInstallContext = { } as any, paths: ['some/path/1', 'some/path/2'], assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), }; describe('stepInstallTransforms', () => { diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_remove_legacy_templates.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_remove_legacy_templates.test.ts index 39e7159596ba8..7fa00a1c57f57 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_remove_legacy_templates.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_remove_legacy_templates.test.ts @@ -24,6 +24,8 @@ import { appContextService } from '../../../../app_context'; import { createAppContextStartContractMock } from '../../../../../mocks'; import { removeLegacyTemplates } from '../../../elasticsearch/template/remove_legacy'; +import { createArchiveIteratorFromMap } from '../../../archive/archive_iterator'; + import { stepRemoveLegacyTemplates } from './step_remove_legacy_templates'; jest.mock('../../../elasticsearch/template/remove_legacy'); @@ -82,6 +84,7 @@ describe('stepRemoveLegacyTemplates', () => { } as any, paths: ['some/path/1', 'some/path/2'], assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), }; appContextService.start( createAppContextStartContractMock({ diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_save_archive_entries.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_save_archive_entries.test.ts index b03c146640488..255572d57cf49 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_save_archive_entries.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_save_archive_entries.test.ts @@ -21,6 +21,8 @@ import { appContextService } from '../../../../app_context'; import { createAppContextStartContractMock } from '../../../../../mocks'; import { saveArchiveEntriesFromAssetsMap, removeArchiveEntries } from '../../../archive/storage'; +import { createArchiveIteratorFromMap } from '../../../archive/archive_iterator'; + import { stepSaveArchiveEntries, cleanupArchiveEntriesStep } from './step_save_archive_entries'; jest.mock('../../../archive/storage', () => { @@ -60,6 +62,7 @@ const packageInstallContext = { Buffer.from('{"content": "data"}'), ], ]), + archiveIterator: createArchiveIteratorFromMap(new Map()), }; const getMockInstalledPackageSo = ( installedEs: EsAssetReference[] = [] diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_save_archive_entries.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_save_archive_entries.ts index b0d5bb67627a6..7db44bb243f85 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_save_archive_entries.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_save_archive_entries.ts @@ -14,17 +14,32 @@ import { withPackageSpan } from '../../utils'; import type { InstallContext } from '../_state_machine_package_install'; import { INSTALL_STATES } from '../../../../../../common/types'; +import { MANIFEST_NAME } from '../../../archive/parse'; export async function stepSaveArchiveEntries(context: InstallContext) { - const { packageInstallContext, savedObjectsClient, installSource } = context; + const { packageInstallContext, savedObjectsClient, installSource, useStreaming } = context; - const { packageInfo } = packageInstallContext; + const { packageInfo, archiveIterator } = packageInstallContext; + + let assetsMap = packageInstallContext?.assetsMap; + let paths = packageInstallContext?.paths; + // For stream based installations, we don't want to save any assets but + // manifest.yaml due to the large number of assets in the package. + if (useStreaming) { + assetsMap = new Map(); + await archiveIterator.traverseEntries(async (entry) => { + if (entry.path.endsWith(MANIFEST_NAME)) { + assetsMap.set(entry.path, entry.buffer); + } + }); + paths = Array.from(assetsMap.keys()); + } const packageAssetResults = await withPackageSpan('Update archive entries', () => saveArchiveEntriesFromAssetsMap({ savedObjectsClient, - assetsMap: packageInstallContext?.assetsMap, - paths: packageInstallContext?.paths, + assetsMap, + paths, packageInfo, installSource, }) diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_save_system_object.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_save_system_object.test.ts index aecdd0b2552c4..8d80c236aefb0 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_save_system_object.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_save_system_object.test.ts @@ -21,6 +21,8 @@ import { createAppContextStartContractMock } from '../../../../../mocks'; import { auditLoggingService } from '../../../../audit_logging'; import { packagePolicyService } from '../../../../package_policy'; +import { createArchiveIteratorFromMap } from '../../../archive/archive_iterator'; + import { stepSaveSystemObject } from './step_save_system_object'; jest.mock('../../../../audit_logging'); @@ -67,6 +69,7 @@ describe('updateLatestExecutedState', () => { logger, packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -133,6 +136,7 @@ describe('updateLatestExecutedState', () => { logger, packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_update_current_write_indices.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_update_current_write_indices.test.ts index c7f3c040b7966..017805d34efef 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_update_current_write_indices.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/step_update_current_write_indices.test.ts @@ -22,6 +22,8 @@ import { appContextService } from '../../../../app_context'; import { createAppContextStartContractMock } from '../../../../../mocks'; import { updateCurrentWriteIndices } from '../../../elasticsearch/template/template'; +import { createArchiveIteratorFromMap } from '../../../archive/archive_iterator'; + import { stepUpdateCurrentWriteIndices } from './step_update_current_write_indices'; jest.mock('../../../elasticsearch/template/template'); @@ -86,6 +88,7 @@ describe('stepUpdateCurrentWriteIndices', () => { } as any, paths: ['some/path/1', 'some/path/2'], assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), }; appContextService.start( createAppContextStartContractMock({ diff --git a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/update_latest_executed_state.test.ts b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/update_latest_executed_state.test.ts index d963e5fea44c9..aea879aba5479 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/update_latest_executed_state.test.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/install_state_machine/steps/update_latest_executed_state.test.ts @@ -32,6 +32,8 @@ import { auditLoggingService } from '../../../../audit_logging'; import type { PackagePolicySOAttributes } from '../../../../../types'; +import { createArchiveIteratorFromMap } from '../../../archive/archive_iterator'; + import { updateLatestExecutedState } from './update_latest_executed_state'; jest.mock('../../../../audit_logging'); @@ -61,6 +63,7 @@ describe('updateLatestExecutedState', () => { logger, packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -116,6 +119,7 @@ describe('updateLatestExecutedState', () => { logger, packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -153,6 +157,7 @@ describe('updateLatestExecutedState', () => { logger, packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', @@ -198,6 +203,7 @@ describe('updateLatestExecutedState', () => { logger, packageInstallContext: { assetsMap: new Map(), + archiveIterator: createArchiveIteratorFromMap(new Map()), paths: [], packageInfo: { title: 'title', diff --git a/x-pack/plugins/fleet/server/services/epm/packages/remove.ts b/x-pack/plugins/fleet/server/services/epm/packages/remove.ts index ac3f5def5d09c..3892eaa951e5f 100644 --- a/x-pack/plugins/fleet/server/services/epm/packages/remove.ts +++ b/x-pack/plugins/fleet/server/services/epm/packages/remove.ts @@ -148,6 +148,7 @@ export async function deleteKibanaAssets({ const namespace = SavedObjectsUtils.namespaceStringToId(spaceId); + // TODO this should be the installed package info, not the package that is being installed const minKibana = packageInfo.conditions?.kibana?.version ? minVersion(packageInfo.conditions.kibana.version) : null; diff --git a/x-pack/plugins/fleet/server/services/epm/registry/index.ts b/x-pack/plugins/fleet/server/services/epm/registry/index.ts index bb4d612aa7de3..75b9869d0a7c6 100644 --- a/x-pack/plugins/fleet/server/services/epm/registry/index.ts +++ b/x-pack/plugins/fleet/server/services/epm/registry/index.ts @@ -54,6 +54,8 @@ import { resolveDataStreamFields, resolveDataStreamsMap, withPackageSpan } from import { verifyPackageArchiveSignature } from '../packages/package_verification'; +import type { ArchiveIterator } from '../../../../common/types'; + import { fetchUrl, getResponse, getResponseStream } from './requests'; import { getRegistryUrl } from './registry_url'; @@ -309,11 +311,12 @@ async function getPackageInfoFromArchiveOrCache( export async function getPackage( name: string, version: string, - options?: { ignoreUnverified?: boolean } + options?: { ignoreUnverified?: boolean; useStreaming?: boolean } ): Promise<{ paths: string[]; packageInfo: ArchivePackage; assetsMap: AssetsMap; + archiveIterator: ArchiveIterator; verificationResult?: PackageVerificationResult; }> { const verifyPackage = appContextService.getExperimentalFeatures().packageVerification; @@ -340,18 +343,18 @@ export async function getPackage( setVerificationResult({ name, version }, latestVerificationResult); } - const { assetsMap, paths } = await unpackBufferToAssetsMap({ - name, - version, + const contentType = ensureContentType(archivePath); + const { paths, assetsMap, archiveIterator } = await unpackBufferToAssetsMap({ archiveBuffer, - contentType: ensureContentType(archivePath), + contentType, + useStreaming: options?.useStreaming, }); if (!packageInfo) { packageInfo = await getPackageInfoFromArchiveOrCache(name, version, archiveBuffer, archivePath); } - return { paths, packageInfo, assetsMap, verificationResult }; + return { paths, packageInfo, assetsMap, archiveIterator, verificationResult }; } export async function getPackageFieldsMetadata( @@ -397,7 +400,7 @@ export async function getPackageFieldsMetadata( } } -function ensureContentType(archivePath: string) { +export function ensureContentType(archivePath: string) { const contentType = mime.lookup(archivePath); if (!contentType) { diff --git a/x-pack/plugins/fleet/server/services/package_policies/experimental_datastream_features.ts b/x-pack/plugins/fleet/server/services/package_policies/experimental_datastream_features.ts index edf31991634b9..cd1c26942aa0c 100644 --- a/x-pack/plugins/fleet/server/services/package_policies/experimental_datastream_features.ts +++ b/x-pack/plugins/fleet/server/services/package_policies/experimental_datastream_features.ts @@ -30,6 +30,7 @@ import { applyDocOnlyValueToMapping, forEachMappings, } from '../experimental_datastream_features_helper'; +import { createArchiveIteratorFromMap } from '../epm/archive/archive_iterator'; export async function handleExperimentalDatastreamFeatureOptIn({ soClient, @@ -75,6 +76,7 @@ export async function handleExperimentalDatastreamFeatureOptIn({ return prepareTemplate({ packageInstallContext: { assetsMap, + archiveIterator: createArchiveIteratorFromMap(assetsMap), packageInfo, paths, }, diff --git a/x-pack/test/fleet_api_integration/apis/epm/install_by_upload.ts b/x-pack/test/fleet_api_integration/apis/epm/install_by_upload.ts index e6fa2930cf84d..e32328b4e22cc 100644 --- a/x-pack/test/fleet_api_integration/apis/epm/install_by_upload.ts +++ b/x-pack/test/fleet_api_integration/apis/epm/install_by_upload.ts @@ -195,7 +195,7 @@ export default function (providerContext: FtrProviderContext) { .send(buf) .expect(400); expect((res.error as HTTPError).text).to.equal( - '{"statusCode":400,"error":"Bad Request","message":"Archive seems empty. Assumed content type was application/gzip, check if this matches the archive type."}' + '{"statusCode":400,"error":"Bad Request","message":"Manifest file manifest.yml not found in paths."}' ); });