From a59969b565dc564872619fd0f309c302d6b9e9a4 Mon Sep 17 00:00:00 2001 From: Xavier Marrugat Date: Fri, 9 Feb 2024 11:19:59 +0100 Subject: [PATCH 1/5] Create unique keys in People This change helps identify already existing people and allows to assign emails. --- infohound/models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/infohound/models.py b/infohound/models.py index aa5102a..4f2dd36 100644 --- a/infohound/models.py +++ b/infohound/models.py @@ -16,6 +16,8 @@ class People(models.Model): url_img = models.TextField(default="https://static.thenounproject.com/png/994628-200.png") source = models.CharField(max_length=255) domain = models.ForeignKey(Domain, on_delete=models.CASCADE) + class Meta: + unique_together = (('name', 'domain'),) # TO-DO: change spoofable to allow 3 states class Emails(models.Model): @@ -103,5 +105,3 @@ class IPs(models.Model): all_info = models.TextField(null=True) is_vulnerable = models.BooleanField(null=True) domain = models.ForeignKey(Domain, on_delete=models.CASCADE) - - From 426f8edf1dbf969398f13cc87da8080387dd02ce Mon Sep 17 00:00:00 2001 From: Xavier Marrugat Date: Fri, 9 Feb 2024 11:26:09 +0100 Subject: [PATCH 2/5] Link emails to people Now it is possible to link an email (when performing email analysis) to already discovered people when the name matches. --- infohound/tool/retriever_modules/people.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/infohound/tool/retriever_modules/people.py b/infohound/tool/retriever_modules/people.py index d0148f2..e45a501 100644 --- a/infohound/tool/retriever_modules/people.py +++ b/infohound/tool/retriever_modules/people.py @@ -48,7 +48,11 @@ def findSocialProfilesByEmail(domain_id): except IntegrityError as e: pass try: - p, created = People.objects.get_or_create(name=results["name"], social_profiles=results["links"], source="Google", domain_id=domain_id) + p = People.objects.filter(name=results["name"], domain_id=domain_id) + if p.exists(): + p = p.first() + else: + p, created = People.objects.get_or_create(name=results["name"], social_profiles=results["links"], raw_metadata=results["info_dump"], url_img=results["url_img"], source="Google", domain_id=domain_id) try: u, created = Usernames.objects.get_or_create(username=email.split("@")[0], source="Google", domain_id=domain_id) usernames_data.append(email.split("@")[0]) From c35a2b8174b230f95a235c70192c081b90e46e45 Mon Sep 17 00:00:00 2001 From: Xavier Marrugat Date: Fri, 9 Feb 2024 11:33:57 +0100 Subject: [PATCH 3/5] Improved name Improved the way a person's name is gotten to avoid inconsistencies. --- infohound/tool/data_sources/google_data.py | 415 +++++++++++---------- 1 file changed, 213 insertions(+), 202 deletions(-) diff --git a/infohound/tool/data_sources/google_data.py b/infohound/tool/data_sources/google_data.py index 54981f7..b14bf02 100644 --- a/infohound/tool/data_sources/google_data.py +++ b/infohound/tool/data_sources/google_data.py @@ -13,213 +13,224 @@ def getUrls(query): - start = 1 - total_results = 0 - total_gathered = 0 - limit = False - results = True - info = [] - - print("Testing query: " + query) - - while results and start<100 and not limit: - payload = {"key":API_KEY,"cx":ID,"start":start,"q":query} - res = requests.get("https://www.googleapis.com/customsearch/v1",params=payload) - data = json.loads(res.text) - if "error" in data: - print(data["error"]["status"]) - limit = True - else: - if start == 1: - total_results = data["searchInformation"]["totalResults"] - if "items" in data: - for item in data["items"]: - url = item["link"] - desc = None - if "snippet" in item: - desc = item["snippet"] - info.append((url,desc,json.dumps(item))) - total_gathered = total_gathered + 1 - else: - results = False - start = start + 10 - - print("Found "+str(total_results)+" and added "+str(total_gathered)) - return (info,total_results,total_gathered,limit) - - #- vulnerable paths - #- files - #- url + start = 1 + total_results = 0 + total_gathered = 0 + limit = False + results = True + info = [] + + print("Testing query: " + query) + + while results and start<100 and not limit: + payload = {"key":API_KEY,"cx":ID,"start":start,"q":query} + res = requests.get("https://www.googleapis.com/customsearch/v1",params=payload) + data = json.loads(res.text) + if "error" in data: + print(data["error"]["status"]) + limit = True + else: + if start == 1: + total_results = data["searchInformation"]["totalResults"] + if "items" in data: + for item in data["items"]: + url = item["link"] + desc = None + if "snippet" in item: + desc = item["snippet"] + info.append((url,desc,json.dumps(item))) + total_gathered = total_gathered + 1 + else: + results = False + start = start + 10 + + print("Found "+str(total_results)+" and added "+str(total_gathered)) + return (info,total_results,total_gathered,limit) + + #- vulnerable paths + #- files + #- url def discoverPeople (query): - start = 1 - total_results = 0 - total_gathered = 0 - limit = False - results = True - people = [] - - print("Testing query: " + query) - - while results and start < 100 and not limit: - payload = {"key":API_KEY,"cx":ID,"start":start,"q":query} - res = requests.get("https://www.googleapis.com/customsearch/v1",params=payload) - data = json.loads(res.text) - if "error" in data: - print(data["error"]["status"]) - limit = True - else: - if start == 1: - total_results = data["searchInformation"]["totalResults"] - if "items" in data: - for item in data["items"]: - try: - url = item["link"] - first_name = item["pagemap"]["metatags"][0]["profile:first_name"] - last_name = item["pagemap"]["metatags"][0]["profile:last_name"] - url_img = item["pagemap"]["cse_image"][0]["src"] - name = f"{first_name} {last_name}" - people.append((name,url,json.dumps(item),url_img)) - print("Added: " + name) - total_gathered = total_gathered + 1 - except KeyError as e: - print(f"Error: The key '{e.args[0]}' is not present in the results.") - except Exception as e: - print(f"Unexpected error: {str(e)}") - else: - results = False - start = start + 10 - time.sleep(1) - - print("Found "+str(total_results)+" and added "+str(total_gathered)) - return (people) + start = 1 + total_results = 0 + total_gathered = 0 + limit = False + results = True + people = [] + + print("Testing query: " + query) + + while results and start < 100 and not limit: + payload = {"key":API_KEY,"cx":ID,"start":start,"q":query} + res = requests.get("https://www.googleapis.com/customsearch/v1",params=payload) + data = json.loads(res.text) + if "error" in data: + print(data["error"]["status"]) + limit = True + else: + if start == 1: + total_results = data["searchInformation"]["totalResults"] + if "items" in data: + for item in data["items"]: + try: + url = item["link"] + first_name = item["pagemap"]["metatags"][0]["profile:first_name"] + last_name = item["pagemap"]["metatags"][0]["profile:last_name"] + url_img = item["pagemap"]["cse_image"][0]["src"] + name = f"{first_name} {last_name}" + people.append((name,url,json.dumps(item),url_img)) + print("Added: " + name) + total_gathered = total_gathered + 1 + except KeyError as e: + print(f"Error: The key '{e.args[0]}' is not present in the results.") + except Exception as e: + print(f"Unexpected error: {str(e)}") + else: + results = False + start = start + 10 + time.sleep(1) + + print("Found "+str(total_results)+" and added "+str(total_gathered)) + return (people) def discoverEmails(domain): - emails = [] - start = 0 - total = 200 - num = 50 - iterations = int(total/num) - if (total%num) != 0: - iterations += 1 - url_base = f"https://www.google.com/search?q=intext:@{domain}&num={num}" - cookies = {"CONSENT": "YES+srp.gws"} - while start < iterations: - try: - url = url_base + f"&start={start}" - user_agent = infohound_utils.getUserAgents() - response = requests.get(url, - headers=user_agent[randint(0, len(user_agent)-1)], - allow_redirects=False, - cookies=cookies, - proxies=None - ) - escaped_text = response.text.encode('utf-8').decode('unicode_escape') - text = urllib.parse.unquote(html.unescape(escaped_text)) - - if response.status_code == 302 and ("htps://www.google.com/webhp" in text or "https://consent.google.com" in text): - raise GoogleCookiePolicies() - elif "detected unusual traffic" in text: - raise GoogleCaptcha() - #emails = emails + infohound_utils.extractEmails(domain, text) - for e in infohound_utils.extractEmails(domain, text): - if e not in emails: - emails.append(e) - soup = BeautifulSoup(text, "html.parser") - # h3 is the title of every result - if len(soup.find_all("h3")) < num: - break - except Exception as ex: - raise ex #It's left over... but it stays there - start += 1 - return emails + emails = [] + start = 0 + total = 200 + num = 50 + iterations = int(total/num) + if (total%num) != 0: + iterations += 1 + url_base = f"https://www.google.com/search?q=intext:@{domain}&num={num}" + cookies = {"CONSENT": "YES+srp.gws"} + while start < iterations: + try: + url = url_base + f"&start={start}" + user_agent = infohound_utils.getUserAgents() + response = requests.get(url, + headers=user_agent[randint(0, len(user_agent)-1)], + allow_redirects=False, + cookies=cookies, + proxies=None + ) + escaped_text = response.text.encode('utf-8').decode('unicode_escape') + text = urllib.parse.unquote(html.unescape(escaped_text)) + + if response.status_code == 302 and ("htps://www.google.com/webhp" in text or "https://consent.google.com" in text): + raise GoogleCookiePolicies() + elif "detected unusual traffic" in text: + raise GoogleCaptcha() + #emails = emails + infohound_utils.extractEmails(domain, text) + for e in infohound_utils.extractEmails(domain, text): + if e not in emails: + emails.append(e) + soup = BeautifulSoup(text, "html.parser") + # h3 is the title of every result + if len(soup.find_all("h3")) < num: + break + except Exception as ex: + raise ex #It's left over... but it stays there + start += 1 + return emails def discoverSocialMedia(domain,email): - data = {} - links = [] - name = "" - - num = 50 - username = email.split("@")[0] - scope = email.split("@")[1] - - url = f"https://www.google.com/search?q='{username}' {scope}" - cookies = {"CONSENT": "YES+","SOCS":"CAISHAgCEhJnd3NfMjAyNDAxMzEtMF9SQzQaAmVzIAEaBgiAkIuuBg"} - - try: - user_agent = infohound_utils.getUserAgents() - response = requests.get(url, - headers=user_agent[randint(0, len(user_agent)-1)], - allow_redirects=False, - cookies=cookies, - proxies=None - ) - - text = response.content - - if response.status_code == 302 and ("htps://www.google.com/webhp" in text or "https://consent.google.com" in text): - raise GoogleCookiePolicies() - elif "detected unusual traffic" in text: - raise GoogleCaptcha() - links = infohound_utils.extractSocialInfo(text) - - if links != []: - soup = BeautifulSoup(text, "html.parser") - if len(soup.find_all("h3")) >= 2: - info = soup.find_all("h3")[0].string - if "-" in info: - info = info.string.split("-")[0] - if "," in info: - info = info.split(",")[0] - name = info.strip() - - data["links"] = links - data["name"] = name - except Exception as ex: - raise ex #It's left over... but it stays there - return data + data = {} + links = [] + name = "" + + num = 50 + username = email.split("@")[0] + scope = email.split("@")[1] + + url = f"https://www.google.com/search?q='{username}' {scope}" + # Seems that this trick does not work any more + cookies = {"CONSENT": "YES+","SOCS":"CAISHAgCEhJnd3NfMjAyNDAxMzEtMF9SQzQaAmVzIAEaBgiAkIuuBg"} + + try: + user_agent = infohound_utils.getUserAgents() + response = requests.get(url, + headers=user_agent[randint(0, len(user_agent)-1)], + allow_redirects=False, + cookies=cookies, + proxies=None + ) + + text = response.content + + if response.status_code == 302 and ("htps://www.google.com/webhp" in text or "https://consent.google.com" in text): + raise GoogleCookiePolicies() + elif "detected unusual traffic" in text: + raise GoogleCaptcha() + links = infohound_utils.extractSocialInfo(text) + + if links != []: + soup = BeautifulSoup(text, "html.parser") + if len(soup.find_all("h3")) >= 2: + info = soup.find_all("h3")[0].string + if "-" in info: + info = info.string.split("-")[0] + if "," in info: + info = info.split(",")[0] + name = info.strip() + + data["links"] = links + data["name"] = name + except Exception as ex: + raise ex #It's left over... but it stays there + return data def discoverSocialMediaByDorks(domain,email): - data = {} - links = [] - name = "" - limit = False - - num = 50 - username = email.split("@")[0] - scope = email.split("@")[1] - - payload = {"key":API_KEY,"cx":ID,"start":1,"q":f"'{username}' {scope}"} - res = requests.get("https://www.googleapis.com/customsearch/v1",params=payload) - info = json.loads(res.content) - if "error" in info: - print(info["error"]["status"]) - limit = True - else: - if "items" in info: - for item in info["items"]: - if "linkedin" in item["link"]: - l = item["link"] - if "?" in item["link"]: - l = l.split("?")[0] - links.append(l) - name = item["title"] - if "," in name: - name = name.split(",")[0] - if "-" in name: - name = name.split("-")[0] - name = name.strip() - if "twitter" in item["link"]: - l = item["link"] - if "?" in item["link"]: - l = l.split("?")[0] - links.append(l) - if name == "": - name = item["title"].split("(")[0].strip() - data["links"] = links - data["name"] = name - print(data) - return data - - - + data = {} + links = [] + name = "" + url_img = None + info_dump = None + limit = False + + num = 50 + username = email.split("@")[0] + scope = email.split("@")[1] + + payload = {'key':API_KEY,'cx':ID,'start':1,'q':f'"{username}" {scope}'} + res = requests.get("https://www.googleapis.com/customsearch/v1",params=payload) + info = json.loads(res.content) + if "error" in info: + print(info["error"]["status"]) + limit = True + else: + if "items" in info: + found_linkedin = False + found_twitter = False + for item in info["items"]: + try: + if "linkedin" in item["link"] and not found_linkedin: + l = item["link"].split("?")[0] if "?" in item["link"] else item["link"] + first_name = item["pagemap"]["metatags"][0]["profile:first_name"] + last_name = item["pagemap"]["metatags"][0]["profile:last_name"] + name = f"{first_name} {last_name}" + url_img = item["pagemap"]["cse_image"][0]["src"] + info_dump = json.dumps(item) + links.append(l) + found_linkedin = True + if "twitter" in item["link"] and not found_twitter: + l = item["link"].split("?")[0] if "?" in item["link"] else item["link"] + name = item["pagemap"]["person"][0]["givenname"] + if not url_img: + url_img = item["pagemap"]["cse_image"][0]["src"] + if not info_dump: + info_dump = json.dumps(item) + links.append(l) + found_twitter = True + if found_twitter and found_linkedin: + break + except KeyError as e: + print(f"Error: The key '{e.args[0]}' is not present in the results.") + except Exception as e: + print(f"Unexpected error: {str(e)}") + + data["links"] = links + data["name"] = name + data["url_img"] = url_img + data["info_dump"] = info_dump + return data From e3b0809c464ea4683e9b6dd453a581ab9f5607da Mon Sep 17 00:00:00 2001 From: Xavier Marrugat Date: Fri, 9 Feb 2024 11:44:14 +0100 Subject: [PATCH 4/5] Minor errors From 9cfee8de94efe92ae5b3cfe01a24634b202eb446 Mon Sep 17 00:00:00 2001 From: Xavier Marrugat Date: Fri, 9 Feb 2024 11:54:41 +0100 Subject: [PATCH 5/5] Slow but sure Seems that Web Archive has limited the queries you can perform per second. So, for now, instead of downloading them asynchronously and making a lot of noise, they will be downloaded a little bit slower but ensuring no file is left behind. --- .../tool/analysis_modules/files_analysis.py | 319 +++++++++--------- 1 file changed, 155 insertions(+), 164 deletions(-) diff --git a/infohound/tool/analysis_modules/files_analysis.py b/infohound/tool/analysis_modules/files_analysis.py index bc545e1..5e0278c 100644 --- a/infohound/tool/analysis_modules/files_analysis.py +++ b/infohound/tool/analysis_modules/files_analysis.py @@ -6,6 +6,7 @@ import time import os import textract +from random import randint from django.db import IntegrityError from infohound.models import Domain,Files, Emails from infohound.tool.retriever_modules import emails as emails_utils @@ -22,137 +23,127 @@ download_direcotry = "infohound/tool/downloaded_files/" -concurrency_limit = trio.CapacityLimiter(5) +rate_limiter = trio.CapacityLimiter(2) async def download_file(url, filepath): - max_retries = 5 - for i in range(max_retries + 1): - try: - async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client: - response = await client.get(url) - with open(filepath, 'wb') as f: - f.write(response.content) - print(f"File {url} downloaded!") - return True - except Exception as e: - print(e) - print(type(e)) - print(f"Download failed for {url}. Retrying... ({i}/{max_retries})") - return False + max_retries = 5 + file_url = url + for i in range(max_retries + 1): + try: + async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client: + user_agent = infohound_utils.getUserAgents() + response = await client.get(file_url, headers=user_agent[randint(0, len(user_agent)-1)]) + with open(filepath, 'wb') as f: + f.write(response.content) + print(f"File {url} downloaded!") + return + except Exception as e: + print(e) + print(type(e)) + print(f"Download failed for {url}. Retrying... ({i}/{max_retries})") + if i%1 == 0: + file_url = url.replace("https://", "http://") + else: + file_url = url + time.sleep(5) async def download_all_files(domain_id): queryset = Files.objects.filter(metadata__isnull=True,domain_id=domain_id) - async with trio.open_nursery() as nursery: - for entry in queryset.iterator(): - url = entry.url_download - filename = entry.filename - filepath = download_direcotry+filename - if not os.path.isfile(os.path.join(download_direcotry, filename)): - task = nursery.start_soon(download_file, url, filepath) + #async with trio.open_nursery() as nursery: + for entry in queryset.iterator(): + url = entry.url_download + filename = entry.filename + filepath = download_direcotry+filename + if not os.path.isfile(os.path.join(download_direcotry, filename)): + #async with rate_limiter: + #task = nursery.start_soon(download_file, url, filepath) + await download_file(url,filepath) def downloadSingleFile(url, filename): - print("Trying to download: " + filename) - retry = 0 - downloaded = False - filepath = download_direcotry+filename - while retry < 5 and not downloaded: - try: - res = requests.get(url) - open(filepath, 'wb').write(res.content) - downloaded = True - except Exception as e: - retry = retry + 1 - if not downloaded: - print(retry) - return downloaded - -""" -def downloadAllFiles(domain_id): - data = [] - queryset = Files.objects.filter(metadata__isnull=True,domain_id=domain_id) - for entry in queryset.iterator(): - url = entry.url - filename = entry.filename - downloaded = downloadSingleFile(url, filename) - print("File not downloaded: "+filename) - break -""" + print("Trying to download: " + filename) + retry = 0 + downloaded = False + filepath = download_direcotry+filename + while retry < 5 and not downloaded: + try: + res = requests.get(url) + open(filepath, 'wb').write(res.content) + downloaded = True + except Exception as e: + retry = retry + 1 + if not downloaded: + print(retry) + return downloaded + def getMetadata(domain_id): - queryset = Files.objects.filter(metadata__isnull=True,domain_id=domain_id) - for entry in queryset.iterator(): - url = entry.url - filename = entry.filename - if not os.path.isfile(os.path.join(download_direcotry, filename)): - downloaded = downloadSingleFile(url, filename) - if not downloaded: - pass - extracted = False - retry = 0 - while not extracted and retry < 5: - with exiftool.ExifToolHelper() as et: - try : - filepath = download_direcotry+filename - metadata = et.get_metadata([filepath])[0] - entry.metadata = metadata - extracted = True - entry.save() - print("metadata extracted") - except Exception as e: - print(e) - print(type(e)) - retry = retry + 1 - - + queryset = Files.objects.filter(metadata__isnull=True,domain_id=domain_id) + for entry in queryset.iterator(): + url = entry.url + filename = entry.filename + if os.path.isfile(os.path.join(download_direcotry, filename)): + extracted = False + retry = 0 + while not extracted and retry < 5: + with exiftool.ExifToolHelper() as et: + try : + filepath = download_direcotry+filename + metadata = et.get_metadata([filepath])[0] + entry.metadata = metadata + extracted = True + entry.save() + print("metadata extracted") + except Exception as e: + print(e) + print(type(e)) + retry = retry + 1 + + def getEmailsFromMetadata(domain_id): - queryset = Files.objects.filter(metadata__isnull=False,domain_id=domain_id) - for entry in queryset.iterator(): - filename = entry.filename - metadata = str(entry.metadata) - emails = emails_utils.getEmailsFromText(metadata) - for e in emails: - b,em = emails_utils.isValidEmail(e) - if b: - domain = Domain.objects.get(id=domain_id).domain - if domain in em: - try: - Emails.objects.get_or_create(email=em,source="Files",domain_id=domain_id) - except IntegrityError as e: - pass + queryset = Files.objects.filter(metadata__isnull=False,domain_id=domain_id) + for entry in queryset.iterator(): + filename = entry.filename + metadata = str(entry.metadata) + emails = emails_utils.getEmailsFromText(metadata) + for e in emails: + b,em = emails_utils.isValidEmail(e) + if b: + domain = Domain.objects.get(id=domain_id).domain + if domain in em: + try: + Emails.objects.get_or_create(email=em,source="Files",domain_id=domain_id) + except IntegrityError as e: + pass def getEmailsFromFilesContent(domain_id): - excluded = ["rar","zip"] - queryset = Files.objects.filter(domain_id=domain_id) - for entry in queryset.iterator(): - url = entry.url - filename = entry.filename - if not os.path.isfile(os.path.join(download_direcotry, filename)): - downloaded = downloadSingleFile(url, filename) - if not downloaded: - pass - - ext = filename.split(".")[-1:][0] - if ext not in excluded: - #text = textract.process(os.path.join(download_direcotry, filename)) - text = extract_text(os.path.join(download_direcotry, filename)) - emails = emails_utils.getEmailsFromText(text) - for e in emails: - b,em = emails_utils.isValidEmail(e) - if b: - domain = Domain.objects.get(id=domain_id).domain - if domain in em: - print("Found another email: " + em) - try: - Emails.objects.get_or_create(email=em,source="Files",domain_id=domain_id) - except IntegrityError as e: - pass - + excluded = ["rar","zip"] + queryset = Files.objects.filter(domain_id=domain_id) + for entry in queryset.iterator(): + url = entry.url + filename = entry.filename + if os.path.isfile(os.path.join(download_direcotry, filename)): + ext = filename.split(".")[-1:][0] + if ext not in excluded: + #text = textract.process(os.path.join(download_direcotry, filename)) + text = extract_text(os.path.join(download_direcotry, filename)) + emails = emails_utils.getEmailsFromText(text) + for e in emails: + b,em = emails_utils.isValidEmail(e) + if b: + domain = Domain.objects.get(id=domain_id).domain + if domain in em: + print("Found another email: " + em) + try: + Emails.objects.get_or_create(email=em,source="Files",domain_id=domain_id) + except IntegrityError as e: + pass + @@ -162,54 +153,54 @@ def getEmailsFromFilesContent(domain_id): # Currently using textract def extract_text(file_path): - text = "" - file_extension = file_path.split(".")[-1:][0] - try: - if file_extension in ["doc", "docx"]: - doc = Document(file_path) - for para in doc.paragraphs: - text += para.text + "\n" - - elif file_extension in ["ppt", "pptx", "pps", "ppsx"]: - ppt = Presentation(file_path) - for slide in ppt.slides: - for shape in slide.shapes: - if shape.has_text_frame: - text += shape.text + "\n" - - elif file_extension in ["xls", "xlsx"]: - wb = load_workbook(file_path) - for sheet in wb: - for row in sheet.iter_rows(): - for cell in row: - text += str(cell.value) + " " - - elif file_extension == "pdf": - pdf = PdfReader(file_path) - for page_num in range(len(pdf.pages)): - text += pdf.pages[page_num].extract_text() - - elif file_extension == "svg": - svg = svg2rlg(file_path) - text = str(svg) - - elif file_extension == "indd": - print("InDesign file format (.indd) is not supported in this script.") - - elif file_extension == "rdp" or file_extension == "ica": - with open(file_path, 'r') as f: - text = f.read() - - elif file_extension == "rar": - with rarfile.RarFile(file_path, 'r') as rar_ref: - for file in rar_ref.namelist(): - if not os.path.isdir(file): - with rar_ref.open(file) as f: - text += f.read().decode('utf-8', errors='ignore') - - else: - print("Unsupported file format.") - except Exception as e: - print("Error occured") - - return text \ No newline at end of file + text = "" + file_extension = file_path.split(".")[-1:][0] + try: + if file_extension in ["doc", "docx"]: + doc = Document(file_path) + for para in doc.paragraphs: + text += para.text + "\n" + + elif file_extension in ["ppt", "pptx", "pps", "ppsx"]: + ppt = Presentation(file_path) + for slide in ppt.slides: + for shape in slide.shapes: + if shape.has_text_frame: + text += shape.text + "\n" + + elif file_extension in ["xls", "xlsx"]: + wb = load_workbook(file_path) + for sheet in wb: + for row in sheet.iter_rows(): + for cell in row: + text += str(cell.value) + " " + + elif file_extension == "pdf": + pdf = PdfReader(file_path) + for page_num in range(len(pdf.pages)): + text += pdf.pages[page_num].extract_text() + + elif file_extension == "svg": + svg = svg2rlg(file_path) + text = str(svg) + + elif file_extension == "indd": + print("InDesign file format (.indd) is not supported in this script.") + + elif file_extension == "rdp" or file_extension == "ica": + with open(file_path, 'r') as f: + text = f.read() + + elif file_extension == "rar": + with rarfile.RarFile(file_path, 'r') as rar_ref: + for file in rar_ref.namelist(): + if not os.path.isdir(file): + with rar_ref.open(file) as f: + text += f.read().decode('utf-8', errors='ignore') + + else: + print("Unsupported file format.") + except Exception as e: + print("Error occured") + + return text