Skip to content
This repository has been archived by the owner on Jan 27, 2025. It is now read-only.

Commit

Permalink
feat(ingest): add stateful ingestion to the ldap source (datahub-proj…
Browse files Browse the repository at this point in the history
…ect#6127)


Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
2 people authored and cccs-Dustin committed Feb 1, 2023
1 parent 236de5d commit fced9b3
Show file tree
Hide file tree
Showing 11 changed files with 844 additions and 119 deletions.
80 changes: 59 additions & 21 deletions metadata-ingestion/src/datahub/ingestion/source/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,25 @@
from ldap.controls import SimplePagedResultsControl
from pydantic.fields import Field

from datahub.configuration.common import ConfigModel, ConfigurationError
from datahub.configuration.common import ConfigurationError
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.ldap_state import LdapCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
CorpGroupInfoClass,
Expand All @@ -25,6 +34,10 @@
CorpUserSnapshotClass,
GroupMembershipClass,
)
from datahub.utilities.source_helpers import (
auto_stale_entity_removal,
auto_status_aspect,
)

# default mapping for attrs
user_attrs_map: Dict[str, Any] = {}
Expand Down Expand Up @@ -86,14 +99,17 @@ def set_cookie(
return bool(cookie)


class LDAPSourceConfig(ConfigModel):
class LDAPSourceConfig(StatefulIngestionConfigBase):
"""Config used by the LDAP Source."""

# Server configuration.
ldap_server: str = Field(description="LDAP server URL.")
ldap_user: str = Field(description="LDAP user.")
ldap_password: str = Field(description="LDAP password.")

# Custom Stateful Ingestion settings
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None

# Extraction configuration.
base_dn: str = Field(description="LDAP DN.")
filter: str = Field(default="(objectClass=*)", description="LDAP extractor filter.")
Expand All @@ -117,7 +133,7 @@ class LDAPSourceConfig(ConfigModel):


@dataclasses.dataclass
class LDAPSourceReport(SourceReport):
class LDAPSourceReport(StaleEntityRemovalSourceReport):

dropped_dns: List[str] = dataclasses.field(default_factory=list)

Expand Down Expand Up @@ -151,7 +167,7 @@ def guess_person_ldap(
@config_class(LDAPSourceConfig)
@support_status(SupportStatus.CERTIFIED)
@dataclasses.dataclass
class LDAPSource(Source):
class LDAPSource(StatefulIngestionSourceBase):
"""
This plugin extracts the following:
- People
Expand All @@ -161,11 +177,21 @@ class LDAPSource(Source):

config: LDAPSourceConfig
report: LDAPSourceReport
platform: str = "ldap"

def __init__(self, ctx: PipelineContext, config: LDAPSourceConfig):
"""Constructor."""
super().__init__(ctx)
super(LDAPSource, self).__init__(config, ctx)
self.config = config

self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=LdapCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

# ensure prior defaults are in place
for k in user_attrs_map:
if k not in self.config.user_attrs_map:
Expand Down Expand Up @@ -199,6 +225,12 @@ def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext) -> "LDAPSourc
return cls(ctx, config)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
"""Returns an Iterable containing the workunits to ingest LDAP users or groups."""
cookie = True
while cookie:
Expand Down Expand Up @@ -251,6 +283,13 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:

cookie = set_cookie(self.lc, pctrls)

def get_platform_instance_id(self) -> str:
"""
The source identifier such as the specific source host address required for stateful ingestion.
Individual subclasses need to override this method appropriately.
"""
return self.config.ldap_server

def handle_user(self, dn: str, attrs: Dict[str, Any]) -> Iterable[MetadataWorkUnit]:
"""
Handle a DN and attributes by adding manager info and constructing a
Expand Down Expand Up @@ -358,7 +397,7 @@ def build_corp_user_mce(
countryCode=country_code,
title=title,
managerUrn=manager_urn,
)
),
],
)

Expand Down Expand Up @@ -389,21 +428,20 @@ def build_corp_group_mce(self, attrs: dict) -> Optional[MetadataChangeEvent]:
if self.config.group_attrs_map["displayName"] in attrs
else None
)
return MetadataChangeEvent(
proposedSnapshot=CorpGroupSnapshotClass(
urn=f"urn:li:corpGroup:{full_name}",
aspects=[
CorpGroupInfoClass(
email=email,
admins=admins,
members=members,
groups=[],
description=description,
displayName=displayName,
)
],
)
group_snapshot = CorpGroupSnapshotClass(
urn=f"urn:li:corpGroup:{full_name}",
aspects=[
CorpGroupInfoClass(
email=email,
admins=admins,
members=members,
groups=[],
description=description,
displayName=displayName,
),
],
)
return MetadataChangeEvent(proposedSnapshot=group_snapshot)
return None

def get_report(self) -> LDAPSourceReport:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import Iterable, List

import pydantic

from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityCheckpointStateBase,
)
from datahub.utilities.urns.urn import guess_entity_type


class LdapCheckpointState(StaleEntityCheckpointStateBase["LdapCheckpointState"]):
"""
Base class for representing the checkpoint state for all LDAP based sources.
Stores all corpuser and corpGroup and being ingested and is used to remove any stale entities.
"""

urns: List[str] = pydantic.Field(default_factory=list)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["corpuser", "corpGroup"]

def add_checkpoint_urn(self, type: str, urn: str) -> None:
assert type in self.get_supported_types()
self.urns.append(urn)

def get_urns_not_in(
self, type: str, other_checkpoint_state: "LdapCheckpointState"
) -> Iterable[str]:
assert type in self.get_supported_types()
diff = set(self.urns) - set(other_checkpoint_state.urns)
yield from (urn for urn in diff if guess_entity_type(urn) == type)

def get_percent_entities_changed(
self, old_checkpoint_state: "LdapCheckpointState"
) -> float:
return StaleEntityCheckpointStateBase.compute_percent_entities_changed(
[(self.urns, old_checkpoint_state.urns)]
)
Loading

0 comments on commit fced9b3

Please sign in to comment.