From f49a5d6e9a9633148736db3e19f037d26a829a49 Mon Sep 17 00:00:00 2001 From: CRIMINAL Date: Tue, 8 Jun 2021 00:18:00 +0100 Subject: [PATCH] Implemented Asyncio Script now uses async functions to handle network requests and writing to disk. I did this because it's faster. Version of the script is still somewhat unstable when it comes to network requests, but I've scraped hundred of models with it and fixed multiple bugs. max_threads in the config now only controls how many network requests to make at a time, but the script will use all available threads to process data, so if you're running out of RAM, I suggest you lower max threads. I'll continue optimizing the script and making it as fast as possible. --- apis/api_helper.py | 520 ++++++++++++++++++--------- apis/onlyfans/classes/__init__.py | 2 +- apis/onlyfans/classes/create_auth.py | 109 +++--- apis/onlyfans/classes/create_post.py | 7 +- apis/onlyfans/classes/create_user.py | 103 +++--- apis/onlyfans/onlyfans.py | 86 +---- classes/make_settings.py | 4 - classes/prepare_metadata.py | 5 +- datascraper/main_datascraper.py | 25 +- extras/OFRenamer/start.py | 10 +- helpers/db_helper.py | 8 +- helpers/main_helper.py | 385 ++++++++++++-------- modules/onlyfans.py | 417 ++++++++------------- start_ofd.py | 63 ++-- 14 files changed, 903 insertions(+), 841 deletions(-) diff --git a/apis/api_helper.py b/apis/api_helper.py index 79544cfc..2e2d4509 100644 --- a/apis/api_helper.py +++ b/apis/api_helper.py @@ -1,31 +1,40 @@ +import asyncio import copy -import math +import hashlib +import os import re +import threading import time -from typing import Any, Union -from urllib.parse import urlparse - -import requests -from requests.sessions import Session -import ujson -import socket -import os +from itertools import chain, groupby, product, zip_longest from multiprocessing import cpu_count -from requests.adapters import HTTPAdapter from multiprocessing.dummy import Pool as ThreadPool -from itertools import product, chain, zip_longest, groupby from os.path import dirname as up -import threading +from random import randint +from typing import Any, Optional +from urllib.parse import urlparse + +import aiohttp +import helpers.main_helper as main_helper +import python_socks +import requests +from aiohttp import ClientSession +from aiohttp.client_reqrep import ClientResponse +from aiohttp_socks import ChainProxyConnector, ProxyConnector, ProxyType +from database.models.media_table import media_table + +from apis.onlyfans.classes import create_user path = up(up(os.path.realpath(__file__))) os.chdir(path) global_settings = {} -global_settings["dynamic_rules_link"] = "https://raw.githubusercontent.com/DATAHOARDERS/dynamic-rules/main/onlyfans.json" +global_settings[ + "dynamic_rules_link" +] = "https://raw.githubusercontent.com/DATAHOARDERS/dynamic-rules/main/onlyfans.json" -class set_settings(): +class set_settings: def __init__(self, option={}): global global_settings self.proxies = option.get("proxies") @@ -35,33 +44,46 @@ def __init__(self, option={}): def chunks(l, n): - final = [l[i * n:(i + 1) * n] for i in range((len(l) + n - 1) // n)] + final = [l[i * n : (i + 1) * n] for i in range((len(l) + n - 1) // n)] return final -def multiprocessing(max_threads=None): +def calculate_max_threads(max_threads=None): if not max_threads: max_threads = -1 max_threads2 = cpu_count() if max_threads < 1 or max_threads >= max_threads2: - pool = ThreadPool() - else: - pool = ThreadPool(max_threads) + max_threads = max_threads2 + return max_threads + + +def multiprocessing(max_threads=None): + max_threads = calculate_max_threads(max_threads) + pool = ThreadPool(max_threads) return pool -class session_manager(): - def __init__(self, original_sessions=[], headers: dict = {}, session_rules=None, session_retry_rules=None, max_threads=-1) -> None: +class session_manager: + def __init__( + self, + auth, + original_sessions=[], + headers: dict = {}, + proxies: list[str] = [], + session_retry_rules=None, + max_threads=-1, + ) -> None: self.sessions = self.add_sessions(original_sessions) - self.pool = multiprocessing(max_threads) + self.pool = multiprocessing() self.max_threads = max_threads self.kill = False self.headers = headers - self.session_rules = session_rules + self.proxies: list[str] = proxies self.session_retry_rules = session_retry_rules dr_link = global_settings["dynamic_rules_link"] dynamic_rules = requests.get(dr_link).json() self.dynamic_rules = dynamic_rules + self.auth = auth def add_sessions(self, original_sessions: list, overwrite_old_sessions=True): if overwrite_old_sessions: @@ -82,6 +104,7 @@ def stimulate_sessions(self): def do(session_manager): while not session_manager.kill: for session in session_manager.sessions: + def process_links(link, session): r = session.get(link) text = r.text.strip("\n") @@ -89,7 +112,8 @@ def process_links(link, session): print else: found_dupe = [ - x for x in session_manager.sessions if x.ip == text] + x for x in session_manager.sessions if x.ip == text + ] if found_dupe: return cloned_session = copy.deepcopy(session) @@ -99,140 +123,295 @@ def process_links(link, session): print(text) print return text + time.sleep(62) link = "https://checkip.amazonaws.com" ip = process_links(link, session) print + t1 = threading.Thread(target=do, args=[self]) t1.start() - def json_request(self, link: str, session: Union[Session] = None, method="GET", stream=False, json_format=True, data={}, sleep=True, timeout=20, ignore_rules=False, force_json=False) -> Any: + async def json_request( + self, + link: str, + session: Optional[ClientSession] = None, + method="GET", + stream=False, + json_format=True, + data={}, + progress_bar=None, + sleep=True, + timeout=20, + ignore_rules=False, + force_json=False, + ) -> Any: headers = {} + custom_session = False if not session: - session = self.sessions[0] - if self.session_rules and not ignore_rules: - headers |= self.session_rules(self, link) - session = copy.deepcopy(session) - count = 0 - sleep_number = 0.5 - result = {} - while count < 11: - try: - count += 1 - if json_format: - headers["accept"] = "application/json, text/plain, */*" - if data: - r = session.request(method, link, json=data, - stream=stream, timeout=timeout, headers=headers) + custom_session = True + proxies = self.proxies + proxy = self.proxies[randint(0, len(proxies) - 1)] if proxies else "" + connector = ProxyConnector.from_url(proxy) if proxy else None + session = ClientSession( + connector=connector, cookies=self.auth.cookies, read_timeout=None + ) + headers = self.session_rules(link) + headers["accept"] = "application/json, text/plain, */*" + headers["Connection"] = "keep-alive" + request_method = None + if method == "HEAD": + request_method = session.head + elif method == "GET": + request_method = session.get + elif method == "POST": + request_method = session.post + elif method == "DELETE": + request_method = session.delete + try: + async with request_method(link, headers=headers) as response: + if method == "HEAD": + result = response else: - r = session.request( - method, link, stream=stream, timeout=timeout, headers=headers) - if self.session_retry_rules: - rule = self.session_retry_rules(r, link) - if rule == 1: - continue - elif rule == 2: - break - if json_format: - content_type = r.headers['Content-Type'] - matches = ["application/json;", "application/vnd.api+json"] - if not force_json and all(match not in content_type for match in matches): - continue - text = r.text - if not text: - message = "ERROR: 100 Posts skipped. Please post the username you're trying to scrape on the issue "'100 Posts Skipped'"" - return result - return ujson.loads(text) - else: - return r - except (ConnectionResetError) as e: - continue - except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError, requests.exceptions.ReadTimeout, socket.timeout) as e: - if sleep: - time.sleep(sleep_number) - sleep_number += 0.5 - continue - except Exception as e: - print(e) - continue - return result - - def parallel_requests(self, items: list[str]): - def multi(link): - result = self.json_request(link) - return result - results = self.pool.starmap(multi, product( - items)) + if json_format and not stream: + result = await response.json() + elif stream and not json_format: + buffer = [] + if response.status == 200: + async for data in response.content.iter_chunked(4096): + buffer.append(data) + length = len(data) + progress_bar.update(length) + else: + if response.content_length: + progress_bar.update_total_size(-response.content_length) + final_buffer = b"".join(buffer) + result = [response, final_buffer] + print + else: + result = await response.read() + if custom_session: + await session.close() + return result + except aiohttp.ClientConnectorError as e: + return + + async def async_requests(self, items: list[str], json_format=True): + tasks = [] + + async def run(links): + proxies = self.proxies + proxy = self.proxies[randint(0, len(proxies) - 1)] if proxies else "" + connector = ProxyConnector.from_url(proxy) if proxy else None + async with ClientSession( + connector=connector, cookies=self.auth.cookies, read_timeout=None + ) as session: + for link in links: + task = asyncio.ensure_future(self.json_request(link, session)) + tasks.append(task) + responses = await asyncio.gather(*tasks) + return responses + + results = await asyncio.ensure_future(run(items)) return results + async def async_downloads( + self, download_list: list[media_table], subscription: create_user + ): + async def run(download_list: list[media_table]): + proxies = self.proxies + proxy = self.proxies[randint(0, len(proxies) - 1)] if proxies else "" + connector = ProxyConnector.from_url(proxy) if proxy else None + async with ClientSession( + connector=connector, cookies=self.auth.cookies, read_timeout=None + ) as session: + tasks = [] + # Get content_lengths + for download_item in download_list: + link = download_item.link + if link: + task = asyncio.ensure_future( + self.json_request( + download_item.link, + session, + method="HEAD", + json_format=False, + ) + ) + tasks.append(task) + responses = await asyncio.gather(*tasks) + tasks.clear() -def create_session(settings={}, custom_proxy="", test_ip=True): - - def test_session(proxy=None, cert=None, max_threads=None): - session = requests.Session() - proxy_type = {'http': proxy, - 'https': proxy} - if proxy: - session.proxies = proxy_type - if cert: - session.verify = cert - session.mount( - 'https://', HTTPAdapter(pool_connections=max_threads, pool_maxsize=max_threads)) - if test_ip: - link = 'https://checkip.amazonaws.com' - r = session_manager().json_request( - link, session, json_format=False, sleep=False) - if not isinstance(r, requests.Response): - print(f"Proxy Not Set: {proxy}\n") - return - ip = r.text.strip() - print("Session IP: "+ip+"\n") - setattr(session, "ip", ip) - setattr(session, "links", []) - return session - sessions = [] - settings = set_settings(settings) - pool = multiprocessing() - max_threads = pool._processes - proxies = settings.proxies - cert = settings.cert - while not sessions: - proxies = [custom_proxy] if custom_proxy else proxies - if proxies: - with pool: - sessions = pool.starmap(test_session, product( - proxies, [cert], [max_threads])) - else: - session = test_session(max_threads=max_threads) - sessions.append(session) - return sessions + async def check(download_item: media_table, response: ClientResponse): + filepath = os.path.join( + download_item.directory, download_item.filename + ) + if response.status == 200: + if response.content_length: + download_item.size = response.content_length + if os.path.exists(filepath): + if os.path.getsize(filepath) == response.content_length: + download_item.downloaded = True + else: + return download_item + else: + return download_item -def assign_session(medias, item, key_one="link", key_two="count", show_item=False, capped=False): - count = 0 - activate_cap = False - number = len(item) - medias2 = [] - for auth in medias: - media2 = {} - media2[key_one] = auth - if not number: - count = -1 - if activate_cap: - media2[key_two] = -1 - else: - if show_item: - media2[key_two] = item[count] - else: - media2[key_two] = count + for download_item in download_list: + temp_response = [ + response + for response in responses + if response and response.url.name == download_item.filename + ] + if temp_response: + temp_response = temp_response[0] + task = check(download_item, temp_response) + tasks.append(task) + result = await asyncio.gather(*tasks) + download_list = [x for x in result if x] + tasks.clear() + progress_bar = None + if download_list: + progress_bar = main_helper.download_session() + progress_bar.start(unit="B", unit_scale=True, miniters=1) + [progress_bar.update_total_size(x.size) for x in download_list] - medias2.append(media2) - count += 1 - if count >= number: - count = 0 - if capped: - activate_cap = True - return medias2 + async def process_download(download_item: media_table): + response = await self.download_content( + download_item, session, progress_bar, subscription + ) + if response: + data, download_item = response.values() + if data: + download_path = os.path.join( + download_item.directory, download_item.filename + ) + os.makedirs(os.path.dirname(download_path), exist_ok=True) + with open(download_path, "wb") as f: + f.write(data) + download_item.size = len(data) + download_item.downloaded = True + + max_threads = calculate_max_threads(self.max_threads) + download_groups = main_helper.grouper(max_threads, download_list) + for download_group in download_groups: + tasks = [] + for download_item in download_group: + task = process_download(download_item) + if task: + tasks.append(task) + result = await asyncio.gather(*tasks) + if isinstance(progress_bar, main_helper.download_session): + progress_bar.close() + return True + + results = await asyncio.ensure_future(run(download_list)) + return results + + async def download_content( + self, + download_item: media_table, + session: ClientSession, + progress_bar, + subscription: create_user, + ): + attempt_count = 1 + new_task = {} + while attempt_count <= 3: + attempt_count += 1 + if not download_item.link: + continue + response: ClientResponse + response, task = await asyncio.ensure_future( + self.json_request( + download_item.link, + session, + json_format=False, + stream=True, + progress_bar=progress_bar, + ) + ) + if response.status != 200: + task = None + if response.content_length: + progress_bar.update_total_size(-response.content_length) + api_type = download_item.__module__.split(".")[-1] + post_id = download_item.post_id + new_result = None + if api_type == "messages": + new_result = await subscription.get_message_by_id( + message_id=post_id + ) + elif api_type == "posts": + new_result = await subscription.get_post(post_id) + print + if new_result and new_result.media: + media_list = [ + x for x in new_result.media if x["id"] == download_item.media_id + ] + if media_list: + media = media_list[0] + quality = subscription.subscriber.extras["settings"][ + "supported" + ]["onlyfans"]["settings"]["video_quality"] + link = main_helper.link_picker(media, quality) + download_item.link = link + continue + new_task["response"] = task + new_task["download_item"] = download_item + break + return new_task + + def session_rules(self, link: str) -> dict: + headers = self.headers + if "https://onlyfans.com/api2/v2/" in link: + dynamic_rules = self.dynamic_rules + headers["app-token"] = dynamic_rules["app_token"] + # auth_id = headers["user-id"] + a = [link, 0, dynamic_rules] + headers2 = self.create_signed_headers(*a) + headers |= headers2 + return headers + + def create_signed_headers(self, link: str, auth_id: int, dynamic_rules: dict): + # Users: 300000 | Creators: 301000 + final_time = str(int(round(time.time()))) + path = urlparse(link).path + query = urlparse(link).query + path = path if not query else f"{path}?{query}" + a = [dynamic_rules["static_param"], final_time, path, str(auth_id)] + msg = "\n".join(a) + message = msg.encode("utf-8") + hash_object = hashlib.sha1(message) + sha_1_sign = hash_object.hexdigest() + sha_1_b = sha_1_sign.encode("ascii") + checksum = ( + sum([sha_1_b[number] for number in dynamic_rules["checksum_indexes"]]) + + dynamic_rules["checksum_constant"] + ) + headers = {} + headers["sign"] = dynamic_rules["format"].format(sha_1_sign, abs(checksum)) + headers["time"] = final_time + return headers + + +async def test_proxies(proxies: list[str]): + final_proxies = [] + for proxy in proxies: + connector = ProxyConnector.from_url(proxy) if proxy else None + async with ClientSession(connector=connector) as session: + link = "https://checkip.amazonaws.com" + try: + response = await session.get(link) + ip = await response.text() + ip = ip.strip() + print("Session IP: " + ip + "\n") + final_proxies.append(proxy) + except python_socks._errors.ProxyConnectionError as e: + print(f"Proxy Not Set: {proxy}\n") + continue + return final_proxies def restore_missing_data(master_set2, media_set, split_by): @@ -241,16 +420,15 @@ def restore_missing_data(master_set2, media_set, split_by): for item in media_set: if not item: link = master_set2[count] - offset = int(link.split('?')[-1].split('&')[1].split("=")[1]) + offset = int(link.split("?")[-1].split("&")[1].split("=")[1]) limit = int(link.split("?")[-1].split("&")[0].split("=")[1]) - if limit == split_by+1: + if limit == split_by + 1: break offset2 = offset - limit2 = int(limit/split_by) - for item in range(1, split_by+1): - link2 = link.replace("limit="+str(limit), "limit="+str(limit2)) - link2 = link2.replace( - "offset="+str(offset), "offset="+str(offset2)) + limit2 = int(limit / split_by) + for item in range(1, split_by + 1): + link2 = link.replace("limit=" + str(limit), "limit=" + str(limit2)) + link2 = link2.replace("offset=" + str(offset), "offset=" + str(offset2)) offset2 += limit2 new_set.append(link2) count += 1 @@ -258,40 +436,31 @@ def restore_missing_data(master_set2, media_set, split_by): return new_set -def scrape_endpoint_links(links, session_manager: session_manager, api_type): - def multi(item): - link = item["link"] - item = {} - session = session_manager.sessions[0] - result = session_manager.json_request(link, session) - if "error" in result: - result = [] - if result: - item["session"] = session - item["result"] = result - return item +async def scrape_endpoint_links(links, session_manager: session_manager, api_type): media_set = [] max_attempts = 100 api_type = api_type.capitalize() for attempt in list(range(max_attempts)): if not links: continue - print("Scrape Attempt: "+str(attempt+1)+"/"+str(max_attempts)) - results = session_manager.parallel_requests(links) + print("Scrape Attempt: " + str(attempt + 1) + "/" + str(max_attempts)) + results = await session_manager.async_requests(links) not_faulty = [x for x in results if x] - faulty = [{"key": k, "value": v, "link": links[k]} - for k, v in enumerate(results) if not v] - last_number = len(results)-1 + faulty = [ + {"key": k, "value": v, "link": links[k]} + for k, v in enumerate(results) + if not v + ] + last_number = len(results) - 1 if faulty: positives = [x for x in faulty if x["key"] != last_number] false_positive = [x for x in faulty if x["key"] == last_number] if positives: attempt = attempt if attempt > 1 else attempt + 1 - num = int(len(faulty)*(100/attempt)) + num = int(len(faulty) * (100 / attempt)) split_by = 2 - print("Missing "+str(num)+" Posts... Retrying...") - links = restore_missing_data( - links, results, split_by) + print("Missing " + str(num) + " Posts... Retrying...") + links = restore_missing_data(links, results, split_by) media_set.extend(not_faulty) if not positives and false_positive: media_set.extend(results) @@ -299,26 +468,21 @@ def multi(item): print else: media_set.extend(results) - print("Found: "+api_type) + print("Found: " + api_type) break media_set = list(chain(*media_set)) return media_set -def grouper(n, iterable, fillvalue=None): - args = [iter(iterable)] * n - return list(zip_longest(fillvalue=fillvalue, *args)) - - def calculate_the_unpredictable(link, limit, multiplier=1): final_links = [] - a = list(range(1, multiplier+1)) + a = list(range(1, multiplier + 1)) for b in a: parsed_link = urlparse(link) q = parsed_link.query.split("&") offset = q[1] old_offset_num = int(re.findall("\\d+", offset)[0]) - new_offset_num = old_offset_num+(limit*b) + new_offset_num = old_offset_num + (limit * b) new_link = link.replace(offset, f"offset={new_offset_num}") final_links.append(new_link) return final_links diff --git a/apis/onlyfans/classes/__init__.py b/apis/onlyfans/classes/__init__.py index e3a34350..d98714a8 100644 --- a/apis/onlyfans/classes/__init__.py +++ b/apis/onlyfans/classes/__init__.py @@ -3,4 +3,4 @@ from apis.onlyfans.classes.create_post import create_post from apis.onlyfans.classes.create_story import create_story from apis.onlyfans.classes.create_message import create_message -from apis.onlyfans.classes.create_highlight import create_highlight \ No newline at end of file +from apis.onlyfans.classes.create_highlight import create_highlight diff --git a/apis/onlyfans/classes/create_auth.py b/apis/onlyfans/classes/create_auth.py index 00457d89..b30b0e44 100644 --- a/apis/onlyfans/classes/create_auth.py +++ b/apis/onlyfans/classes/create_auth.py @@ -1,35 +1,27 @@ +import asyncio +import math from datetime import datetime -from apis.onlyfans.classes.create_post import create_post -from apis.onlyfans.classes.create_message import create_message from itertools import chain, product -from apis import api_helper -from apis.onlyfans.classes.extras import ( - auth_details, - content_types, - create_headers, - endpoint_links, - error_details, - handle_refresh, -) -from apis.onlyfans.classes.create_user import create_user - -import requests from typing import Any, Optional, Union -from apis.api_helper import session_manager -import copy -from user_agent import generate_user_agent -import math + import jsonpickle +from apis import api_helper +from apis.onlyfans.classes.create_message import create_message +from apis.onlyfans.classes.create_post import create_post +from apis.onlyfans.classes.create_user import create_user +from apis.onlyfans.classes.extras import (auth_details, content_types, + create_headers, endpoint_links, + error_details, handle_refresh) from dateutil.relativedelta import relativedelta +from user_agent import generate_user_agent class create_auth: def __init__( self, - session_manager2: session_manager, option={}, - init=False, pool=None, + max_threads=-1, ) -> None: self.id = option.get("id") self.username: str = option.get("username") @@ -47,10 +39,10 @@ def __init__( self.archived_stories = {} self.mass_messages = [] self.paid_content = [] - session_manager2 = copy.copy(session_manager2) - self.session_manager = session_manager2 + self.session_manager = api_helper.session_manager(self, max_threads=max_threads) self.pool = pool self.auth_details: Optional[auth_details] = None + self.cookies: dict = {} self.profile_directory = option.get("profile_directory", "") self.guest = False self.active = False @@ -63,7 +55,7 @@ def update(self, data): if found_attr: setattr(self, key, value) - def login(self, full=False, max_attempts=10, guest=False): + async def login(self, full=False, max_attempts=10, guest=False): auth_version = "(V1)" if guest: self.auth_details.auth_id = "0" @@ -84,22 +76,22 @@ def login(self, full=False, max_attempts=10, guest=False): dynamic_rules = self.session_manager.dynamic_rules a = [dynamic_rules, auth_id, user_agent, x_bc, auth_items.sess, link] self.session_manager.headers = create_headers(*a) - if not self.session_manager.sessions: - self.session_manager.add_sessions([requests.Session()]) if guest: print("Guest Authentication") return self for session in self.session_manager.sessions: for auth_cookie in auth_cookies: session.cookies.set(**auth_cookie) + + self.cookies = {d["name"]: d["value"] for d in auth_cookies} count = 1 while count < max_attempts + 1: string = f"Auth {auth_version} Attempt {count}/{max_attempts}" print(string) - self.get_authed() + await self.get_authed() count += 1 - def resolve_auth(auth: create_auth): + async def resolve_auth(auth: create_auth): if self.errors: error = self.errors[-1] print(error.message) @@ -114,7 +106,7 @@ def resolve_auth(auth: create_auth): ) code = input("Enter 2FA Code\n") data = {"code": code, "rememberMe": True} - r = self.session_manager.json_request( + r = await self.session_manager.json_request( link, method="POST", data=data ) if "error" in r: @@ -126,7 +118,7 @@ def resolve_auth(auth: create_auth): auth.errors.remove(error) break - resolve_auth(self) + await resolve_auth(self) if not self.active: if self.errors: error = self.errors[-1] @@ -145,12 +137,10 @@ def resolve_auth(auth: create_auth): break return self - def get_authed(self): + async def get_authed(self): if not self.active: link = endpoint_links().customer - r = self.session_manager.json_request( - link, self.session_manager.sessions[0], sleep=False - ) + r = await self.session_manager.json_request(link, sleep=False) if r: self.resolve_auth_errors(r) if not self.errors: @@ -181,7 +171,7 @@ def resolve_auth_errors(self, r): else: self.errors.clear() - def get_lists(self, refresh=True, limit=100, offset=0): + async def get_lists(self, refresh=True, limit=100, offset=0): api_type = "lists" if not self.active: return @@ -189,19 +179,18 @@ def get_lists(self, refresh=True, limit=100, offset=0): subscriptions = handle_refresh(self, api_type) return subscriptions link = endpoint_links(global_limit=limit, global_offset=offset).lists - session = self.session_manager.sessions[0] - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) self.lists = results return results - def get_user(self, identifier: Union[str, int]): + async def get_user(self, identifier: Union[str, int]): link = endpoint_links(identifier).users - result = self.session_manager.json_request(link) + result = await self.session_manager.json_request(link) result["session_manager"] = self.session_manager result = create_user(result) if "error" not in result else result return result - def get_lists_users( + async def get_lists_users( self, identifier, check: bool = False, refresh=True, limit=100, offset=0 ): if not self.active: @@ -209,18 +198,18 @@ def get_lists_users( link = endpoint_links( identifier, global_limit=limit, global_offset=offset ).lists_users - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) if len(results) >= limit and not check: - results2 = self.get_lists_users( + results2 = await self.get_lists_users( identifier, limit=limit, offset=limit + offset ) results.extend(results2) return results - def get_subscription( + async def get_subscription( self, check: bool = False, identifier="", limit=100, offset=0 ) -> Union[create_user, None]: - subscriptions = self.get_subscriptions(refresh=False) + subscriptions = await self.get_subscriptions(refresh=False) valid = None for subscription in subscriptions: if identifier == subscription.username or identifier == subscription.id: @@ -228,7 +217,7 @@ def get_subscription( break return valid - def get_subscriptions( + async def get_subscriptions( self, resume=None, refresh=True, @@ -243,7 +232,6 @@ def get_subscriptions( subscriptions = self.subscriptions return subscriptions link = endpoint_links(global_limit=limit, global_offset=offset).subscriptions - session = self.session_manager.sessions[0] ceil = math.ceil(self.subscribesCount / limit) a = list(range(ceil)) offset_array = [] @@ -263,7 +251,8 @@ def get_subscriptions( json_authed = jsonpickle.decode(json_authed) self.session_manager = temp_session_manager self.pool = temp_pool - json_authed = json_authed | self.get_user(self.username).__dict__ + temp_auth = await self.get_user(self.username) + json_authed = json_authed | temp_auth.__dict__ subscription = create_user(json_authed) subscription.subscriber = self @@ -274,11 +263,11 @@ def get_subscriptions( results.append(subscription) if not identifiers: - def multi(item): + async def multi(item): link = item # link = item["link"] # session = item["session"] - subscriptions = self.session_manager.json_request(link) + subscriptions = await self.session_manager.json_request(link) valid_subscriptions = [] extras = {} extras["auth_check"] = "" @@ -293,7 +282,7 @@ def multi(item): for subscription in subscriptions: subscription["session_manager"] = self.session_manager if extra_info: - subscription2 = self.get_user(subscription["username"]) + subscription2 = await self.get_user(subscription["username"]) if isinstance(subscription2, dict): if "error" in subscription2: continue @@ -306,13 +295,14 @@ def multi(item): pool = self.pool # offset_array= offset_array[:16] - results += pool.starmap(multi, product(offset_array)) + tasks = pool.starmap(multi, product(offset_array)) + results += await asyncio.gather(*tasks) else: for identifier in identifiers: if self.id == identifier or self.username == identifier: continue link = endpoint_links(identifier=identifier).users - result = self.session_manager.json_request(link) + result = await self.session_manager.json_request(link) if "error" in result or not result["subscribedBy"]: continue subscription = create_user(result) @@ -326,7 +316,7 @@ def multi(item): self.subscriptions = results return results - def get_chats( + async def get_chats( self, links: Optional[list] = None, limit=100, @@ -367,13 +357,13 @@ def get_chats( links += links2 else: links = links2 - results = self.session_manager.parallel_requests(links) + results = await self.session_manager.async_requests(links) has_more = results[-1]["hasMore"] final_results = [x["list"] for x in results] final_results = list(chain.from_iterable(final_results)) if has_more: - results2 = self.get_chats( + results2 = await self.get_chats( links=[links[-1]], limit=limit, offset=limit + offset, inside_loop=True ) final_results.extend(results2) @@ -382,7 +372,9 @@ def get_chats( self.chats = final_results return final_results - def get_mass_messages(self, resume=None, refresh=True, limit=10, offset=0) -> list: + async def get_mass_messages( + self, resume=None, refresh=True, limit=10, offset=0 + ) -> list: api_type = "mass_messages" if not self.active: return [] @@ -393,8 +385,7 @@ def get_mass_messages(self, resume=None, refresh=True, limit=10, offset=0) -> li link = endpoint_links( global_limit=limit, global_offset=offset ).mass_messages_api - session = self.session_manager.sessions[0] - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) items = results.get("list", []) if not items: return items @@ -419,7 +410,7 @@ def get_mass_messages(self, resume=None, refresh=True, limit=10, offset=0) -> li self.mass_messages = items return items - def get_paid_content( + async def get_paid_content( self, check: bool = False, refresh: bool = True, @@ -435,7 +426,7 @@ def get_paid_content( if result: return result link = endpoint_links(global_limit=limit, global_offset=offset).paid_api - final_results = self.session_manager.json_request(link) + final_results = await self.session_manager.json_request(link) if len(final_results) >= limit and not check: results2 = self.get_paid_content( limit=limit, offset=limit + offset, inside_loop=True diff --git a/apis/onlyfans/classes/create_post.py b/apis/onlyfans/classes/create_post.py index 467f57ec..b0a6f909 100644 --- a/apis/onlyfans/classes/create_post.py +++ b/apis/onlyfans/classes/create_post.py @@ -1,11 +1,10 @@ -from apis import api_helper from apis.onlyfans.classes.extras import endpoint_links from typing import Any class create_post: def __init__( - self, option={}, session_manager: api_helper.session_manager = None + self, option={}, session_manager = None ) -> None: self.responseType: str = option.get("responseType") self.id: int = option.get("id") @@ -46,12 +45,12 @@ def __init__( self.canPurchase: bool = option.get("canPurchase") self.session_manager = session_manager - def favorite(self): + async def favorite(self): link = endpoint_links( identifier=f"{self.responseType}s", identifier2=self.id, identifier3=self.author["id"], ).favorite - results = self.session_manager.json_request(link, method="POST") + results = await self.session_manager.json_request(link, method="POST") self.isFavorite = True return results diff --git a/apis/onlyfans/classes/create_user.py b/apis/onlyfans/classes/create_user.py index c90d225f..77f7a1f8 100644 --- a/apis/onlyfans/classes/create_user.py +++ b/apis/onlyfans/classes/create_user.py @@ -1,21 +1,16 @@ -from apis.onlyfans.classes.create_highlight import create_highlight -from apis.onlyfans.classes.create_message import create_message -from itertools import chain -from apis.onlyfans.classes.create_post import create_post -from apis.onlyfans.classes.create_story import create_story import math +from itertools import chain +from typing import Any, Optional from urllib import parse -from mergedeep.mergedeep import Strategy, merge +import apis.onlyfans.classes.create_auth as create_auth from apis import api_helper -from apis.onlyfans.classes.extras import ( - content_types, - endpoint_links, - handle_refresh, - media_types, -) -from apis.onlyfans.classes import create_auth -from typing import Any, Optional +from apis.onlyfans.classes.create_highlight import create_highlight +from apis.onlyfans.classes.create_message import create_message +from apis.onlyfans.classes.create_post import create_post +from apis.onlyfans.classes.create_story import create_story +from apis.onlyfans.classes.extras import (content_types, endpoint_links, + handle_refresh, media_types) class create_user: @@ -208,7 +203,7 @@ def __init__(self, option={}) -> None: self.pinnedPostsCount: int = option.get("pinnedPostsCount") self.maxPinnedPostsCount: int = option.get("maxPinnedPostsCount") # Custom - self.subscriber: Optional[create_auth] = None + self.subscriber: Optional[create_auth.create_auth] = None self.scraped = content_types() self.temp_scraped = content_types() self.session_manager: api_helper.session_manager = option.get("session_manager") @@ -225,7 +220,7 @@ def is_me(self) -> bool: status = True return status - def get_stories(self, refresh=True, limit=100, offset=0) -> list: + async def get_stories(self, refresh=True, limit=100, offset=0) -> list: api_type = "stories" if not refresh: result = handle_refresh(self, api_type) @@ -238,12 +233,12 @@ def get_stories(self, refresh=True, limit=100, offset=0) -> list: identifier=self.id, global_limit=limit, global_offset=offset ).stories_api ] - results = api_helper.scrape_endpoint_links(link, self.session_manager, api_type) + results = await api_helper.scrape_endpoint_links(link, self.session_manager, api_type) results = [create_story(x) for x in results] self.temp_scraped.Stories = results return results - def get_highlights( + async def get_highlights( self, identifier="", refresh=True, limit=100, offset=0, hightlight_id="" ) -> list: api_type = "highlights" @@ -257,17 +252,17 @@ def get_highlights( link = endpoint_links( identifier=identifier, global_limit=limit, global_offset=offset ).list_highlights - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) results = [create_highlight(x) for x in results] else: link = endpoint_links( identifier=hightlight_id, global_limit=limit, global_offset=offset ).highlight - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) results = [create_story(x) for x in results["stories"]] return results - def get_posts( + async def get_posts( self, links: Optional[list] = None, limit=10, offset=0, refresh=True ) -> list: api_type = "posts" @@ -289,24 +284,24 @@ def get_posts( link = link.replace(f"limit={limit}", f"limit={limit}") new_link = link.replace("offset=0", f"offset={num}") links.append(new_link) - results = api_helper.scrape_endpoint_links( + results = await api_helper.scrape_endpoint_links( links, self.session_manager, api_type ) final_results = [create_post(x, self.session_manager) for x in results] self.temp_scraped.Posts = final_results return final_results - def get_post(self, identifier=None, limit=10, offset=0): + async def get_post(self, identifier=None, limit=10, offset=0): if not identifier: identifier = self.id link = endpoint_links( identifier=identifier, global_limit=limit, global_offset=offset ).post_by_id - result = self.session_manager.json_request(link) + result = await self.session_manager.json_request(link) final_result = create_post(result) return final_result - def get_messages( + async def get_messages( self, links: Optional[list] = None, limit=10, @@ -336,13 +331,13 @@ def get_messages( links += links2 else: links = links2 - results = self.session_manager.parallel_requests(links) + results = await self.session_manager.async_requests(links) has_more = results[-1]["hasMore"] final_results = [x["list"] for x in results] final_results = list(chain.from_iterable(final_results)) if has_more: - results2 = self.get_messages( + results2 = await self.get_messages( links=[links[-1]], limit=limit, offset=limit + offset, inside_loop=True ) final_results.extend(results2) @@ -354,33 +349,37 @@ def get_messages( self.temp_scraped.Messages = final_results return final_results - def get_message_by_id( - self, identifier=None, identifier2=None, refresh=True, limit=10, offset=0 + async def get_message_by_id( + self, user_id=None, message_id=None, refresh=True, limit=10, offset=0 ): - if not identifier: - identifier = self.id + if not user_id: + user_id = self.id link = endpoint_links( - identifier=identifier, - identifier2=identifier2, + identifier=user_id, + identifier2=message_id, global_limit=limit, global_offset=offset, ).message_by_id - result = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) + results = [x for x in results["list"] if x["id"] == message_id] + result = result = results[0] if results else {} final_result = create_message(result) return final_result - def get_archived_stories(self, refresh=True, limit=100, offset=0): + async def get_archived_stories(self, refresh=True, limit=100, offset=0): api_type = "archived_stories" if not refresh: result = handle_refresh(self, api_type) if result: return result link = endpoint_links(global_limit=limit, global_offset=offset).archived_stories - results = self.session_manager.json_request(link) - self.archived_stories = results + results = await self.session_manager.json_request(link) + results = [create_story(x) for x in results if "error" not in x] + if results: + print return results - def get_archived_posts( + async def get_archived_posts( self, links: Optional[list] = None, limit=10, offset=0, refresh=True ) -> list: api_type = "archived_posts" @@ -402,56 +401,54 @@ def get_archived_posts( link = link.replace(f"limit={limit}", f"limit={limit}") new_link = link.replace("offset=0", f"offset={num}") links.append(new_link) - results = api_helper.scrape_endpoint_links( + results = await api_helper.scrape_endpoint_links( links, self.session_manager, api_type ) final_results = [create_post(x, self.session_manager) for x in results if x] self.temp_scraped.Archived.Posts = final_results return final_results - def get_archived(self, api): + async def get_archived(self, api): items = [] if self.is_me(): item = {} item["type"] = "Stories" - item["results"] = [self.get_archived_stories()] + item["results"] = [await self.get_archived_stories()] items.append(item) item = {} item["type"] = "Posts" # item["results"] = test - item["results"] = self.get_archived_posts() + item["results"] = await self.get_archived_posts() items.append(item) return items - def search_chat(self, identifier="", text="", refresh=True, limit=10, offset=0): + async def search_chat(self, identifier="", text="", refresh=True, limit=10, offset=0): if identifier: identifier = parse.urljoin(identifier, "messages") link = endpoint_links( identifier=identifier, text=text, global_limit=limit, global_offset=offset ).search_chat - session = self.session_manager.sessions[0] - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) return results - def search_messages(self, identifier="", text="", refresh=True, limit=10, offset=0): + async def search_messages(self, identifier="", text="", refresh=True, limit=10, offset=0): if identifier: identifier = parse.urljoin(identifier, "messages") text = parse.quote_plus(text) link = endpoint_links( identifier=identifier, text=text, global_limit=limit, global_offset=offset ).search_messages - session = self.session_manager.sessions[0] - results = self.session_manager.json_request(link) + results = await self.session_manager.json_request(link) return results - def like(self, category: str, identifier: int): + async def like(self, category: str, identifier: int): link = endpoint_links(identifier=category, identifier2=identifier).like - results = self.session_manager.json_request(link, method="POST") + results = await self.session_manager.json_request(link, method="POST") return results - def unlike(self, category: str, identifier: int): + async def unlike(self, category: str, identifier: int): link = endpoint_links(identifier=category, identifier2=identifier).like - results = self.session_manager.json_request(link, method="DELETE") + results = await self.session_manager.json_request(link, method="DELETE") return results def buy_subscription(self): @@ -465,5 +462,5 @@ def buy_subscription(self): } print - def set_scraped(self, name, scraped: media_types): + def set_scraped(self, name, scraped): setattr(self.scraped, name, scraped) diff --git a/apis/onlyfans/onlyfans.py b/apis/onlyfans/onlyfans.py index 05240b27..0afafe91 100644 --- a/apis/onlyfans/onlyfans.py +++ b/apis/onlyfans/onlyfans.py @@ -1,62 +1,11 @@ +from typing import List, Optional, Union + from apis.onlyfans.classes import create_user -from apis.onlyfans.classes.extras import ( - auth_details, - content_types, - endpoint_links, -) from apis.onlyfans.classes.create_auth import create_auth -import time -import base64 -from typing import List, Optional, Union -from urllib.parse import urlparse -from urllib import parse -import hashlib -import math -from datetime import datetime -from dateutil.relativedelta import relativedelta -from itertools import chain, product -import requests +from apis.onlyfans.classes.extras import (auth_details, content_types, + endpoint_links) -from sqlalchemy.orm.session import Session from .. import api_helper -from mergedeep import merge, Strategy -import jsonpickle -import copy -from random import random - - -def create_signed_headers(link: str, auth_id: int, dynamic_rules: dict): - # Users: 300000 | Creators: 301000 - final_time = str(int(round(time.time()))) - path = urlparse(link).path - query = urlparse(link).query - path = path if not query else f"{path}?{query}" - a = [dynamic_rules["static_param"], final_time, path, str(auth_id)] - msg = "\n".join(a) - message = msg.encode("utf-8") - hash_object = hashlib.sha1(message) - sha_1_sign = hash_object.hexdigest() - sha_1_b = sha_1_sign.encode("ascii") - checksum = ( - sum([sha_1_b[number] for number in dynamic_rules["checksum_indexes"]]) - + dynamic_rules["checksum_constant"] - ) - headers = {} - headers["sign"] = dynamic_rules["format"].format(sha_1_sign, abs(checksum)) - headers["time"] = final_time - return headers - - -def session_rules(session_manager: api_helper.session_manager, link) -> dict: - headers = session_manager.headers - if "https://onlyfans.com/api2/v2/" in link: - dynamic_rules = session_manager.dynamic_rules - headers["app-token"] = dynamic_rules["app_token"] - # auth_id = headers["user-id"] - a = [link, 0, dynamic_rules] - headers2 = create_signed_headers(*a) - headers |= headers2 - return headers def session_retry_rules(r, link: str) -> int: @@ -81,7 +30,6 @@ def __init__( self, custom_request=callable, max_threads=-1, - original_sessions: List[requests.Session] = [], ) -> None: self.auths: list[create_auth] = [] self.subscriptions: list[create_user] = [] @@ -90,14 +38,17 @@ def __init__( self.lists = None self.endpoint_links = endpoint_links self.pool = api_helper.multiprocessing() - self.session_manager = api_helper.session_manager( - session_rules=session_rules, - session_retry_rules=session_retry_rules, - max_threads=max_threads, - original_sessions=original_sessions, - ) self.settings = {} + def add_auth(self, option={}, only_active=False): + if only_active and not option.get("active"): + return + auth = create_auth(pool=self.pool, max_threads=self.max_threads) + auth.auth_details = auth_details(option) + auth.extras["settings"] = self.settings + self.auths.append(auth) + return auth + def get_auth(self, identifier: Union[str, int]) -> Optional[create_auth]: final_auth = None for auth in self.auths: @@ -109,14 +60,7 @@ def get_auth(self, identifier: Union[str, int]) -> Optional[create_auth]: break return final_auth - def add_auth(self, option={}, only_active=False): - if only_active and not option.get("active"): - return - auth = create_auth(session_manager2=self.session_manager, pool=self.pool) - auth.auth_details = auth_details(option) - self.auths.append(auth) - return auth - def close_pools(self): self.pool.close() - self.session_manager.pool.close() + for auth in self.auths: + auth.session_manager.pool.close() diff --git a/classes/make_settings.py b/classes/make_settings.py index f17584fe..20df5d5c 100644 --- a/classes/make_settings.py +++ b/classes/make_settings.py @@ -1,11 +1,7 @@ import copy from typing import List, Union from urllib.parse import urlparse -from apis.onlyfans.onlyfans import auth_details import os - -import ujson -from classes.prepare_metadata import format_types, prepare_reformat import uuid as uuid def fix(config={}): diff --git a/classes/prepare_metadata.py b/classes/prepare_metadata.py index 1e35830f..3e306f15 100644 --- a/classes/prepare_metadata.py +++ b/classes/prepare_metadata.py @@ -3,7 +3,7 @@ import copy from enum import unique import os -from typing import Dict +from typing import Dict, MutableMapping, Union from requests.api import get from helpers import main_helper @@ -15,7 +15,7 @@ class create_metadata(object): - def __init__(self, authed: create_auth = None, metadata: dict = {}, standard_format=False, api_type: str = "") -> None: + def __init__(self, authed: create_auth = None, metadata: Union[list,dict,MutableMapping] = {}, standard_format=False, api_type: str = "") -> None: self.version = global_version fixed_metadata = self.fix_metadata(metadata, standard_format, api_type) self.content = format_content( @@ -367,6 +367,7 @@ def __init__(self, option, keep_vars=False): self.text = option.get('text', format_variables2.text) self.date = option.get('postedAt', format_variables2.date) self.price = option.get('price', 0) + self.archived = option.get('archived', False) self.date_format = option.get('date_format') self.maximum_length = 255 self.text_length = option.get('text_length', self.maximum_length) diff --git a/datascraper/main_datascraper.py b/datascraper/main_datascraper.py index 4746a57b..0464ed60 100644 --- a/datascraper/main_datascraper.py +++ b/datascraper/main_datascraper.py @@ -24,7 +24,7 @@ # return text -def start_datascraper( +async def start_datascraper( json_config: dict, site_name_lower: str, api: Optional[OnlyFans.start] = None, @@ -46,10 +46,8 @@ def start_datascraper( identifiers = [] auto_profile_choice = json_site_settings["auto_profile_choice"] subscription_array = [] - original_sessions = [] - original_sessions = api_helper.create_session(settings=json_settings) - original_sessions = [x for x in original_sessions if x] - if not original_sessions: + proxies = await api_helper.test_proxies(json_settings["proxies"]) + if json_settings["proxies"] and not proxies: print("Unable to create session") return None archive_time = timeit.default_timer() @@ -58,8 +56,9 @@ def start_datascraper( module = m_onlyfans if not api: api = OnlyFans.start(max_threads=json_settings["max_threads"]) + api.settings = json_config api = main_helper.process_profiles( - json_settings, original_sessions, site_name, api + json_settings, proxies, site_name, api ) print @@ -78,12 +77,12 @@ def start_datascraper( auth.auth_details, json_config, json_site_settings, site_name ) setup = False - setup, subscriptions = module.account_setup( + setup, subscriptions = await module.account_setup( auth, identifiers, jobs, auth_count ) if not setup: if webhooks: - x = main_helper.process_webhooks(api, "auth_webhook", "failed") + x = await main_helper.process_webhooks(api, "auth_webhook", "failed") auth_details = {} auth_details["auth"] = auth.auth_details.__dict__ profile_directory = auth.profile_directory @@ -95,14 +94,14 @@ def start_datascraper( continue auth_count += 1 subscription_array += subscriptions - x = main_helper.process_webhooks(api, "auth_webhook", "succeeded") + x = await main_helper.process_webhooks(api, "auth_webhook", "succeeded") subscription_list = module.format_options(subscription_array, "usernames", api.auths) if jobs["scrape_paid_content"]: print("Scraping Paid Content") - paid_content = module.paid_content_scraper(api, identifiers) + paid_content = await module.paid_content_scraper(api, identifiers) if jobs["scrape_names"]: print("Scraping Subscriptions") - names = main_helper.process_names( + names = await main_helper.process_names( module, subscription_list, auto_model_choice, @@ -111,9 +110,9 @@ def start_datascraper( site_name_lower, site_name, ) - x = main_helper.process_downloads(api, module) + x = await main_helper.process_downloads(api, module) if webhooks: - x = main_helper.process_webhooks(api, "download_webhook", "succeeded") + x = await main_helper.process_webhooks(api, "download_webhook", "succeeded") elif site_name_lower == "starsavn": pass # site_name = "StarsAVN" diff --git a/extras/OFRenamer/start.py b/extras/OFRenamer/start.py index 8a2f5883..b5d1c923 100644 --- a/extras/OFRenamer/start.py +++ b/extras/OFRenamer/start.py @@ -15,10 +15,7 @@ def fix_directories(api,posts, all_files, database_session: scoped_session, fold def fix_directories(post: api_table, media_db: list[media_table]): delete_rows = [] - final_type = "" - if parent_type: - final_type = f"{api_type}{os.path.sep}{parent_type}" - final_type = final_type if final_type else api_type + final_api_type = os.path.join("Archived",api_type) if post.archived else api_type post_id = post.post_id media_db = [x for x in media_db if x.post_id == post_id] for media in media_db: @@ -42,7 +39,7 @@ def fix_directories(post: api_table, media_db: list[media_table]): option["post_id"] = post_id option["media_id"] = media_id option["username"] = username - option["api_type"] = final_type if parent_type else api_type + option["api_type"] = final_api_type option["media_type"] = media.media_type option["filename"] = original_filename option["ext"] = ext @@ -53,6 +50,7 @@ def fix_directories(post: api_table, media_db: list[media_table]): option["text_length"] = text_length option["directory"] = download_path option["preview"] = media.preview + option["archived"] = post.archived prepared_format = prepare_reformat(option) file_directory = main_helper.reformat( prepared_format, file_directory_format) @@ -173,5 +171,5 @@ def start(api,Session, parent_type, api_type, api_path, site_name, subscription, exit() else: import helpers.main_helper as main_helper - from apis.api_helper import multiprocessing from classes.prepare_metadata import format_types, prepare_reformat + diff --git a/helpers/db_helper.py b/helpers/db_helper.py index a6f35f62..21990341 100644 --- a/helpers/db_helper.py +++ b/helpers/db_helper.py @@ -6,6 +6,7 @@ from alembic.config import Config from alembic import command from sqlalchemy.exc import IntegrityError +from sqlalchemy.sql.functions import func from database.databases.stories import stories from database.databases.posts import posts from database.databases.messages import messages @@ -37,7 +38,7 @@ def run_revisions(alembic_directory: str, database_path: str = ""): x = command.revision(alembic_cfg, autogenerate=True, message="content") -def run_migrations(alembic_directory: str, database_path: str, api) -> None: +def run_migrations(alembic_directory: str, database_path: str) -> None: ini_path = os.path.join(alembic_directory, "alembic.ini") script_location = os.path.join(alembic_directory, "alembic") full_database_path = f'sqlite:///{database_path}' @@ -89,3 +90,8 @@ def get_or_create(session: Session, model, defaults=None, fbkwargs={}): return instance, False else: return instance, True + +def get_count(q): + count_q = q.statement.with_only_columns([func.count()]).order_by(None) + count = q.session.execute(count_q).scalar() + return count diff --git a/helpers/main_helper.py b/helpers/main_helper.py index ec4d6f76..80e89e59 100644 --- a/helpers/main_helper.py +++ b/helpers/main_helper.py @@ -1,35 +1,38 @@ -from apis.onlyfans.classes import create_user +from database.models.media_table import media_table +from sqlalchemy.orm.session import Session +from apis.onlyfans.classes.extras import content_types +import copy import json -from apis.onlyfans import onlyfans as OnlyFans import math -from types import SimpleNamespace -from typing import Any, Optional, Tuple, Union - -from sqlalchemy import inspect -from sqlalchemy.orm import declarative_base -from classes.prepare_metadata import format_variables -import copy import os import platform +import random import re -from datetime import datetime -from itertools import chain, zip_longest, groupby -import psutil import shutil -from multiprocessing.dummy import Pool as ThreadPool -import ujson -from tqdm import tqdm import string -import random +import traceback +from datetime import datetime +from itertools import chain, groupby, zip_longest +from multiprocessing.dummy import Pool as ThreadPool +from types import SimpleNamespace +from typing import Any, Optional, Tuple, Union +import classes.make_settings as make_settings +import classes.prepare_webhooks as prepare_webhooks +import psutil import requests +import ujson +from apis.onlyfans import onlyfans as OnlyFans +from apis.onlyfans.classes import create_user from bs4 import BeautifulSoup +from classes.prepare_metadata import format_variables, prepare_reformat +from mergedeep import Strategy, merge +from sqlalchemy import inspect +from sqlalchemy.orm import declarative_base +from tqdm import tqdm -import classes.make_settings as make_settings -import classes.prepare_webhooks as prepare_webhooks -from mergedeep import merge, Strategy import helpers.db_helper as db_helper -import traceback + json_global_settings = None min_drive_space = 0 webhooks = None @@ -58,7 +61,7 @@ def rename_duplicates(seen, filename): else: count = 1 while filename_lower in seen: - filename = filename+" ("+str(count)+")" + filename = filename + " (" + str(count) + ")" filename_lower = filename.lower() count += 1 seen.add(filename_lower) @@ -67,7 +70,7 @@ def rename_duplicates(seen, filename): def parse_links(site_name, input_link): if site_name in {"onlyfans", "starsavn"}: - username = input_link.rsplit('/', 1)[-1] + username = input_link.rsplit("/", 1)[-1] return username if site_name in {"patreon", "fourchan", "bbwchan"}: @@ -85,15 +88,13 @@ def parse_links(site_name, input_link): def clean_text(string, remove_spaces=False): matches = ["\n", "
"] for m in matches: - string = string.replace( - m, " ").strip() - string = ' '.join(string.split()) + string = string.replace(m, " ").strip() + string = " ".join(string.split()) string = BeautifulSoup(string, "lxml").get_text() SAFE_PTN = r"[|\^&+\-%*/=!:\"?><]" - string = re.sub(SAFE_PTN, ' ', string.strip() - ).strip() + string = re.sub(SAFE_PTN, " ", string.strip()).strip() if remove_spaces: - string = string.replace(' ', '_') + string = string.replace(" ", "_") return string @@ -112,6 +113,7 @@ def format_image(filepath, timestamp): try: if os_name == "Windows": from win32_setctime import setctime + setctime(filepath, timestamp) # print(f"Updated Creation Time {filepath}") os.utime(filepath, (timestamp, timestamp)) @@ -132,7 +134,7 @@ def filter_metadata(datas): def import_archive(archive_path) -> Any: metadata = {} if os.path.exists(archive_path) and os.path.getsize(archive_path): - with open(archive_path, 'r', encoding='utf-8') as outfile: + with open(archive_path, "r", encoding="utf-8") as outfile: while not metadata: try: metadata = ujson.load(outfile) @@ -155,7 +157,7 @@ def legacy_database_fixer(database_path, database, database_name, database_exist if database_exists: Session, engine = db_helper.create_database_session(database_path) database_session = Session() - result = inspect(engine).has_table('alembic_version') + result = inspect(engine).has_table("alembic_version") if not result: if not pre_alembic_database_exists: os.rename(old_database_path, pre_alembic_path) @@ -200,11 +202,81 @@ def legacy_database_fixer(database_path, database, database_name, database_exist datas.append(new_item) print database_session.close() - x = export_sqlite(old_database_path, datas, - database_name, legacy_fixer=True) + x = export_sqlite(old_database_path, datas, database_name, legacy_fixer=True) print +def fix_sqlite( + profile_directory, + download_directory, + metadata_directory, + format_directories, + site_name, + username, + metadata_directory_format, +): + items = content_types().__dict__.items() + final_metadatas = [] + for api_type, value in items: + mandatory_directories = {} + mandatory_directories["profile_directory"] = profile_directory + mandatory_directories["download_directory"] = download_directory + mandatory_directories["metadata_directory"] = metadata_directory + formatted_directories = format_directories( + mandatory_directories, + site_name, + username, + metadata_directory_format, + "", + api_type, + ) + final_metadata_directory = formatted_directories["metadata_directory"] + if all(final_metadata_directory != x for x in final_metadatas): + final_metadatas.append(final_metadata_directory) + print + print + for final_metadata in final_metadatas: + archived_database_path = os.path.join(final_metadata, "Archived.db") + if os.path.exists(archived_database_path): + Session2, engine = db_helper.create_database_session(archived_database_path) + database_session: Session = Session2() + cwd = os.getcwd() + for api_type, value in items: + database_path = os.path.join(final_metadata, f"{api_type}.db") + database_name = api_type.lower() + alembic_location = os.path.join( + cwd, "database", "databases", database_name + ) + result = inspect(engine).has_table(database_name) + if result: + db_helper.run_migrations(alembic_location, archived_database_path) + db_helper.run_migrations(alembic_location, database_path) + Session3, engine2 = db_helper.create_database_session(database_path) + db_collection = db_helper.database_collection() + database_session2: Session = Session3() + database = db_collection.chooser(database_name) + api_table = database.api_table + archived_result = database_session.query(api_table).all() + for item in archived_result: + result2 = ( + database_session2.query(api_table) + .filter(api_table.post_id == item.post_id) + .first() + ) + if not result2: + item2 = item.__dict__ + item2.pop("id") + item2.pop("_sa_instance_state") + item = api_table(**item2) + item.archived = True + database_session2.add(item) + database_session2.commit() + database_session2.close() + database_session.commit() + database_session.close() + os.remove(archived_database_path) + + def export_sqlite(archive_path, datas, parent_type, legacy_fixer=False, api=None): metadata_directory = os.path.dirname(archive_path) os.makedirs(metadata_directory, exist_ok=True) @@ -215,8 +287,7 @@ def export_sqlite(archive_path, datas, parent_type, legacy_fixer=False, api=None database_name = database_name.lower() db_collection = db_helper.database_collection() database = db_collection.chooser(database_name) - alembic_location = os.path.join( - cwd, "database", "databases", database_name) + alembic_location = os.path.join(cwd, "database", "databases", database_name) database_exists = os.path.exists(database_path) if database_exists: if os.path.getsize(database_path) == 0: @@ -224,15 +295,14 @@ def export_sqlite(archive_path, datas, parent_type, legacy_fixer=False, api=None database_exists = False if not legacy_fixer: x = legacy_database_fixer( - database_path, database, database_name, database_exists) - db_helper.run_migrations(alembic_location, database_path, api) + database_path, database, database_name, database_exists + ) + db_helper.run_migrations(alembic_location, database_path) print Session, engine = db_helper.create_database_session(database_path) database_session = Session() api_table = database.api_table media_table = database.media_table - # api_table = db_helper.api_table() - # media_table = db_helper.media_table() for post in datas: post_id = post["post_id"] @@ -240,8 +310,7 @@ def export_sqlite(archive_path, datas, parent_type, legacy_fixer=False, api=None date_object = None if postedAt: if not isinstance(postedAt, datetime): - date_object = datetime.strptime( - postedAt, "%d-%m-%Y %H:%M:%S") + date_object = datetime.strptime(postedAt, "%d-%m-%Y %H:%M:%S") else: date_object = postedAt result = database_session.query(api_table) @@ -254,6 +323,7 @@ def export_sqlite(archive_path, datas, parent_type, legacy_fixer=False, api=None post["price"] = 0 post_db.price = post["price"] post_db.paid = post["paid"] + post_db.archived = post["archived"] if date_object: post_db.created_at = date_object database_session.add(post_db) @@ -265,7 +335,8 @@ def export_sqlite(archive_path, datas, parent_type, legacy_fixer=False, api=None media_db = result.filter_by(media_id=media_id).first() if not media_db: media_db = result.filter_by( - filename=media["filename"], created_at=date_object).first() + filename=media["filename"], created_at=date_object + ).first() if not media_db: media_db = media_table() if legacy_fixer: @@ -298,7 +369,7 @@ def format_paths(j_directories, site_name): return paths -def reformat(prepared_format, unformatted): +def reformat(prepared_format: prepare_reformat, unformatted): post_id = prepared_format.post_id media_id = prepared_format.media_id date = prepared_format.date @@ -312,8 +383,7 @@ def reformat(prepared_format, unformatted): if type(date) is str: format_variables2 = format_variables() if date != format_variables2.date and date != "": - date = datetime.strptime( - date, "%d-%m-%Y %H:%M:%S") + date = datetime.strptime(date, "%d-%m-%Y %H:%M:%S") date = date.strftime(prepared_format.date_format) else: if date != None: @@ -329,8 +399,7 @@ def reformat(prepared_format, unformatted): value = "Paid" directory = prepared_format.directory path = unformatted.replace("{site_name}", prepared_format.site_name) - path = path.replace( - "{first_letter}", prepared_format.username[0].capitalize()) + path = path.replace("{first_letter}", prepared_format.username[0].capitalize()) path = path.replace("{post_id}", post_id) path = path.replace("{media_id}", media_id) path = path.replace("{username}", prepared_format.username) @@ -342,18 +411,18 @@ def reformat(prepared_format, unformatted): path = path.replace("{date}", date) directory_count = len(directory) path_count = len(path) - maximum_length = maximum_length - (directory_count+path_count-extra_count) + maximum_length = maximum_length - (directory_count + path_count - extra_count) text_length = text_length if text_length < maximum_length else maximum_length if has_text: - # https://stackoverflow.com/a/43848928 + # https://stackoverflow.com/a/43848928 def utf8_lead_byte(b): - '''A UTF-8 intermediate byte starts with the bits 10xxxxxx.''' + """A UTF-8 intermediate byte starts with the bits 10xxxxxx.""" return (b & 0xC0) != 0x80 def utf8_byte_truncate(text, max_bytes): - '''If text[max_bytes] is not a lead byte, back up until a lead byte is - found and truncate before that character.''' - utf8 = text.encode('utf8') + """If text[max_bytes] is not a lead byte, back up until a lead byte is + found and truncate before that character.""" + utf8 = text.encode("utf8") if len(utf8) <= max_bytes: return utf8 i = max_bytes @@ -361,7 +430,7 @@ def utf8_byte_truncate(text, max_bytes): i -= 1 return utf8[:i] - filtered_text = utf8_byte_truncate(text, text_length).decode('utf8') + filtered_text = utf8_byte_truncate(text, text_length).decode("utf8") path = path.replace("{text}", filtered_text) else: path = path.replace("{text}", "") @@ -388,7 +457,9 @@ def get_directory(directories: list[str], extra_path): return directory -def check_space(download_paths, min_size=min_drive_space, priority="download", create_directory=True): +def check_space( + download_paths, min_size=min_drive_space, priority="download", create_directory=True +): root = "" while not root: paths = [] @@ -429,9 +500,10 @@ def find_model_directory(username, directories) -> Tuple[str, bool]: def are_long_paths_enabled(): if os_name == "Windows": from ctypes import WinDLL, c_ubyte - ntdll = WinDLL('ntdll') - if hasattr(ntdll, 'RtlAreLongPathsEnabled'): + ntdll = WinDLL("ntdll") + + if hasattr(ntdll, "RtlAreLongPathsEnabled"): ntdll.RtlAreLongPathsEnabled.restype = c_ubyte ntdll.RtlAreLongPathsEnabled.argtypes = () @@ -452,8 +524,7 @@ def check_for_dupe_file(download_path, content_length): class download_session(tqdm): - def start(self, unit='B', unit_scale=True, - miniters=1, tsize=0): + def start(self, unit="B", unit_scale=True, miniters=1, tsize=0): self.unit = unit self.unit_scale = unit_scale self.miniters = miniters @@ -464,8 +535,9 @@ def start(self, unit='B', unit_scale=True, self.total += tsize def update_total_size(self, tsize): - tsize = int(tsize) - self.total += tsize + if tsize: + tsize = int(tsize) + self.total += tsize def update_to(self, b=1, bsize=1, tsize=None): x = bsize @@ -473,34 +545,6 @@ def update_to(self, b=1, bsize=1, tsize=None): self.update(b) -def downloader(r, download_path, d_session, count=0): - delete = False - try: - with open(download_path, 'wb') as f: - delete = True - for chunk in r.iter_content(chunk_size=4096): - if chunk: # filter out keep-alive new chunks - size = f.write(chunk) - d_session.update(size) - except (ConnectionResetError) as e: - if delete: - os.unlink(download_path) - return - except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError) as e: - return - except Exception as e: - if delete: - deleted = None - while not deleted: - try: - os.unlink(download_path) - deleted = True - except PermissionError as e2: - print(e2) - return - return True - - def get_config(config_path): if os.path.exists(config_path): json_config = ujson.load(open(config_path)) @@ -509,8 +553,9 @@ def get_config(config_path): json_config2 = copy.deepcopy(json_config) json_config = make_settings.fix(json_config) file_name = os.path.basename(config_path) - json_config = ujson.loads(json.dumps(make_settings.config( - **json_config), default=lambda o: o.__dict__)) + json_config = ujson.loads( + json.dumps(make_settings.config(**json_config), default=lambda o: o.__dict__) + ) updated = False if json_config != json_config2: updated = True @@ -518,14 +563,15 @@ def get_config(config_path): export_data(json_config, filepath) if not json_config: input( - f"The .settings\\{file_name} file has been created. Fill in whatever you need to fill in and then press enter when done.\n") + f"The .settings\\{file_name} file has been created. Fill in whatever you need to fill in and then press enter when done.\n" + ) json_config = ujson.load(open(config_path)) return json_config, updated def choose_auth(array): names = [] - array = [{"auth_count": -1, "username": "All"}]+array + array = [{"auth_count": -1, "username": "All"}] + array string = "" seperator = " | " name_count = len(array) @@ -534,14 +580,14 @@ def choose_auth(array): count = 0 for x in array: name = x["username"] - string += str(count)+" = "+name + string += str(count) + " = " + name names.append(x) - if count+1 != name_count: + if count + 1 != name_count: string += seperator count += 1 - print("Auth Usernames: "+string) + print("Auth Usernames: " + string) value = int(input().strip()) if value: names = [names[value]] @@ -550,7 +596,9 @@ def choose_auth(array): return names -def choose_option(subscription_list, auto_scrape: Union[str, bool], use_default_message=False): +def choose_option( + subscription_list, auto_scrape: Union[str, bool], use_default_message=False +): names = subscription_list[0] default_message = "" seperator = " | " @@ -562,13 +610,11 @@ def choose_option(subscription_list, auto_scrape: Union[str, bool], use_default_ if auto_scrape: values = [x[1] for x in names] else: - print( - f"{default_message}{subscription_list[1]}") + print(f"{default_message}{subscription_list[1]}") values = input().strip().split(",") else: if not auto_scrape: - print( - f"{default_message}{subscription_list[1]}") + print(f"{default_message}{subscription_list[1]}") values = input().strip().split(",") else: values = auto_scrape @@ -589,7 +635,7 @@ def choose_option(subscription_list, auto_scrape: Union[str, bool], use_default_ return new_names -def process_profiles(json_settings, original_sessions, site_name, api: Union[OnlyFans.start]): +def process_profiles(json_settings, proxies, site_name, api: Union[OnlyFans.start]): profile_directories = json_settings["profile_directories"] for profile_directory in profile_directories: x = os.path.join(profile_directory, site_name) @@ -603,32 +649,29 @@ def process_profiles(json_settings, original_sessions, site_name, api: Union[Onl temp_users.append("default") for user in temp_users: user_profile = os.path.join(x, user) - user_auth_filepath = os.path.join( - user_profile, "auth.json") + user_auth_filepath = os.path.join(user_profile, "auth.json") datas = {} if os.path.exists(user_auth_filepath): - temp_json_auth = ujson.load( - open(user_auth_filepath)) + temp_json_auth = ujson.load(open(user_auth_filepath)) json_auth = temp_json_auth["auth"] if not json_auth.get("active", None): continue json_auth["username"] = user - auth = api.add_auth( - json_auth) - auth.session_manager.add_sessions(original_sessions) + auth = api.add_auth(json_auth) + auth.session_manager.proxies = proxies auth.profile_directory = user_profile datas["auth"] = auth.auth_details.__dict__ if datas: - export_data( - datas, user_auth_filepath) + export_data(datas, user_auth_filepath) print print return api -def process_names(module, subscription_list, auto_scrape, api, json_config, site_name_lower, site_name) -> list: - names = choose_option( - subscription_list, auto_scrape, True) +async def process_names( + module, subscription_list, auto_scrape, api, json_config, site_name_lower, site_name +) -> list: + names = choose_option(subscription_list, auto_scrape, True) if not names: print("There's nothing to scrape.") for name in names: @@ -638,24 +681,22 @@ def process_names(module, subscription_list, auto_scrape, api, json_config, site name = name[-1] assign_vars(json_config) username = parse_links(site_name_lower, name) - result = module.start_datascraper( - authed, username, site_name) + result = await module.start_datascraper(authed, username, site_name) return names -def process_downloads(api, module): +async def process_downloads(api, module): for auth in api.auths: - subscriptions = auth.get_subscriptions(refresh=False) + subscriptions = await auth.get_subscriptions(refresh=False) for subscription in subscriptions: - download_info = subscription.download_info - if download_info: - module.download_media(auth, subscription) - if json_global_settings["helpers"]["delete_empty_directories"]: - delete_empty_directories( - download_info.get("base_directory", "")) + await module.prepare_downloads(subscription) + if json_global_settings["helpers"]["delete_empty_directories"]: + delete_empty_directories( + subscription.download_info.get("base_directory", "") + ) -def process_webhooks(api: Union[OnlyFans.start], category, category2): +async def process_webhooks(api: Union[OnlyFans.start], category, category2): global_webhooks = webhooks["global_webhooks"] global_status = webhooks["global_status"] webhook = webhooks[category] @@ -671,8 +712,9 @@ def process_webhooks(api: Union[OnlyFans.start], category, category2): webhook_links = webhook_state["webhooks"] if webhook_status: for auth in api.auths: - send_webhook(auth, webhook_hide_sensitive_info, - webhook_links, category, category2) + await send_webhook( + auth, webhook_hide_sensitive_info, webhook_links, category, category2 + ) print print @@ -684,24 +726,30 @@ def is_me(user_api): return False -def export_data(metadata: Union[list, dict], path: str, encoding: Optional[str] = "utf-8"): +async def write_data(download_path:str, data:bytes): + with open(download_path, "wb") as f: + f.write(data) + + +def export_data( + metadata: Union[list, dict], path: str, encoding: Optional[str] = "utf-8" +): directory = os.path.dirname(path) os.makedirs(directory, exist_ok=True) - with open(path, 'w', encoding=encoding) as outfile: + with open(path, "w", encoding=encoding) as outfile: ujson.dump(metadata, outfile, indent=2, escape_forward_slashes=False) def grouper(n, iterable, fillvalue: Optional[Union[str, int]] = None): args = [iter(iterable)] * n - grouped = list(zip_longest(fillvalue=fillvalue, *args)) + final_grouped = list(zip_longest(fillvalue=fillvalue, *args)) if not fillvalue: - grouped = [x for x in grouped if x] - return grouped - - -def create_link_group(max_threads): - x = range - print + grouped = [] + for group in final_grouped: + group = [x for x in group if x] + grouped.append(group) + final_grouped = grouped + return final_grouped def remove_mandatory_files(files, keep=[]): @@ -736,29 +784,31 @@ def metadata_fixer(directory): shutil.move(archive_file, new) -def ordinal(n): return "%d%s" % ( - n, "tsnrhtdd"[(n/10 % 10 != 1)*(n % 10 < 4)*n % 10::4]) +def ordinal(n): + return "%d%s" % (n, "tsnrhtdd"[(n / 10 % 10 != 1) * (n % 10 < 4) * n % 10 :: 4]) def id_generator(size=6, chars=string.ascii_uppercase + string.digits): - return ''.join(random.choice(chars) for _ in range(size)) + return "".join(random.choice(chars) for _ in range(size)) def humansize(nbytes): i = 0 - suffixes = ['B', 'KB', 'MB', 'GB', 'TB', 'PB'] - while nbytes >= 1024 and i < len(suffixes)-1: - nbytes /= 1024. + suffixes = ["B", "KB", "MB", "GB", "TB", "PB"] + while nbytes >= 1024 and i < len(suffixes) - 1: + nbytes /= 1024.0 i += 1 - f = ('%.2f' % nbytes).rstrip('0').rstrip('.') - return '%s %s' % (f, suffixes[i]) + f = ("%.2f" % nbytes).rstrip("0").rstrip(".") + return "%s %s" % (f, suffixes[i]) def byteToGigaByte(n): - return (n / math.pow(10, 9)) + return n / math.pow(10, 9) -def send_webhook(item, webhook_hide_sensitive_info, webhook_links, category, category2: str): +async def send_webhook( + item, webhook_hide_sensitive_info, webhook_links, category, category2: str +): if category == "auth_webhook": for webhook_link in webhook_links: auth = item @@ -770,11 +820,10 @@ def send_webhook(item, webhook_hide_sensitive_info, webhook_links, category, cat embed.title = f"Auth {category2.capitalize()}" embed.add_field("username", username) message.embeds.append(embed) - message = ujson.loads(json.dumps( - message, default=lambda o: o.__dict__)) + message = ujson.loads(json.dumps(message, default=lambda o: o.__dict__)) x = requests.post(webhook_link, json=message) if category == "download_webhook": - subscriptions:list[create_user] = item.get_subscriptions(refresh=False) + subscriptions: list[create_user] = await item.get_subscriptions(refresh=False) for subscription in subscriptions: download_info = subscription.download_info if download_info: @@ -787,14 +836,15 @@ def send_webhook(item, webhook_hide_sensitive_info, webhook_links, category, cat embed.add_field("link", subscription.get_link()) embed.image.url = subscription.avatar message.embeds.append(embed) - message = ujson.loads(json.dumps( - message, default=lambda o: o.__dict__)) + message = ujson.loads( + json.dumps(message, default=lambda o: o.__dict__) + ) x = requests.post(webhook_link, json=message) print def find_between(s, start, end): - format = f'{start}(.+?){end}' + format = f"{start}(.+?){end}" x = re.search(format, s) if x: x = x.group(1) @@ -815,6 +865,7 @@ def start(directory): content_count = len(contents) if content_count == 1 and "desktop.ini" in contents: shutil.rmtree(full_path, ignore_errors=True) + x = start(directory) if os.path.exists(directory): if not os.listdir(directory): @@ -843,9 +894,9 @@ def module_chooser(domain, json_sites): continue elif x not in wl: continue - string += str(count)+" = "+x + string += str(count) + " = " + x site_names.append(x) - if count+1 != site_count: + if count + 1 != site_count: string += seperator count += 1 @@ -853,3 +904,29 @@ def module_chooser(domain, json_sites): string = f"{domain} not supported" site_names = [] return string, site_names + + +def link_picker(media,video_quality): + link = "" + if "source" in media: + quality_key = "source" + source = media[quality_key] + link = source[quality_key] + if link: + if media["type"] == "video": + qualities = media["videoSources"] + qualities = dict(sorted(qualities.items(), reverse=False)) + qualities[quality_key] = source[quality_key] + for quality, quality_link in qualities.items(): + video_quality = video_quality.removesuffix("p") + if quality == video_quality: + if quality_link: + link = quality_link + break + print + print + print + if "src" in media: + link = media["src"] + return link + \ No newline at end of file diff --git a/modules/onlyfans.py b/modules/onlyfans.py index 764e1113..d377bc75 100644 --- a/modules/onlyfans.py +++ b/modules/onlyfans.py @@ -1,43 +1,45 @@ -from apis.onlyfans.classes.create_message import create_message -from apis.onlyfans.classes.create_post import create_post -from apis.onlyfans.classes.create_story import create_story -from apis.onlyfans.classes.extras import auth_details, media_types -from apis.onlyfans.classes.create_user import create_user -from apis.onlyfans.classes.create_auth import create_auth +import copy import hashlib -import shutil -from apis.onlyfans import onlyfans as OnlyFans -from helpers import db_helper -from typing import Any, Optional, Union -from apis.onlyfans.onlyfans import start -from classes.prepare_metadata import create_metadata, format_content, prepare_reformat +import html +import json import os +import shutil from datetime import datetime, timedelta from itertools import chain, product +from types import SimpleNamespace +from typing import Any, Optional, Union from urllib.parse import urlparse -import copy -import json -import html -import extras.OFRenamer.start as ofrenamer - -import requests +import extras.OFLogin.start_ofl as oflogin +import extras.OFRenamer.start as ofrenamer import helpers.main_helper as main_helper -from types import SimpleNamespace -from mergedeep import merge, Strategy -from sqlalchemy.orm import session, sessionmaker, declarative_base +import requests +from apis.onlyfans import onlyfans as OnlyFans +from apis.onlyfans.classes.create_auth import create_auth +from apis.onlyfans.classes.create_message import create_message +from apis.onlyfans.classes.create_post import create_post +from apis.onlyfans.classes.create_story import create_story +from apis.onlyfans.classes.create_user import create_user +from apis.onlyfans.classes.extras import auth_details, media_types +from apis.onlyfans.onlyfans import start +from classes.prepare_metadata import create_metadata, format_content, prepare_reformat +from helpers import db_helper from helpers.main_helper import ( choose_option, download_session, export_data, + export_sqlite, + fix_sqlite, import_archive, ) -import extras.OFLogin.start_ofl as oflogin +from mergedeep import Strategy, merge +from sqlalchemy.orm import declarative_base, session, sessionmaker +from sqlalchemy.orm.scoping import scoped_session +import helpers.db_helper as db_helper site_name = "OnlyFans" json_config = None json_global_settings = None -max_threads = -1 json_settings = None auto_media_choice = "" profile_directory = "" @@ -57,11 +59,10 @@ def assign_vars(json_auth: auth_details, config, site_settings, site_name): - global json_config, json_global_settings, max_threads, json_settings, auto_media_choice, profile_directory, download_directory, metadata_directory, metadata_directory_format, delete_legacy_metadata, overwrite_files, date_format, file_directory_format, filename_format, ignored_keywords, ignore_type, blacklist_name, webhook, text_length + global json_config, json_global_settings, json_settings, auto_media_choice, profile_directory, download_directory, metadata_directory, metadata_directory_format, delete_legacy_metadata, overwrite_files, date_format, file_directory_format, filename_format, ignored_keywords, ignore_type, blacklist_name, webhook, text_length json_config = config json_global_settings = json_config["settings"] - max_threads = json_global_settings["max_threads"] json_settings = site_settings auto_media_choice = json_settings["auto_media_choice"] profile_directory = main_helper.get_directory( @@ -86,12 +87,12 @@ def assign_vars(json_auth: auth_details, config, site_settings, site_name): text_length = json_settings["text_length"] -def account_setup( +async def account_setup( auth: create_auth, identifiers: list = [], jobs: dict = {}, auth_count=0 ): status = False subscriptions = [] - authed = auth.login() + authed = await auth.login() if authed.active: profile_directory = json_global_settings["profile_directories"][0] profile_directory = os.path.abspath(profile_directory) @@ -105,12 +106,12 @@ def account_setup( imported = import_archive(metadata_filepath) if "auth" in imported: imported = imported["auth"] - mass_messages = authed.get_mass_messages(resume=imported) + mass_messages = await authed.get_mass_messages(resume=imported) if mass_messages: main_helper.export_data(mass_messages, metadata_filepath) # chats = api.get_chats() if identifiers or jobs["scrape_names"]: - subscriptions += manage_subscriptions( + subscriptions += await manage_subscriptions( authed, auth_count, identifiers=identifiers ) status = True @@ -131,13 +132,25 @@ def account_setup( # The start lol -def start_datascraper(authed: create_auth, identifier, site_name, choice_type=None): - subscription = authed.get_subscription(identifier=identifier) +async def start_datascraper( + authed: create_auth, identifier, site_name, choice_type=None +): + subscription = await authed.get_subscription(identifier=identifier) if not subscription: return [False, subscription] print("Scrape Processing") username = subscription.username print("Name: " + username) + some_list = [ + profile_directory, + download_directory, + metadata_directory, + format_directories, + site_name, + username, + metadata_directory_format, + ] + fix_sqlite(*some_list) api_array = scrape_choice(authed, subscription) api_array = format_options(api_array, "apis") apis = api_array[0] @@ -156,7 +169,7 @@ def start_datascraper(authed: create_auth, identifier, site_name, choice_type=No item["api_array"]["username"] = username item["api_array"]["subscription"] = subscription api_type = item["api_type"] - results = prepare_scraper(authed, site_name, item) + results = await prepare_scraper(authed, site_name, item) print print("Scrape Completed" + "\n") return [True, subscription] @@ -255,7 +268,9 @@ def scrape_choice(authed: create_auth, subscription): # Downloads the model's avatar and header -def profile_scraper(authed: create_auth, site_name, api_type, username, base_directory): +async def profile_scraper( + authed: create_auth, site_name, api_type, username, base_directory +): reformats = {} reformats["metadata_directory_format"] = json_settings["metadata_directory_format"] reformats["file_directory_format"] = json_settings["file_directory_format"] @@ -272,16 +287,16 @@ def profile_scraper(authed: create_auth, site_name, api_type, username, base_dir option["directory"] = base_directory a, b, c = prepare_reformat(option, keep_vars=True).reformat(reformats) print - y = authed.get_subscription(identifier=username) + y = await authed.get_subscription(identifier=username) override_media_types = [] avatar = y.avatar header = y.header if avatar: override_media_types.append(["Avatars", avatar]) - if header: + elif header: override_media_types.append(["Headers", header]) - d_session = download_session() - d_session.start(unit="B", unit_scale=True, miniters=1) + progress_bar = download_session() + progress_bar.start(unit="B", unit_scale=True, miniters=1) for override_media_type in override_media_types: new_dict = dict() media_type = override_media_type[0] @@ -290,26 +305,32 @@ def profile_scraper(authed: create_auth, site_name, api_type, username, base_dir directory2 = os.path.join(b, media_type) os.makedirs(directory2, exist_ok=True) download_path = os.path.join(directory2, media_link.split("/")[-2] + ".jpg") - if not overwrite_files: + response = await authed.session_manager.json_request( + media_link, + method="HEAD" + ) + if overwrite_files: if os.path.isfile(download_path): - continue - r = authed.session_manager.json_request( - media_link, stream=True, json_format=False, sleep=False + if os.path.getsize(download_path) == response.content_length: + continue + progress_bar.update_total_size(response.content_length) + response, data = await authed.session_manager.json_request( + media_link, + stream=True, + json_format=False, + sleep=False, + progress_bar=progress_bar, ) - if not isinstance(r, requests.Response): - continue - tsize = r.headers.get("content-length") - d_session.update_total_size(tsize) - downloaded = main_helper.downloader(r, download_path, d_session) + downloaded = await main_helper.write_data(download_path, data) if not downloaded: continue - d_session.close() + progress_bar.close() -def paid_content_scraper(api: start, identifiers=[]): +async def paid_content_scraper(api: start, identifiers=[]): for authed in api.auths: paid_contents = [] - paid_contents = authed.get_paid_content() + paid_contents = await authed.get_paid_content() if not authed.active: return authed.subscriptions = authed.subscriptions @@ -321,10 +342,13 @@ def paid_content_scraper(api: start, identifiers=[]): author = paid_content.author if not author: continue - subscription = authed.get_subscription(check=True, identifier=author["id"]) + subscription = await authed.get_subscription( + check=True, identifier=author["id"] + ) if not subscription: subscription = create_user(author) authed.subscriptions.append(subscription) + subscription.subscriber = authed api_type = paid_content.responseType.capitalize() + "s" api_media = getattr(subscription.temp_scraped, api_type) api_media.append(paid_content) @@ -388,11 +412,8 @@ def paid_content_scraper(api: start, identifiers=[]): authed, new_metadata, formatted_directories, - subscription, api_type, - api_path, metadata_path, - site_name, ) parent_type = "" new_metadata = new_metadata + old_metadata @@ -432,7 +453,7 @@ def process_messages(authed: create_auth, subscription, messages) -> list: return unrefined_set -def process_mass_messages( +async def process_mass_messages( authed: create_auth, subscription, metadata_directory, mass_messages ) -> list: def compare_message(queue_id, remote_messages): @@ -548,7 +569,7 @@ def compare_message(queue_id, remote_messages): print if mass_message["hashed_ip"] != hash or date_object > next_date_object: print("Getting Message By ID") - x = subscription.get_message_by_id( + x = await subscription.get_message_by_id( identifier=identifier, identifier2=found["id"], limit=1 ) new_found = x["result"]["list"][0] @@ -567,11 +588,8 @@ def process_legacy_metadata( authed: create_auth, new_metadata_set, formatted_directories, - subscription, api_type, - api_path, archive_path, - site_name, ): print("Processing metadata.") delete_metadatas = [] @@ -586,6 +604,7 @@ def process_legacy_metadata( os.makedirs(os.path.dirname(archive_path), exist_ok=True) shutil.move(legacy_metadata_path2, archive_path) archive_path = archive_path.replace("db", "json") + legacy_archive_path = archive_path.replace("Posts.json", "Archived.json") legacy_metadata_object, delete_legacy_metadatas = legacy_metadata_fixer( formatted_directories, authed ) @@ -593,6 +612,17 @@ def process_legacy_metadata( print("Merging new metadata with legacy metadata.") delete_metadatas.extend(delete_legacy_metadatas) old_metadata_set = import_archive(archive_path) + old_metadata_set2 = import_archive(legacy_archive_path) + if old_metadata_set2: + delete_metadatas.append(legacy_archive_path) + old_metadata_set_type = type(old_metadata_set) + old_metadata_set2_type = type(old_metadata_set2) + if all(v == dict for v in [old_metadata_set_type,old_metadata_set2_type]): + old_metadata_set = merge({}, *[old_metadata_set,old_metadata_set2], strategy=Strategy.ADDITIVE) + else: + if isinstance(old_metadata_set,dict) and not old_metadata_set: + old_metadata_set = [] + old_metadata_set.append(old_metadata_set2) old_metadata_object = create_metadata(authed, old_metadata_set, api_type=api_type) if old_metadata_set: print("Merging new metadata with old metadata.") @@ -603,6 +633,7 @@ def process_legacy_metadata( for value3 in value2: x = value3.medias item = value3.convert(keep_empty_items=True) + item["archived"] = False old_metadata_set.append(item) print print @@ -642,9 +673,7 @@ def process_metadata( subscription.download_info["webhook"] = webhook database_name = parent_type if parent_type else api_type subscription.download_info["metadata_locations"][api_type] = {} - subscription.download_info["metadata_locations"][api_type][ - database_name - ] = archive_path + subscription.download_info["metadata_locations"][api_type] = archive_path if json_global_settings["helpers"]["renamer"]: print("Renaming files.") new_metadata_object = ofrenamer.start( @@ -715,7 +744,7 @@ def format_directories( # Prepares the API links to be scraped -def prepare_scraper(authed: create_auth, site_name, item): +async def prepare_scraper(authed: create_auth, site_name, item): api_type = item["api_type"] api_array = item["api_array"] subscription: create_user = api_array["subscription"] @@ -739,29 +768,31 @@ def prepare_scraper(authed: create_auth, site_name, item): formatted_download_directory = formatted_directories["download_directory"] formatted_metadata_directory = formatted_directories["metadata_directory"] if api_type == "Profile": - profile_scraper( + await profile_scraper( authed, site_name, api_type, username, formatted_download_directory ) return True if api_type == "Stories": - master_set = subscription.get_stories() - highlights = subscription.get_highlights() + master_set = await subscription.get_stories() + master_set += await subscription.get_archived_stories() + highlights = await subscription.get_highlights() valid_highlights = [] for highlight in highlights: - highlight = subscription.get_highlights(hightlight_id=highlight.id) + highlight = await subscription.get_highlights(hightlight_id=highlight.id) valid_highlights.extend(highlight) master_set.extend(valid_highlights) print if api_type == "Posts": - master_set = subscription.get_posts() - if api_type == "Archived": - master_set = subscription.get_archived(authed) + master_set = await subscription.get_posts() + master_set += await subscription.get_archived_posts() + # if api_type == "Archived": + # master_set = await subscription.get_archived(authed) if api_type == "Messages": - unrefined_set = subscription.get_messages() + unrefined_set = await subscription.get_messages() mass_messages = getattr(authed, "mass_messages") if subscription.is_me() and mass_messages: mass_messages = getattr(authed, "mass_messages") - unrefined_set2 = process_mass_messages( + unrefined_set2 = await process_mass_messages( authed, subscription, formatted_metadata_directory, mass_messages ) unrefined_set += unrefined_set2 @@ -804,19 +835,16 @@ def prepare_scraper(authed: create_auth, site_name, item): ) unrefined_set = [x for x in unrefined_set] new_metadata = main_helper.format_media_set(unrefined_set) + metadata_path = os.path.join(formatted_metadata_directory, api_type + ".db") if new_metadata: new_metadata = new_metadata["content"] - metadata_path = os.path.join(formatted_metadata_directory, api_type + ".db") api_path = os.path.join(api_type, parent_type) old_metadata, delete_metadatas = process_legacy_metadata( authed, new_metadata, formatted_directories, - subscription, api_type, - api_path, metadata_path, - site_name, ) new_metadata = new_metadata + old_metadata subscription.set_scraped(api_type, new_metadata) @@ -999,7 +1027,6 @@ def media_scraper( new_set = {} new_set["content"] = [] directories = [] - session = authed.session_manager.sessions[0] if api_type == "Stories": pass if api_type == "Archived": @@ -1012,7 +1039,8 @@ def media_scraper( download_path = formatted_directories["download_directory"] for location in formatted_directories["locations"]: sorted_directories = copy.copy(location["sorted_directories"]) - master_date = "01-01-0001 00:00:00" + date_today = datetime.now() + master_date = datetime.strftime(date_today, "%d-%m-%Y %H:%M:%S") media_type = location["media_type"] alt_media_type = location["alt_media_type"] file_directory_format = json_settings["file_directory_format"] @@ -1031,11 +1059,12 @@ def media_scraper( seperator = " | " if print_output: print( - f"Scraping [{seperator.join(alt_media_type)}]. Should take less than a minute." + f"Scraping [{seperator.join(alt_media_type)}]. Should take less than a minute.\n" ) post_id = post_result.id new_post = {} new_post["medias"] = [] + new_post["archived"] = False rawText = "" text = "" previews = [] @@ -1053,6 +1082,7 @@ def media_scraper( previews = post_result.preview date = post_result.postedAt price = post_result.price + new_post["archived"] = post_result.isArchived if isinstance(post_result, create_message): if post_result.isReportedByMe: continue @@ -1093,30 +1123,8 @@ def media_scraper( new_post["price"] = price for media in post_result.media: media_id = media["id"] - size = 0 - link = "" preview_link = "" - if "source" in media: - quality_key = "source" - source = media[quality_key] - link = source[quality_key] - if link: - if media["type"] == "video": - qualities = media["videoSources"] - qualities = dict(sorted(qualities.items(), reverse=False)) - qualities[quality_key] = source[quality_key] - for quality, quality_link in qualities.items(): - video_quality_json = json_settings["video_quality"] - video_quality_json = video_quality_json.removesuffix("p") - if quality == video_quality_json: - if quality_link: - link = quality_link - break - print - print - print - if "src" in media: - link = media["src"] + link = main_helper.link_picker(media,json_settings["video_quality"]) matches = ["us", "uk", "ca", "ca2", "de"] if not link: @@ -1147,7 +1155,6 @@ def media_scraper( if media["type"] not in alt_media_type: continue - session.links.extend(new_media["links"]) matches = [s for s in ignored_keywords if s in final_text] if matches: print("Matches: ", matches) @@ -1155,13 +1162,15 @@ def media_scraper( filename = link.rsplit("/", 1)[-1] filename, ext = os.path.splitext(filename) ext = ext.__str__().replace(".", "").split("?")[0] - + final_api_type = ( + os.path.join("Archived", api_type) if new_post["archived"] else api_type + ) option = {} option = option | new_post option["site_name"] = "OnlyFans" option["media_id"] = media_id option["filename"] = filename - option["api_type"] = api_type + option["api_type"] = final_api_type option["media_type"] = media_type option["ext"] = ext option["username"] = username @@ -1169,6 +1178,7 @@ def media_scraper( option["text_length"] = text_length option["directory"] = download_path option["preview"] = new_media["preview"] + option["archived"] = new_post["archived"] prepared_format = prepare_reformat(option) file_directory = main_helper.reformat( @@ -1220,174 +1230,51 @@ def media_scraper( # Downloads scraped content -class download_media: - def __init__(self, authed: create_auth = None, subscription=None) -> None: - username = subscription.username - download_info = subscription.download_info - if download_info: - self.downloaded = True - metadata_locations = download_info["metadata_locations"] - directory = download_info["directory"] - for parent_type, value in metadata_locations.items(): - for api_type, metadata_path in value.items(): - Session, engine = db_helper.create_database_session(metadata_path) - database_session = Session() - database_name = api_type.lower() - db_collection = db_helper.database_collection() - database = db_collection.chooser(database_name) - api_table = database.api_table - media_table = database.media_table - result = database_session.query(media_table).all() - media_type_list = media_types() - for r in result: - item = getattr(media_type_list, r.media_type) - item.append(r) - media_type_list = media_type_list.__dict__ - for location, v in media_type_list.items(): - if location == "Texts": - continue - media_set = v - media_set_count = len(media_set) - if not media_set: - continue - string = "Download Processing\n" - string += f"Name: {username} | Type: {api_type} | Count: {media_set_count} {location} | Directory: {directory}\n" - print(string) - d_session = download_session() - d_session.start(unit="B", unit_scale=True, miniters=1) - pool = authed.session_manager.pool - pool.starmap( - self.prepare_download, - product( - media_set, - [authed], - [api_type], - [subscription], - [d_session], - ), - ) - d_session.close() - database_session.commit() - else: - self.downloaded = False - def prepare_download( - self, media, authed: create_auth, api_type, subscription: create_user, d_session - ): - return_bool = True - if not overwrite_files and media.downloaded: - return - count = 0 - sessions = [x for x in authed.session_manager.sessions if media.link in x.links] - if not sessions: - return - session = sessions[0] - while count < 11: - links = [media.link] - - def choose_link(session, links): - for link in links: - r = authed.session_manager.json_request( - link, session, "HEAD", stream=False, json_format=False - ) - if not isinstance(r, requests.Response): - continue - - header = r.headers - content_length = header.get("content-length") - if not content_length: - continue - content_length = int(content_length) - return [link, content_length] - - result = choose_link(session, links) - if not result: - new_result: Any = None - if api_type == "Messages": - new_result = subscription.get_message_by_id( - identifier2=media.post_id, limit=1 - ) - elif api_type == "Posts": - new_result = subscription.get_post(media.post_id) - else: - print - mandatory_directories = {} - mandatory_directories["profile_directory"] = profile_directory - mandatory_directories["download_directory"] = download_directory - mandatory_directories["metadata_directory"] = metadata_directory - media_type = format_media_types() - formatted_directories = format_directories( - mandatory_directories, - site_name, - subscription.username, - metadata_directory_format, - media_type, - api_type, - ) - unrefined_result = [media_scraper( - new_result, - authed, - subscription, - formatted_directories, - subscription.username, - api_type, - print_output=False, - )] - new_metadata = main_helper.format_media_set(unrefined_result) - new_metadata = new_metadata["content"] - found_post = main_helper.format_media_set(new_metadata) - if found_post: - found_media = [ - x - for x in found_post["medias"] - if x["media_id"] == media.media_id - ] - if found_media: - new_link = found_media[0]["links"][0] - media.link = new_link - count += 1 - continue - link = result[0] - content_length = result[1] - media.size = content_length - date_object = media.created_at - download_path = os.path.join(media.directory, media.filename) - timestamp = date_object.timestamp() - if not overwrite_files: - if main_helper.check_for_dupe_file(download_path, content_length): - main_helper.format_image(download_path, timestamp) - return_bool = False - media.downloaded = True - break - r = authed.session_manager.json_request( - link, session, stream=True, json_format=False +async def prepare_downloads(subscription: create_user): + download_info = subscription.download_info + if not download_info: + return + directory = download_info["directory"] + for api_type, metadata_path in download_info["metadata_locations"].items(): + Session, engine = db_helper.create_database_session(metadata_path) + database_session: scoped_session = Session() + database_name = api_type.lower() + db_collection = db_helper.database_collection() + database = db_collection.chooser(database_name) + api_table = database.api_table + media_table = database.media_table + settings = subscription.subscriber.extras["settings"]["supported"]["onlyfans"]["settings"] + overwrite_files = settings["overwrite_files"] + if overwrite_files: + download_list: Any = database_session.query(media_table).all() + media_set_count = len(download_list) + else: + download_list: Any = database_session.query(media_table).filter( + media_table.downloaded == False ) - if not isinstance(r, requests.Response): - return_bool = False - count += 1 - continue - d_session.update_total_size(content_length) - downloader = main_helper.downloader(r, download_path, d_session, count) - if not downloader: - count += 1 - continue - main_helper.format_image(download_path, timestamp) - media.downloaded = True - break - if not media.downloaded: - print(f"Download Failed: {media.link}") - d_session.colour = "Red" - - return return_bool + media_set_count = db_helper.get_count(download_list) + location = "" + string = "Download Processing\n" + string += f"Name: {subscription.username} | Type: {api_type} | Count: {media_set_count}{location} | Directory: {directory}\n" + if media_set_count: + print(string) + a = await subscription.session_manager.async_downloads( + download_list, subscription + ) + database_session.commit() + database_session.close() + print + print -def manage_subscriptions( +async def manage_subscriptions( authed: create_auth, auth_count=0, identifiers: list = [], refresh: bool = True ): - results = authed.get_subscriptions(identifiers=identifiers, refresh=refresh) + results = await authed.get_subscriptions(identifiers=identifiers, refresh=refresh) if blacklist_name: - r = authed.get_lists() + r = await authed.get_lists() if not r: return [False, []] new_results = [c for c in r if blacklist_name == c["name"]] @@ -1396,7 +1283,7 @@ def manage_subscriptions( list_users = item["users"] if int(item["usersCount"]) > 2: list_id = str(item["id"]) - list_users = authed.get_lists_users(list_id) + list_users = await authed.get_lists_users(list_id) users = list_users bl_ids = [x["username"] for x in users] results2 = results.copy() diff --git a/start_ofd.py b/start_ofd.py index cc5471ff..96fdcd35 100755 --- a/start_ofd.py +++ b/start_ofd.py @@ -3,8 +3,7 @@ import os import time import traceback -import logging - +import asyncio main_test.version_check() main_test.check_config() @@ -25,34 +24,38 @@ string, site_names = main_helper.module_chooser(domain, json_sites) # logging.basicConfig(level=logging.DEBUG, format="%(message)s") - while True: - try: - if domain: - if site_names: - site_name = domain + async def main(): + while True: + try: + if domain: + if site_names: + site_name = domain + else: + print(string) + continue else: print(string) - continue - else: - print(string) - x = input() - if x == "x": - break - x = int(x) - site_name = site_names[x] - site_name_lower = site_name.lower() - api = main_datascraper.start_datascraper(json_config, site_name_lower) - if api: - api.close_pools() - if exit_on_completion: - print("Now exiting.") - exit(0) - elif not infinite_loop: - print("Input anything to continue") + x = input() + if x == "x": + break + x = int(x) + site_name = site_names[x] + site_name_lower = site_name.lower() + api = await main_datascraper.start_datascraper(json_config, site_name_lower) + if api: + api.close_pools() + if exit_on_completion: + print("Now exiting.") + exit(0) + elif not infinite_loop: + print("Input anything to continue") + input() + elif loop_timeout: + print("Pausing scraper for " + loop_timeout + " seconds.") + time.sleep(int(loop_timeout)) + except Exception as e: + print(traceback.format_exc()) input() - elif loop_timeout: - print("Pausing scraper for " + loop_timeout + " seconds.") - time.sleep(int(loop_timeout)) - except Exception as e: - print(traceback.format_exc()) - input() + + + asyncio.run(main())