Skip to content

Commit

Permalink
enh: specify browser type
Browse files Browse the repository at this point in the history
  • Loading branch information
cullenwatson committed Sep 29, 2024
1 parent 836de53 commit 3855c51
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 48 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "staffspy"
version = "0.2.12"
version = "0.2.13"
description = "Staff scraper library for LinkedIn"
authors = ["Cullen Watson <[email protected]>"]
readme = "README.md"
Expand Down
71 changes: 41 additions & 30 deletions staffspy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,45 +6,54 @@
from staffspy.solvers.solver_type import SolverType
from staffspy.solvers.two_captcha import TwoCaptchaSolver
from staffspy.utils.utils import set_logger_level, logger, Login
from utils.driver_type import DriverType, BrowserType


class LinkedInAccount:
solver_map = {
SolverType.CAPSOLVER: CapSolver,
SolverType.TWO_CAPTCHA: TwoCaptchaSolver
SolverType.TWO_CAPTCHA: TwoCaptchaSolver,
}

def __init__(
self,
session_file: str = None,
username: str = None,
password: str = None,
log_level: int = 0,
solver_api_key: str = None,
solver_service: SolverType = SolverType.CAPSOLVER
self,
session_file: str = None,
username: str = None,
password: str = None,
log_level: int = 0,
solver_api_key: str = None,
solver_service: SolverType = SolverType.CAPSOLVER,
driver_type: DriverType = None,
):
self.session_file = session_file
self.username = username
self.password = password
self.log_level = log_level
self.solver = self.solver_map[solver_service](solver_api_key)
self.driver_type = driver_type
self.session = None
self.linkedin_scraper = None
self.login()

def login(self):
set_logger_level(self.log_level)
login = Login(self.username, self.password, self.solver, self.session_file)
login = Login(
self.username,
self.password,
self.solver,
self.session_file,
self.driver_type,
)
self.session = login.load_session()

def scrape_staff(
self,
company_name: str = None,
user_id: str = None,
search_term: str = None,
location: str = None,
extra_profile_data: bool = False,
max_results: int = 1000
self,
company_name: str = None,
user_id: str = None,
search_term: str = None,
location: str = None,
extra_profile_data: bool = False,
max_results: int = 1000,
) -> pd.DataFrame:
"""Scrape staff from Linkedin
company_name - name of company to find staff frame
Expand All @@ -59,7 +68,9 @@ def scrape_staff(
if not company_name:
if not user_id:
raise ValueError("Either company_name or user_id must be provided")
company_name = li_scraper.fetch_user_profile_data_from_public_id('company_id')
company_name = li_scraper.fetch_user_profile_data_from_public_id(
"company_id"
)

staff = li_scraper.scrape_staff(
company_name=company_name,
Expand All @@ -76,32 +87,32 @@ def scrape_staff(
linkedin_member_df = staff_df[staff_df["name"] == "LinkedIn Member"]
non_linkedin_member_df = staff_df[staff_df["name"] != "LinkedIn Member"]
staff_df = pd.concat([non_linkedin_member_df, linkedin_member_df])
logger.info(f"Scraped {len(staff_df)} staff members from {company_name}, with {len(linkedin_member_df)} hidden LinkedIn users")
logger.info(
f"Scraped {len(staff_df)} staff members from {company_name}, with {len(linkedin_member_df)} hidden LinkedIn users"
)
return staff_df

def scrape_users(
self,
user_ids: list[str]
) -> pd.DataFrame:
def scrape_users(self, user_ids: list[str]) -> pd.DataFrame:
"""Scrape users from Linkedin by user IDs
user_ids - list of LinkedIn user IDs
"""
li_scraper = LinkedInScraper(self.session)
li_scraper.num_staff = len(user_ids)
users = [
Staff(
id='',
search_term='manual',
id="",
search_term="manual",
profile_id=user_id,
) for user_id in user_ids
)
for user_id in user_ids
]

for i, user in enumerate(users,start=1):
user.id = li_scraper.fetch_user_profile_data_from_public_id(user.profile_id, 'user_id')
for i, user in enumerate(users, start=1):
user.id = li_scraper.fetch_user_profile_data_from_public_id(
user.profile_id, "user_id"
)
if user.id:
li_scraper.fetch_all_info_for_employee(
user, i
)
li_scraper.fetch_all_info_for_employee(user, i)

users_dicts = [user.to_dict() for user in users if user.id]
users_df = pd.DataFrame(users_dicts)
Expand Down
59 changes: 42 additions & 17 deletions staffspy/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
import pickle
import re
from datetime import datetime
from typing import Optional
from urllib.parse import quote
from dateutil.parser import parse

import requests
import tldextract
from bs4 import BeautifulSoup
from dateutil.parser import parse
from tenacity import stop_after_attempt, retry_if_exception_type, retry, RetryError

from staffspy.utils.exceptions import BlobException
from staffspy.solvers.solver import Solver
from staffspy.utils.driver_type import DriverType, BrowserType
from staffspy.utils.exceptions import BlobException

logger = logging.getLogger("StaffSpy")
logger.propagate = False
Expand Down Expand Up @@ -50,32 +52,55 @@ def create_emails(first, last, domain):
return emails


def get_webdriver():
def get_webdriver(driver_type: Optional[DriverType] = None):
try:
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.firefox.service import Service as FirefoxService
except ImportError as e:
raise Exception(
"install package `pip install staffspy[browser]` to login with browser"
)

for browser in [webdriver.Chrome, webdriver.Firefox]:
try:
return browser()
except WebDriverException:
continue
if driver_type:
if str(driver_type.browser_type) == str(BrowserType.CHROME):
if driver_type.executable_path:
service = ChromeService(executable_path=driver_type.executable_path)
return webdriver.Chrome(service=service)
else:
return webdriver.Chrome()
elif str(driver_type.browser_type) == str(BrowserType.FIREFOX):
if driver_type.executable_path:
service = FirefoxService(executable_path=driver_type.executable_path)
return webdriver.Firefox(service=service)
else:
return webdriver.Firefox()
else:
for browser in [webdriver.Chrome, webdriver.Firefox]:
try:
return browser()
except Exception:
continue
return None


class Login:

def __init__(self, username: str, password: str, solver: Solver, session_file: str):
self.username, self.password, self.solver, self.session_file = (
username,
password,
solver,
session_file,
)
def __init__(
self,
username: str,
password: str,
solver: Solver,
session_file: str,
driver_type: DriverType = None,
):
(
self.username,
self.password,
self.solver,
self.session_file,
self.driver_type,
) = (username, password, solver, session_file, driver_type)

def solve_captcha(self, session, data, payload):
url = data["challenge_url"]
Expand Down Expand Up @@ -204,7 +229,7 @@ def login_requests(self):

def login_browser(self):
"""Backup login method"""
driver = get_webdriver()
driver = get_webdriver(self.driver_type)

if driver is None:
logger.debug("No browser found for selenium")
Expand Down

0 comments on commit 3855c51

Please sign in to comment.