diff --git a/.github/workflows/release-maubot-plugin.yml b/.github/workflows/release-maubot-plugin.yml new file mode 100644 index 0000000..7a3d88b --- /dev/null +++ b/.github/workflows/release-maubot-plugin.yml @@ -0,0 +1,45 @@ +name: Release Maubot Plugin + +on: + push: + branches: + - main + +jobs: + build: + runs-on: ubuntu-latest + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + + steps: + - name: Check out code + uses: actions/checkout@v3 + with: + fetch-depth: 0 + + - name: Extract version from maubot.yaml + run: | + VERSION=$(cat maubot.yaml | grep 'version' | cut -d ':' -f 2 | xargs) + echo "VERSION=$VERSION" >> $GITHUB_ENV + + - name: Check if release already exists + run: | + RELEASE_EXIST=$(gh release view $VERSION > /dev/null 2>&1 && echo "true" || echo "false") + echo "RELEASE_EXIST=$RELEASE_EXIST" >> $GITHUB_ENV + + - name: Generate Changelog + if: env.RELEASE_EXIST == 'false' + run: | + echo "Changelog:" > CHANGELOG.md + git log $(git describe --tags --abbrev=0)..HEAD --pretty=format:"- %h: %s" -- base-config.yaml maubot.yaml socialmediadownload.py instaloader >> CHANGELOG.md + + - name: Package Release + if: env.RELEASE_EXIST == 'false' + run: | + zip -r package.zip base-config.yaml maubot.yaml socialmediadownload.py instaloader + mv package.zip package.mbp + + - name: Create and Upload GitHub Release + if: env.RELEASE_EXIST == 'false' + run: | + gh release create ${{ env.VERSION }} package.mbp -t ${{ env.VERSION }} -F CHANGELOG.md \ No newline at end of file diff --git a/.gitignore b/.gitignore index b6e4761..7c0c7d1 100644 --- a/.gitignore +++ b/.gitignore @@ -127,3 +127,7 @@ dmypy.json # Pyre type checker .pyre/ + +# Maubot +*.zip +*.mbp \ No newline at end of file diff --git a/README.md b/README.md index 4de3b3e..1001ef9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # SocialMediaDownloadMaubot -[Maubot](https://github.com/maubot/maubot) plugin that downloads content from various social media websites given a link. +[Maubot](https://github.com/maubot/maubot) plugin that downloads content from various social media websites given a link and posts it to the Matrix room. Currently supported: - Reddit diff --git a/instaloader/__init__.py b/instaloader/__init__.py index d3fa241..cad0beb 100644 --- a/instaloader/__init__.py +++ b/instaloader/__init__.py @@ -1,7 +1,7 @@ """Download pictures (or videos) along with their captions and other metadata from Instagram.""" -__version__ = '4.9.6' +__version__ = '4.10' try: diff --git a/instaloader/instaloader.py b/instaloader/instaloader.py index c699442..c5def2f 100644 --- a/instaloader/instaloader.py +++ b/instaloader/instaloader.py @@ -524,7 +524,7 @@ def download_title_pic(self, url: str, target: Union[str, Path], name_suffix: st .. versionadded:: 4.3""" http_response = self.context.get_raw(url) - date_object = None # type: Optional[datetime] + date_object: Optional[datetime] = None if 'Last-Modified' in http_response.headers: date_object = datetime.strptime(http_response.headers["Last-Modified"], '%a, %d %b %Y %H:%M:%S GMT') date_object = date_object.replace(tzinfo=timezone.utc) @@ -585,6 +585,23 @@ def download_hashtag_profilepic(self, hashtag: Hashtag) -> None: .. versionadded:: 4.4""" self.download_title_pic(hashtag.profile_pic_url, '#' + hashtag.name, 'profile_pic', None) + @_requires_login + def save_session(self) -> dict: + """Saves internally stored :class:`requests.Session` object to :class:`dict`. + + :raises LoginRequiredException: If called without being logged in. + + .. versionadded:: 4.10 + """ + return self.context.save_session() + + def load_session(self, username: str, session_data: dict) -> None: + """Internally stores :class:`requests.Session` object from :class:`dict`. + + .. versionadded:: 4.10 + """ + self.context.load_session(username, session_data) + @_requires_login def save_session_to_file(self, filename: Optional[str] = None) -> None: """Saves internally stored :class:`requests.Session` object. @@ -712,7 +729,7 @@ def _all_already_downloaded(path_base, is_videos_enumerated) -> bool: post.get_sidecar_nodes(self.slide_start, self.slide_end), start=self.slide_start % post.mediacount + 1 ): - suffix = str(edge_number) # type: Optional[str] + suffix: Optional[str] = str(edge_number) if '{filename}' in self.filename_pattern: suffix = None if self.download_pictures and (not sidecar_node.is_video or self.download_video_thumbnails): @@ -945,11 +962,11 @@ def download_highlights(self, """ for user_highlight in self.get_highlights(user): name = user_highlight.owner_username - highlight_target = (filename_target + highlight_target: Union[str, Path] = (filename_target if filename_target else (Path(_PostPathFormatter.sanitize_path(name, self.sanitize_paths)) / _PostPathFormatter.sanitize_path(user_highlight.title, - self.sanitize_paths))) # type: Union[str, Path] + self.sanitize_paths))) self.context.log("Retrieving highlights \"{}\" from profile {}".format(user_highlight.title, name)) self.download_highlight_cover(user_highlight, highlight_target) totalcount = user_highlight.itemcount diff --git a/instaloader/instaloadercontext.py b/instaloader/instaloadercontext.py index b19f1ba..7659af3 100644 --- a/instaloader/instaloadercontext.py +++ b/instaloader/instaloadercontext.py @@ -9,6 +9,7 @@ import textwrap import time import urllib.parse +import uuid from contextlib import contextmanager from datetime import datetime, timedelta from functools import partial @@ -36,6 +37,34 @@ def default_user_agent() -> str: '(KHTML, like Gecko) Chrome/92.0.4515.131 Safari/537.36' +def default_iphone_headers() -> Dict[str, Any]: + return {'User-Agent': 'Instagram 273.0.0.16.70 (iPad13,8; iOS 16_3; en_US; en-US; ' \ + 'scale=2.00; 2048x2732; 452417278) AppleWebKit/420+', + 'x-ads-opt-out': '1', + 'x-bloks-is-panorama-enabled': 'true', + 'x-bloks-version-id': '01507c21540f73e2216b6f62a11a5b5e51aa85491b72475c080da35b1228ddd6', + 'x-fb-client-ip': 'True', + 'x-fb-connection-type': 'wifi', + 'x-fb-http-engine': 'Liger', + 'x-fb-server-cluster': 'True', + 'x-fb': '1', + 'x-ig-abr-connection-speed-kbps': '2', + 'x-ig-app-id': '124024574287414', + 'x-ig-app-locale': 'en-US', + 'x-ig-app-startup-country': 'US', + 'x-ig-bandwidth-speed-kbps': '0.000', + 'x-ig-capabilities': '36r/F/8=', + 'x-ig-connection-speed': '{}kbps'.format(random.randint(1000, 20000)), + 'x-ig-connection-type': 'WiFi', + 'x-ig-device-locale': 'en-US', + 'x-ig-mapped-locale': 'en-US', + 'x-ig-timezone-offset': str((datetime.now().astimezone().utcoffset() or timedelta(seconds=0)).seconds), + 'x-ig-www-claim': '0', + 'x-pigeon-session-id': str(uuid.uuid4()), + 'x-tigon-is-retry': 'False', + 'x-whatsapp': '0'} + + class InstaloaderContext: """Class providing methods for (error) logging and low-level communication with Instagram. @@ -61,6 +90,7 @@ def __init__(self, sleep: bool = True, quiet: bool = False, user_agent: Optional self.request_timeout = request_timeout self._session = self.get_anonymous_session() self.username = None + self.user_id = None self.sleep = sleep self.quiet = quiet self.max_connection_attempts = max_connection_attempts @@ -68,9 +98,10 @@ def __init__(self, sleep: bool = True, quiet: bool = False, user_agent: Optional self._root_rhx_gis = None self.two_factor_auth_pending = None self.iphone_support = iphone_support + self.iphone_headers = default_iphone_headers() # error log, filled with error() and printed at the end of Instaloader.main() - self.error_log = [] # type: List[str] + self.error_log: List[str] = [] self._rate_controller = rate_controller(self) if rate_controller is not None else RateController(self) @@ -81,20 +112,26 @@ def __init__(self, sleep: bool = True, quiet: bool = False, user_agent: Optional self.fatal_status_codes = fatal_status_codes or [] # Cache profile from id (mapping from id to Profile) - self.profile_id_cache = dict() # type: Dict[int, Any] + self.profile_id_cache: Dict[int, Any] = dict() @contextmanager def anonymous_copy(self): session = self._session username = self.username + user_id = self.user_id + iphone_headers = self.iphone_headers self._session = self.get_anonymous_session() self.username = None + self.user_id = None + self.iphone_headers = default_iphone_headers() try: yield self finally: self._session.close() self.username = username self._session = session + self.user_id = user_id + self.iphone_headers = iphone_headers @property def is_logged_in(self) -> bool: @@ -170,22 +207,30 @@ def get_anonymous_session(self) -> requests.Session: session.request = partial(session.request, timeout=self.request_timeout) # type: ignore return session - def save_session_to_file(self, sessionfile): - """Not meant to be used directly, use :meth:`Instaloader.save_session_to_file`.""" - pickle.dump(requests.utils.dict_from_cookiejar(self._session.cookies), sessionfile) + def save_session(self): + """Not meant to be used directly, use :meth:`Instaloader.save_session`.""" + return requests.utils.dict_from_cookiejar(self._session.cookies) - def load_session_from_file(self, username, sessionfile): - """Not meant to be used directly, use :meth:`Instaloader.load_session_from_file`.""" + def load_session(self, username, sessiondata): + """Not meant to be used directly, use :meth:`Instaloader.load_session`.""" session = requests.Session() - session.cookies = requests.utils.cookiejar_from_dict(pickle.load(sessionfile)) + session.cookies = requests.utils.cookiejar_from_dict(sessiondata) session.headers.update(self._default_http_header()) session.headers.update({'X-CSRFToken': session.cookies.get_dict()['csrftoken']}) # Override default timeout behavior. # Need to silence mypy bug for this. See: https://github.com/python/mypy/issues/2427 - session.request = partial(session.request, timeout=self.request_timeout) # type: ignore + session.request = partial(session.request, timeout=self.request_timeout) # type: ignore self._session = session self.username = username + def save_session_to_file(self, sessionfile): + """Not meant to be used directly, use :meth:`Instaloader.save_session_to_file`.""" + pickle.dump(self.save_session(), sessionfile) + + def load_session_from_file(self, username, sessionfile): + """Not meant to be used directly, use :meth:`Instaloader.load_session_from_file`.""" + self.load_session(username, pickle.load(sessionfile)) + def test_login(self) -> Optional[str]: """Not meant to be used directly, use :meth:`Instaloader.test_login`.""" data = self.graphql_query("d6f4427fbe92d846298cf93df0b937d3", {}) @@ -265,6 +310,7 @@ def login(self, user, passwd): session.headers.update({'X-CSRFToken': login.cookies['csrftoken']}) self._session = session self.username = user + self.user_id = resp_json['userId'] def two_factor_login(self, two_factor_code): """Second step of login if 2FA is enabled. @@ -298,7 +344,8 @@ def do_sleep(self): time.sleep(min(random.expovariate(0.6), 15.0)) def get_json(self, path: str, params: Dict[str, Any], host: str = 'www.instagram.com', - session: Optional[requests.Session] = None, _attempt=1) -> Dict[str, Any]: + session: Optional[requests.Session] = None, _attempt=1, + response_headers: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """JSON request to Instagram. :param path: URL, relative to the given domain which defaults to www.instagram.com/ @@ -325,8 +372,11 @@ def get_json(self, path: str, params: Dict[str, Any], host: str = 'www.instagram resp = sess.get('https://{0}/{1}'.format(host, path), params=params, allow_redirects=False) if resp.status_code in self.fatal_status_codes: redirect = " redirect to {}".format(resp.headers['location']) if 'location' in resp.headers else "" - raise AbortDownloadException("Query to https://{}/{} responded with \"{} {}\"{}".format( - host, path, resp.status_code, resp.reason, redirect + body = "" + if resp.headers['Content-Type'].startswith('application/json'): + body = ': ' + resp.text[:500] + ('…' if len(resp.text) > 501 else '') + raise AbortDownloadException("Query to https://{}/{} responded with \"{} {}\"{}{}".format( + host, path, resp.status_code, resp.reason, redirect, body )) while resp.is_redirect: redirect_url = resp.headers['location'] @@ -335,13 +385,16 @@ def get_json(self, path: str, params: Dict[str, Any], host: str = 'www.instagram redirect_url.startswith('https://i.instagram.com/accounts/login')): if not self.is_logged_in: raise LoginRequiredException("Redirected to login page. Use --login.") - # alternate rate limit exceeded behavior - raise TooManyRequestsException("Redirected to login") + raise AbortDownloadException("Redirected to login page. You've been logged out, please wait " + + "some time, recreate the session and try again") if redirect_url.startswith('https://{}/'.format(host)): resp = sess.get(redirect_url if redirect_url.endswith('/') else redirect_url + '/', params=params, allow_redirects=False) else: break + if response_headers is not None: + response_headers.clear() + response_headers.update(resp.headers) if resp.status_code == 400: raise QueryReturnedBadRequestException("400 Bad Request") if resp.status_code == 404: @@ -392,7 +445,8 @@ def get_json(self, path: str, params: Dict[str, Any], host: str = 'www.instagram self._rate_controller.handle_429('iphone') if is_other_query: self._rate_controller.handle_429('other') - return self.get_json(path=path, params=params, host=host, session=sess, _attempt=_attempt + 1) + return self.get_json(path=path, params=params, host=host, session=sess, _attempt=_attempt + 1, + response_headers=response_headers) except KeyboardInterrupt: self.error("[skipped by user]", repeat_at_end=False) raise ConnectionException(error_string) from err @@ -482,11 +536,57 @@ def get_iphone_json(self, path: str, params: Dict[str, Any]) -> Dict[str, Any]: .. versionadded:: 4.2.1""" with copy_session(self._session, self.request_timeout) as tempsession: - tempsession.headers['User-Agent'] = 'Instagram 146.0.0.27.125 (iPhone12,1; iOS 13_3; en_US; en-US; ' \ - 'scale=2.00; 1656x3584; 190542906)' - for header in ['Host', 'Origin', 'X-Instagram-AJAX', 'X-Requested-With']: + # Set headers to simulate an API request from iPad + tempsession.headers['ig-intended-user-id'] = str(self.user_id) + tempsession.headers['x-pigeon-rawclienttime'] = '{:.6f}'.format(time.time()) + + # Add headers obtained from previous iPad request + tempsession.headers.update(self.iphone_headers) + + # Extract key information from cookies if we haven't got it already from a previous request + header_cookies_mapping = {'x-mid': 'mid', + 'ig-u-ds-user-id': 'ds_user_id', + 'x-ig-device-id': 'ig_did', + 'x-ig-family-device-id': 'ig_did', + 'family_device_id': 'ig_did'} + + # Map the cookie value to the matching HTTP request header + cookies = tempsession.cookies.get_dict().copy() + for key, value in header_cookies_mapping.items(): + if value in cookies: + if key not in tempsession.headers: + tempsession.headers[key] = cookies[value] + else: + # Remove the cookie value if it's already specified as a header + tempsession.cookies.pop(value, None) + + # Edge case for ig-u-rur header due to special string encoding in cookie + if 'rur' in cookies: + if 'ig-u-rur' not in tempsession.headers: + tempsession.headers['ig-u-rur'] = cookies['rur'].strip('\"').encode('utf-8') \ + .decode('unicode_escape') + else: + tempsession.cookies.pop('rur', None) + + # Remove headers specific to Desktop version + for header in ['Host', 'Origin', 'X-Instagram-AJAX', 'X-Requested-With', 'Referer']: tempsession.headers.pop(header, None) - return self.get_json(path, params, 'i.instagram.com', tempsession) + + # No need for cookies if we have a bearer token + if 'authorization' in tempsession.headers: + tempsession.cookies.clear() + + response_headers = dict() # type: Dict[str, Any] + response = self.get_json(path, params, 'i.instagram.com', tempsession, response_headers=response_headers) + + # Extract the ig-set-* headers and use them in the next request + for key, value in response_headers.items(): + if key.startswith('ig-set-'): + self.iphone_headers[key.replace('ig-set-', '')] = value + elif key.startswith('x-ig-set-'): + self.iphone_headers[key.replace('x-ig-set-', 'x-ig-')] = value + + return response def write_raw(self, resp: Union[bytes, requests.Response], filename: str) -> None: """Write raw response data into a file. @@ -582,7 +682,7 @@ def sleep(self, secs): def __init__(self, context: InstaloaderContext): self._context = context - self._query_timestamps = dict() # type: Dict[str, List[float]] + self._query_timestamps: Dict[str, List[float]] = dict() self._earliest_next_request_time = 0.0 self._iphone_earliest_next_request_time = 0.0 diff --git a/instaloader/lateststamps.py b/instaloader/lateststamps.py index 9a67cc2..5846948 100644 --- a/instaloader/lateststamps.py +++ b/instaloader/lateststamps.py @@ -1,6 +1,8 @@ import configparser from datetime import datetime, timezone from typing import Optional +from os.path import dirname +from os import makedirs class LatestStamps: @@ -25,6 +27,7 @@ def __init__(self, latest_stamps_file): self.data.read(latest_stamps_file) def _save(self): + makedirs(dirname(self.file), exist_ok=True) with open(self.file, 'w') as f: self.data.write(f) diff --git a/instaloader/nodeiterator.py b/instaloader/nodeiterator.py index bd9d4e1..a5c907c 100644 --- a/instaloader/nodeiterator.py +++ b/instaloader/nodeiterator.py @@ -10,15 +10,15 @@ from .exceptions import AbortDownloadException, InvalidArgumentException, QueryReturnedBadRequestException from .instaloadercontext import InstaloaderContext -FrozenNodeIterator = NamedTuple('FrozenNodeIterator', - [('query_hash', str), - ('query_variables', Dict), - ('query_referer', Optional[str]), - ('context_username', Optional[str]), - ('total_index', int), - ('best_before', Optional[float]), - ('remaining_data', Optional[Dict]), - ('first_node', Optional[Dict])]) +class FrozenNodeIterator(NamedTuple): + query_hash: str + query_variables: Dict + query_referer: Optional[str] + context_username: Optional[str] + total_index: int + best_before: Optional[float] + remaining_data: Optional[Dict] + first_node: Optional[Dict] FrozenNodeIterator.query_hash.__doc__ = """The GraphQL ``query_hash`` parameter.""" FrozenNodeIterator.query_variables.__doc__ = """The GraphQL ``query_variables`` parameter.""" FrozenNodeIterator.query_referer.__doc__ = """The HTTP referer used for the GraphQL query.""" @@ -95,7 +95,7 @@ def __init__(self, self._is_first = is_first def _query(self, after: Optional[str] = None) -> Dict: - pagination_variables = {'first': NodeIterator._graphql_page_length} # type: Dict[str, Any] + pagination_variables: Dict[str, Any] = {'first': NodeIterator._graphql_page_length} if after is not None: pagination_variables['after'] = after try: @@ -137,7 +137,7 @@ def __next__(self) -> T: if self._first_node is None: self._first_node = node return item - if self._data['page_info']['has_next_page']: + if self._data.get('page_info', {}).get('has_next_page'): query_response = self._query(self._data['page_info']['end_cursor']) if self._data['edges'] != query_response['edges'] and len(query_response['edges']) > 0: page_index, data = self._page_index, self._data @@ -289,7 +289,7 @@ def resumable_iteration(context: InstaloaderContext, is_resuming = True start_index = iterator.total_index context.log("Resuming from {}.".format(resume_file_path)) - except (InvalidArgumentException, LZMAError, json.decoder.JSONDecodeError) as exc: + except (InvalidArgumentException, LZMAError, json.decoder.JSONDecodeError, EOFError) as exc: context.error("Warning: Not resuming from {}: {}".format(resume_file_path, exc)) try: yield is_resuming, start_index diff --git a/instaloader/structures.py b/instaloader/structures.py index 6f4b935..ef74d0b 100644 --- a/instaloader/structures.py +++ b/instaloader/structures.py @@ -2,12 +2,11 @@ import lzma import re from base64 import b64decode, b64encode -from collections import namedtuple from contextlib import suppress from datetime import datetime from itertools import islice from pathlib import Path -from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, Iterable, Iterator, List, NamedTuple, Optional, Tuple, Union from unicodedata import normalize from . import __version__ @@ -16,25 +15,57 @@ from .nodeiterator import FrozenNodeIterator, NodeIterator from .sectioniterator import SectionIterator -PostSidecarNode = namedtuple('PostSidecarNode', ['is_video', 'display_url', 'video_url']) -PostSidecarNode.__doc__ = "Item of a Sidecar Post." + +class PostSidecarNode(NamedTuple): + """Item of a Sidecar Post.""" + is_video: bool + display_url: str + video_url: str + + PostSidecarNode.is_video.__doc__ = "Whether this node is a video." PostSidecarNode.display_url.__doc__ = "URL of image or video thumbnail." PostSidecarNode.video_url.__doc__ = "URL of video or None." -PostCommentAnswer = namedtuple('PostCommentAnswer', ['id', 'created_at_utc', 'text', 'owner', 'likes_count']) + +class PostCommentAnswer(NamedTuple): + id: int + created_at_utc: datetime + text: str + owner: 'Profile' + likes_count: int + + PostCommentAnswer.id.__doc__ = "ID number of comment." PostCommentAnswer.created_at_utc.__doc__ = ":class:`~datetime.datetime` when comment was created (UTC)." PostCommentAnswer.text.__doc__ = "Comment text." PostCommentAnswer.owner.__doc__ = "Owner :class:`Profile` of the comment." PostCommentAnswer.likes_count.__doc__ = "Number of likes on comment." -PostComment = namedtuple('PostComment', (*PostCommentAnswer._fields, 'answers')) # type: ignore + +class PostComment(NamedTuple): + id: int + created_at_utc: datetime + text: str + owner: 'Profile' + likes_count: int + answers: Iterator[PostCommentAnswer] + + for field in PostCommentAnswer._fields: getattr(PostComment, field).__doc__ = getattr(PostCommentAnswer, field).__doc__ # pylint: disable=no-member -PostComment.answers.__doc__ = r"Iterator which yields all :class:`PostCommentAnswer`\ s for the comment." # type: ignore +PostComment.answers.__doc__ = r"Iterator which yields all :class:`PostCommentAnswer`\ s for the comment." + + +class PostLocation(NamedTuple): + id: int + name: str + slug: str + has_public_page: Optional[bool] + lat: Optional[float] + lng: Optional[float] + -PostLocation = namedtuple('PostLocation', ['id', 'name', 'slug', 'has_public_page', 'lat', 'lng']) PostLocation.id.__doc__ = "ID number of location." PostLocation.name.__doc__ = "Location name." PostLocation.slug.__doc__ = "URL friendly variant of location name." @@ -42,6 +73,22 @@ PostLocation.lat.__doc__ = "Latitude (:class:`float` or None)." PostLocation.lng.__doc__ = "Longitude (:class:`float` or None)." +# This regular expression is by MiguelX413 +_hashtag_regex = re.compile(r"(?:#)((?:\w){1,150})") + +# This regular expression is modified from jStassen, adjusted to use Python's \w to +# support Unicode and a word/beginning of string delimiter at the beginning to ensure +# that no email addresses join the list of mentions. +# http://blog.jstassen.com/2016/03/code-regex-for-instagram-username-and-hashtags/ +_mention_regex = re.compile(r"(?:^|[^\w\n]|_)(?:@)(\w(?:(?:\w|(?:\.(?!\.))){0,28}(?:\w))?)", re.ASCII) + + +def _optional_normalize(string: Optional[str]) -> Optional[str]: + if string is not None: + return normalize("NFC", string) + else: + return None + class Post: """ @@ -73,8 +120,8 @@ def __init__(self, context: InstaloaderContext, node: Dict[str, Any], self._context = context self._node = node self._owner_profile = owner_profile - self._full_metadata_dict = None # type: Optional[Dict[str, Any]] - self._location = None # type: Optional[PostLocation] + self._full_metadata_dict: Optional[Dict[str, Any]] = None + self._location: Optional[PostLocation] = None self._iphone_struct_ = None if 'iphone_struct' in node: # if loaded from JSON with load_structure_from_file() @@ -364,16 +411,10 @@ def get_sidecar_nodes(self, start=0, end=-1) -> Iterator[PostSidecarNode]: @property def caption(self) -> Optional[str]: """Caption.""" - def _normalize(string: Optional[str]) -> Optional[str]: - if string is not None: - return normalize("NFC", string) - else: - return None - if "edge_media_to_caption" in self._node and self._node["edge_media_to_caption"]["edges"]: - return _normalize(self._node["edge_media_to_caption"]["edges"][0]["node"]["text"]) + return _optional_normalize(self._node["edge_media_to_caption"]["edges"][0]["node"]["text"]) elif "caption" in self._node: - return _normalize(self._node["caption"]) + return _optional_normalize(self._node["caption"]) return None @property @@ -381,22 +422,14 @@ def caption_hashtags(self) -> List[str]: """List of all lowercased hashtags (without preceeding #) that occur in the Post's caption.""" if not self.caption: return [] - # This regular expression is from jStassen, adjusted to use Python's \w to support Unicode - # http://blog.jstassen.com/2016/03/code-regex-for-instagram-username-and-hashtags/ - hashtag_regex = re.compile(r"(?:#)(\w(?:(?:\w|(?:\.(?!\.))){0,28}(?:\w))?)") - return re.findall(hashtag_regex, self.caption.lower()) + return _hashtag_regex.findall(self.caption.lower()) @property def caption_mentions(self) -> List[str]: """List of all lowercased profiles that are mentioned in the Post's caption, without preceeding @.""" if not self.caption: return [] - # This regular expression is modified from jStassen, adjusted to use Python's \w to - # support Unicode and a word/beginning of string delimiter at the beginning to ensure - # that no email addresses join the list of mentions. - # http://blog.jstassen.com/2016/03/code-regex-for-instagram-username-and-hashtags/ - mention_regex = re.compile(r"(?:^|\W|_)(?:@)(\w(?:(?:\w|(?:\.(?!\.))){0,28}(?:\w))?)", re.ASCII) - return re.findall(mention_regex, self.caption.lower()) + return _mention_regex.findall(self.caption.lower()) @property def pcaption(self) -> str: @@ -515,7 +548,7 @@ def comments(self) -> int: def get_comments(self) -> Iterable[PostComment]: r"""Iterate over all comments of the post. - Each comment is represented by a PostComment namedtuple with fields text (string), created_at (datetime), + Each comment is represented by a PostComment NamedTuple with fields text (string), created_at (datetime), id (int), owner (:class:`Profile`) and answers (:class:`~typing.Iterator`\ [:class:`PostCommentAnswer`]) if available. @@ -625,7 +658,7 @@ def sponsor_users(self) -> List['Profile']: @property def location(self) -> Optional[PostLocation]: """ - If the Post has a location, returns PostLocation namedtuple with fields 'id', 'lat' and 'lng' and 'name'. + If the Post has a location, returns PostLocation NamedTuple with fields 'id', 'lat' and 'lng' and 'name'. .. versionchanged:: 4.2.9 Require being logged in (as required by Instagram), return None if not logged-in. @@ -681,7 +714,7 @@ class Profile: def __init__(self, context: InstaloaderContext, node: Dict[str, Any]): assert 'username' in node self._context = context - self._has_public_story = None # type: Optional[bool] + self._has_public_story: Optional[bool] = None self._node = node self._has_full_metadata = False self._iphone_struct_ = None @@ -874,7 +907,29 @@ def business_category_name(self) -> str: @property def biography(self) -> str: - return self._metadata('biography') + return normalize("NFC", self._metadata('biography')) + + @property + def biography_hashtags(self) -> List[str]: + """ + List of all lowercased hashtags (without preceeding #) that occur in the Profile's biography. + + .. versionadded:: 4.10 + """ + if not self.biography: + return [] + return _hashtag_regex.findall(self.biography.lower()) + + @property + def biography_mentions(self) -> List[str]: + """ + List of all lowercased profiles that are mentioned in the Profile's biography, without preceeding @. + + .. versionadded:: 4.10 + """ + if not self.biography: + return [] + return _mention_regex.findall(self.biography.lower()) @property def blocked_by_viewer(self) -> bool: @@ -1042,6 +1097,27 @@ def get_igtv_posts(self) -> NodeIterator[Post]: def _make_is_newest_checker() -> Callable[[Post, Optional[Post]], bool]: return lambda post, first: first is None or post.date_local > first.date_local + def get_followed_hashtags(self) -> NodeIterator['Hashtag']: + """ + Retrieve list of hashtags followed by given profile. + To use this, one needs to be logged in and private profiles has to be followed. + + :rtype: NodeIterator[Hashtag] + + .. versionadded:: 4.10 + """ + if not self._context.is_logged_in: + raise LoginRequiredException("--login required to get a profile's followers.") + self._obtain_metadata() + return NodeIterator( + self._context, + 'e6306cc3dbe69d6a82ef8b5f8654c50b', + lambda d: d["data"]["user"]["edge_following_hashtag"], + lambda n: Hashtag(self._context, n), + {'id': str(self.userid)}, + 'https://www.instagram.com/{0}/'.format(self.username), + ) + def get_followers(self) -> NodeIterator['Profile']: """ Retrieve list of followers of given profile. @@ -1248,6 +1324,53 @@ def typename(self) -> str: """Type of post, GraphStoryImage or GraphStoryVideo""" return self._node['__typename'] + @property + def caption(self) -> Optional[str]: + """ + Caption. + + .. versionadded:: 4.10 + """ + if "edge_media_to_caption" in self._node and self._node["edge_media_to_caption"]["edges"]: + return _optional_normalize(self._node["edge_media_to_caption"]["edges"][0]["node"]["text"]) + elif "caption" in self._node: + return _optional_normalize(self._node["caption"]) + return None + + @property + def caption_hashtags(self) -> List[str]: + """ + List of all lowercased hashtags (without preceeding #) that occur in the StoryItem's caption. + + .. versionadded:: 4.10 + """ + if not self.caption: + return [] + return _hashtag_regex.findall(self.caption.lower()) + + @property + def caption_mentions(self) -> List[str]: + """ + List of all lowercased profiles that are mentioned in the StoryItem's caption, without preceeding @. + + .. versionadded:: 4.10 + """ + if not self.caption: + return [] + return _mention_regex.findall(self.caption.lower()) + + @property + def pcaption(self) -> str: + """ + Printable caption, useful as a format specifier for --filename-pattern. + + .. versionadded:: 4.10 + """ + def _elliptify(caption): + pcaption = ' '.join([s.replace('/', '\u2215') for s in caption.splitlines() if s]).strip() + return (pcaption[:30] + "\u2026") if len(pcaption) > 31 else pcaption + return _elliptify(self.caption) if self.caption else '' + @property def is_video(self) -> bool: """True if the StoryItem is a video.""" @@ -1313,9 +1436,9 @@ class Story: def __init__(self, context: InstaloaderContext, node: Dict[str, Any]): self._context = context self._node = node - self._unique_id = None # type: Optional[str] - self._owner_profile = None # type: Optional[Profile] - self._iphone_struct_ = None # type: Optional[Dict[str, Any]] + self._unique_id: Optional[str] = None + self._owner_profile: Optional[Profile] = None + self._iphone_struct_: Optional[Dict[str, Any]] = None def __repr__(self): return ''.format(self.owner_username, self.latest_media_utc) @@ -1431,8 +1554,8 @@ class Highlight(Story): def __init__(self, context: InstaloaderContext, node: Dict[str, Any], owner: Optional[Profile] = None): super().__init__(context, node) self._owner_profile = owner - self._items = None # type: Optional[List[Dict[str, Any]]] - self._iphone_struct_ = None # type: Optional[Dict[str, Any]] + self._items: Optional[List[Dict[str, Any]]] = None + self._iphone_struct_: Optional[Dict[str, Any]] = None def __repr__(self): return ''.format(self.owner_username, self.title) diff --git a/maubot.yaml b/maubot.yaml index 83f4231..c9430a8 100644 --- a/maubot.yaml +++ b/maubot.yaml @@ -1,6 +1,6 @@ maubot: 0.1.0 id: me.gogel.maubot.socialmediadownload -version: 1.0.0 +version: 1.1.1 license: MIT modules: - instaloader diff --git a/socialmediadownload.py b/socialmediadownload.py index 88a7a87..172e952 100644 --- a/socialmediadownload.py +++ b/socialmediadownload.py @@ -3,6 +3,7 @@ import mimetypes import instaloader import urllib +import yarl from typing import Type from urllib.parse import quote @@ -19,13 +20,13 @@ def do_update(self, helper: ConfigUpdateHelper) -> None: for suffix in ["enabled", "info", "image", "video", "thumbnail"]: helper.copy(f"{prefix}.{suffix}") -reddit_pattern = re.compile(r"^((?:https?:)?\/\/)?((?:www|m)\.)?((?:reddit\.com|redd\.it))(\/r\/.*\/comments\/.*)(\/)?$") +reddit_pattern = re.compile(r"^((?:https?:)?\/\/)?((?:www|m|old|nm)\.)?((?:reddit\.com|redd\.it))(\/r\/.*\/(?:comments|s)\/.*)(\/)?$") instagram_pattern = re.compile(r"^(?:https?:\/\/)?(?:www\.)?instagram\.com\/?([a-zA-Z0-9\.\_\-]+)?\/([p]+)?([reel]+)?([tv]+)?([stories]+)?\/([a-zA-Z0-9\-\_\.]+)\/?([0-9]+)?/$") youtube_pattern = re.compile(r"^((?:https?:)?\/\/)?((?:www|m)\.)?((?:youtube\.com|youtu\.be))(\/(?:[\w\-]+\?v=|embed\/|v\/)?)([\w\-]+)(\S+)?$") class SocialMediaDownloadPlugin(Plugin): async def start(self) -> None: - await super().start() + self.config.load_and_update() @classmethod def get_config_class(cls) -> Type[BaseProxyConfig]: @@ -35,7 +36,7 @@ def get_config_class(cls) -> Type[BaseProxyConfig]: async def on_message(self, evt: MessageEvent) -> None: if evt.content.msgtype != MessageType.TEXT or evt.content.body.startswith("!"): return - + for url_tup in youtube_pattern.findall(evt.content.body): await evt.mark_read() if self.config["youtube.enabled"]: @@ -115,7 +116,7 @@ async def handle_instagram(self, evt, url_tup): await self.client.send_image(evt.room_id, url=uri, file_name=file_name, info=ImageInfo(mimetype='image/jpeg')) if post.is_video and self.config["instagram.video"]: - response = await self.http.get(post.video_url) + response = await self.http.get(yarl.URL(post.video_url,encoded=True)) if response.status != 200: self.log.warning(f"Unexpected status fetching instagram video {post.video_url}: {response.status}") return @@ -127,8 +128,23 @@ async def handle_instagram(self, evt, url_tup): uri = await self.client.upload_media(media, mime_type=mime_type, filename=file_name) await self.client.send_file(evt.room_id, url=uri, info=BaseFileInfo(mimetype=mime_type, size=len(media)), file_name=file_name, file_type=MessageType.VIDEO) + async def get_redirected_url(self, short_url: str) -> str: + async with self.http.get(short_url, allow_redirects=True) as response: + if response.status == 200: + return str(response.url) + else: + self.log.warning(f"Unexpected status fetching redirected URL: {response.status}") + return None + async def handle_reddit(self, evt, url_tup): url = ''.join(url_tup).split('?')[0] + + if "/s/" in url: + url = await self.get_redirected_url(url) + if not url: + return + + url = await self.get_redirected_url(url) query_url = quote(url).replace('%3A', ':') + ".json" + "?limit=1" headers = {'User-Agent': 'ggogel/SocialMediaDownloadMaubot'} response = await self.http.request('GET', query_url, headers=headers)