-
Notifications
You must be signed in to change notification settings - Fork 514
/
integrations.py
110 lines (84 loc) · 4.7 KB
/
integrations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
"""Functions to support and interact with Kibana integrations."""
import gzip
import json
import os
import re
from pathlib import Path
from typing import Union
import requests
from marshmallow import EXCLUDE, Schema, fields, post_load
from .semver import Version
from .utils import cached, get_etc_path, read_gzip
MANIFEST_FILE_PATH = Path(get_etc_path('integration-manifests.json.gz'))
@cached
def load_integrations_manifests() -> dict:
"""Load the consolidated integrations manifest."""
return json.loads(read_gzip(get_etc_path('integration-manifests.json.gz')))
class IntegrationManifestSchema(Schema):
name = fields.Str(required=True)
version = fields.Str(required=True)
release = fields.Str(required=True)
description = fields.Str(required=True)
conditions = fields.Dict(required=True)
policy_templates = fields.List(fields.Dict, required=True)
owner = fields.Dict(required=False)
@post_load
def transform_policy_template(self, data, **kwargs):
data["policy_templates"] = [policy["name"] for policy in data["policy_templates"]]
return data
def build_integrations_manifest(overwrite: bool, rule_integrations: list) -> None:
"""Builds a new local copy of manifest.yaml from integrations Github."""
if overwrite:
if os.path.exists(MANIFEST_FILE_PATH):
os.remove(MANIFEST_FILE_PATH)
final_integration_manifests = {integration: {} for integration in rule_integrations}
for integration in rule_integrations:
integration_manifests = get_integration_manifests(integration)
for manifest in integration_manifests:
validated_manifest = IntegrationManifestSchema(unknown=EXCLUDE).load(manifest)
package_version = validated_manifest.pop("version")
final_integration_manifests[integration][package_version] = validated_manifest
manifest_file = gzip.open(MANIFEST_FILE_PATH, "w+")
manifest_file_bytes = json.dumps(final_integration_manifests).encode("utf-8")
manifest_file.write(manifest_file_bytes)
print(f"final integrations manifests dumped: {MANIFEST_FILE_PATH}")
def find_least_compatible_version(package: str, integration: str,
current_stack_version: str, packages_manifest: dict) -> Union[str, None]:
"""Finds least compatible version for specified integration based on stack version supplied."""
integration_manifests = {k: v for k, v in sorted(packages_manifest[package].items(), key=Version)}
# trim integration_manifests to only the latest major entries
max_major, *_ = max([Version(manifest_version) for manifest_version in integration_manifests])
latest_major_integration_manifests = \
{k: v for k, v in integration_manifests.items() if Version(k)[0] == max_major}
def compare_versions(int_ver: str, pkg_ver: str) -> bool:
"""Compares integration and package version"""
pkg_major, pkg_minor = Version(pkg_ver)
integration_major, integration_minor = Version(int_ver)[:2]
if int(integration_major) < int(pkg_major) or int(pkg_major) > int(integration_major):
return False
compatible = Version(int_ver) <= Version(pkg_ver)
return compatible
for version, manifest in latest_major_integration_manifests.items():
for kibana_compat_vers in re.sub(r"\>|\<|\=|\^", "", manifest["conditions"]["kibana"]["version"]).split(" || "):
if compare_versions(kibana_compat_vers, current_stack_version):
return f"^{version}"
print(f"no compatible version for integration {package}:{integration}")
return None
def get_integration_manifests(integration: str) -> list:
"""Iterates over specified integrations from package-storage and combines manifests per version."""
epr_search_url = "https://epr.elastic.co/search"
# link for search parameters - https://github.com/elastic/package-registry
epr_search_parameters = {"package": f"{integration}", "prerelease": "true",
"all": "true", "include_policy_templates": "true"}
epr_search_response = requests.get(epr_search_url, params=epr_search_parameters)
epr_search_response.raise_for_status()
manifests = epr_search_response.json()
if not manifests:
raise ValueError(f"EPR search for {integration} integration package returned empty list")
print(f"loaded {integration} manifests from the following package versions: "
f"{[manifest['version'] for manifest in manifests]}")
return manifests