diff --git a/apis/api_helper.py b/apis/api_helper.py index 79544cfcb..2e2d4509f 100644 --- a/apis/api_helper.py +++ b/apis/api_helper.py @@ -1,31 +1,40 @@ +import asyncio import copy -import math +import hashlib +import os import re +import threading import time -from typing import Any, Union -from urllib.parse import urlparse - -import requests -from requests.sessions import Session -import ujson -import socket -import os +from itertools import chain, groupby, product, zip_longest from multiprocessing import cpu_count -from requests.adapters import HTTPAdapter from multiprocessing.dummy import Pool as ThreadPool -from itertools import product, chain, zip_longest, groupby from os.path import dirname as up -import threading +from random import randint +from typing import Any, Optional +from urllib.parse import urlparse + +import aiohttp +import helpers.main_helper as main_helper +import python_socks +import requests +from aiohttp import ClientSession +from aiohttp.client_reqrep import ClientResponse +from aiohttp_socks import ChainProxyConnector, ProxyConnector, ProxyType +from database.models.media_table import media_table + +from apis.onlyfans.classes import create_user path = up(up(os.path.realpath(__file__))) os.chdir(path) global_settings = {} -global_settings["dynamic_rules_link"] = "https://raw.githubusercontent.com/DATAHOARDERS/dynamic-rules/main/onlyfans.json" +global_settings[ + "dynamic_rules_link" +] = "https://raw.githubusercontent.com/DATAHOARDERS/dynamic-rules/main/onlyfans.json" -class set_settings(): +class set_settings: def __init__(self, option={}): global global_settings self.proxies = option.get("proxies") @@ -35,33 +44,46 @@ def __init__(self, option={}): def chunks(l, n): - final = [l[i * n:(i + 1) * n] for i in range((len(l) + n - 1) // n)] + final = [l[i * n : (i + 1) * n] for i in range((len(l) + n - 1) // n)] return final -def multiprocessing(max_threads=None): +def calculate_max_threads(max_threads=None): if not max_threads: max_threads = -1 max_threads2 = cpu_count() if max_threads < 1 or max_threads >= max_threads2: - pool = ThreadPool() - else: - pool = ThreadPool(max_threads) + max_threads = max_threads2 + return max_threads + + +def multiprocessing(max_threads=None): + max_threads = calculate_max_threads(max_threads) + pool = ThreadPool(max_threads) return pool -class session_manager(): - def __init__(self, original_sessions=[], headers: dict = {}, session_rules=None, session_retry_rules=None, max_threads=-1) -> None: +class session_manager: + def __init__( + self, + auth, + original_sessions=[], + headers: dict = {}, + proxies: list[str] = [], + session_retry_rules=None, + max_threads=-1, + ) -> None: self.sessions = self.add_sessions(original_sessions) - self.pool = multiprocessing(max_threads) + self.pool = multiprocessing() self.max_threads = max_threads self.kill = False self.headers = headers - self.session_rules = session_rules + self.proxies: list[str] = proxies self.session_retry_rules = session_retry_rules dr_link = global_settings["dynamic_rules_link"] dynamic_rules = requests.get(dr_link).json() self.dynamic_rules = dynamic_rules + self.auth = auth def add_sessions(self, original_sessions: list, overwrite_old_sessions=True): if overwrite_old_sessions: @@ -82,6 +104,7 @@ def stimulate_sessions(self): def do(session_manager): while not session_manager.kill: for session in session_manager.sessions: + def process_links(link, session): r = session.get(link) text = r.text.strip("\n") @@ -89,7 +112,8 @@ def process_links(link, session): print else: found_dupe = [ - x for x in session_manager.sessions if x.ip == text] + x for x in session_manager.sessions if x.ip == text + ] if found_dupe: return cloned_session = copy.deepcopy(session) @@ -99,140 +123,295 @@ def process_links(link, session): print(text) print return text + time.sleep(62) link = "https://checkip.amazonaws.com" ip = process_links(link, session) print + t1 = threading.Thread(target=do, args=[self]) t1.start() - def json_request(self, link: str, session: Union[Session] = None, method="GET", stream=False, json_format=True, data={}, sleep=True, timeout=20, ignore_rules=False, force_json=False) -> Any: + async def json_request( + self, + link: str, + session: Optional[ClientSession] = None, + method="GET", + stream=False, + json_format=True, + data={}, + progress_bar=None, + sleep=True, + timeout=20, + ignore_rules=False, + force_json=False, + ) -> Any: headers = {} + custom_session = False if not session: - session = self.sessions[0] - if self.session_rules and not ignore_rules: - headers |= self.session_rules(self, link) - session = copy.deepcopy(session) - count = 0 - sleep_number = 0.5 - result = {} - while count < 11: - try: - count += 1 - if json_format: - headers["accept"] = "application/json, text/plain, */*" - if data: - r = session.request(method, link, json=data, - stream=stream, timeout=timeout, headers=headers) + custom_session = True + proxies = self.proxies + proxy = self.proxies[randint(0, len(proxies) - 1)] if proxies else "" + connector = ProxyConnector.from_url(proxy) if proxy else None + session = ClientSession( + connector=connector, cookies=self.auth.cookies, read_timeout=None + ) + headers = self.session_rules(link) + headers["accept"] = "application/json, text/plain, */*" + headers["Connection"] = "keep-alive" + request_method = None + if method == "HEAD": + request_method = session.head + elif method == "GET": + request_method = session.get + elif method == "POST": + request_method = session.post + elif method == "DELETE": + request_method = session.delete + try: + async with request_method(link, headers=headers) as response: + if method == "HEAD": + result = response else: - r = session.request( - method, link, stream=stream, timeout=timeout, headers=headers) - if self.session_retry_rules: - rule = self.session_retry_rules(r, link) - if rule == 1: - continue - elif rule == 2: - break - if json_format: - content_type = r.headers['Content-Type'] - matches = ["application/json;", "application/vnd.api+json"] - if not force_json and all(match not in content_type for match in matches): - continue - text = r.text - if not text: - message = "ERROR: 100 Posts skipped. Please post the username you're trying to scrape on the issue "'100 Posts Skipped'"" - return result - return ujson.loads(text) - else: - return r - except (ConnectionResetError) as e: - continue - except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError, requests.exceptions.ReadTimeout, socket.timeout) as e: - if sleep: - time.sleep(sleep_number) - sleep_number += 0.5 - continue - except Exception as e: - print(e) - continue - return result - - def parallel_requests(self, items: list[str]): - def multi(link): - result = self.json_request(link) - return result - results = self.pool.starmap(multi, product( - items)) + if json_format and not stream: + result = await response.json() + elif stream and not json_format: + buffer = [] + if response.status == 200: + async for data in response.content.iter_chunked(4096): + buffer.append(data) + length = len(data) + progress_bar.update(length) + else: + if response.content_length: + progress_bar.update_total_size(-response.content_length) + final_buffer = b"".join(buffer) + result = [response, final_buffer] + print + else: + result = await response.read() + if custom_session: + await session.close() + return result + except aiohttp.ClientConnectorError as e: + return + + async def async_requests(self, items: list[str], json_format=True): + tasks = [] + + async def run(links): + proxies = self.proxies + proxy = self.proxies[randint(0, len(proxies) - 1)] if proxies else "" + connector = ProxyConnector.from_url(proxy) if proxy else None + async with ClientSession( + connector=connector, cookies=self.auth.cookies, read_timeout=None + ) as session: + for link in links: + task = asyncio.ensure_future(self.json_request(link, session)) + tasks.append(task) + responses = await asyncio.gather(*tasks) + return responses + + results = await asyncio.ensure_future(run(items)) return results + async def async_downloads( + self, download_list: list[media_table], subscription: create_user + ): + async def run(download_list: list[media_table]): + proxies = self.proxies + proxy = self.proxies[randint(0, len(proxies) - 1)] if proxies else "" + connector = ProxyConnector.from_url(proxy) if proxy else None + async with ClientSession( + connector=connector, cookies=self.auth.cookies, read_timeout=None + ) as session: + tasks = [] + # Get content_lengths + for download_item in download_list: + link = download_item.link + if link: + task = asyncio.ensure_future( + self.json_request( + download_item.link, + session, + method="HEAD", + json_format=False, + ) + ) + tasks.append(task) + responses = await asyncio.gather(*tasks) + tasks.clear() -def create_session(settings={}, custom_proxy="", test_ip=True): - - def test_session(proxy=None, cert=None, max_threads=None): - session = requests.Session() - proxy_type = {'http': proxy, - 'https': proxy} - if proxy: - session.proxies = proxy_type - if cert: - session.verify = cert - session.mount( - 'https://', HTTPAdapter(pool_connections=max_threads, pool_maxsize=max_threads)) - if test_ip: - link = 'https://checkip.amazonaws.com' - r = session_manager().json_request( - link, session, json_format=False, sleep=False) - if not isinstance(r, requests.Response): - print(f"Proxy Not Set: {proxy}\n") - return - ip = r.text.strip() - print("Session IP: "+ip+"\n") - setattr(session, "ip", ip) - setattr(session, "links", []) - return session - sessions = [] - settings = set_settings(settings) - pool = multiprocessing() - max_threads = pool._processes - proxies = settings.proxies - cert = settings.cert - while not sessions: - proxies = [custom_proxy] if custom_proxy else proxies - if proxies: - with pool: - sessions = pool.starmap(test_session, product( - proxies, [cert], [max_threads])) - else: - session = test_session(max_threads=max_threads) - sessions.append(session) - return sessions + async def check(download_item: media_table, response: ClientResponse): + filepath = os.path.join( + download_item.directory, download_item.filename + ) + if response.status == 200: + if response.content_length: + download_item.size = response.content_length + if os.path.exists(filepath): + if os.path.getsize(filepath) == response.content_length: + download_item.downloaded = True + else: + return download_item + else: + return download_item -def assign_session(medias, item, key_one="link", key_two="count", show_item=False, capped=False): - count = 0 - activate_cap = False - number = len(item) - medias2 = [] - for auth in medias: - media2 = {} - media2[key_one] = auth - if not number: - count = -1 - if activate_cap: - media2[key_two] = -1 - else: - if show_item: - media2[key_two] = item[count] - else: - media2[key_two] = count + for download_item in download_list: + temp_response = [ + response + for response in responses + if response and response.url.name == download_item.filename + ] + if temp_response: + temp_response = temp_response[0] + task = check(download_item, temp_response) + tasks.append(task) + result = await asyncio.gather(*tasks) + download_list = [x for x in result if x] + tasks.clear() + progress_bar = None + if download_list: + progress_bar = main_helper.download_session() + progress_bar.start(unit="B", unit_scale=True, miniters=1) + [progress_bar.update_total_size(x.size) for x in download_list] - medias2.append(media2) - count += 1 - if count >= number: - count = 0 - if capped: - activate_cap = True - return medias2 + async def process_download(download_item: media_table): + response = await self.download_content( + download_item, session, progress_bar, subscription + ) + if response: + data, download_item = response.values() + if data: + download_path = os.path.join( + download_item.directory, download_item.filename + ) + os.makedirs(os.path.dirname(download_path), exist_ok=True) + with open(download_path, "wb") as f: + f.write(data) + download_item.size = len(data) + download_item.downloaded = True + + max_threads = calculate_max_threads(self.max_threads) + download_groups = main_helper.grouper(max_threads, download_list) + for download_group in download_groups: + tasks = [] + for download_item in download_group: + task = process_download(download_item) + if task: + tasks.append(task) + result = await asyncio.gather(*tasks) + if isinstance(progress_bar, main_helper.download_session): + progress_bar.close() + return True + + results = await asyncio.ensure_future(run(download_list)) + return results + + async def download_content( + self, + download_item: media_table, + session: ClientSession, + progress_bar, + subscription: create_user, + ): + attempt_count = 1 + new_task = {} + while attempt_count <= 3: + attempt_count += 1 + if not download_item.link: + continue + response: ClientResponse + response, task = await asyncio.ensure_future( + self.json_request( + download_item.link, + session, + json_format=False, + stream=True, + progress_bar=progress_bar, + ) + ) + if response.status != 200: + task = None + if response.content_length: + progress_bar.update_total_size(-response.content_length) + api_type = download_item.__module__.split(".")[-1] + post_id = download_item.post_id + new_result = None + if api_type == "messages": + new_result = await subscription.get_message_by_id( + message_id=post_id + ) + elif api_type == "posts": + new_result = await subscription.get_post(post_id) + print + if new_result and new_result.media: + media_list = [ + x for x in new_result.media if x["id"] == download_item.media_id + ] + if media_list: + media = media_list[0] + quality = subscription.subscriber.extras["settings"][ + "supported" + ]["onlyfans"]["settings"]["video_quality"] + link = main_helper.link_picker(media, quality) + download_item.link = link + continue + new_task["response"] = task + new_task["download_item"] = download_item + break + return new_task + + def session_rules(self, link: str) -> dict: + headers = self.headers + if "https://onlyfans.com/api2/v2/" in link: + dynamic_rules = self.dynamic_rules + headers["app-token"] = dynamic_rules["app_token"] + # auth_id = headers["user-id"] + a = [link, 0, dynamic_rules] + headers2 = self.create_signed_headers(*a) + headers |= headers2 + return headers + + def create_signed_headers(self, link: str, auth_id: int, dynamic_rules: dict): + # Users: 300000 | Creators: 301000 + final_time = str(int(round(time.time()))) + path = urlparse(link).path + query = urlparse(link).query + path = path if not query else f"{path}?{query}" + a = [dynamic_rules["static_param"], final_time, path, str(auth_id)] + msg = "\n".join(a) + message = msg.encode("utf-8") + hash_object = hashlib.sha1(message) + sha_1_sign = hash_object.hexdigest() + sha_1_b = sha_1_sign.encode("ascii") + checksum = ( + sum([sha_1_b[number] for number in dynamic_rules["checksum_indexes"]]) + + dynamic_rules["checksum_constant"] + ) + headers = {} + headers["sign"] = dynamic_rules["format"].format(sha_1_sign, abs(checksum)) + headers["time"] = final_time + return headers + + +async def test_proxies(proxies: list[str]): + final_proxies = [] + for proxy in proxies: + connector = ProxyConnector.from_url(proxy) if proxy else None + async with ClientSession(connector=connector) as session: + link = "https://checkip.amazonaws.com" + try: + response = await session.get(link) + ip = await response.text() + ip = ip.strip() + print("Session IP: " + ip + "\n") + final_proxies.append(proxy) + except python_socks._errors.ProxyConnectionError as e: + print(f"Proxy Not Set: {proxy}\n") + continue + return final_proxies def restore_missing_data(master_set2, media_set, split_by): @@ -241,16 +420,15 @@ def restore_missing_data(master_set2, media_set, split_by): for item in media_set: if not item: link = master_set2[count] - offset = int(link.split('?')[-1].split('&')[1].split("=")[1]) + offset = int(link.split("?")[-1].split("&")[1].split("=")[1]) limit = int(link.split("?")[-1].split("&")[0].split("=")[1]) - if limit == split_by+1: + if limit == split_by + 1: break offset2 = offset - limit2 = int(limit/split_by) - for item in range(1, split_by+1): - link2 = link.replace("limit="+str(limit), "limit="+str(limit2)) - link2 = link2.replace( - "offset="+str(offset), "offset="+str(offset2)) + limit2 = int(limit / split_by) + for item in range(1, split_by + 1): + link2 = link.replace("limit=" + str(limit), "limit=" + str(limit2)) + link2 = link2.replace("offset=" + str(offset), "offset=" + str(offset2)) offset2 += limit2 new_set.append(link2) count += 1 @@ -258,40 +436,31 @@ def restore_missing_data(master_set2, media_set, split_by): return new_set -def scrape_endpoint_links(links, session_manager: session_manager, api_type): - def multi(item): - link = item["link"] - item = {} - session = session_manager.sessions[0] - result = session_manager.json_request(link, session) - if "error" in result: - result = [] - if result: - item["session"] = session - item["result"] = result - return item +async def scrape_endpoint_links(links, session_manager: session_manager, api_type): media_set = [] max_attempts = 100 api_type = api_type.capitalize() for attempt in list(range(max_attempts)): if not links: continue - print("Scrape Attempt: "+str(attempt+1)+"/"+str(max_attempts)) - results = session_manager.parallel_requests(links) + print("Scrape Attempt: " + str(attempt + 1) + "/" + str(max_attempts)) + results = await session_manager.async_requests(links) not_faulty = [x for x in results if x] - faulty = [{"key": k, "value": v, "link": links[k]} - for k, v in enumerate(results) if not v] - last_number = len(results)-1 + faulty = [ + {"key": k, "value": v, "link": links[k]} + for k, v in enumerate(results) + if not v + ] + last_number = len(results) - 1 if faulty: positives = [x for x in faulty if x["key"] != last_number] false_positive = [x for x in faulty if x["key"] == last_number] if positives: attempt = attempt if attempt > 1 else attempt + 1 - num = int(len(faulty)*(100/attempt)) + num = int(len(faulty) * (100 / attempt)) split_by = 2 - print("Missing "+str(num)+" Posts... Retrying...") - links = restore_missing_data( - links, results, split_by) + print("Missing " + str(num) + " Posts... Retrying...") + links = restore_missing_data(links, results, split_by) media_set.extend(not_faulty) if not positives and false_positive: media_set.extend(results) @@ -299,26 +468,21 @@ def multi(item): print else: media_set.extend(results) - print("Found: "+api_type) + print("Found: " + api_type) break media_set = list(chain(*media_set)) return media_set -def grouper(n, iterable, fillvalue=None): - args = [iter(iterable)] * n - return list(zip_longest(fillvalue=fillvalue, *args)) - - def calculate_the_unpredictable(link, limit, multiplier=1): final_links = [] - a = list(range(1, multiplier+1)) + a = list(range(1, multiplier + 1)) for b in a: parsed_link = urlparse(link) q = parsed_link.query.split("&") offset = q[1] old_offset_num = int(re.findall("\\d+", offset)[0]) - new_offset_num = old_offset_num+(limit*b) + new_offset_num = old_offset_num + (limit * b) new_link = link.replace(offset, f"offset={new_offset_num}") final_links.append(new_link) return final_links diff --git a/apis/onlyfans/classes/__init__.py b/apis/onlyfans/classes/__init__.py index e3a343500..d98714a8f 100644 --- a/apis/onlyfans/classes/__init__.py +++ b/apis/onlyfans/classes/__init__.py @@ -3,4 +3,4 @@ from apis.onlyfans.classes.create_post import create_post from apis.onlyfans.classes.create_story import create_story from apis.onlyfans.classes.create_message import create_message -from apis.onlyfans.classes.create_highlight import create_highlight \ No newline at end of file +from apis.onlyfans.classes.create_highlight import create_highlight diff --git a/apis/onlyfans/classes/create_auth.py b/apis/onlyfans/classes/create_auth.py index 00457d895..b30b0e444 100644 --- a/apis/onlyfans/classes/create_auth.py +++ b/apis/onlyfans/classes/create_auth.py @@ -1,35 +1,27 @@ +import asyncio +import math from datetime import datetime -from apis.onlyfans.classes.create_post import create_post -from apis.onlyfans.classes.create_message import create_message from itertools import chain, product -from apis import api_helper -from apis.onlyfans.classes.extras import ( - auth_details, - content_types, - create_headers, - endpoint_links, - error_details, - handle_refresh, -) -from apis.onlyfans.classes.create_user import create_user - -import requests from typing import Any, Optional, Union -from apis.api_helper import session_manager -import copy -from user_agent import generate_user_agent -import math + import jsonpickle +from apis import api_helper +from apis.onlyfans.classes.create_message import create_message +from apis.onlyfans.classes.create_post import create_post +from apis.onlyfans.classes.create_user import create_user +from apis.onlyfans.classes.extras import (auth_details, content_types, + create_headers, endpoint_links, + error_details, handle_refresh) from dateutil.relativedelta import relativedelta +from user_agent import generate_user_agent class create_auth: def __init__( self, - session_manager2: session_manager, option={}, - init=False, pool=None, + max_threads=-1, ) -> None: self.id = option.get("id") self.username: str = option.get("username") @@ -47,10 +39,10 @@ def __init__( self.archived_stories = {} self.mass_messages = [] self.paid_content = [] - session_manager2 = copy.copy(session_manager2) - self.session_manager = session_manager2 + self.session_manager = api_helper.session_manager(self, max_threads=max_threads) self.pool = pool self.auth_details: Optional[auth_details] = None + self.cookies: dict = {} self.profile_directory = option.get("profile_directory", "") self.guest = False self.active = False @@ -63,7 +55,7 @@ def update(self, data): if found_attr: setattr(self, key, value) - def login(self, full=False, max_attempts=10, guest=False): + async def login(self, full=False, max_attempts=10, guest=False): auth_version = "(V1)" if guest: self.auth_details.auth_id = "0" @@ -84,22 +76,22 @@ def login(self, full=False, max_attempts=10, guest=False): dynamic_rules = self.session_manager.dynamic_rules a = [dynamic_rules, auth_id, user_agent, x_bc, auth_items.sess, link] self.session_manager.headers = create_headers(*a) - if not self.session_manager.sessions: - self.session_manager.add_sessions([requests.Session()]) if guest: print("Guest Authentication") return self for session in self.session_manager.sessions: for auth_cookie in auth_cookies: session.cookies.set(**auth_cookie) + + self.cookies = {d["name"]: d["value"] for d in auth_cookies} count = 1 while count < max_attempts + 1: string = f"Auth {auth_version} Attempt {count}/{max_attempts}" print(string) - self.get_authed() + await self.get_authed() count += 1 - def resolve_auth(auth: create_auth): + async def resolve_auth(auth: create_auth): if self.errors: error = self.errors[-1] print(error.message) @@ -114,7 +106,7 @@ def resolve_auth(auth: create_auth): ) code = input("Enter 2FA Code\n") data = {"code": code, "rememberMe": True} - r = self.session_manager.json_request( + r = await self.session_manager.json_request( link, method="POST", data=data ) if "error" in r: @@ -126,7 +118,7 @@ def resolve_auth(auth: create_auth): auth.errors.remove(error) break - resolve_auth(self) + await resolve_auth(self) if not self.active: if self.errors: error = self.errors[-1] @@ -145,12 +137,10 @@ def resolve_auth(auth: create_auth): break return self - def get_authed(self): + async def get_authed(self): if not self.active: link = endpoint_links().customer - r = self.session_manager.json_request( - link, self.session_manager.sessions[0], sleep=False - ) + r = await self.session_manager.json_request(link, sleep=False) if r: self.resolve_auth_errors(r) if not self.errors: @@ -181,7 +171,7 @@ def resolve_auth_errors(self, r): else: self.errors.clear() - def get_lists(self, refresh=True, limit=100, offset=0): + async def get_lists(self, refresh=True, limit=100, offset=0): api_type = "lists" if not self.active: return @@ -189,19 +179,18 @@ def get_lists(self, refresh=True, limit=100, offset=0): subscriptions = handle_refresh(self, api_type) return subscriptions link = endpoint_links(global_limit=limit, global_offset=offset).lists - session = self.session_manager.sessions[0] - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) self.lists = results return results - def get_user(self, identifier: Union[str, int]): + async def get_user(self, identifier: Union[str, int]): link = endpoint_links(identifier).users - result = self.session_manager.json_request(link) + result = await self.session_manager.json_request(link) result["session_manager"] = self.session_manager result = create_user(result) if "error" not in result else result return result - def get_lists_users( + async def get_lists_users( self, identifier, check: bool = False, refresh=True, limit=100, offset=0 ): if not self.active: @@ -209,18 +198,18 @@ def get_lists_users( link = endpoint_links( identifier, global_limit=limit, global_offset=offset ).lists_users - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) if len(results) >= limit and not check: - results2 = self.get_lists_users( + results2 = await self.get_lists_users( identifier, limit=limit, offset=limit + offset ) results.extend(results2) return results - def get_subscription( + async def get_subscription( self, check: bool = False, identifier="", limit=100, offset=0 ) -> Union[create_user, None]: - subscriptions = self.get_subscriptions(refresh=False) + subscriptions = await self.get_subscriptions(refresh=False) valid = None for subscription in subscriptions: if identifier == subscription.username or identifier == subscription.id: @@ -228,7 +217,7 @@ def get_subscription( break return valid - def get_subscriptions( + async def get_subscriptions( self, resume=None, refresh=True, @@ -243,7 +232,6 @@ def get_subscriptions( subscriptions = self.subscriptions return subscriptions link = endpoint_links(global_limit=limit, global_offset=offset).subscriptions - session = self.session_manager.sessions[0] ceil = math.ceil(self.subscribesCount / limit) a = list(range(ceil)) offset_array = [] @@ -263,7 +251,8 @@ def get_subscriptions( json_authed = jsonpickle.decode(json_authed) self.session_manager = temp_session_manager self.pool = temp_pool - json_authed = json_authed | self.get_user(self.username).__dict__ + temp_auth = await self.get_user(self.username) + json_authed = json_authed | temp_auth.__dict__ subscription = create_user(json_authed) subscription.subscriber = self @@ -274,11 +263,11 @@ def get_subscriptions( results.append(subscription) if not identifiers: - def multi(item): + async def multi(item): link = item # link = item["link"] # session = item["session"] - subscriptions = self.session_manager.json_request(link) + subscriptions = await self.session_manager.json_request(link) valid_subscriptions = [] extras = {} extras["auth_check"] = "" @@ -293,7 +282,7 @@ def multi(item): for subscription in subscriptions: subscription["session_manager"] = self.session_manager if extra_info: - subscription2 = self.get_user(subscription["username"]) + subscription2 = await self.get_user(subscription["username"]) if isinstance(subscription2, dict): if "error" in subscription2: continue @@ -306,13 +295,14 @@ def multi(item): pool = self.pool # offset_array= offset_array[:16] - results += pool.starmap(multi, product(offset_array)) + tasks = pool.starmap(multi, product(offset_array)) + results += await asyncio.gather(*tasks) else: for identifier in identifiers: if self.id == identifier or self.username == identifier: continue link = endpoint_links(identifier=identifier).users - result = self.session_manager.json_request(link) + result = await self.session_manager.json_request(link) if "error" in result or not result["subscribedBy"]: continue subscription = create_user(result) @@ -326,7 +316,7 @@ def multi(item): self.subscriptions = results return results - def get_chats( + async def get_chats( self, links: Optional[list] = None, limit=100, @@ -367,13 +357,13 @@ def get_chats( links += links2 else: links = links2 - results = self.session_manager.parallel_requests(links) + results = await self.session_manager.async_requests(links) has_more = results[-1]["hasMore"] final_results = [x["list"] for x in results] final_results = list(chain.from_iterable(final_results)) if has_more: - results2 = self.get_chats( + results2 = await self.get_chats( links=[links[-1]], limit=limit, offset=limit + offset, inside_loop=True ) final_results.extend(results2) @@ -382,7 +372,9 @@ def get_chats( self.chats = final_results return final_results - def get_mass_messages(self, resume=None, refresh=True, limit=10, offset=0) -> list: + async def get_mass_messages( + self, resume=None, refresh=True, limit=10, offset=0 + ) -> list: api_type = "mass_messages" if not self.active: return [] @@ -393,8 +385,7 @@ def get_mass_messages(self, resume=None, refresh=True, limit=10, offset=0) -> li link = endpoint_links( global_limit=limit, global_offset=offset ).mass_messages_api - session = self.session_manager.sessions[0] - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) items = results.get("list", []) if not items: return items @@ -419,7 +410,7 @@ def get_mass_messages(self, resume=None, refresh=True, limit=10, offset=0) -> li self.mass_messages = items return items - def get_paid_content( + async def get_paid_content( self, check: bool = False, refresh: bool = True, @@ -435,7 +426,7 @@ def get_paid_content( if result: return result link = endpoint_links(global_limit=limit, global_offset=offset).paid_api - final_results = self.session_manager.json_request(link) + final_results = await self.session_manager.json_request(link) if len(final_results) >= limit and not check: results2 = self.get_paid_content( limit=limit, offset=limit + offset, inside_loop=True diff --git a/apis/onlyfans/classes/create_post.py b/apis/onlyfans/classes/create_post.py index 467f57ec2..b0a6f9091 100644 --- a/apis/onlyfans/classes/create_post.py +++ b/apis/onlyfans/classes/create_post.py @@ -1,11 +1,10 @@ -from apis import api_helper from apis.onlyfans.classes.extras import endpoint_links from typing import Any class create_post: def __init__( - self, option={}, session_manager: api_helper.session_manager = None + self, option={}, session_manager = None ) -> None: self.responseType: str = option.get("responseType") self.id: int = option.get("id") @@ -46,12 +45,12 @@ def __init__( self.canPurchase: bool = option.get("canPurchase") self.session_manager = session_manager - def favorite(self): + async def favorite(self): link = endpoint_links( identifier=f"{self.responseType}s", identifier2=self.id, identifier3=self.author["id"], ).favorite - results = self.session_manager.json_request(link, method="POST") + results = await self.session_manager.json_request(link, method="POST") self.isFavorite = True return results diff --git a/apis/onlyfans/classes/create_user.py b/apis/onlyfans/classes/create_user.py index c90d225f5..77f7a1f87 100644 --- a/apis/onlyfans/classes/create_user.py +++ b/apis/onlyfans/classes/create_user.py @@ -1,21 +1,16 @@ -from apis.onlyfans.classes.create_highlight import create_highlight -from apis.onlyfans.classes.create_message import create_message -from itertools import chain -from apis.onlyfans.classes.create_post import create_post -from apis.onlyfans.classes.create_story import create_story import math +from itertools import chain +from typing import Any, Optional from urllib import parse -from mergedeep.mergedeep import Strategy, merge +import apis.onlyfans.classes.create_auth as create_auth from apis import api_helper -from apis.onlyfans.classes.extras import ( - content_types, - endpoint_links, - handle_refresh, - media_types, -) -from apis.onlyfans.classes import create_auth -from typing import Any, Optional +from apis.onlyfans.classes.create_highlight import create_highlight +from apis.onlyfans.classes.create_message import create_message +from apis.onlyfans.classes.create_post import create_post +from apis.onlyfans.classes.create_story import create_story +from apis.onlyfans.classes.extras import (content_types, endpoint_links, + handle_refresh, media_types) class create_user: @@ -208,7 +203,7 @@ def __init__(self, option={}) -> None: self.pinnedPostsCount: int = option.get("pinnedPostsCount") self.maxPinnedPostsCount: int = option.get("maxPinnedPostsCount") # Custom - self.subscriber: Optional[create_auth] = None + self.subscriber: Optional[create_auth.create_auth] = None self.scraped = content_types() self.temp_scraped = content_types() self.session_manager: api_helper.session_manager = option.get("session_manager") @@ -225,7 +220,7 @@ def is_me(self) -> bool: status = True return status - def get_stories(self, refresh=True, limit=100, offset=0) -> list: + async def get_stories(self, refresh=True, limit=100, offset=0) -> list: api_type = "stories" if not refresh: result = handle_refresh(self, api_type) @@ -238,12 +233,12 @@ def get_stories(self, refresh=True, limit=100, offset=0) -> list: identifier=self.id, global_limit=limit, global_offset=offset ).stories_api ] - results = api_helper.scrape_endpoint_links(link, self.session_manager, api_type) + results = await api_helper.scrape_endpoint_links(link, self.session_manager, api_type) results = [create_story(x) for x in results] self.temp_scraped.Stories = results return results - def get_highlights( + async def get_highlights( self, identifier="", refresh=True, limit=100, offset=0, hightlight_id="" ) -> list: api_type = "highlights" @@ -257,17 +252,17 @@ def get_highlights( link = endpoint_links( identifier=identifier, global_limit=limit, global_offset=offset ).list_highlights - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) results = [create_highlight(x) for x in results] else: link = endpoint_links( identifier=hightlight_id, global_limit=limit, global_offset=offset ).highlight - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) results = [create_story(x) for x in results["stories"]] return results - def get_posts( + async def get_posts( self, links: Optional[list] = None, limit=10, offset=0, refresh=True ) -> list: api_type = "posts" @@ -289,24 +284,24 @@ def get_posts( link = link.replace(f"limit={limit}", f"limit={limit}") new_link = link.replace("offset=0", f"offset={num}") links.append(new_link) - results = api_helper.scrape_endpoint_links( + results = await api_helper.scrape_endpoint_links( links, self.session_manager, api_type ) final_results = [create_post(x, self.session_manager) for x in results] self.temp_scraped.Posts = final_results return final_results - def get_post(self, identifier=None, limit=10, offset=0): + async def get_post(self, identifier=None, limit=10, offset=0): if not identifier: identifier = self.id link = endpoint_links( identifier=identifier, global_limit=limit, global_offset=offset ).post_by_id - result = self.session_manager.json_request(link) + result = await self.session_manager.json_request(link) final_result = create_post(result) return final_result - def get_messages( + async def get_messages( self, links: Optional[list] = None, limit=10, @@ -336,13 +331,13 @@ def get_messages( links += links2 else: links = links2 - results = self.session_manager.parallel_requests(links) + results = await self.session_manager.async_requests(links) has_more = results[-1]["hasMore"] final_results = [x["list"] for x in results] final_results = list(chain.from_iterable(final_results)) if has_more: - results2 = self.get_messages( + results2 = await self.get_messages( links=[links[-1]], limit=limit, offset=limit + offset, inside_loop=True ) final_results.extend(results2) @@ -354,33 +349,37 @@ def get_messages( self.temp_scraped.Messages = final_results return final_results - def get_message_by_id( - self, identifier=None, identifier2=None, refresh=True, limit=10, offset=0 + async def get_message_by_id( + self, user_id=None, message_id=None, refresh=True, limit=10, offset=0 ): - if not identifier: - identifier = self.id + if not user_id: + user_id = self.id link = endpoint_links( - identifier=identifier, - identifier2=identifier2, + identifier=user_id, + identifier2=message_id, global_limit=limit, global_offset=offset, ).message_by_id - result = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) + results = [x for x in results["list"] if x["id"] == message_id] + result = result = results[0] if results else {} final_result = create_message(result) return final_result - def get_archived_stories(self, refresh=True, limit=100, offset=0): + async def get_archived_stories(self, refresh=True, limit=100, offset=0): api_type = "archived_stories" if not refresh: result = handle_refresh(self, api_type) if result: return result link = endpoint_links(global_limit=limit, global_offset=offset).archived_stories - results = self.session_manager.json_request(link) - self.archived_stories = results + results = await self.session_manager.json_request(link) + results = [create_story(x) for x in results if "error" not in x] + if results: + print return results - def get_archived_posts( + async def get_archived_posts( self, links: Optional[list] = None, limit=10, offset=0, refresh=True ) -> list: api_type = "archived_posts" @@ -402,56 +401,54 @@ def get_archived_posts( link = link.replace(f"limit={limit}", f"limit={limit}") new_link = link.replace("offset=0", f"offset={num}") links.append(new_link) - results = api_helper.scrape_endpoint_links( + results = await api_helper.scrape_endpoint_links( links, self.session_manager, api_type ) final_results = [create_post(x, self.session_manager) for x in results if x] self.temp_scraped.Archived.Posts = final_results return final_results - def get_archived(self, api): + async def get_archived(self, api): items = [] if self.is_me(): item = {} item["type"] = "Stories" - item["results"] = [self.get_archived_stories()] + item["results"] = [await self.get_archived_stories()] items.append(item) item = {} item["type"] = "Posts" # item["results"] = test - item["results"] = self.get_archived_posts() + item["results"] = await self.get_archived_posts() items.append(item) return items - def search_chat(self, identifier="", text="", refresh=True, limit=10, offset=0): + async def search_chat(self, identifier="", text="", refresh=True, limit=10, offset=0): if identifier: identifier = parse.urljoin(identifier, "messages") link = endpoint_links( identifier=identifier, text=text, global_limit=limit, global_offset=offset ).search_chat - session = self.session_manager.sessions[0] - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) return results - def search_messages(self, identifier="", text="", refresh=True, limit=10, offset=0): + async def search_messages(self, identifier="", text="", refresh=True, limit=10, offset=0): if identifier: identifier = parse.urljoin(identifier, "messages") text = parse.quote_plus(text) link = endpoint_links( identifier=identifier, text=text, global_limit=limit, global_offset=offset ).search_messages - session = self.session_manager.sessions[0] - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) return results - def like(self, category: str, identifier: int): + async def like(self, category: str, identifier: int): link = endpoint_links(identifier=category, identifier2=identifier).like - results = self.session_manager.json_request(link, method="POST") + results = await self.session_manager.json_request(link, method="POST") return results - def unlike(self, category: str, identifier: int): + async def unlike(self, category: str, identifier: int): link = endpoint_links(identifier=category, identifier2=identifier).like - results = self.session_manager.json_request(link, method="DELETE") + results = await self.session_manager.json_request(link, method="DELETE") return results def buy_subscription(self): @@ -465,5 +462,5 @@ def buy_subscription(self): } print - def set_scraped(self, name, scraped: media_types): + def set_scraped(self, name, scraped): setattr(self.scraped, name, scraped) diff --git a/apis/onlyfans/onlyfans.py b/apis/onlyfans/onlyfans.py index 05240b271..0afafe91f 100644 --- a/apis/onlyfans/onlyfans.py +++ b/apis/onlyfans/onlyfans.py @@ -1,62 +1,11 @@ +from typing import List, Optional, Union + from apis.onlyfans.classes import create_user -from apis.onlyfans.classes.extras import ( - auth_details, - content_types, - endpoint_links, -) from apis.onlyfans.classes.create_auth import create_auth -import time -import base64 -from typing import List, Optional, Union -from urllib.parse import urlparse -from urllib import parse -import hashlib -import math -from datetime import datetime -from dateutil.relativedelta import relativedelta -from itertools import chain, product -import requests +from apis.onlyfans.classes.extras import (auth_details, content_types, + endpoint_links) -from sqlalchemy.orm.session import Session from .. import api_helper -from mergedeep import merge, Strategy -import jsonpickle -import copy -from random import random - - -def create_signed_headers(link: str, auth_id: int, dynamic_rules: dict): - # Users: 300000 | Creators: 301000 - final_time = str(int(round(time.time()))) - path = urlparse(link).path - query = urlparse(link).query - path = path if not query else f"{path}?{query}" - a = [dynamic_rules["static_param"], final_time, path, str(auth_id)] - msg = "\n".join(a) - message = msg.encode("utf-8") - hash_object = hashlib.sha1(message) - sha_1_sign = hash_object.hexdigest() - sha_1_b = sha_1_sign.encode("ascii") - checksum = ( - sum([sha_1_b[number] for number in dynamic_rules["checksum_indexes"]]) - + dynamic_rules["checksum_constant"] - ) - headers = {} - headers["sign"] = dynamic_rules["format"].format(sha_1_sign, abs(checksum)) - headers["time"] = final_time - return headers - - -def session_rules(session_manager: api_helper.session_manager, link) -> dict: - headers = session_manager.headers - if "https://onlyfans.com/api2/v2/" in link: - dynamic_rules = session_manager.dynamic_rules - headers["app-token"] = dynamic_rules["app_token"] - # auth_id = headers["user-id"] - a = [link, 0, dynamic_rules] - headers2 = create_signed_headers(*a) - headers |= headers2 - return headers def session_retry_rules(r, link: str) -> int: @@ -81,7 +30,6 @@ def __init__( self, custom_request=callable, max_threads=-1, - original_sessions: List[requests.Session] = [], ) -> None: self.auths: list[create_auth] = [] self.subscriptions: list[create_user] = [] @@ -90,14 +38,17 @@ def __init__( self.lists = None self.endpoint_links = endpoint_links self.pool = api_helper.multiprocessing() - self.session_manager = api_helper.session_manager( - session_rules=session_rules, - session_retry_rules=session_retry_rules, - max_threads=max_threads, - original_sessions=original_sessions, - ) self.settings = {} + def add_auth(self, option={}, only_active=False): + if only_active and not option.get("active"): + return + auth = create_auth(pool=self.pool, max_threads=self.max_threads) + auth.auth_details = auth_details(option) + auth.extras["settings"] = self.settings + self.auths.append(auth) + return auth + def get_auth(self, identifier: Union[str, int]) -> Optional[create_auth]: final_auth = None for auth in self.auths: @@ -109,14 +60,7 @@ def get_auth(self, identifier: Union[str, int]) -> Optional[create_auth]: break return final_auth - def add_auth(self, option={}, only_active=False): - if only_active and not option.get("active"): - return - auth = create_auth(session_manager2=self.session_manager, pool=self.pool) - auth.auth_details = auth_details(option) - self.auths.append(auth) - return auth - def close_pools(self): self.pool.close() - self.session_manager.pool.close() + for auth in self.auths: + auth.session_manager.pool.close() diff --git a/classes/make_settings.py b/classes/make_settings.py index f17584fe5..20df5d5cb 100644 --- a/classes/make_settings.py +++ b/classes/make_settings.py @@ -1,11 +1,7 @@ import copy from typing import List, Union from urllib.parse import urlparse -from apis.onlyfans.onlyfans import auth_details import os - -import ujson -from classes.prepare_metadata import format_types, prepare_reformat import uuid as uuid def fix(config={}): diff --git a/classes/prepare_metadata.py b/classes/prepare_metadata.py index 1e35830f2..3e306f154 100644 --- a/classes/prepare_metadata.py +++ b/classes/prepare_metadata.py @@ -3,7 +3,7 @@ import copy from enum import unique import os -from typing import Dict +from typing import Dict, MutableMapping, Union from requests.api import get from helpers import main_helper @@ -15,7 +15,7 @@ class create_metadata(object): - def __init__(self, authed: create_auth = None, metadata: dict = {}, standard_format=False, api_type: str = "") -> None: + def __init__(self, authed: create_auth = None, metadata: Union[list,dict,MutableMapping] = {}, standard_format=False, api_type: str = "") -> None: self.version = global_version fixed_metadata = self.fix_metadata(metadata, standard_format, api_type) self.content = format_content( @@ -367,6 +367,7 @@ def __init__(self, option, keep_vars=False): self.text = option.get('text', format_variables2.text) self.date = option.get('postedAt', format_variables2.date) self.price = option.get('price', 0) + self.archived = option.get('archived', False) self.date_format = option.get('date_format') self.maximum_length = 255 self.text_length = option.get('text_length', self.maximum_length) diff --git a/datascraper/main_datascraper.py b/datascraper/main_datascraper.py index 4746a57bd..0464ed60f 100644 --- a/datascraper/main_datascraper.py +++ b/datascraper/main_datascraper.py @@ -24,7 +24,7 @@ # return text -def start_datascraper( +async def start_datascraper( json_config: dict, site_name_lower: str, api: Optional[OnlyFans.start] = None, @@ -46,10 +46,8 @@ def start_datascraper( identifiers = [] auto_profile_choice = json_site_settings["auto_profile_choice"] subscription_array = [] - original_sessions = [] - original_sessions = api_helper.create_session(settings=json_settings) - original_sessions = [x for x in original_sessions if x] - if not original_sessions: + proxies = await api_helper.test_proxies(json_settings["proxies"]) + if json_settings["proxies"] and not proxies: print("Unable to create session") return None archive_time = timeit.default_timer() @@ -58,8 +56,9 @@ def start_datascraper( module = m_onlyfans if not api: api = OnlyFans.start(max_threads=json_settings["max_threads"]) + api.settings = json_config api = main_helper.process_profiles( - json_settings, original_sessions, site_name, api + json_settings, proxies, site_name, api ) print @@ -78,12 +77,12 @@ def start_datascraper( auth.auth_details, json_config, json_site_settings, site_name ) setup = False - setup, subscriptions = module.account_setup( + setup, subscriptions = await module.account_setup( auth, identifiers, jobs, auth_count ) if not setup: if webhooks: - x = main_helper.process_webhooks(api, "auth_webhook", "failed") + x = await main_helper.process_webhooks(api, "auth_webhook", "failed") auth_details = {} auth_details["auth"] = auth.auth_details.__dict__ profile_directory = auth.profile_directory @@ -95,14 +94,14 @@ def start_datascraper( continue auth_count += 1 subscription_array += subscriptions - x = main_helper.process_webhooks(api, "auth_webhook", "succeeded") + x = await main_helper.process_webhooks(api, "auth_webhook", "succeeded") subscription_list = module.format_options(subscription_array, "usernames", api.auths) if jobs["scrape_paid_content"]: print("Scraping Paid Content") - paid_content = module.paid_content_scraper(api, identifiers) + paid_content = await module.paid_content_scraper(api, identifiers) if jobs["scrape_names"]: print("Scraping Subscriptions") - names = main_helper.process_names( + names = await main_helper.process_names( module, subscription_list, auto_model_choice, @@ -111,9 +110,9 @@ def start_datascraper( site_name_lower, site_name, ) - x = main_helper.process_downloads(api, module) + x = await main_helper.process_downloads(api, module) if webhooks: - x = main_helper.process_webhooks(api, "download_webhook", "succeeded") + x = await main_helper.process_webhooks(api, "download_webhook", "succeeded") elif site_name_lower == "starsavn": pass # site_name = "StarsAVN" diff --git a/extras/OFRenamer/start.py b/extras/OFRenamer/start.py index 8a2f5883c..b5d1c9230 100644 --- a/extras/OFRenamer/start.py +++ b/extras/OFRenamer/start.py @@ -15,10 +15,7 @@ def fix_directories(api,posts, all_files, database_session: scoped_session, fold def fix_directories(post: api_table, media_db: list[media_table]): delete_rows = [] - final_type = "" - if parent_type: - final_type = f"{api_type}{os.path.sep}{parent_type}" - final_type = final_type if final_type else api_type + final_api_type = os.path.join("Archived",api_type) if post.archived else api_type post_id = post.post_id media_db = [x for x in media_db if x.post_id == post_id] for media in media_db: @@ -42,7 +39,7 @@ def fix_directories(post: api_table, media_db: list[media_table]): option["post_id"] = post_id option["media_id"] = media_id option["username"] = username - option["api_type"] = final_type if parent_type else api_type + option["api_type"] = final_api_type option["media_type"] = media.media_type option["filename"] = original_filename option["ext"] = ext @@ -53,6 +50,7 @@ def fix_directories(post: api_table, media_db: list[media_table]): option["text_length"] = text_length option["directory"] = download_path option["preview"] = media.preview + option["archived"] = post.archived prepared_format = prepare_reformat(option) file_directory = main_helper.reformat( prepared_format, file_directory_format) @@ -173,5 +171,5 @@ def start(api,Session, parent_type, api_type, api_path, site_name, subscription, exit() else: import helpers.main_helper as main_helper - from apis.api_helper import multiprocessing from classes.prepare_metadata import format_types, prepare_reformat + diff --git a/helpers/db_helper.py b/helpers/db_helper.py index a6f35f62b..219903417 100644 --- a/helpers/db_helper.py +++ b/helpers/db_helper.py @@ -6,6 +6,7 @@ from alembic.config import Config from alembic import command from sqlalchemy.exc import IntegrityError +from sqlalchemy.sql.functions import func from database.databases.stories import stories from database.databases.posts import posts from database.databases.messages import messages @@ -37,7 +38,7 @@ def run_revisions(alembic_directory: str, database_path: str = ""): x = command.revision(alembic_cfg, autogenerate=True, message="content") -def run_migrations(alembic_directory: str, database_path: str, api) -> None: +def run_migrations(alembic_directory: str, database_path: str) -> None: ini_path = os.path.join(alembic_directory, "alembic.ini") script_location = os.path.join(alembic_directory, "alembic") full_database_path = f'sqlite:///{database_path}' @@ -89,3 +90,8 @@ def get_or_create(session: Session, model, defaults=None, fbkwargs={}): return instance, False else: return instance, True + +def get_count(q): + count_q = q.statement.with_only_columns([func.count()]).order_by(None) + count = q.session.execute(count_q).scalar() + return count diff --git a/helpers/main_helper.py b/helpers/main_helper.py index ec4d6f766..80e89e59e 100644 --- a/helpers/main_helper.py +++ b/helpers/main_helper.py @@ -1,35 +1,38 @@ -from apis.onlyfans.classes import create_user +from database.models.media_table import media_table +from sqlalchemy.orm.session import Session +from apis.onlyfans.classes.extras import content_types +import copy import json -from apis.onlyfans import onlyfans as OnlyFans import math -from types import SimpleNamespace -from typing import Any, Optional, Tuple, Union - -from sqlalchemy import inspect -from sqlalchemy.orm import declarative_base -from classes.prepare_metadata import format_variables -import copy import os import platform +import random import re -from datetime import datetime -from itertools import chain, zip_longest, groupby -import psutil import shutil -from multiprocessing.dummy import Pool as ThreadPool -import ujson -from tqdm import tqdm import string -import random +import traceback +from datetime import datetime +from itertools import chain, groupby, zip_longest +from multiprocessing.dummy import Pool as ThreadPool +from types import SimpleNamespace +from typing import Any, Optional, Tuple, Union +import classes.make_settings as make_settings +import classes.prepare_webhooks as prepare_webhooks +import psutil import requests +import ujson +from apis.onlyfans import onlyfans as OnlyFans +from apis.onlyfans.classes import create_user from bs4 import BeautifulSoup +from classes.prepare_metadata import format_variables, prepare_reformat +from mergedeep import Strategy, merge +from sqlalchemy import inspect +from sqlalchemy.orm import declarative_base +from tqdm import tqdm -import classes.make_settings as make_settings -import classes.prepare_webhooks as prepare_webhooks -from mergedeep import merge, Strategy import helpers.db_helper as db_helper -import traceback + json_global_settings = None min_drive_space = 0 webhooks = None @@ -58,7 +61,7 @@ def rename_duplicates(seen, filename): else: count = 1 while filename_lower in seen: - filename = filename+" ("+str(count)+")" + filename = filename + " (" + str(count) + ")" filename_lower = filename.lower() count += 1 seen.add(filename_lower) @@ -67,7 +70,7 @@ def rename_duplicates(seen, filename): def parse_links(site_name, input_link): if site_name in {"onlyfans", "starsavn"}: - username = input_link.rsplit('/', 1)[-1] + username = input_link.rsplit("/", 1)[-1] return username if site_name in {"patreon", "fourchan", "bbwchan"}: @@ -85,15 +88,13 @@ def parse_links(site_name, input_link): def clean_text(string, remove_spaces=False): matches = ["\n", "
"] for m in matches: - string = string.replace( - m, " ").strip() - string = ' '.join(string.split()) + string = string.replace(m, " ").strip() + string = " ".join(string.split()) string = BeautifulSoup(string, "lxml").get_text() SAFE_PTN = r"[|\^&+\-%*/=!:\"?><]" - string = re.sub(SAFE_PTN, ' ', string.strip() - ).strip() + string = re.sub(SAFE_PTN, " ", string.strip()).strip() if remove_spaces: - string = string.replace(' ', '_') + string = string.replace(" ", "_") return string @@ -112,6 +113,7 @@ def format_image(filepath, timestamp): try: if os_name == "Windows": from win32_setctime import setctime + setctime(filepath, timestamp) # print(f"Updated Creation Time {filepath}") os.utime(filepath, (timestamp, timestamp)) @@ -132,7 +134,7 @@ def filter_metadata(datas): def import_archive(archive_path) -> Any: metadata = {} if os.path.exists(archive_path) and os.path.getsize(archive_path): - with open(archive_path, 'r', encoding='utf-8') as outfile: + with open(archive_path, "r", encoding="utf-8") as outfile: while not metadata: try: metadata = ujson.load(outfile) @@ -155,7 +157,7 @@ def legacy_database_fixer(database_path, database, database_name, database_exist if database_exists: Session, engine = db_helper.create_database_session(database_path) database_session = Session() - result = inspect(engine).has_table('alembic_version') + result = inspect(engine).has_table("alembic_version") if not result: if not pre_alembic_database_exists: os.rename(old_database_path, pre_alembic_path) @@ -200,11 +202,81 @@ def legacy_database_fixer(database_path, database, database_name, database_exist datas.append(new_item) print database_session.close() - x = export_sqlite(old_database_path, datas, - database_name, legacy_fixer=True) + x = export_sqlite(old_database_path, datas, database_name, legacy_fixer=True) print +def fix_sqlite( + profile_directory, + download_directory, + metadata_directory, + format_directories, + site_name, + username, + metadata_directory_format, +): + items = content_types().__dict__.items() + final_metadatas = [] + for api_type, value in items: + mandatory_directories = {} + mandatory_directories["profile_directory"] = profile_directory + mandatory_directories["download_directory"] = download_directory + mandatory_directories["metadata_directory"] = metadata_directory + formatted_directories = format_directories( + mandatory_directories, + site_name, + username, + metadata_directory_format, + "", + api_type, + ) + final_metadata_directory = formatted_directories["metadata_directory"] + if all(final_metadata_directory != x for x in final_metadatas): + final_metadatas.append(final_metadata_directory) + print + print + for final_metadata in final_metadatas: + archived_database_path = os.path.join(final_metadata, "Archived.db") + if os.path.exists(archived_database_path): + Session2, engine = db_helper.create_database_session(archived_database_path) + database_session: Session = Session2() + cwd = os.getcwd() + for api_type, value in items: + database_path = os.path.join(final_metadata, f"{api_type}.db") + database_name = api_type.lower() + alembic_location = os.path.join( + cwd, "database", "databases", database_name + ) + result = inspect(engine).has_table(database_name) + if result: + db_helper.run_migrations(alembic_location, archived_database_path) + db_helper.run_migrations(alembic_location, database_path) + Session3, engine2 = db_helper.create_database_session(database_path) + db_collection = db_helper.database_collection() + database_session2: Session = Session3() + database = db_collection.chooser(database_name) + api_table = database.api_table + archived_result = database_session.query(api_table).all() + for item in archived_result: + result2 = ( + database_session2.query(api_table) + .filter(api_table.post_id == item.post_id) + .first() + ) + if not result2: + item2 = item.__dict__ + item2.pop("id") + item2.pop("_sa_instance_state") + item = api_table(**item2) + item.archived = True + database_session2.add(item) + database_session2.commit() + database_session2.close() + database_session.commit() + database_session.close() + os.remove(archived_database_path) + + def export_sqlite(archive_path, datas, parent_type, legacy_fixer=False, api=None): metadata_directory = os.path.dirname(archive_path) os.makedirs(metadata_directory, exist_ok=True) @@ -215,8 +287,7 @@ def export_sqlite(archive_path, datas, parent_type, legacy_fixer=False, api=None database_name = database_name.lower() db_collection = db_helper.database_collection() database = db_collection.chooser(database_name) - alembic_location = os.path.join( - cwd, "database", "databases", database_name) + alembic_location = os.path.join(cwd, "database", "databases", database_name) database_exists = os.path.exists(database_path) if database_exists: if os.path.getsize(database_path) == 0: @@ -224,15 +295,14 @@ def export_sqlite(archive_path, datas, parent_type, legacy_fixer=False, api=None database_exists = False if not legacy_fixer: x = legacy_database_fixer( - database_path, database, database_name, database_exists) - db_helper.run_migrations(alembic_location, database_path, api) + database_path, database, database_name, database_exists + ) + db_helper.run_migrations(alembic_location, database_path) print Session, engine = db_helper.create_database_session(database_path) database_session = Session() api_table = database.api_table media_table = database.media_table - # api_table = db_helper.api_table() - # media_table = db_helper.media_table() for post in datas: post_id = post["post_id"] @@ -240,8 +310,7 @@ def export_sqlite(archive_path, datas, parent_type, legacy_fixer=False, api=None date_object = None if postedAt: if not isinstance(postedAt, datetime): - date_object = datetime.strptime( - postedAt, "%d-%m-%Y %H:%M:%S") + date_object = datetime.strptime(postedAt, "%d-%m-%Y %H:%M:%S") else: date_object = postedAt result = database_session.query(api_table) @@ -254,6 +323,7 @@ def export_sqlite(archive_path, datas, parent_type, legacy_fixer=False, api=None post["price"] = 0 post_db.price = post["price"] post_db.paid = post["paid"] + post_db.archived = post["archived"] if date_object: post_db.created_at = date_object database_session.add(post_db) @@ -265,7 +335,8 @@ def export_sqlite(archive_path, datas, parent_type, legacy_fixer=False, api=None media_db = result.filter_by(media_id=media_id).first() if not media_db: media_db = result.filter_by( - filename=media["filename"], created_at=date_object).first() + filename=media["filename"], created_at=date_object + ).first() if not media_db: media_db = media_table() if legacy_fixer: @@ -298,7 +369,7 @@ def format_paths(j_directories, site_name): return paths -def reformat(prepared_format, unformatted): +def reformat(prepared_format: prepare_reformat, unformatted): post_id = prepared_format.post_id media_id = prepared_format.media_id date = prepared_format.date @@ -312,8 +383,7 @@ def reformat(prepared_format, unformatted): if type(date) is str: format_variables2 = format_variables() if date != format_variables2.date and date != "": - date = datetime.strptime( - date, "%d-%m-%Y %H:%M:%S") + date = datetime.strptime(date, "%d-%m-%Y %H:%M:%S") date = date.strftime(prepared_format.date_format) else: if date != None: @@ -329,8 +399,7 @@ def reformat(prepared_format, unformatted): value = "Paid" directory = prepared_format.directory path = unformatted.replace("{site_name}", prepared_format.site_name) - path = path.replace( - "{first_letter}", prepared_format.username[0].capitalize()) + path = path.replace("{first_letter}", prepared_format.username[0].capitalize()) path = path.replace("{post_id}", post_id) path = path.replace("{media_id}", media_id) path = path.replace("{username}", prepared_format.username) @@ -342,18 +411,18 @@ def reformat(prepared_format, unformatted): path = path.replace("{date}", date) directory_count = len(directory) path_count = len(path) - maximum_length = maximum_length - (directory_count+path_count-extra_count) + maximum_length = maximum_length - (directory_count + path_count - extra_count) text_length = text_length if text_length < maximum_length else maximum_length if has_text: - # https://stackoverflow.com/a/43848928 + # https://stackoverflow.com/a/43848928 def utf8_lead_byte(b): - '''A UTF-8 intermediate byte starts with the bits 10xxxxxx.''' + """A UTF-8 intermediate byte starts with the bits 10xxxxxx.""" return (b & 0xC0) != 0x80 def utf8_byte_truncate(text, max_bytes): - '''If text[max_bytes] is not a lead byte, back up until a lead byte is - found and truncate before that character.''' - utf8 = text.encode('utf8') + """If text[max_bytes] is not a lead byte, back up until a lead byte is + found and truncate before that character.""" + utf8 = text.encode("utf8") if len(utf8) <= max_bytes: return utf8 i = max_bytes @@ -361,7 +430,7 @@ def utf8_byte_truncate(text, max_bytes): i -= 1 return utf8[:i] - filtered_text = utf8_byte_truncate(text, text_length).decode('utf8') + filtered_text = utf8_byte_truncate(text, text_length).decode("utf8") path = path.replace("{text}", filtered_text) else: path = path.replace("{text}", "") @@ -388,7 +457,9 @@ def get_directory(directories: list[str], extra_path): return directory -def check_space(download_paths, min_size=min_drive_space, priority="download", create_directory=True): +def check_space( + download_paths, min_size=min_drive_space, priority="download", create_directory=True +): root = "" while not root: paths = [] @@ -429,9 +500,10 @@ def find_model_directory(username, directories) -> Tuple[str, bool]: def are_long_paths_enabled(): if os_name == "Windows": from ctypes import WinDLL, c_ubyte - ntdll = WinDLL('ntdll') - if hasattr(ntdll, 'RtlAreLongPathsEnabled'): + ntdll = WinDLL("ntdll") + + if hasattr(ntdll, "RtlAreLongPathsEnabled"): ntdll.RtlAreLongPathsEnabled.restype = c_ubyte ntdll.RtlAreLongPathsEnabled.argtypes = () @@ -452,8 +524,7 @@ def check_for_dupe_file(download_path, content_length): class download_session(tqdm): - def start(self, unit='B', unit_scale=True, - miniters=1, tsize=0): + def start(self, unit="B", unit_scale=True, miniters=1, tsize=0): self.unit = unit self.unit_scale = unit_scale self.miniters = miniters @@ -464,8 +535,9 @@ def start(self, unit='B', unit_scale=True, self.total += tsize def update_total_size(self, tsize): - tsize = int(tsize) - self.total += tsize + if tsize: + tsize = int(tsize) + self.total += tsize def update_to(self, b=1, bsize=1, tsize=None): x = bsize @@ -473,34 +545,6 @@ def update_to(self, b=1, bsize=1, tsize=None): self.update(b) -def downloader(r, download_path, d_session, count=0): - delete = False - try: - with open(download_path, 'wb') as f: - delete = True - for chunk in r.iter_content(chunk_size=4096): - if chunk: # filter out keep-alive new chunks - size = f.write(chunk) - d_session.update(size) - except (ConnectionResetError) as e: - if delete: - os.unlink(download_path) - return - except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError) as e: - return - except Exception as e: - if delete: - deleted = None - while not deleted: - try: - os.unlink(download_path) - deleted = True - except PermissionError as e2: - print(e2) - return - return True - - def get_config(config_path): if os.path.exists(config_path): json_config = ujson.load(open(config_path)) @@ -509,8 +553,9 @@ def get_config(config_path): json_config2 = copy.deepcopy(json_config) json_config = make_settings.fix(json_config) file_name = os.path.basename(config_path) - json_config = ujson.loads(json.dumps(make_settings.config( - **json_config), default=lambda o: o.__dict__)) + json_config = ujson.loads( + json.dumps(make_settings.config(**json_config), default=lambda o: o.__dict__) + ) updated = False if json_config != json_config2: updated = True @@ -518,14 +563,15 @@ def get_config(config_path): export_data(json_config, filepath) if not json_config: input( - f"The .settings\\{file_name} file has been created. Fill in whatever you need to fill in and then press enter when done.\n") + f"The .settings\\{file_name} file has been created. Fill in whatever you need to fill in and then press enter when done.\n" + ) json_config = ujson.load(open(config_path)) return json_config, updated def choose_auth(array): names = [] - array = [{"auth_count": -1, "username": "All"}]+array + array = [{"auth_count": -1, "username": "All"}] + array string = "" seperator = " | " name_count = len(array) @@ -534,14 +580,14 @@ def choose_auth(array): count = 0 for x in array: name = x["username"] - string += str(count)+" = "+name + string += str(count) + " = " + name names.append(x) - if count+1 != name_count: + if count + 1 != name_count: string += seperator count += 1 - print("Auth Usernames: "+string) + print("Auth Usernames: " + string) value = int(input().strip()) if value: names = [names[value]] @@ -550,7 +596,9 @@ def choose_auth(array): return names -def choose_option(subscription_list, auto_scrape: Union[str, bool], use_default_message=False): +def choose_option( + subscription_list, auto_scrape: Union[str, bool], use_default_message=False +): names = subscription_list[0] default_message = "" seperator = " | " @@ -562,13 +610,11 @@ def choose_option(subscription_list, auto_scrape: Union[str, bool], use_default_ if auto_scrape: values = [x[1] for x in names] else: - print( - f"{default_message}{subscription_list[1]}") + print(f"{default_message}{subscription_list[1]}") values = input().strip().split(",") else: if not auto_scrape: - print( - f"{default_message}{subscription_list[1]}") + print(f"{default_message}{subscription_list[1]}") values = input().strip().split(",") else: values = auto_scrape @@ -589,7 +635,7 @@ def choose_option(subscription_list, auto_scrape: Union[str, bool], use_default_ return new_names -def process_profiles(json_settings, original_sessions, site_name, api: Union[OnlyFans.start]): +def process_profiles(json_settings, proxies, site_name, api: Union[OnlyFans.start]): profile_directories = json_settings["profile_directories"] for profile_directory in profile_directories: x = os.path.join(profile_directory, site_name) @@ -603,32 +649,29 @@ def process_profiles(json_settings, original_sessions, site_name, api: Union[Onl temp_users.append("default") for user in temp_users: user_profile = os.path.join(x, user) - user_auth_filepath = os.path.join( - user_profile, "auth.json") + user_auth_filepath = os.path.join(user_profile, "auth.json") datas = {} if os.path.exists(user_auth_filepath): - temp_json_auth = ujson.load( - open(user_auth_filepath)) + temp_json_auth = ujson.load(open(user_auth_filepath)) json_auth = temp_json_auth["auth"] if not json_auth.get("active", None): continue json_auth["username"] = user - auth = api.add_auth( - json_auth) - auth.session_manager.add_sessions(original_sessions) + auth = api.add_auth(json_auth) + auth.session_manager.proxies = proxies auth.profile_directory = user_profile datas["auth"] = auth.auth_details.__dict__ if datas: - export_data( - datas, user_auth_filepath) + export_data(datas, user_auth_filepath) print print return api -def process_names(module, subscription_list, auto_scrape, api, json_config, site_name_lower, site_name) -> list: - names = choose_option( - subscription_list, auto_scrape, True) +async def process_names( + module, subscription_list, auto_scrape, api, json_config, site_name_lower, site_name +) -> list: + names = choose_option(subscription_list, auto_scrape, True) if not names: print("There's nothing to scrape.") for name in names: @@ -638,24 +681,22 @@ def process_names(module, subscription_list, auto_scrape, api, json_config, site name = name[-1] assign_vars(json_config) username = parse_links(site_name_lower, name) - result = module.start_datascraper( - authed, username, site_name) + result = await module.start_datascraper(authed, username, site_name) return names -def process_downloads(api, module): +async def process_downloads(api, module): for auth in api.auths: - subscriptions = auth.get_subscriptions(refresh=False) + subscriptions = await auth.get_subscriptions(refresh=False) for subscription in subscriptions: - download_info = subscription.download_info - if download_info: - module.download_media(auth, subscription) - if json_global_settings["helpers"]["delete_empty_directories"]: - delete_empty_directories( - download_info.get("base_directory", "")) + await module.prepare_downloads(subscription) + if json_global_settings["helpers"]["delete_empty_directories"]: + delete_empty_directories( + subscription.download_info.get("base_directory", "") + ) -def process_webhooks(api: Union[OnlyFans.start], category, category2): +async def process_webhooks(api: Union[OnlyFans.start], category, category2): global_webhooks = webhooks["global_webhooks"] global_status = webhooks["global_status"] webhook = webhooks[category] @@ -671,8 +712,9 @@ def process_webhooks(api: Union[OnlyFans.start], category, category2): webhook_links = webhook_state["webhooks"] if webhook_status: for auth in api.auths: - send_webhook(auth, webhook_hide_sensitive_info, - webhook_links, category, category2) + await send_webhook( + auth, webhook_hide_sensitive_info, webhook_links, category, category2 + ) print print @@ -684,24 +726,30 @@ def is_me(user_api): return False -def export_data(metadata: Union[list, dict], path: str, encoding: Optional[str] = "utf-8"): +async def write_data(download_path:str, data:bytes): + with open(download_path, "wb") as f: + f.write(data) + + +def export_data( + metadata: Union[list, dict], path: str, encoding: Optional[str] = "utf-8" +): directory = os.path.dirname(path) os.makedirs(directory, exist_ok=True) - with open(path, 'w', encoding=encoding) as outfile: + with open(path, "w", encoding=encoding) as outfile: ujson.dump(metadata, outfile, indent=2, escape_forward_slashes=False) def grouper(n, iterable, fillvalue: Optional[Union[str, int]] = None): args = [iter(iterable)] * n - grouped = list(zip_longest(fillvalue=fillvalue, *args)) + final_grouped = list(zip_longest(fillvalue=fillvalue, *args)) if not fillvalue: - grouped = [x for x in grouped if x] - return grouped - - -def create_link_group(max_threads): - x = range - print + grouped = [] + for group in final_grouped: + group = [x for x in group if x] + grouped.append(group) + final_grouped = grouped + return final_grouped def remove_mandatory_files(files, keep=[]): @@ -736,29 +784,31 @@ def metadata_fixer(directory): shutil.move(archive_file, new) -def ordinal(n): return "%d%s" % ( - n, "tsnrhtdd"[(n/10 % 10 != 1)*(n % 10 < 4)*n % 10::4]) +def ordinal(n): + return "%d%s" % (n, "tsnrhtdd"[(n / 10 % 10 != 1) * (n % 10 < 4) * n % 10 :: 4]) def id_generator(size=6, chars=string.ascii_uppercase + string.digits): - return ''.join(random.choice(chars) for _ in range(size)) + return "".join(random.choice(chars) for _ in range(size)) def humansize(nbytes): i = 0 - suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] - while nbytes >= 1024 and i < len(suffixes)-1: - nbytes /= 1024. + suffixes = ["B", "KB", "MB", "GB", "TB", "PB"] + while nbytes >= 1024 and i < len(suffixes) - 1: + nbytes /= 1024.0 i += 1 - f = ('%.2f' % nbytes).rstrip('0').rstrip('.') - return '%s %s' % (f, suffixes[i]) + f = ("%.2f" % nbytes).rstrip("0").rstrip(".") + return "%s %s" % (f, suffixes[i]) def byteToGigaByte(n): - return (n / math.pow(10, 9)) + return n / math.pow(10, 9) -def send_webhook(item, webhook_hide_sensitive_info, webhook_links, category, category2: str): +async def send_webhook( + item, webhook_hide_sensitive_info, webhook_links, category, category2: str +): if category == "auth_webhook": for webhook_link in webhook_links: auth = item @@ -770,11 +820,10 @@ def send_webhook(item, webhook_hide_sensitive_info, webhook_links, category, cat embed.title = f"Auth {category2.capitalize()}" embed.add_field("username", username) message.embeds.append(embed) - message = ujson.loads(json.dumps( - message, default=lambda o: o.__dict__)) + message = ujson.loads(json.dumps(message, default=lambda o: o.__dict__)) x = requests.post(webhook_link, json=message) if category == "download_webhook": - subscriptions:list[create_user] = item.get_subscriptions(refresh=False) + subscriptions: list[create_user] = await item.get_subscriptions(refresh=False) for subscription in subscriptions: download_info = subscription.download_info if download_info: @@ -787,14 +836,15 @@ def send_webhook(item, webhook_hide_sensitive_info, webhook_links, category, cat embed.add_field("link", subscription.get_link()) embed.image.url = subscription.avatar message.embeds.append(embed) - message = ujson.loads(json.dumps( - message, default=lambda o: o.__dict__)) + message = ujson.loads( + json.dumps(message, default=lambda o: o.__dict__) + ) x = requests.post(webhook_link, json=message) print def find_between(s, start, end): - format = f'{start}(.+?){end}' + format = f"{start}(.+?){end}" x = re.search(format, s) if x: x = x.group(1) @@ -815,6 +865,7 @@ def start(directory): content_count = len(contents) if content_count == 1 and "desktop.ini" in contents: shutil.rmtree(full_path, ignore_errors=True) + x = start(directory) if os.path.exists(directory): if not os.listdir(directory): @@ -843,9 +894,9 @@ def module_chooser(domain, json_sites): continue elif x not in wl: continue - string += str(count)+" = "+x + string += str(count) + " = " + x site_names.append(x) - if count+1 != site_count: + if count + 1 != site_count: string += seperator count += 1 @@ -853,3 +904,29 @@ def module_chooser(domain, json_sites): string = f"{domain} not supported" site_names = [] return string, site_names + + +def link_picker(media,video_quality): + link = "" + if "source" in media: + quality_key = "source" + source = media[quality_key] + link = source[quality_key] + if link: + if media["type"] == "video": + qualities = media["videoSources"] + qualities = dict(sorted(qualities.items(), reverse=False)) + qualities[quality_key] = source[quality_key] + for quality, quality_link in qualities.items(): + video_quality = video_quality.removesuffix("p") + if quality == video_quality: + if quality_link: + link = quality_link + break + print + print + print + if "src" in media: + link = media["src"] + return link + \ No newline at end of file diff --git a/modules/onlyfans.py b/modules/onlyfans.py index 764e1113e..d377bc757 100644 --- a/modules/onlyfans.py +++ b/modules/onlyfans.py @@ -1,43 +1,45 @@ -from apis.onlyfans.classes.create_message import create_message -from apis.onlyfans.classes.create_post import create_post -from apis.onlyfans.classes.create_story import create_story -from apis.onlyfans.classes.extras import auth_details, media_types -from apis.onlyfans.classes.create_user import create_user -from apis.onlyfans.classes.create_auth import create_auth +import copy import hashlib -import shutil -from apis.onlyfans import onlyfans as OnlyFans -from helpers import db_helper -from typing import Any, Optional, Union -from apis.onlyfans.onlyfans import start -from classes.prepare_metadata import create_metadata, format_content, prepare_reformat +import html +import json import os +import shutil from datetime import datetime, timedelta from itertools import chain, product +from types import SimpleNamespace +from typing import Any, Optional, Union from urllib.parse import urlparse -import copy -import json -import html -import extras.OFRenamer.start as ofrenamer - -import requests +import extras.OFLogin.start_ofl as oflogin +import extras.OFRenamer.start as ofrenamer import helpers.main_helper as main_helper -from types import SimpleNamespace -from mergedeep import merge, Strategy -from sqlalchemy.orm import session, sessionmaker, declarative_base +import requests +from apis.onlyfans import onlyfans as OnlyFans +from apis.onlyfans.classes.create_auth import create_auth +from apis.onlyfans.classes.create_message import create_message +from apis.onlyfans.classes.create_post import create_post +from apis.onlyfans.classes.create_story import create_story +from apis.onlyfans.classes.create_user import create_user +from apis.onlyfans.classes.extras import auth_details, media_types +from apis.onlyfans.onlyfans import start +from classes.prepare_metadata import create_metadata, format_content, prepare_reformat +from helpers import db_helper from helpers.main_helper import ( choose_option, download_session, export_data, + export_sqlite, + fix_sqlite, import_archive, ) -import extras.OFLogin.start_ofl as oflogin +from mergedeep import Strategy, merge +from sqlalchemy.orm import declarative_base, session, sessionmaker +from sqlalchemy.orm.scoping import scoped_session +import helpers.db_helper as db_helper site_name = "OnlyFans" json_config = None json_global_settings = None -max_threads = -1 json_settings = None auto_media_choice = "" profile_directory = "" @@ -57,11 +59,10 @@ def assign_vars(json_auth: auth_details, config, site_settings, site_name): - global json_config, json_global_settings, max_threads, json_settings, auto_media_choice, profile_directory, download_directory, metadata_directory, metadata_directory_format, delete_legacy_metadata, overwrite_files, date_format, file_directory_format, filename_format, ignored_keywords, ignore_type, blacklist_name, webhook, text_length + global json_config, json_global_settings, json_settings, auto_media_choice, profile_directory, download_directory, metadata_directory, metadata_directory_format, delete_legacy_metadata, overwrite_files, date_format, file_directory_format, filename_format, ignored_keywords, ignore_type, blacklist_name, webhook, text_length json_config = config json_global_settings = json_config["settings"] - max_threads = json_global_settings["max_threads"] json_settings = site_settings auto_media_choice = json_settings["auto_media_choice"] profile_directory = main_helper.get_directory( @@ -86,12 +87,12 @@ def assign_vars(json_auth: auth_details, config, site_settings, site_name): text_length = json_settings["text_length"] -def account_setup( +async def account_setup( auth: create_auth, identifiers: list = [], jobs: dict = {}, auth_count=0 ): status = False subscriptions = [] - authed = auth.login() + authed = await auth.login() if authed.active: profile_directory = json_global_settings["profile_directories"][0] profile_directory = os.path.abspath(profile_directory) @@ -105,12 +106,12 @@ def account_setup( imported = import_archive(metadata_filepath) if "auth" in imported: imported = imported["auth"] - mass_messages = authed.get_mass_messages(resume=imported) + mass_messages = await authed.get_mass_messages(resume=imported) if mass_messages: main_helper.export_data(mass_messages, metadata_filepath) # chats = api.get_chats() if identifiers or jobs["scrape_names"]: - subscriptions += manage_subscriptions( + subscriptions += await manage_subscriptions( authed, auth_count, identifiers=identifiers ) status = True @@ -131,13 +132,25 @@ def account_setup( # The start lol -def start_datascraper(authed: create_auth, identifier, site_name, choice_type=None): - subscription = authed.get_subscription(identifier=identifier) +async def start_datascraper( + authed: create_auth, identifier, site_name, choice_type=None +): + subscription = await authed.get_subscription(identifier=identifier) if not subscription: return [False, subscription] print("Scrape Processing") username = subscription.username print("Name: " + username) + some_list = [ + profile_directory, + download_directory, + metadata_directory, + format_directories, + site_name, + username, + metadata_directory_format, + ] + fix_sqlite(*some_list) api_array = scrape_choice(authed, subscription) api_array = format_options(api_array, "apis") apis = api_array[0] @@ -156,7 +169,7 @@ def start_datascraper(authed: create_auth, identifier, site_name, choice_type=No item["api_array"]["username"] = username item["api_array"]["subscription"] = subscription api_type = item["api_type"] - results = prepare_scraper(authed, site_name, item) + results = await prepare_scraper(authed, site_name, item) print print("Scrape Completed" + "\n") return [True, subscription] @@ -255,7 +268,9 @@ def scrape_choice(authed: create_auth, subscription): # Downloads the model's avatar and header -def profile_scraper(authed: create_auth, site_name, api_type, username, base_directory): +async def profile_scraper( + authed: create_auth, site_name, api_type, username, base_directory +): reformats = {} reformats["metadata_directory_format"] = json_settings["metadata_directory_format"] reformats["file_directory_format"] = json_settings["file_directory_format"] @@ -272,16 +287,16 @@ def profile_scraper(authed: create_auth, site_name, api_type, username, base_dir option["directory"] = base_directory a, b, c = prepare_reformat(option, keep_vars=True).reformat(reformats) print - y = authed.get_subscription(identifier=username) + y = await authed.get_subscription(identifier=username) override_media_types = [] avatar = y.avatar header = y.header if avatar: override_media_types.append(["Avatars", avatar]) - if header: + elif header: override_media_types.append(["Headers", header]) - d_session = download_session() - d_session.start(unit="B", unit_scale=True, miniters=1) + progress_bar = download_session() + progress_bar.start(unit="B", unit_scale=True, miniters=1) for override_media_type in override_media_types: new_dict = dict() media_type = override_media_type[0] @@ -290,26 +305,32 @@ def profile_scraper(authed: create_auth, site_name, api_type, username, base_dir directory2 = os.path.join(b, media_type) os.makedirs(directory2, exist_ok=True) download_path = os.path.join(directory2, media_link.split("/")[-2] + ".jpg") - if not overwrite_files: + response = await authed.session_manager.json_request( + media_link, + method="HEAD" + ) + if overwrite_files: if os.path.isfile(download_path): - continue - r = authed.session_manager.json_request( - media_link, stream=True, json_format=False, sleep=False + if os.path.getsize(download_path) == response.content_length: + continue + progress_bar.update_total_size(response.content_length) + response, data = await authed.session_manager.json_request( + media_link, + stream=True, + json_format=False, + sleep=False, + progress_bar=progress_bar, ) - if not isinstance(r, requests.Response): - continue - tsize = r.headers.get("content-length") - d_session.update_total_size(tsize) - downloaded = main_helper.downloader(r, download_path, d_session) + downloaded = await main_helper.write_data(download_path, data) if not downloaded: continue - d_session.close() + progress_bar.close() -def paid_content_scraper(api: start, identifiers=[]): +async def paid_content_scraper(api: start, identifiers=[]): for authed in api.auths: paid_contents = [] - paid_contents = authed.get_paid_content() + paid_contents = await authed.get_paid_content() if not authed.active: return authed.subscriptions = authed.subscriptions @@ -321,10 +342,13 @@ def paid_content_scraper(api: start, identifiers=[]): author = paid_content.author if not author: continue - subscription = authed.get_subscription(check=True, identifier=author["id"]) + subscription = await authed.get_subscription( + check=True, identifier=author["id"] + ) if not subscription: subscription = create_user(author) authed.subscriptions.append(subscription) + subscription.subscriber = authed api_type = paid_content.responseType.capitalize() + "s" api_media = getattr(subscription.temp_scraped, api_type) api_media.append(paid_content) @@ -388,11 +412,8 @@ def paid_content_scraper(api: start, identifiers=[]): authed, new_metadata, formatted_directories, - subscription, api_type, - api_path, metadata_path, - site_name, ) parent_type = "" new_metadata = new_metadata + old_metadata @@ -432,7 +453,7 @@ def process_messages(authed: create_auth, subscription, messages) -> list: return unrefined_set -def process_mass_messages( +async def process_mass_messages( authed: create_auth, subscription, metadata_directory, mass_messages ) -> list: def compare_message(queue_id, remote_messages): @@ -548,7 +569,7 @@ def compare_message(queue_id, remote_messages): print if mass_message["hashed_ip"] != hash or date_object > next_date_object: print("Getting Message By ID") - x = subscription.get_message_by_id( + x = await subscription.get_message_by_id( identifier=identifier, identifier2=found["id"], limit=1 ) new_found = x["result"]["list"][0] @@ -567,11 +588,8 @@ def process_legacy_metadata( authed: create_auth, new_metadata_set, formatted_directories, - subscription, api_type, - api_path, archive_path, - site_name, ): print("Processing metadata.") delete_metadatas = [] @@ -586,6 +604,7 @@ def process_legacy_metadata( os.makedirs(os.path.dirname(archive_path), exist_ok=True) shutil.move(legacy_metadata_path2, archive_path) archive_path = archive_path.replace("db", "json") + legacy_archive_path = archive_path.replace("Posts.json", "Archived.json") legacy_metadata_object, delete_legacy_metadatas = legacy_metadata_fixer( formatted_directories, authed ) @@ -593,6 +612,17 @@ def process_legacy_metadata( print("Merging new metadata with legacy metadata.") delete_metadatas.extend(delete_legacy_metadatas) old_metadata_set = import_archive(archive_path) + old_metadata_set2 = import_archive(legacy_archive_path) + if old_metadata_set2: + delete_metadatas.append(legacy_archive_path) + old_metadata_set_type = type(old_metadata_set) + old_metadata_set2_type = type(old_metadata_set2) + if all(v == dict for v in [old_metadata_set_type,old_metadata_set2_type]): + old_metadata_set = merge({}, *[old_metadata_set,old_metadata_set2], strategy=Strategy.ADDITIVE) + else: + if isinstance(old_metadata_set,dict) and not old_metadata_set: + old_metadata_set = [] + old_metadata_set.append(old_metadata_set2) old_metadata_object = create_metadata(authed, old_metadata_set, api_type=api_type) if old_metadata_set: print("Merging new metadata with old metadata.") @@ -603,6 +633,7 @@ def process_legacy_metadata( for value3 in value2: x = value3.medias item = value3.convert(keep_empty_items=True) + item["archived"] = False old_metadata_set.append(item) print print @@ -642,9 +673,7 @@ def process_metadata( subscription.download_info["webhook"] = webhook database_name = parent_type if parent_type else api_type subscription.download_info["metadata_locations"][api_type] = {} - subscription.download_info["metadata_locations"][api_type][ - database_name - ] = archive_path + subscription.download_info["metadata_locations"][api_type] = archive_path if json_global_settings["helpers"]["renamer"]: print("Renaming files.") new_metadata_object = ofrenamer.start( @@ -715,7 +744,7 @@ def format_directories( # Prepares the API links to be scraped -def prepare_scraper(authed: create_auth, site_name, item): +async def prepare_scraper(authed: create_auth, site_name, item): api_type = item["api_type"] api_array = item["api_array"] subscription: create_user = api_array["subscription"] @@ -739,29 +768,31 @@ def prepare_scraper(authed: create_auth, site_name, item): formatted_download_directory = formatted_directories["download_directory"] formatted_metadata_directory = formatted_directories["metadata_directory"] if api_type == "Profile": - profile_scraper( + await profile_scraper( authed, site_name, api_type, username, formatted_download_directory ) return True if api_type == "Stories": - master_set = subscription.get_stories() - highlights = subscription.get_highlights() + master_set = await subscription.get_stories() + master_set += await subscription.get_archived_stories() + highlights = await subscription.get_highlights() valid_highlights = [] for highlight in highlights: - highlight = subscription.get_highlights(hightlight_id=highlight.id) + highlight = await subscription.get_highlights(hightlight_id=highlight.id) valid_highlights.extend(highlight) master_set.extend(valid_highlights) print if api_type == "Posts": - master_set = subscription.get_posts() - if api_type == "Archived": - master_set = subscription.get_archived(authed) + master_set = await subscription.get_posts() + master_set += await subscription.get_archived_posts() + # if api_type == "Archived": + # master_set = await subscription.get_archived(authed) if api_type == "Messages": - unrefined_set = subscription.get_messages() + unrefined_set = await subscription.get_messages() mass_messages = getattr(authed, "mass_messages") if subscription.is_me() and mass_messages: mass_messages = getattr(authed, "mass_messages") - unrefined_set2 = process_mass_messages( + unrefined_set2 = await process_mass_messages( authed, subscription, formatted_metadata_directory, mass_messages ) unrefined_set += unrefined_set2 @@ -804,19 +835,16 @@ def prepare_scraper(authed: create_auth, site_name, item): ) unrefined_set = [x for x in unrefined_set] new_metadata = main_helper.format_media_set(unrefined_set) + metadata_path = os.path.join(formatted_metadata_directory, api_type + ".db") if new_metadata: new_metadata = new_metadata["content"] - metadata_path = os.path.join(formatted_metadata_directory, api_type + ".db") api_path = os.path.join(api_type, parent_type) old_metadata, delete_metadatas = process_legacy_metadata( authed, new_metadata, formatted_directories, - subscription, api_type, - api_path, metadata_path, - site_name, ) new_metadata = new_metadata + old_metadata subscription.set_scraped(api_type, new_metadata) @@ -999,7 +1027,6 @@ def media_scraper( new_set = {} new_set["content"] = [] directories = [] - session = authed.session_manager.sessions[0] if api_type == "Stories": pass if api_type == "Archived": @@ -1012,7 +1039,8 @@ def media_scraper( download_path = formatted_directories["download_directory"] for location in formatted_directories["locations"]: sorted_directories = copy.copy(location["sorted_directories"]) - master_date = "01-01-0001 00:00:00" + date_today = datetime.now() + master_date = datetime.strftime(date_today, "%d-%m-%Y %H:%M:%S") media_type = location["media_type"] alt_media_type = location["alt_media_type"] file_directory_format = json_settings["file_directory_format"] @@ -1031,11 +1059,12 @@ def media_scraper( seperator = " | " if print_output: print( - f"Scraping [{seperator.join(alt_media_type)}]. Should take less than a minute." + f"Scraping [{seperator.join(alt_media_type)}]. Should take less than a minute.\n" ) post_id = post_result.id new_post = {} new_post["medias"] = [] + new_post["archived"] = False rawText = "" text = "" previews = [] @@ -1053,6 +1082,7 @@ def media_scraper( previews = post_result.preview date = post_result.postedAt price = post_result.price + new_post["archived"] = post_result.isArchived if isinstance(post_result, create_message): if post_result.isReportedByMe: continue @@ -1093,30 +1123,8 @@ def media_scraper( new_post["price"] = price for media in post_result.media: media_id = media["id"] - size = 0 - link = "" preview_link = "" - if "source" in media: - quality_key = "source" - source = media[quality_key] - link = source[quality_key] - if link: - if media["type"] == "video": - qualities = media["videoSources"] - qualities = dict(sorted(qualities.items(), reverse=False)) - qualities[quality_key] = source[quality_key] - for quality, quality_link in qualities.items(): - video_quality_json = json_settings["video_quality"] - video_quality_json = video_quality_json.removesuffix("p") - if quality == video_quality_json: - if quality_link: - link = quality_link - break - print - print - print - if "src" in media: - link = media["src"] + link = main_helper.link_picker(media,json_settings["video_quality"]) matches = ["us", "uk", "ca", "ca2", "de"] if not link: @@ -1147,7 +1155,6 @@ def media_scraper( if media["type"] not in alt_media_type: continue - session.links.extend(new_media["links"]) matches = [s for s in ignored_keywords if s in final_text] if matches: print("Matches: ", matches) @@ -1155,13 +1162,15 @@ def media_scraper( filename = link.rsplit("/", 1)[-1] filename, ext = os.path.splitext(filename) ext = ext.__str__().replace(".", "").split("?")[0] - + final_api_type = ( + os.path.join("Archived", api_type) if new_post["archived"] else api_type + ) option = {} option = option | new_post option["site_name"] = "OnlyFans" option["media_id"] = media_id option["filename"] = filename - option["api_type"] = api_type + option["api_type"] = final_api_type option["media_type"] = media_type option["ext"] = ext option["username"] = username @@ -1169,6 +1178,7 @@ def media_scraper( option["text_length"] = text_length option["directory"] = download_path option["preview"] = new_media["preview"] + option["archived"] = new_post["archived"] prepared_format = prepare_reformat(option) file_directory = main_helper.reformat( @@ -1220,174 +1230,51 @@ def media_scraper( # Downloads scraped content -class download_media: - def __init__(self, authed: create_auth = None, subscription=None) -> None: - username = subscription.username - download_info = subscription.download_info - if download_info: - self.downloaded = True - metadata_locations = download_info["metadata_locations"] - directory = download_info["directory"] - for parent_type, value in metadata_locations.items(): - for api_type, metadata_path in value.items(): - Session, engine = db_helper.create_database_session(metadata_path) - database_session = Session() - database_name = api_type.lower() - db_collection = db_helper.database_collection() - database = db_collection.chooser(database_name) - api_table = database.api_table - media_table = database.media_table - result = database_session.query(media_table).all() - media_type_list = media_types() - for r in result: - item = getattr(media_type_list, r.media_type) - item.append(r) - media_type_list = media_type_list.__dict__ - for location, v in media_type_list.items(): - if location == "Texts": - continue - media_set = v - media_set_count = len(media_set) - if not media_set: - continue - string = "Download Processing\n" - string += f"Name: {username} | Type: {api_type} | Count: {media_set_count} {location} | Directory: {directory}\n" - print(string) - d_session = download_session() - d_session.start(unit="B", unit_scale=True, miniters=1) - pool = authed.session_manager.pool - pool.starmap( - self.prepare_download, - product( - media_set, - [authed], - [api_type], - [subscription], - [d_session], - ), - ) - d_session.close() - database_session.commit() - else: - self.downloaded = False - def prepare_download( - self, media, authed: create_auth, api_type, subscription: create_user, d_session - ): - return_bool = True - if not overwrite_files and media.downloaded: - return - count = 0 - sessions = [x for x in authed.session_manager.sessions if media.link in x.links] - if not sessions: - return - session = sessions[0] - while count < 11: - links = [media.link] - - def choose_link(session, links): - for link in links: - r = authed.session_manager.json_request( - link, session, "HEAD", stream=False, json_format=False - ) - if not isinstance(r, requests.Response): - continue - - header = r.headers - content_length = header.get("content-length") - if not content_length: - continue - content_length = int(content_length) - return [link, content_length] - - result = choose_link(session, links) - if not result: - new_result: Any = None - if api_type == "Messages": - new_result = subscription.get_message_by_id( - identifier2=media.post_id, limit=1 - ) - elif api_type == "Posts": - new_result = subscription.get_post(media.post_id) - else: - print - mandatory_directories = {} - mandatory_directories["profile_directory"] = profile_directory - mandatory_directories["download_directory"] = download_directory - mandatory_directories["metadata_directory"] = metadata_directory - media_type = format_media_types() - formatted_directories = format_directories( - mandatory_directories, - site_name, - subscription.username, - metadata_directory_format, - media_type, - api_type, - ) - unrefined_result = [media_scraper( - new_result, - authed, - subscription, - formatted_directories, - subscription.username, - api_type, - print_output=False, - )] - new_metadata = main_helper.format_media_set(unrefined_result) - new_metadata = new_metadata["content"] - found_post = main_helper.format_media_set(new_metadata) - if found_post: - found_media = [ - x - for x in found_post["medias"] - if x["media_id"] == media.media_id - ] - if found_media: - new_link = found_media[0]["links"][0] - media.link = new_link - count += 1 - continue - link = result[0] - content_length = result[1] - media.size = content_length - date_object = media.created_at - download_path = os.path.join(media.directory, media.filename) - timestamp = date_object.timestamp() - if not overwrite_files: - if main_helper.check_for_dupe_file(download_path, content_length): - main_helper.format_image(download_path, timestamp) - return_bool = False - media.downloaded = True - break - r = authed.session_manager.json_request( - link, session, stream=True, json_format=False +async def prepare_downloads(subscription: create_user): + download_info = subscription.download_info + if not download_info: + return + directory = download_info["directory"] + for api_type, metadata_path in download_info["metadata_locations"].items(): + Session, engine = db_helper.create_database_session(metadata_path) + database_session: scoped_session = Session() + database_name = api_type.lower() + db_collection = db_helper.database_collection() + database = db_collection.chooser(database_name) + api_table = database.api_table + media_table = database.media_table + settings = subscription.subscriber.extras["settings"]["supported"]["onlyfans"]["settings"] + overwrite_files = settings["overwrite_files"] + if overwrite_files: + download_list: Any = database_session.query(media_table).all() + media_set_count = len(download_list) + else: + download_list: Any = database_session.query(media_table).filter( + media_table.downloaded == False ) - if not isinstance(r, requests.Response): - return_bool = False - count += 1 - continue - d_session.update_total_size(content_length) - downloader = main_helper.downloader(r, download_path, d_session, count) - if not downloader: - count += 1 - continue - main_helper.format_image(download_path, timestamp) - media.downloaded = True - break - if not media.downloaded: - print(f"Download Failed: {media.link}") - d_session.colour = "Red" - - return return_bool + media_set_count = db_helper.get_count(download_list) + location = "" + string = "Download Processing\n" + string += f"Name: {subscription.username} | Type: {api_type} | Count: {media_set_count}{location} | Directory: {directory}\n" + if media_set_count: + print(string) + a = await subscription.session_manager.async_downloads( + download_list, subscription + ) + database_session.commit() + database_session.close() + print + print -def manage_subscriptions( +async def manage_subscriptions( authed: create_auth, auth_count=0, identifiers: list = [], refresh: bool = True ): - results = authed.get_subscriptions(identifiers=identifiers, refresh=refresh) + results = await authed.get_subscriptions(identifiers=identifiers, refresh=refresh) if blacklist_name: - r = authed.get_lists() + r = await authed.get_lists() if not r: return [False, []] new_results = [c for c in r if blacklist_name == c["name"]] @@ -1396,7 +1283,7 @@ def manage_subscriptions( list_users = item["users"] if int(item["usersCount"]) > 2: list_id = str(item["id"]) - list_users = authed.get_lists_users(list_id) + list_users = await authed.get_lists_users(list_id) users = list_users bl_ids = [x["username"] for x in users] results2 = results.copy() diff --git a/start_ofd.py b/start_ofd.py index cc5471ff0..96fdcd354 100755 --- a/start_ofd.py +++ b/start_ofd.py @@ -3,8 +3,7 @@ import os import time import traceback -import logging - +import asyncio main_test.version_check() main_test.check_config() @@ -25,34 +24,38 @@ string, site_names = main_helper.module_chooser(domain, json_sites) # logging.basicConfig(level=logging.DEBUG, format="%(message)s") - while True: - try: - if domain: - if site_names: - site_name = domain + async def main(): + while True: + try: + if domain: + if site_names: + site_name = domain + else: + print(string) + continue else: print(string) - continue - else: - print(string) - x = input() - if x == "x": - break - x = int(x) - site_name = site_names[x] - site_name_lower = site_name.lower() - api = main_datascraper.start_datascraper(json_config, site_name_lower) - if api: - api.close_pools() - if exit_on_completion: - print("Now exiting.") - exit(0) - elif not infinite_loop: - print("Input anything to continue") + x = input() + if x == "x": + break + x = int(x) + site_name = site_names[x] + site_name_lower = site_name.lower() + api = await main_datascraper.start_datascraper(json_config, site_name_lower) + if api: + api.close_pools() + if exit_on_completion: + print("Now exiting.") + exit(0) + elif not infinite_loop: + print("Input anything to continue") + input() + elif loop_timeout: + print("Pausing scraper for " + loop_timeout + " seconds.") + time.sleep(int(loop_timeout)) + except Exception as e: + print(traceback.format_exc()) input() - elif loop_timeout: - print("Pausing scraper for " + loop_timeout + " seconds.") - time.sleep(int(loop_timeout)) - except Exception as e: - print(traceback.format_exc()) - input() + + + asyncio.run(main())