Skip to content

Commit

Permalink
[Fleet] use package storage when getting a package (#85337)
Browse files Browse the repository at this point in the history
* getPackageFromSource to use package storage

* fix type

* use bulkGet

* add data streams and policy templates to package info from storage

* fix merge conflict

* comment out policy_templates for now

* add policy_templates to package info, remove required inputs from parseAndVerifyPolicyTemplates

* add storage assets to cache

* tidy up

Co-authored-by: Kibana Machine <[email protected]>
  • Loading branch information
neptunian and kibanamachine committed Jan 12, 2021
1 parent 7f1fc0c commit 35af269
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 45 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/fleet/server/services/epm/archive/cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const getPackageInfo = (args: SharedKey) => {
export const getArchivePackage = (args: SharedKey) => {
const packageInfo = getPackageInfo(args);
const paths = getArchiveFilelist(args);
if (!paths || !packageInfo) return undefined;
return {
paths,
packageInfo,
Expand Down
115 changes: 114 additions & 1 deletion x-pack/plugins/fleet/server/services/epm/archive/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/

import { extname } from 'path';
import { uniq } from 'lodash';
import { safeLoad } from 'js-yaml';
import { isBinaryFile } from 'isbinaryfile';
import mime from 'mime-types';
import uuidv5 from 'uuid/v5';
Expand All @@ -14,8 +16,11 @@ import {
InstallablePackage,
InstallSource,
PackageAssetReference,
RegistryDataStream,
} from '../../../../common';
import { getArchiveEntry } from './index';
import { ArchiveEntry, getArchiveEntry, setArchiveEntry, setArchiveFilelist } from './index';
import { parseAndVerifyPolicyTemplates, parseAndVerifyStreams } from './validation';
import { pkgToPkgKey } from '../registry';

// could be anything, picked this from https://github.com/elastic/elastic-agent-client/issues/17
const MAX_ES_ASSET_BYTES = 4 * 1024 * 1024;
Expand Down Expand Up @@ -121,6 +126,15 @@ export async function archiveEntryToBulkCreateObject(opts: {
attributes: doc,
};
}
export function packageAssetToArchiveEntry(asset: PackageAsset): ArchiveEntry {
const { asset_path: path, data_utf8: utf8, data_base64: base64 } = asset;
const buffer = utf8 ? Buffer.from(utf8, 'utf8') : Buffer.from(base64, 'base64');

return {
path,
buffer,
};
}

export async function getAsset(opts: {
savedObjectsClient: SavedObjectsClientContract;
Expand All @@ -138,3 +152,102 @@ export async function getAsset(opts: {

return storedAsset;
}

export const getEsPackage = async (
pkgName: string,
pkgVersion: string,
references: PackageAssetReference[],
savedObjectsClient: SavedObjectsClientContract
) => {
const pkgKey = pkgToPkgKey({ name: pkgName, version: pkgVersion });
const bulkRes = await savedObjectsClient.bulkGet<PackageAsset>(
references.map((reference) => ({
...reference,
fields: ['asset_path', 'data_utf8', 'data_base64'],
}))
);
const assets = bulkRes.saved_objects.map((so) => so.attributes);

// add asset references to cache
const paths: string[] = [];
const entries: ArchiveEntry[] = assets.map(packageAssetToArchiveEntry);
entries.forEach(({ path, buffer }) => {
if (path && buffer) {
setArchiveEntry(path, buffer);
paths.push(path);
}
});
setArchiveFilelist({ name: pkgName, version: pkgVersion }, paths);
// create the packageInfo
// TODO: this is mostly copied from validtion.ts, needed in case package does not exist in storage yet or is missing from cache
// we don't want to reach out to the registry again so recreate it here. should check whether it exists in packageInfoCache first

const manifestPath = `${pkgName}-${pkgVersion}/manifest.yml`;
const soResManifest = await savedObjectsClient.get<PackageAsset>(
ASSETS_SAVED_OBJECT_TYPE,
assetPathToObjectId(manifestPath)
);
const packageInfo = safeLoad(soResManifest.attributes.data_utf8);

try {
const readmePath = `docs/README.md`;
await savedObjectsClient.get<PackageAsset>(
ASSETS_SAVED_OBJECT_TYPE,
assetPathToObjectId(`${pkgName}-${pkgVersion}/${readmePath}`)
);
packageInfo.readme = `/package/${pkgName}/${pkgVersion}/${readmePath}`;
} catch (err) {
// read me doesn't exist
}

let dataStreamPaths: string[] = [];
const dataStreams: RegistryDataStream[] = [];
paths
.filter((path) => path.startsWith(`${pkgKey}/data_stream/`))
.forEach((path) => {
const parts = path.split('/');
if (parts.length > 2 && parts[2]) dataStreamPaths.push(parts[2]);
});

dataStreamPaths = uniq(dataStreamPaths);

await Promise.all(
dataStreamPaths.map(async (dataStreamPath) => {
const dataStreamManifestPath = `${pkgKey}/data_stream/${dataStreamPath}/manifest.yml`;
const soResDataStreamManifest = await savedObjectsClient.get<PackageAsset>(
ASSETS_SAVED_OBJECT_TYPE,
assetPathToObjectId(dataStreamManifestPath)
);
const dataStreamManifest = safeLoad(soResDataStreamManifest.attributes.data_utf8);
const {
title: dataStreamTitle,
release,
ingest_pipeline: ingestPipeline,
type,
dataset,
} = dataStreamManifest;
const streams = parseAndVerifyStreams(dataStreamManifest, dataStreamPath);

dataStreams.push({
dataset: dataset || `${pkgName}.${dataStreamPath}`,
title: dataStreamTitle,
release,
package: pkgName,
ingest_pipeline: ingestPipeline || 'default',
path: dataStreamPath,
type,
streams,
});
})
);
packageInfo.policy_templates = parseAndVerifyPolicyTemplates(packageInfo);
packageInfo.data_streams = dataStreams;
packageInfo.assets = paths.map((path) => {
return path.replace(`${pkgName}-${pkgVersion}`, `/package/${pkgName}/${pkgVersion}`);
});

return {
paths,
packageInfo,
};
};
25 changes: 15 additions & 10 deletions x-pack/plugins/fleet/server/services/epm/archive/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
RegistryInput,
RegistryStream,
RegistryVarsEntry,
PackageSpecManifest,
} from '../../../../common/types';
import { PackageInvalidArchiveError } from '../../../errors';
import { unpackBufferEntries } from './index';
Expand Down Expand Up @@ -143,7 +144,7 @@ function parseAndVerifyReadme(paths: string[], pkgName: string, pkgVersion: stri
const readmePath = `${pkgName}-${pkgVersion}${readmeRelPath}`;
return paths.includes(readmePath) ? `/package/${pkgName}/${pkgVersion}${readmeRelPath}` : null;
}
function parseAndVerifyDataStreams(
export function parseAndVerifyDataStreams(
paths: string[],
pkgName: string,
pkgVersion: string
Expand Down Expand Up @@ -211,7 +212,7 @@ function parseAndVerifyDataStreams(

return dataStreams;
}
function parseAndVerifyStreams(manifest: any, dataStreamPath: string): RegistryStream[] {
export function parseAndVerifyStreams(manifest: any, dataStreamPath: string): RegistryStream[] {
const streams: RegistryStream[] = [];
const manifestStreams = manifest.streams;
if (manifestStreams && manifestStreams.length > 0) {
Expand Down Expand Up @@ -243,7 +244,7 @@ function parseAndVerifyStreams(manifest: any, dataStreamPath: string): RegistryS
}
return streams;
}
function parseAndVerifyVars(manifestVars: any[], location: string): RegistryVarsEntry[] {
export function parseAndVerifyVars(manifestVars: any[], location: string): RegistryVarsEntry[] {
const vars: RegistryVarsEntry[] = [];
if (manifestVars && manifestVars.length > 0) {
manifestVars.forEach((manifestVar) => {
Expand Down Expand Up @@ -278,19 +279,23 @@ function parseAndVerifyVars(manifestVars: any[], location: string): RegistryVars
}
return vars;
}
function parseAndVerifyPolicyTemplates(manifest: any): RegistryPolicyTemplate[] {
export function parseAndVerifyPolicyTemplates(
manifest: PackageSpecManifest
): RegistryPolicyTemplate[] {
const policyTemplates: RegistryPolicyTemplate[] = [];
const manifestPolicyTemplates = manifest.policy_templates;
if (manifestPolicyTemplates && manifestPolicyTemplates > 0) {
if (manifestPolicyTemplates && manifestPolicyTemplates.length > 0) {
manifestPolicyTemplates.forEach((policyTemplate: any) => {
const { name, title: policyTemplateTitle, description, inputs, multiple } = policyTemplate;
if (!(name && policyTemplateTitle && description && inputs)) {
if (!(name && policyTemplateTitle && description)) {
throw new PackageInvalidArchiveError(
`Invalid top-level manifest: one of mandatory fields 'name', 'title', 'description', 'input' missing in policy template: ${policyTemplate}`
`Invalid top-level manifest: one of mandatory fields 'name', 'title', 'description' is missing in policy template: ${policyTemplate}`
);
}

const parsedInputs = parseAndVerifyInputs(inputs, `config template ${name}`);
let parsedInputs: RegistryInput[] | undefined = [];
if (inputs) {
parsedInputs = parseAndVerifyInputs(inputs, `config template ${name}`);
}

// defaults to true if undefined, but may be explicitly set to false.
let parsedMultiple = true;
Expand All @@ -307,7 +312,7 @@ function parseAndVerifyPolicyTemplates(manifest: any): RegistryPolicyTemplate[]
}
return policyTemplates;
}
function parseAndVerifyInputs(manifestInputs: any, location: string): RegistryInput[] {
export function parseAndVerifyInputs(manifestInputs: any, location: string): RegistryInput[] {
const inputs: RegistryInput[] = [];
if (manifestInputs && manifestInputs.length > 0) {
manifestInputs.forEach((input: any) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import { SavedObjectsClientContract } from 'src/core/server';
import { INDEX_PATTERN_SAVED_OBJECT_TYPE } from '../../../../constants';
import { loadFieldsFromYaml, Fields, Field } from '../../fields/field';
import { dataTypes, installationStatuses } from '../../../../../common/constants';
import { ArchivePackage, InstallSource, ValueOf } from '../../../../../common/types';
import { ArchivePackage, Installation, InstallSource, ValueOf } from '../../../../../common/types';
import { RegistryPackage, DataType } from '../../../../types';
import { getPackageFromSource, getPackageSavedObjects } from '../../packages/get';
import { getInstallation, getPackageFromSource, getPackageSavedObjects } from '../../packages/get';

interface FieldFormatMap {
[key: string]: FieldFormatMapItem;
Expand Down Expand Up @@ -81,34 +81,39 @@ export async function installIndexPatterns(
);

const packagesToFetch = installedPackagesSavedObjects.reduce<
Array<{ name: string; version: string; installSource: InstallSource }>
>((acc, pkgSO) => {
Array<{ name: string; version: string; installedPkg: Installation | undefined }>
>((acc, pkg) => {
acc.push({
name: pkgSO.attributes.name,
version: pkgSO.attributes.version,
installSource: pkgSO.attributes.install_source,
name: pkg.attributes.name,
version: pkg.attributes.version,
installedPkg: pkg.attributes,
});
return acc;
}, []);

if (pkgName && pkgVersion && installSource) {
const packageToInstall = packagesToFetch.find((pkgSO) => pkgSO.name === pkgName);
const packageToInstall = packagesToFetch.find((pkg) => pkg.name === pkgName);
if (packageToInstall) {
// set the version to the one we want to install
// if we're reinstalling the number will be the same
// if this is an upgrade then we'll be modifying the version number to the upgrade version
packageToInstall.version = pkgVersion;
} else {
// if we're installing for the first time, add to the list
packagesToFetch.push({ name: pkgName, version: pkgVersion, installSource });
packagesToFetch.push({
name: pkgName,
version: pkgVersion,
installedPkg: await getInstallation({ savedObjectsClient, pkgName }),
});
}
}
// get each package's registry info
const packagesToFetchPromise = packagesToFetch.map((pkg) =>
getPackageFromSource({
pkgName: pkg.name,
pkgVersion: pkg.version,
pkgInstallSource: pkg.installSource,
installedPkg: pkg.installedPkg,
savedObjectsClient,
})
);
const packages = await Promise.all(packagesToFetchPromise);
Expand Down
64 changes: 41 additions & 23 deletions x-pack/plugins/fleet/server/services/epm/packages/get.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@
import { SavedObjectsClientContract, SavedObjectsFindOptions } from 'src/core/server';
import { isPackageLimited, installationStatuses } from '../../../../common';
import { PACKAGES_SAVED_OBJECT_TYPE } from '../../../constants';
import {
ArchivePackage,
InstallSource,
RegistryPackage,
EpmPackageAdditions,
} from '../../../../common/types';
import { ArchivePackage, RegistryPackage, EpmPackageAdditions } from '../../../../common/types';
import { Installation, PackageInfo, KibanaAssetType } from '../../../types';
import * as Registry from '../registry';
import { createInstallableFrom, isRequiredPackage } from './index';
import { getEsPackage } from '../archive/storage';
import { getArchivePackage } from '../archive';

export { getFile, SearchParams } from '../registry';
Expand Down Expand Up @@ -103,13 +99,10 @@ export async function getPackageInfo(options: {
const getPackageRes = await getPackageFromSource({
pkgName,
pkgVersion,
pkgInstallSource:
savedObject?.attributes.version === pkgVersion
? savedObject?.attributes.install_source
: 'registry',
savedObjectsClient,
installedPkg: savedObject?.attributes,
});
const paths = getPackageRes.paths;
const packageInfo = getPackageRes.packageInfo;
const { paths, packageInfo } = getPackageRes;

// add properties that aren't (or aren't yet) on the package
const additions: EpmPackageAdditions = {
Expand All @@ -123,28 +116,53 @@ export async function getPackageInfo(options: {
return createInstallableFrom(updated, savedObject);
}

interface PackageResponse {
paths: string[];
packageInfo: ArchivePackage | RegistryPackage;
}
type GetPackageResponse = PackageResponse | undefined;

// gets package from install_source if it exists otherwise gets from registry
export async function getPackageFromSource(options: {
pkgName: string;
pkgVersion: string;
pkgInstallSource?: InstallSource;
}): Promise<{
paths: string[] | undefined;
packageInfo: RegistryPackage | ArchivePackage;
}> {
const { pkgName, pkgVersion, pkgInstallSource } = options;
// TODO: Check package storage before checking registry
let res;
if (pkgInstallSource === 'upload') {
installedPkg?: Installation;
savedObjectsClient: SavedObjectsClientContract;
}): Promise<PackageResponse> {
const { pkgName, pkgVersion, installedPkg, savedObjectsClient } = options;
let res: GetPackageResponse;
// if the package is installed

if (installedPkg && installedPkg.version === pkgVersion) {
const { install_source: pkgInstallSource } = installedPkg;
// check cache
res = getArchivePackage({
name: pkgName,
version: pkgVersion,
});
if (!res) {
res = await getEsPackage(
pkgName,
pkgVersion,
installedPkg.package_assets,
savedObjectsClient
);
}
// for packages not in cache or package storage and installed from registry, check registry
if (!res && pkgInstallSource === 'registry') {
try {
res = await Registry.getRegistryPackage(pkgName, pkgVersion);
// TODO: add to cache and storage here?
} catch (error) {
// treating this is a 404 as no status code returned
// in the unlikely event its missing from cache, storage, and never installed from registry
}
}
} else {
// else package is not installed or installed and missing from cache and storage and installed from registry
res = await Registry.getRegistryPackage(pkgName, pkgVersion);
}
if (!res.packageInfo || !res.paths)
throw new Error(`package info for ${pkgName}-${pkgVersion} does not exist`);
if (!res) throw new Error(`package info for ${pkgName}-${pkgVersion} does not exist`);
return {
paths: res.paths,
packageInfo: res.packageInfo,
Expand Down
1 change: 0 additions & 1 deletion x-pack/plugins/fleet/server/services/epm/registry/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ export async function getRegistryPackage(
}

const packageInfo = await getInfo(name, version);

return { paths, packageInfo };
}

Expand Down

0 comments on commit 35af269

Please sign in to comment.