Skip to content

Commit

Permalink
feat(webscraper): add support for a list of URLs (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
janaka authored Mar 3, 2024
1 parent f5725a3 commit 42e8f86
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 80 deletions.
170 changes: 118 additions & 52 deletions source/docq/data_source/support/web_extracting.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
from bs4 import BeautifulSoup
from llama_index import Document
from llama_index.readers.base import BaseReader
from opentelemetry import trace

from ...domain import DocumentListItem
from ...domain import DocumentListItem, SourcePageType

tracer = trace.get_tracer(__name__)


class BaseTextExtractor(ABC):
Expand All @@ -28,6 +31,7 @@ def __init__(
self._subtitle_css_selector = subtitle_css_selector

@abstractmethod
@tracer.start_as_current_span(name="extract_text")
def extract_text(
self: Self,
soup: Any,
Expand All @@ -37,10 +41,12 @@ def extract_text(
pass

@abstractmethod
@tracer.start_as_current_span(name="link_extract_selector")
def link_extract_selector() -> Any:
"""Criteria filter specific <a> tags to extract links from. To extract all links, return None."""
pass

@tracer.start_as_current_span(name="extract_title")
def extract_title(self: Self, soup: Any, css_selector: Optional[str] = None) -> str: # noqa: D102
"""Extract the title from a web page. Defaults to the <h1> tag.
Expand All @@ -59,6 +65,7 @@ def extract_title(self: Self, soup: Any, css_selector: Optional[str] = None) ->

return title_element.get_text() if title_element else "web page"

@tracer.start_as_current_span(name="extract_subtitle")
def extract_subtitle(self: Self, soup: Any, css_selector: Optional[str] = None) -> str:
"""Extract the subtitle from a web page. Defaults to the <h2> tag.
Expand All @@ -77,15 +84,27 @@ def extract_subtitle(self: Self, soup: Any, css_selector: Optional[str] = None)

return subtitle_element.get_text() if subtitle_element else ""

def extract_links(self: Self, soup: Any, website_url: str, extract_url: str, include_filter: Optional[str] = None) -> List[str]:
@tracer.start_as_current_span(name="extract_links")
def extract_links(
self: Self, soup: Any, website_url: str, extract_url: str, include_filter: Optional[str] = None
) -> List[str]:
"""Extract a unique list of links from a website."""
span = trace.get_current_span()
log.debug("Extract links from root URL: %s", extract_url)

span.set_attributes(
{
"extract_url": extract_url,
"website_url": website_url,
"include_filter": str(include_filter),
"link_extract_selector": self.link_extract_selector(),
}
)
links = (
soup.find_all("a", class_=self.link_extract_selector())
if self.link_extract_selector() is not None
else soup.find_all("a")
)
span.set_attribute("total_links_count", len(links).__str__())
log.debug("Total links on page: %s", len(links))
rtd_links = []

Expand All @@ -106,9 +125,11 @@ def extract_links(self: Self, soup: Any, website_url: str, extract_url: str, inc

rtd_links.append(href)

log.debug("include filter: %s, ismatch: %s", include_filter, ismatch)
log.debug("include filter: %s, ismatc h: %s", include_filter, ismatch)
span.add_event("link_match", {"link": href, "ismatch": ismatch})

log.debug("Total links for extraction: %s", len(rtd_links))
span.set_attribute("links_for_extraction_count", len(rtd_links).__str__())
return rtd_links


Expand Down Expand Up @@ -145,13 +166,16 @@ def extract_text(
page_url: str,
) -> str | None:
"""Extract text from Any website on a best efforts basis, naive implementation. Not recursive."""
span = trace.get_current_span()
try:
tags = soup.find_all("p")
span.set_attribute("p_tags_count", len(tags).__str__())
page_text = ""
for p in tags:
page_text += f"/n{p.get_text()}"
except IndexError:
page_text = None
span.set_attribute("p_tags_count", "0")
log.info("generic_reader: No text blocks (<p> tags) found on: %s", page_url)

return page_text
Expand Down Expand Up @@ -210,10 +234,12 @@ def __init__(
self.website_metadata = website_metadata
self._document_list: List[DocumentListItem] = []

@tracer.start_as_current_span(name="load_data")
def load_data(
self: Self,
urls: List[str],
include_filter: Optional[str] = None,
source_page_type: Optional[SourcePageType] = SourcePageType.index_page,
) -> List[Document]:
"""Load data from the urls.
Expand All @@ -225,65 +251,105 @@ def load_data(
List[Document]: List of documents.
"""
span = trace.get_current_span()
all_documents: List[Document] = []
urls = urls[0].split(",")
log.debug("Number of root URLs supplied: %s", len(urls))
log.debug("root URLs supplied: %s", urls)
page_links: List[str] = []

for url in urls:
log.debug("Now processing root URL: %s", url)
hostname = urlparse(url).hostname or "default"

try:
page = requests.get(url, timeout=5)
except Exception as e:
raise ValueError(f"One of the inputs is not a valid url: {url}", e) # noqa: B904
if not urls or len(urls) == 0:
raise ValueError("No URLs supplied.")

soup = BeautifulSoup(page.content, "html.parser")
hostname = urlparse(urls[0]).hostname or "default"
if hostname in self.website_extractors:
extractor: BaseTextExtractor = self.website_extractors[hostname]
else:
extractor: BaseTextExtractor = self.website_extractors["default"]

span.set_attribute("source_page_type", source_page_type.__str__())

# page_links = urls # default case expect page urls to extract content from directly
print("source page type : ", source_page_type)

if source_page_type == SourcePageType.index_page:
# the provided URLs are index pages, extract links from them first
log.debug("Number of index page URLs supplied: %s", len(urls))
for url in urls:
lnk = self._extract_links(url, extractor, include_filter)
page_links.extend(lnk)
span.add_event("extracted_links_from_index_page", {"url": url, "links_count": len(lnk)})
elif source_page_type == SourcePageType.page_list:
page_links = urls
print("page list - links : ", page_links)
else:
raise ValueError(f"Invalid source page type: {source_page_type}")

if hostname in self.website_extractors:
extractor: BaseTextExtractor = self.website_extractors[hostname]
else:
extractor: BaseTextExtractor = self.website_extractors["default"]
span.set_attribute("page_links_count", len(page_links).__str__())
print("page links : ", page_links)

page_links = extractor.extract_links(soup, url, url, include_filter=include_filter)
for page_link in page_links:
try:
page_response = requests.get(page_link, timeout=5)
span.add_event("url_requested", {"page_link": page_link, "response_bytes": len(page_response.content)})
soup = BeautifulSoup(
page_response.text, "html.parser"
) # TODO: not sure why the original code used response.text here and response.content above. dig in later.

page_text = extractor.extract_text(
soup=soup,
page_url=page_link,
)

page_title = extractor.extract_title(soup=soup)
page_subtitle = extractor.extract_subtitle(soup=soup)
indexed_on = datetime.timestamp(datetime.now().utcnow())
metadata = {
"source_website": urlparse(page_link).hostname,
"source_uri": page_link,
"indexed_on": indexed_on,
"page_title": page_title,
"page_subtitle": page_subtitle,
}

if self.website_metadata is not None:
metadata.update(self.website_metadata(page_link))

all_documents.append(Document(text=page_text, extra_info=metadata))

self._document_list.append(DocumentListItem.create_instance(page_link, page_text, int(indexed_on)))

for page_link in page_links:
try:
page_response = requests.get(page_link, timeout=5)
soup = BeautifulSoup(
page_response.text, "html.parser"
) # TODO: not sure why the original code used response.text here and response.content above. dig in later.
except Exception as e:
span.record_exception(e)
span.set_status(
trace.Status(trace.StatusCode.ERROR, f"Error requesting web page, skipped : {page_link}")
)
log.exception("Error requesting web page, skipped: %s, Error: %s", page_link, e)
continue

page_text = extractor.extract_text(
soup=soup,
page_url=page_link,
)
return all_documents

page_title = extractor.extract_title(soup=soup)
page_subtitle = extractor.extract_subtitle(soup=soup)
indexed_on = datetime.timestamp(datetime.now().utcnow())
metadata = {
"source_website": url,
"source_uri": page_link,
"indexed_on": indexed_on,
"page_title": page_title,
"page_subtitle": page_subtitle,
}
def get_document_list(self: Self) -> List[DocumentListItem]:
"""Return a list of documents. Can be used for tracking state overtime by implementing persistence and displaying document lists to users."""
return self._document_list

if self.website_metadata is not None:
metadata.update(self.website_metadata(url))
@staticmethod
@tracer.start_as_current_span(name="extract_links")
def _extract_links(url: str, extractor: BaseTextExtractor, include_filter: Optional[str] = None) -> List[str]:
span = trace.get_current_span()
log.debug("Now processing root URL: %s", url)
span.add_event("start_processing_url", {"url": url})

all_documents.append(Document(text=page_text, extra_info=metadata))
try:
page = requests.get(url, timeout=5)
span.add_event("url_requested", {"url": url, "response_bytes": len(page.content)})
except Exception as e:
span.record_exception(e)
span.set_status(trace.Status(trace.StatusCode.ERROR, f"One of the inputs is not a valid url: {url}"))
raise ValueError(f"One of the inputs is not a valid url: {url}", e) # noqa: B904

self._document_list.append(DocumentListItem.create_instance(page_link, page_text, int(indexed_on)))
soup = BeautifulSoup(page.content, "html.parser")

except Exception as e:
log.exception("Error requesting web page, skipped: %s, Error: %s", page_link, e)
continue
# print("page content: ", page.content)

return all_documents
# print("page text: ", page.text)

def get_document_list(self: Self) -> List[DocumentListItem]:
"""Return a list of documents. Can be used for tracking state overtime by implementing persistence and displaying document lists to users."""
return self._document_list
page_links = extractor.extract_links(soup, url, url, include_filter=include_filter)
return page_links
52 changes: 28 additions & 24 deletions source/docq/data_source/web_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

import logging as log
from datetime import datetime
from typing import List, Optional
from typing import List, Optional, Self

from llama_index import Document

from ..domain import ConfigKey, SpaceKey
from ..domain import ConfigKey, SourcePageType, SpaceKey
from ..support.store import get_index_dir
from .main import DocumentMetadata, SpaceDataSourceWebBased
from .support.web_extracting import BeautifulSoupWebReader, GenericTextExtractor
Expand All @@ -19,9 +19,23 @@ def __init__(self, alternative_name: Optional[str] = None) -> None:
"""Initialize the data source."""
super().__init__("Web Scraper" if alternative_name is None else alternative_name)

def get_config_keys(self) -> List[ConfigKey]:
def get_config_keys(self: Self) -> List[ConfigKey]:
"""Get the config keys for web scraper."""
return [
ConfigKey(
"source_page_type",
"Source Page Type",
False,
False,
None,
{
"type": "selectbox",
"select_box_options": {
SourcePageType.index_page.name: SourcePageType.index_page.value,
SourcePageType.page_list.name: SourcePageType.page_list.value,
},
},
),
ConfigKey("website_url", "Website URL"),
ConfigKey("extractor_name", "Extractor Template Name", True),
ConfigKey(
Expand All @@ -32,46 +46,36 @@ def get_config_keys(self) -> List[ConfigKey]:
),
]

def load(self, space: SpaceKey, configs: dict) -> List[Document]:
def load(self: Self, space: SpaceKey, configs: dict) -> List[Document]:
"""Extract text from web pages on a website and load each page as a Document."""
_documents = []
try:
log.debug("config: %s", configs)
log.debug("configs: %s", configs)
persist_path = get_index_dir(space)

bs_web_reader = self._initiate_web_reader(space, configs)

source_page_type_str = configs.get("source_page_type")
print("source_page_type: ", source_page_type_str)
source_page_type = (
SourcePageType[source_page_type_str[0]] if source_page_type_str else SourcePageType.index_page
)

_documents = bs_web_reader.load_data(
urls=[configs["website_url"]],
urls=configs["website_url"].split(","),
include_filter=configs["include_filter"],
source_page_type=source_page_type,
)

document_list = bs_web_reader.get_document_list()
















log.debug("created document list: %s", document_list)
self._save_document_list(document_list, persist_path, self._DOCUMENT_LIST_FILENAME)

log.info("web doc count %d", len(_documents))
except Exception as e:
log.error("Error loading web documents", e)



webscaper_metadata_keys = []
exclude_embed_metadata_keys_ = [
str(DocumentMetadata.SPACE_ID.name).lower(),
Expand All @@ -93,7 +97,7 @@ def load(self, space: SpaceKey, configs: dict) -> List[Document]:

return self._add_exclude_metadata_keys(_documents, exclude_embed_metadata_keys_, excluded_llm_metadata_keys_)

def _initiate_web_reader(self, space: SpaceKey, configs: dict) -> BeautifulSoupWebReader:
def _initiate_web_reader(self: Self, space: SpaceKey, configs: dict) -> BeautifulSoupWebReader:
"""Initialize the web reader."""

def lambda_metadata(x: str) -> dict:
Expand Down
8 changes: 8 additions & 0 deletions source/docq/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ def value(self: Self) -> str:
return _join_properties(_SEPARATOR_FOR_VALUE, self.type_.name, self.org_id, self.id_)



class SourcePageType(Enum):
"""Source page type."""

index_page = "Index Page"
page_list = "Page List"


@dataclass
class ConfigKey:
"""Config key."""
Expand Down
1 change: 1 addition & 0 deletions web/utils/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ def _prepare_space_data_source(prefix: str) -> Tuple[str, dict]:
ds_type = st.session_state[f"{prefix}ds_type"][0]
ds_config_keys = SpaceDataSources.__members__[ds_type].value.get_config_keys()
ds_configs = {key.key: st.session_state[f"{prefix}ds_config_{key.key}"] for key in ds_config_keys}
print("save space ds_config ", ds_configs)
return ds_type, ds_configs


Expand Down
Loading

0 comments on commit 42e8f86

Please sign in to comment.