Skip to content

Commit

Permalink
feat: aliyun datasource support crawl metadata at once
Browse files Browse the repository at this point in the history
  • Loading branch information
jinkangkang.jkk committed Dec 25, 2024
1 parent 3f82153 commit 2ec5ea7
Show file tree
Hide file tree
Showing 5 changed files with 664 additions and 76 deletions.
341 changes: 333 additions & 8 deletions cloudinit/sources/DataSourceAliYun.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,38 @@
from typing import List

from cloudinit import dmi, sources
from cloudinit import url_helper as uhelp
from cloudinit import util
from cloudinit.event import EventScope, EventType
from cloudinit.sources import DataSourceEc2 as EC2
from cloudinit.sources import DataSourceHostname, NicOrder
from cloudinit.net.dhcp import NoDHCPLeaseError
from cloudinit.net.ephemeral import EphemeralIPNetwork
from cloudinit.sources import DataSourceHostname
from cloudinit.sources.helpers import aliyun, ec2

LOG = logging.getLogger(__name__)

ALIYUN_PRODUCT = "Alibaba Cloud ECS"


class DataSourceAliYun(EC2.DataSourceEc2):
class DataSourceAliYun(sources.DataSource):

dsname = "AliYun"
metadata_urls = ["http://100.100.100.200"]

# The minimum supported metadata_version from the ec2 metadata apis
# The minimum supported metadata_version from the ecs metadata apis
min_metadata_version = "2016-01-01"
extended_metadata_versions: List[str] = []

# Setup read_url parameters per get_url_params.
url_max_wait = 240
url_timeout = 50

_api_token = None # API token for accessing the metadata service
_network_config = sources.UNSET # Used to cache calculated network cfg v1

# Whether we want to get network configuration from the metadata service.
perform_dhcp_setup = False

# Aliyun metadata server security enhanced mode overwrite
@property
def imdsv2_token_put_header(self):
Expand All @@ -32,11 +46,9 @@ def __init__(self, sys_cfg, distro, paths):
super(DataSourceAliYun, self).__init__(sys_cfg, distro, paths)
self.default_update_events = copy.deepcopy(self.default_update_events)
self.default_update_events[EventScope.NETWORK].add(EventType.BOOT)
self._fallback_nic_order = NicOrder.NIC_NAME

def _unpickle(self, ci_pkl_version: int) -> None:
super()._unpickle(ci_pkl_version)
self._fallback_nic_order = NicOrder.NIC_NAME

def get_hostname(self, fqdn=False, resolve_ip=False, metadata_only=False):
hostname = self.metadata.get("hostname")
Expand All @@ -51,9 +63,322 @@ def get_public_ssh_keys(self):

def _get_cloud_name(self):
if _is_aliyun():
return EC2.CloudNames.ALIYUN
return self.dsname.lower()
return "NO_ALIYUN_METADATA"

@property
def platform(self):
return self.dsname.lower()

# IMDSv2 related parameters from the ecs metadata api document
@property
def api_token_route(self):
return "latest/api/token"

@property
def imdsv2_token_ttl_seconds(self):
return "21600"

@property
def imdsv2_token_redact(self):
return [self.imdsv2_token_put_header, self.imdsv2_token_req_header]

@property
def imdsv2_token_req_header(self):
return self.imdsv2_token_put_header + "-ttl-seconds"

@property
def network_config(self):
"""Return a network config dict for rendering ENI or netplan files."""
if self._network_config != sources.UNSET:
return self._network_config

if self.metadata is None:
# this would happen if get_data hadn't been called. leave as UNSET
LOG.warning(
"Unexpected call to network_config when metadata is None."
)
return None

result = None

iface = self.distro.fallback_interface
net_md = self.metadata.get("network")
if isinstance(net_md, dict):
result = aliyun.convert_ecs_metadata_network_config(
net_md,
fallback_nic=iface,
full_network_config=util.get_cfg_option_bool(
self.ds_cfg, "apply_full_imds_network_config", True
),
)
else:
return EC2.CloudNames.NO_EC2_METADATA
LOG.warning("Metadata 'network' key not valid: %s.", net_md)
self._network_config = result
return self._network_config

def _maybe_fetch_api_token(self, mdurls):
"""Get an API token for ECS Instance Metadata Service.
On ECS. IMDS will always answer an API token, set
HttpTokens=optional (default) when create instance will not forcefully
use the security-enhanced mode (IMDSv2).
https://api.alibabacloud.com/api/Ecs/2014-05-26/RunInstances
"""

urls = []
url2base = {}
url_path = self.api_token_route
request_method = "PUT"
for url in mdurls:
cur = "{0}/{1}".format(url, url_path)
urls.append(cur)
url2base[cur] = url

# use the self._imds_exception_cb to check for Read errors
LOG.debug("Fetching Ecs IMDSv2 API Token")

response = None
url = None
url_params = self.get_url_params()
try:
url, response = uhelp.wait_for_url(
urls=urls,
max_wait=url_params.max_wait_seconds,
timeout=url_params.timeout_seconds,
status_cb=LOG.warning,
headers_cb=self._get_headers,
exception_cb=self._imds_exception_cb,
request_method=request_method,
headers_redact=self.imdsv2_token_redact,
connect_synchronously=False,
)
except uhelp.UrlError:
# We use the raised exception to interupt the retry loop.
# Nothing else to do here.
pass

if url and response:
self._api_token = response
return url2base[url]

# If we get here, then wait_for_url timed out, waiting for IMDS
# or the IMDS HTTP endpoint is disabled
return None

def wait_for_metadata_service(self):
mcfg = self.ds_cfg
mdurls = mcfg.get("metadata_urls", self.metadata_urls)

# try the api token path first
metadata_address = self._maybe_fetch_api_token(mdurls)

if metadata_address:
self.metadata_address = metadata_address
LOG.debug("Using metadata source: '%s'", self.metadata_address)
else:
LOG.warning("IMDS's HTTP endpoint is probably disabled")
return bool(metadata_address)

def crawl_metadata(self):
"""Crawl metadata service when available.
@returns: Dictionary of crawled metadata content containing the keys:
meta-data, user-data, vendor-data and dynamic.
"""
if not self.wait_for_metadata_service():
return {}
redact = self.imdsv2_token_redact
crawled_metadata = {}
exc_cb = self._refresh_stale_aliyun_token_cb
exc_cb_ud = self._skip_or_refresh_stale_aliyun_token_cb
skip_cb = None
exe_cb_whole_meta = self._skip_json_path_meta_path_aliyun_cb
try:
crawled_metadata["user-data"] = aliyun.get_instance_data(
self.min_metadata_version,
self.metadata_address,
headers_cb=self._get_headers,
headers_redact=redact,
exception_cb=exc_cb_ud,
item_name="user-data",
)
crawled_metadata["vendor-data"] = aliyun.get_instance_data(
self.min_metadata_version,
self.metadata_address,
headers_cb=self._get_headers,
headers_redact=redact,
exception_cb=exc_cb_ud,
item_name="vendor-data",
)
try:
result = aliyun.get_instance_meta_data(
self.min_metadata_version,
self.metadata_address,
headers_cb=self._get_headers,
headers_redact=redact,
exception_cb=exe_cb_whole_meta,
)
crawled_metadata["meta-data"] = result
except Exception:
util.logexc(
LOG,
"Faild read json meta-data from %s"
"fall back directory tree style",
self.metadata_address,
)
crawled_metadata["meta-data"] = ec2.get_instance_metadata(
self.min_metadata_version,
self.metadata_address,
headers_cb=self._get_headers,
headers_redact=redact,
exception_cb=exc_cb,
retrieval_exception_ignore_cb=skip_cb,
)
except Exception:
util.logexc(
LOG,
"Failed reading from metadata address %s",
self.metadata_address,
)
return {}
return crawled_metadata

def _refresh_stale_aliyun_token_cb(self, msg, exception):
"""Exception handler for Ecs to refresh token if token is stale."""
if isinstance(exception, uhelp.UrlError) and exception.code == 401:
# With _api_token as None, _get_headers will _refresh_api_token.
LOG.debug("Clearing cached Ecs API token due to expiry")
self._api_token = None
return True # always retry

def _skip_or_refresh_stale_aliyun_token_cb(self, msg, exception):
"""Callback will not retry on SKIP_USERDATA_VENDORDATA_CODES or
if no token is available."""
retry = ec2.skip_retry_on_codes(
ec2.SKIP_USERDATA_CODES, msg, exception
)
if not retry:
return False # False raises exception
return self._refresh_stale_aliyun_token_cb(msg, exception)

def _skip_json_path_meta_path_aliyun_cb(self, msg, exception):
"""Callback will not retry of whole meta_path is not found"""
if isinstance(exception, uhelp.UrlError) and exception.code == 404:
LOG.warning("whole meta_path is not found, skipping")
return False
return self._refresh_stale_aliyun_token_cb(msg, exception)

def _get_data(self):
if self.cloud_name != self.dsname.lower():
return False
if self.perform_dhcp_setup: # Setup networking in init-local stage.
if util.is_FreeBSD():
LOG.debug("FreeBSD doesn't support running dhclient with -sf")
return False
try:
with EphemeralIPNetwork(
self.distro,
self.distro.fallback_interface,
ipv4=True,
ipv6=False,
) as netw:
self._crawled_metadata = self.crawl_metadata()
LOG.debug(
"Crawled metadata service%s",
f" {netw.state_msg}" if netw.state_msg else "",
)

except NoDHCPLeaseError:
return False
else:
self._crawled_metadata = self.crawl_metadata()
if not self._crawled_metadata:
return False
self.metadata = self._crawled_metadata.get("meta-data", None)
self.userdata_raw = self._crawled_metadata.get("user-data", None)
self.vendordata_raw = self._crawled_metadata.get("vendor-data", None)
return True

def _refresh_api_token(self, seconds=None):
"""Request new metadata API token.
@param seconds: The lifetime of the token in seconds
@return: The API token or None if unavailable.
"""
# if self.cloud_name not in IDMSV2_SUPPORTED_CLOUD_PLATFORMS:
# return None

if seconds is None:
seconds = self.imdsv2_token_ttl_seconds

LOG.debug("Refreshing Ecs metadata API token")
request_header = {self.imdsv2_token_req_header: seconds}
token_url = "{}/{}".format(self.metadata_address, self.api_token_route)
try:
response = uhelp.readurl(
token_url,
headers=request_header,
headers_redact=self.imdsv2_token_redact,
request_method="PUT",
)
except uhelp.UrlError as e:
LOG.warning(
"Unable to get API token: %s raised exception %s", token_url, e
)
return None
return response.contents

def _get_headers(self, url=""):
"""Return a dict of headers for accessing a url.
If _api_token is unset on AWS, attempt to refresh the token via a PUT
and then return the updated token header.
"""
# if self.cloud_name not in IDMSV2_SUPPORTED_CLOUD_PLATFORMS:
# return {}
# Request a 6 hour token if URL is api_token_route
request_token_header = {
self.imdsv2_token_req_header: self.imdsv2_token_ttl_seconds
}
if self.api_token_route in url:
return request_token_header
if not self._api_token:
# If we don't yet have an API token, get one via a PUT against
# api_token_route. This _api_token may get unset by a 403 due
# to an invalid or expired token
self._api_token = self._refresh_api_token()
if not self._api_token:
return {}
return {self.imdsv2_token_put_header: self._api_token}

def _imds_exception_cb(self, msg, exception=None):
"""Fail quickly on proper AWS if IMDSv2 rejects API token request
Guidance from Amazon is that if IMDSv2 had disabled token requests
by returning a 403, or cloud-init malformed requests resulting in
other 40X errors, we want the datasource detection to fail quickly
without retries as those symptoms will likely not be resolved by
retries.
Exceptions such as requests.ConnectionError due to IMDS being
temporarily unroutable or unavailable will still retry due to the
callsite wait_for_url.
"""
if isinstance(exception, uhelp.UrlError):
# requests.ConnectionError will have exception.code == None
if exception.code and exception.code >= 400:
if exception.code == 403:
LOG.warning(
"Ecs IMDS endpoint returned a 403 error. "
"HTTP endpoint is disabled. Aborting."
)
else:
LOG.warning(
"Fatal error while requesting Ecs IMDSv2 API tokens"
)
raise exception


def _is_aliyun():
Expand Down
Loading

0 comments on commit 2ec5ea7

Please sign in to comment.