From 8970330ee45e8678ff8768022910f6167de0e15f Mon Sep 17 00:00:00 2001 From: Moritz Jung <18733473+moritzj29@users.noreply.github.com> Date: Sat, 5 Mar 2022 14:26:39 +0100 Subject: [PATCH 1/2] [amazon] add support for amazon.de, extendable for further regions --- finance_dl/amazon.py | 377 ++++++++++++++++++++++++++++++++++++------- setup.py | 1 + 2 files changed, 316 insertions(+), 62 deletions(-) diff --git a/finance_dl/amazon.py b/finance_dl/amazon.py index 54909d0..07a676b 100644 --- a/finance_dl/amazon.py +++ b/finance_dl/amazon.py @@ -22,13 +22,16 @@ scheme. - `amazon_domain`: Optional. Specifies the Amazon domain from which to download - orders. Must be one of `'.com'` or `'.co.cuk'`. Defaults to `'.com'`. + orders. Must be one of `'.com'`, `'.co.cuk'` or `'.de'`. Defaults to + `'.com'`. - `regular`: Optional. Must be a `bool`. If `True` (the default), download regular orders. + For domains other than `amazon_domain=".com"`, `True` downloads regular AND digital orders. - `digital`: Optional. Must be a `bool` or `None`. If `True`, download digital - orders. Defaults to `None`, which is equivalent to `True` for - `amazon_domain=".com"`, and `False` for `amazon_domain=".co.uk"`. + orders. Effective only for `amazon_domain=".com"`. Defaults to `True` for + `amazon_domain=".com"`. For other domains, digital invoices are downloaded + tgehter with regular invoices since there is no separate menu on the amazon website. - `profile_dir`: Optional. If specified, must be a `str` that specifies the path to a persistent Chrome browser profile to use. This should be a path @@ -71,40 +74,186 @@ def CONFIG_amazon(): From the interactive shell, type: `self.run()` to start the scraper. """ - +import dataclasses import urllib.parse import re import logging import os +import time +import datetime +import dateutil.parser +import bs4 from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import Select from selenium.webdriver.common.keys import Keys from atomicwrites import atomic_write from . import scrape_lib +from typing import List, Optional logger = logging.getLogger('amazon_scrape') -class Domain: - COM = 'com' - CO_UK = 'co.uk' +@dataclasses.dataclass +class Domain(): + top_level: str + + sign_in: str + sign_out: str + + # Find invoices. + your_orders: str + invoice: str + invoice_link: List[str] + order_summary: str + order_summary_hidden: bool + next: str + + # Confirm invoice page + grand_total: str + grand_total_digital: str + order_cancelled: str + + digital_order: str + regular_order_placed: str + + # .COM: digital orders have own order list + # other domains: digital orders are in the regular order list + digital_orders_menu: bool + digital_orders_menu_text: Optional[str] = None + + fresh_fallback: Optional[str] = None + + +class DOT_COM(Domain): + def __init__(self) -> None: + super().__init__( + top_level='com', + sign_in='Sign In', + sign_out='Sign Out', + + your_orders='Your Orders', + invoice='Invoice', + invoice_link=["View order", "View invoice"], + # View invoice -> regular/digital order, View order -> Amazon Fresh + fresh_fallback="View order", + order_summary='Order Summary', + order_summary_hidden=False, + next='Next', + + grand_total='Grand Total:', + grand_total_digital='Grand Total:', + order_cancelled='Order Canceled', + + digital_order='Digital Order: (.*)', + regular_order_placed=r'(?:Subscribe and Save )?Order Placed:\s+([^\s]+ \d+, \d{4})', + + digital_orders_menu=True, + digital_orders_menu_text='Digital Orders', + ) + + @staticmethod + def parse_date(date_str) -> datetime.date: + return dateutil.parser.parse(date_str).date() + +class DOT_CO_UK(Domain): + def __init__(self) -> None: + super().__init__( + top_level='co.uk', + sign_in='Sign in', + sign_out='Sign out', + + your_orders='Your Orders', + invoice='Invoice', + invoice_link=["View order", "View invoice"], + # View invoice -> regular/digital order, View order -> Amazon Fresh + fresh_fallback="View order", + order_summary='Order Summary', + order_summary_hidden=False, + next='Next', + + grand_total='Grand Total:', + grand_total_digital='Grand Total:', + order_cancelled='Order Canceled', + + digital_order='Digital Order: (.*)', + regular_order_placed=r'(?:Subscribe and Save )?Order Placed:\s+([^\s]+ \d+, \d{4})', + + digital_orders_menu=False, + ) + + @staticmethod + def parse_date(date_str) -> datetime.date: + return dateutil.parser.parse(date_str).date() + +class DOT_DE(Domain): + def __init__(self) -> None: + super().__init__( + top_level='de', + sign_in='Hallo, Anmelden', + sign_out='Abmelden', + + your_orders='Meine Bestellungen', + invoice='Rechnung', + invoice_link=["Bestelldetails anzeigen"], + fresh_fallback=None, + order_summary='Bestellübersicht', + order_summary_hidden=True, + next='Weiter', + + grand_total='Gesamtsumme:', + grand_total_digital='Endsumme:', + order_cancelled='Order Canceled', + + digital_order='Digitale Bestellung: (.*)', + regular_order_placed=r'(?:Getätigte Spar-Abo-Bestellung|Bestellung aufgegeben am):\s+(\d+\. [^\s]+ \d{4})', + + digital_orders_menu=False, + ) + + class _parserinfo(dateutil.parser.parserinfo): + MONTHS=[ + ('Jan', 'Januar'), ('Feb', 'Februar'), ('Mär', 'März'), + ('Apr', 'April'), ('Mai', 'Mai'), ('Jun', 'Juni'), + ('Jul', 'Juli'), ('Aug', 'August'), ('Sep', 'September'), + ('Okt', 'Oktober'), ('Nov', 'November'), ('Dez', 'Dezember') + ] + + @staticmethod + def parse_date(date_str) -> datetime.date: + return dateutil.parser.parse(date_str, parserinfo=DOT_DE._parserinfo(dayfirst=True)).date() + +DOMAINS = { + ".com": DOT_COM, + ".co.uk": DOT_CO_UK, + ".de": DOT_DE + } class Scraper(scrape_lib.Scraper): - def __init__(self, credentials, output_directory, dir_per_year=False, amazon_domain=Domain.COM, regular=True, digital=None, order_groups=None, **kwargs): + def __init__(self, + credentials, + output_directory, + dir_per_year=False, + amazon_domain: str = ".com", + regular: bool = True, + digital: Optional[bool] = None, + order_groups: Optional[List[str]] = None, + **kwargs): super().__init__(**kwargs) - default_digital = True if amazon_domain == Domain.COM else False + if amazon_domain not in DOMAINS: + raise ValueError(f"Domain '{amazon_domain} not supported. Supported " + f"domains: {list(DOMAINS)}") + self.domain = DOMAINS[amazon_domain]() self.credentials = credentials self.output_directory = output_directory self.dir_per_year = dir_per_year self.logged_in = False - self.amazon_domain = amazon_domain self.regular = regular - self.digital = digital if digital is not None else default_digital + self.digital_orders_menu = digital if digital is not None else self.domain.digital_orders_menu self.order_groups = order_groups def check_url(self, url): - netloc_re = r'^([^\.@]+\.)*amazon.' + self.amazon_domain + '$' + netloc_re = r'^([^\.@]+\.)*amazon.' + self.domain.top_level + '$' result = urllib.parse.urlparse(url) if result.scheme != 'https' or not re.fullmatch(netloc_re, result.netloc): raise RuntimeError('Reached invalid URL: %r' % url) @@ -114,11 +263,11 @@ def check_after_wait(self): def login(self): logger.info('Initiating log in') - self.driver.get('https://www.amazon.' + self.amazon_domain) + self.driver.get('https://www.amazon.' + self.domain.top_level) if self.logged_in: return - sign_out_links = self.find_elements_by_descendant_partial_text('Sign Out', 'a') + sign_out_links = self.find_elements_by_descendant_partial_text(self.domain.sign_out, 'a') if len(sign_out_links) > 0: logger.info('You must be already logged in!') self.logged_in = True @@ -126,7 +275,7 @@ def login(self): logger.info('Looking for sign-in link') sign_in_links, = self.wait_and_return( - lambda: self.find_visible_elements_by_descendant_partial_text('Sign in', 'a') + lambda: self.find_visible_elements_by_descendant_partial_text(self.domain.sign_in, 'a') ) self.click(sign_in_links[0]) @@ -163,7 +312,14 @@ def get_invoice_path(self, year, order_id): return os.path.join(self.output_directory, year, order_id + '.html') return os.path.join(self.output_directory, order_id + '.html') - def get_orders(self, regular=True, digital=True): + def get_order_id(self, href) -> str: + m = re.match('.*[&?]orderID=((?:D)?[0-9\\-]+)(?:&.*)?$', href) + if m is None: + raise RuntimeError( + 'Failed to parse order ID from href %r' % (href, )) + return m[1] + + def get_orders(self, regular=True, digital_orders_menu=True): invoice_hrefs = [] order_ids_seen = set() order_ids_downloaded = frozenset([ @@ -176,60 +332,101 @@ def get_orders(self, regular=True, digital=True): def get_invoice_urls(): initial_iteration = True while True: + # break when there is no "next page" + + # Problem: different site structures depending on country + + # .com / .uk + # Order Summary buttons are directly visible and can be + # identified with href containing "orderID=" + # but order summary may have different names, e.g. for Amazon Fresh orders + + # .de + # only link with href containing "orderID=" is "Bestelldetails anzeigen" (=Order Details) + # which is not helpful + # order summary is hidden behind submenu which requires a click to be visible def invoice_finder(): - return self.driver.find_elements(By.XPATH, '//a[contains(@href, "orderID=")]') - + if not self.domain.order_summary_hidden: + # order summary link is visible on page + return self.driver.find_elements( + By.XPATH, '//a[contains(@href, "orderID=")]') + else: + # order summary link is hidden in submenu for each order + elements = self.driver.find_elements_by_xpath( + '//a[@class="a-popover-trigger a-declarative"]') + return [a for a in elements if a.text == self.domain.invoice] + if initial_iteration: invoices = invoice_finder() else: invoices, = self.wait_and_return(invoice_finder) initial_iteration = False - order_ids = set() - for invoice_link in invoices: - # Amazon Fresh, and regular orders respectively - if invoice_link.text not in ("View order", "View invoice"): - # View invoice -> regular/digital order, View order -> Amazon Fresh - continue - + last_order_id = None + + def invoice_link_finder(invoice_link): + if invoice_link.text not in self.domain.invoice_link: + # skip invoice if label is not known + # different labels are possible e.g. for regular orders vs. Amazon fresh + if invoice_link.text != "": + # log non-empty link texts -> may be new type + logger.info( + 'Skipping invoice due to unknown invoice_link.text: %s', + invoice_link.text) + return (False, False) href = invoice_link.get_attribute('href') - m = re.match('.*[&?]orderID=((?:D)?[0-9\\-]+)(?:&.*)?$', href) - if m is None: - raise RuntimeError( - 'Failed to parse order ID from href %r' % (href, )) - order_id = m[1] - if order_id in order_ids: - continue - order_ids.add(order_id) - if order_id in order_ids_seen: - logger.info('Skipping already-seen order id: %r', - order_id) - continue - if order_id in order_ids_downloaded: - logger.info('Skipping already-downloaded invoice: %r', - order_id) - continue - if invoice_link.text == "View order": + order_id = self.get_order_id(href) + if self.domain.fresh_fallback is not None and invoice_link.text == self.domain.fresh_fallback: # Amazon Fresh order, construct link to invoice logger.info(" Found likely Amazon Fresh order. Falling back to direct invoice URL.") tokens = href.split("/") tokens = tokens[:4] tokens[-1] = f"gp/css/summary/print.html?orderID={order_id}" href = "/".join(tokens) + return (order_id, href) + + def invoice_link_finder_hidden(): + # submenu containing order summary takes some time to load after click + # search for order summary link and compare order_id + # repeat until order_id is different to last order_id + summary_links = self.driver.find_elements_by_link_text( + self.domain.order_summary) + if summary_links: + href = summary_links[0].get_attribute('href') + order_id = self.get_order_id(href) + if order_id != last_order_id: + return (order_id, href) + return False - invoice_hrefs.append((href, order_id)) - order_ids_seen.add(order_id) + for invoice_link in invoices: + if not self.domain.order_summary_hidden: + (order_id, href) = invoice_link_finder(invoice_link) + else: + invoice_link.click() + (order_id, href), = self.wait_and_return(invoice_link_finder_hidden) + if order_id: + if order_id in order_ids_seen: + logger.info('Skipping already-seen order id: %r', order_id) + continue + if order_id in order_ids_downloaded: + logger.info('Skipping already-downloaded invoice: %r', order_id) + continue + logger.info('Found order \'{}\''.format(order_id)) + invoice_hrefs.append((href, order_id)) + order_ids_seen.add(order_id) + last_order_id = order_id # Find next link next_links = self.find_elements_by_descendant_text_match( - '. = "Next"', 'a', only_displayed=True) + f'. = "{self.domain.next}"', 'a', only_displayed=True) if len(next_links) == 0: logger.info('Found no more pages') break if len(next_links) != 1: raise RuntimeError('More than one next link found') with self.wait_for_page_load(): + logging.info("Next page.") self.click(next_links[0]) def retrieve_all_order_groups(): @@ -258,20 +455,21 @@ def retrieve_all_order_groups(): order_select.select_by_index(order_select_index - 1) get_invoice_urls() - orders_text = "Your Orders" if self.amazon_domain == Domain.CO_UK else "Orders" - # on co.uk, orders link is hidden behind the menu, hence not directly clickable - (orders_link,), = self.wait_and_return( - lambda: self.find_elements_by_descendant_text_match('. = "{}"'.format(orders_text), 'a', only_displayed=False) - ) - link = orders_link.get_attribute('href') - scrape_lib.retry(lambda: self.driver.get(link), retry_delay=2) - if regular: + # on co.uk, orders link is hidden behind the menu, hence not directly clickable + (orders_link,), = self.wait_and_return( + lambda: self.find_elements_by_descendant_text_match(f'. = "{self.domain.your_orders}"', 'a', only_displayed=False) + ) + link = orders_link.get_attribute('href') + scrape_lib.retry(lambda: self.driver.get(link), retry_delay=2) + retrieve_all_order_groups() - if digital: + if digital_orders_menu: + # orders in separate Digital Orders list (relevant for .COM) + # other domains list digital orders within the regular order list (digital_orders_link,), = self.wait_and_return( - lambda: self.find_elements_by_descendant_text_match('contains(., "Digital Orders")', 'a', only_displayed=True) + lambda: self.find_elements_by_descendant_text_match(f'contains(., "{self.domain.digital_orders_text}")', 'a', only_displayed=True) ) scrape_lib.retry(lambda: self.click(digital_orders_link), retry_delay=2) @@ -281,6 +479,11 @@ def retrieve_all_order_groups(): def retrieve_invoices(self, invoice_hrefs): for href, order_id in invoice_hrefs: + # Todo: check if invoice already present + # if os.path.exists(invoice_path): + # logger.info('Skipping already-downloaded invoice: %r', + # order_id) + # continue logger.info('Downloading invoice for order %r', order_id) with self.wait_for_page_load(): self.driver.get(href) @@ -289,7 +492,11 @@ def retrieve_invoices(self, invoice_hrefs): # Wait until it is all generated. def get_source(): source = self.driver.page_source - if 'Grand Total:' in source or 'Order Canceled' in source: + if ( + self.domain.grand_total in source or + self.domain.grand_total_digital in source or + self.domain.order_cancelled in source + ): return source elif 'problem loading this order' in source: raise ValueError(f'Failed to retrieve information for order {order_id}') @@ -301,12 +508,55 @@ def get_source(): page_source, = self.wait_and_return(get_source) if order_id not in page_source: raise ValueError(f'Failed to retrieve information for order {order_id}') - m = re.search('(?:Digital Order: |Order Placed: *\n *\n) *[A-Za-z]* \d+, (\d{4})', - page_source) - order_date = m[1] if m else None - if order_date is None and self.dir_per_year: + + # extract order date + def get_date(source, order_id): + # code blocks taken from beancount-import/amazon-invoice.py + soup=bs4.BeautifulSoup(source, 'lxml') + + def is_order_placed_node(node): + # order placed information in page header (top left) + m = re.fullmatch(self.domain.regular_order_placed, node.text.strip()) + return m is not None + + def is_digital_order_row(node): + # information in heading of order table + if node.name != 'tr': + return False + m = re.match(self.domain.digital_order, node.text.strip()) + if m is None: + return False + try: + self.domain.parse_date(m.group(1)) + return True + except: + return False + + if order_id.startswith('D'): + # digital order + node = soup.find(is_digital_order_row) + regex = self.domain.digital_order + else: + # regular order + node = soup.find(is_order_placed_node) + regex = self.domain.regular_order_placed + + m = re.fullmatch(regex, node.text.strip()) + if m is None: + return None + order_date = self.domain.parse_date(m.group(1)) + return order_date + + order_date = get_date(page_source, order_id) + if order_date is None: + if self.dir_per_year: raise ValueError(f'Failed to get date for order {order_id}') - invoice_path = self.get_invoice_path(m[1], order_id) + else: + # date is not necessary, so just log + logger.info(f'Failed to get date for order {order_id}') + else: + order_date = order_date.year + invoice_path = self.get_invoice_path(order_date, order_id) if not os.path.exists(os.path.dirname(invoice_path)): os.makedirs(os.path.dirname(invoice_path)) with atomic_write( @@ -320,7 +570,10 @@ def run(self): self.login() if not os.path.exists(self.output_directory): os.makedirs(self.output_directory) - self.get_orders(regular=self.regular, digital=self.digital) + self.get_orders( + regular=self.regular, + digital_orders_menu=self.digital_orders_menu + ) def run(**kwargs): diff --git a/setup.py b/setup.py index 7b15a72..5995ee7 100644 --- a/setup.py +++ b/setup.py @@ -99,6 +99,7 @@ def run(self): 'beancount>=2.1.2', 'atomicwrites>=1.3.0', 'jsonschema', + 'python-dateutil', ], tests_require=[ 'pytest', From 64a43f756178a89f5accbf605652513cc7862021 Mon Sep 17 00:00:00 2001 From: Moritz Jung <18733473+moritzj29@users.noreply.github.com> Date: Sat, 5 Mar 2022 14:39:03 +0100 Subject: [PATCH 2/2] remove comments and unused import --- finance_dl/amazon.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/finance_dl/amazon.py b/finance_dl/amazon.py index 07a676b..9b8f04d 100644 --- a/finance_dl/amazon.py +++ b/finance_dl/amazon.py @@ -79,7 +79,6 @@ def CONFIG_amazon(): import re import logging import os -import time import datetime import dateutil.parser import bs4 @@ -479,11 +478,6 @@ def retrieve_all_order_groups(): def retrieve_invoices(self, invoice_hrefs): for href, order_id in invoice_hrefs: - # Todo: check if invoice already present - # if os.path.exists(invoice_path): - # logger.info('Skipping already-downloaded invoice: %r', - # order_id) - # continue logger.info('Downloading invoice for order %r', order_id) with self.wait_for_page_load(): self.driver.get(href)