Skip to content

Commit

Permalink
Merge pull request #15 from jxlil/fix/proxy_auth
Browse files Browse the repository at this point in the history
Fix/proxy auth
  • Loading branch information
jxlil authored Jul 13, 2024
2 parents d8f03f2 + fea46d7 commit 44ee1f6
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 41 deletions.
113 changes: 73 additions & 40 deletions scrapy_impersonate/parser.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import base64
from typing import Optional, Tuple, Union
from urllib.parse import urlparse
from curl_cffi import CurlHttpVersion
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from curl_cffi import CurlHttpVersion, CurlMime
from scrapy.http import Request


Expand All @@ -11,18 +9,31 @@ def __init__(self, request: Request) -> None:
self._request = request
self._impersonate_args = request.meta.get("impersonate_args", {})

@property
def method(self) -> str:
return self._request.method

@property
def url(self) -> str:
return self._request.url

@property
def method(self) -> str:
return self._request.method
def params(self) -> Optional[Union[Dict, List, Tuple]]:
return self._impersonate_args.get("params")

@property
def data(self) -> Union[bytes, str, None]:
def data(self) -> Optional[Any]:
return self._request.body

@property
def json(self) -> Optional[dict]:
return self._impersonate_args.get("json")

@property
def headers(self) -> dict:
headers = self._request.headers.to_unicode_dict()
return dict(headers)

@property
def cookies(self) -> dict:
cookies = self._request.cookies
Expand All @@ -36,67 +47,89 @@ def cookies(self) -> dict:
return {}

@property
def headers(self) -> dict:
headers = self._request.headers.to_unicode_dict()
return dict(headers)
def files(self) -> Optional[dict]:
return self._impersonate_args.get("files")

@property
def proxies(self) -> Union[dict, None]:
proxy = self._request.meta.get("proxy")
if not proxy:
return
def auth(self) -> Optional[Tuple[str, str]]:
return self._impersonate_args.get("auth")

parsed_proxy = urlparse(proxy)
@property
def timeout(self) -> Union[float, Tuple[float, float]]:
return self._impersonate_args.get("timeout", 30.0)

proxy_scheme = parsed_proxy.scheme or "http"
proxy_netloc = parsed_proxy.netloc or parsed_proxy.path
@property
def allow_redirects(self) -> bool:
return False if self._request.meta.get("dont_redirect") else True

if proxy_auth := self.headers.get("Proxy-Authorization"):
proxy_auth = proxy_auth.replace("Basic", "").strip()
proxy_auth = base64.b64decode(proxy_auth).decode()
@property
def max_redirects(self) -> int:
return self._impersonate_args.get("max_redirects", -1)

if "@" not in proxy_netloc:
proxy_netloc = f"{proxy_auth}@{proxy_netloc}"
@property
def proxies(self) -> Optional[dict]:
return self._impersonate_args.get("proxies")

proxy = f"{proxy_scheme}://{proxy_netloc}"
return {"http": proxy, "https": proxy}
@property
def proxy(self) -> Optional[str]:
return self._request.meta.get("proxy")

@property
def allow_redirects(self) -> bool:
return False if self._request.meta.get("dont_redirect") else True
def proxy_auth(self) -> Optional[Tuple[str, str]]:
return self._impersonate_args.get("proxy_auth")

@property
def impersonate(self) -> Union[str, None]:
return self._request.meta.get("impersonate")
def verify(self) -> Optional[bool]:
return self._impersonate_args.get("verify")

@property
def params(self) -> Optional[dict]:
return self._impersonate_args.get("params")
def referer(self) -> Optional[str]:
return self._impersonate_args.get("referer")

@property
def json(self) -> Optional[dict]:
return self._impersonate_args.get("json")
def accept_encoding(self) -> str:
return self._impersonate_args.get("accept_encoding", "gzip, deflate, br")

@property
def auth(self) -> Optional[Tuple[str, str]]:
return self._impersonate_args.get("auth")
def content_callback(self) -> Optional[Callable]:
return self._impersonate_args.get("content_callback")

@property
def timeout(self) -> Union[float, Tuple[float, float]]:
return self._impersonate_args.get("timeout", 30)
def impersonate(self) -> Optional[str]:
return self._request.meta.get("impersonate")

@property
def max_redirects(self) -> int:
return self._impersonate_args.get("max_redirects", -1)
def default_headers(self) -> Optional[bool]:
return self._impersonate_args.get("default_headers")

@property
def verify(self) -> Optional[bool]:
return self._impersonate_args.get("verify")
def default_encoding(self) -> Union[str, Callable[[bytes], str]]:
return self._impersonate_args.get("default_encoding", "utf-8")

@property
def http_version(self) -> Optional[CurlHttpVersion]:
return self._impersonate_args.get("http_version")

@property
def interface(self) -> Optional[str]:
return self._impersonate_args.get("interface")

@property
def cert(self) -> Optional[Union[str, Tuple[str, str]]]:
return self._impersonate_args.get("cert")

@property
def stream(self) -> bool:
return self._impersonate_args.get("stream", False)

@property
def max_recv_speed(self) -> int:
return self._impersonate_args.get("max_recv_speed", 0)

@property
def multipart(self) -> Optional[CurlMime]:
return self._impersonate_args.get("multipart")

def as_dict(self) -> dict:
return {
property_name: getattr(self, property_name)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setup(
name="scrapy-impersonate",
version="1.2.3",
version="1.2.4",
author="Jalil SA (jxlil)",
description="Scrapy download handler that can impersonate browser fingerprints",
license="MIT",
Expand Down

0 comments on commit 44ee1f6

Please sign in to comment.