diff --git a/.github/workflows/check_code_quality.yml b/.github/workflows/check_code_quality.yml
new file mode 100644
index 0000000..1f13295
--- /dev/null
+++ b/.github/workflows/check_code_quality.yml
@@ -0,0 +1,62 @@
+name: Check Code Quality
+
+on:
+ pull_request:
+ branches:
+ - develop
+ - main
+
+jobs:
+ format:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v4
+ with:
+ ref: ${{ github.head_ref }}
+
+ - name: Set up Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: '3.10'
+
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+
+ - name: Run formatter
+ run: make format
+
+ - name: Commit changes
+ run: |
+ if [ -n "$(git status --porcelain)" ]; then
+ git config --local user.email "github-actions[bot]@users.noreply.github.com"
+ git config --local user.name "github-actions[bot]"
+ git add .
+ git commit -m "🎨 Format code with isort and black"
+ git push
+ fi
+
+ lint:
+ needs: format # formatジョブの完了後に実行
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python-version: ['3.10', '3.11', '3.12', '3.13']
+ fail-fast: false
+
+ steps:
+ - uses: actions/checkout@v4
+ with:
+ ref: ${{ github.head_ref }}
+
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v5
+ with:
+ python-version: ${{ matrix.python-version }}
+
+ - name: Install dependencies
+ run: |
+ python -m pip install --upgrade pip
+
+ - name: Run linter
+ run: make lint
\ No newline at end of file
diff --git a/Makefile b/Makefile
index 5e90246..c830e48 100644
--- a/Makefile
+++ b/Makefile
@@ -32,6 +32,6 @@ format:
lint:
pip install -e .[lint]
python -m flake8 $(FORMAT_DIR)
- # python -m mypy $(FORMAT_DIR) --install-types --non-interactive
+ python -m mypy $(FORMAT_DIR) --install-types --non-interactive
PHONY: build release release_from-develop format commit
\ No newline at end of file
diff --git a/src/wikidot/common/exceptions.py b/src/wikidot/common/exceptions.py
index 4d8dae1..a6353d4 100644
--- a/src/wikidot/common/exceptions.py
+++ b/src/wikidot/common/exceptions.py
@@ -108,3 +108,15 @@ class ForbiddenException(WikidotException):
def __init__(self, message):
super().__init__(message)
+
+
+# ---
+# 処理エラー関連
+# ---
+
+
+class NoElementException(WikidotException):
+ """要素が存在しないときの例外"""
+
+ def __init__(self, message):
+ super().__init__(message)
diff --git a/src/wikidot/connector/ajax.py b/src/wikidot/connector/ajax.py
index db15784..e79e389 100644
--- a/src/wikidot/connector/ajax.py
+++ b/src/wikidot/connector/ajax.py
@@ -166,7 +166,7 @@ def request(
return_exceptions: bool = False,
site_name: str | None = None,
site_ssl_supported: bool | None = None,
- ) -> tuple[BaseException | Any]:
+ ) -> tuple[httpx.Response | Exception]:
"""ajax-module-connector.phpへのリクエストを行う
Parameters
@@ -215,6 +215,7 @@ async def _request(_body: dict[str, Any]) -> httpx.Response:
# リクエスト実行
try:
+ response = None
# Semaphoreで同時実行数制御
async with semaphore_instance:
async with httpx.AsyncClient() as client:
@@ -236,18 +237,21 @@ async def _request(_body: dict[str, Any]) -> httpx.Response:
retry_count += 1
# リトライ回数上限に達した場合は例外送出
- if retry_count >= self.config.attempt_limit:
+ if retry_count > self.config.attempt_limit:
wd_logger.error(
- f"AMC is respond HTTP error code: {response.status_code} -> {_body}"
+ f"AMC is respond HTTP error code: "
+ f"{response.status_code if response is not None else 'timeout'} -> {_body}"
)
raise AMCHttpStatusCodeException(
- f"AMC is respond HTTP error code: {response.status_code}",
- response.status_code,
+ f"AMC is respond HTTP error code: "
+ f"{response.status_code if response is not None else 'timeout'} -> {_body}",
+ response.status_code if response is not None else 999,
) from e
# 間隔を空けてリトライ
wd_logger.info(
- f"AMC is respond status: {response.status_code} (retry: {retry_count}) -> {_body}"
+ f"AMC is respond status: {response.status_code if response is not None else 'timeout'} "
+ f"(retry: {retry_count}) -> {_body}"
)
await asyncio.sleep(self.config.retry_interval)
continue
diff --git a/src/wikidot/module/auth.py b/src/wikidot/module/auth.py
index 6021bea..90c2b97 100644
--- a/src/wikidot/module/auth.py
+++ b/src/wikidot/module/auth.py
@@ -2,10 +2,10 @@
import httpx
-from wikidot.common.exceptions import SessionCreateException
+from ..common.exceptions import SessionCreateException
if TYPE_CHECKING:
- from wikidot.module.client import Client
+ from .client import Client
class HTTPAuthentication:
diff --git a/src/wikidot/module/client.py b/src/wikidot/module/client.py
index c4b7738..abbde2f 100644
--- a/src/wikidot/module/client.py
+++ b/src/wikidot/module/client.py
@@ -1,22 +1,22 @@
-from wikidot.common import wd_logger
-from wikidot.common.exceptions import LoginRequiredException
-from wikidot.connector.ajax import AjaxModuleConnectorClient, AjaxModuleConnectorConfig
-from wikidot.module.auth import HTTPAuthentication
-from wikidot.module.private_message import (
+from ..common import wd_logger
+from ..common.exceptions import LoginRequiredException
+from ..connector.ajax import AjaxModuleConnectorClient, AjaxModuleConnectorConfig
+from .auth import HTTPAuthentication
+from .private_message import (
PrivateMessage,
PrivateMessageCollection,
PrivateMessageInbox,
PrivateMessageSentBox,
)
-from wikidot.module.site import Site
-from wikidot.module.user import User, UserCollection
+from .site import Site
+from .user import AbstractUser, User, UserCollection
class ClientUserMethods:
def __init__(self, client: "Client"):
self.client = client
- def get(self, name: str, raise_when_not_found: bool = False) -> User:
+ def get(self, name: str, raise_when_not_found: bool = False) -> "AbstractUser":
"""ユーザー名からユーザーオブジェクトを取得する
Parameters
@@ -37,7 +37,7 @@ def get(self, name: str, raise_when_not_found: bool = False) -> User:
def get_bulk(
self, names: list[str], raise_when_not_found: bool = False
- ) -> list[User]:
+ ) -> UserCollection:
"""ユーザー名からユーザーオブジェクトを取得する
Parameters
diff --git a/src/wikidot/module/forum.py b/src/wikidot/module/forum.py
deleted file mode 100644
index 9fc3d91..0000000
--- a/src/wikidot/module/forum.py
+++ /dev/null
@@ -1,62 +0,0 @@
-from dataclasses import dataclass
-from typing import TYPE_CHECKING
-
-from wikidot.module.forum_category import ForumCategory, ForumCategoryCollection
-from wikidot.module.forum_group import ForumGroupCollection
-from wikidot.module.forum_thread import ForumThread
-
-if TYPE_CHECKING:
- from wikidot.module.site import Site
-
-
-class ForumCategoryMethods:
- def __init__(self, forum: "Forum") -> None:
- self.forum = forum
-
- def get(self, id: int):
- category = ForumCategory(
- site=self.forum.site,
- id=id,
- forum=self.forum,
- )
- return category.update()
-
-
-class ForumThreadMethods:
- def __init__(self, forum: "Forum") -> None:
- self.forum = forum
-
- def get(self, id: int):
- thread = ForumThread(
- site=self.forum.site,
- id=id,
- forum=self.forum,
- )
- return thread.update()
-
-
-@dataclass
-class Forum:
- site: "Site"
- name = "Forum"
- _groups: "ForumGroupCollection" = None
- _categories: "ForumCategoryCollection" = None
-
- def __post_init__(self):
- self.category = ForumCategoryMethods(self)
- self.thread = ForumThreadMethods(self)
-
- def get_url(self):
- return f"{self.site.get_url}/forum/start"
-
- @property
- def groups(self):
- if self._groups is None:
- ForumGroupCollection.get_groups(self.site, self)
- return self._groups
-
- @property
- def categories(self):
- if self._categories is None:
- ForumCategoryCollection.get_categories(self.site, self)
- return self._categories
diff --git a/src/wikidot/module/forum_category.py b/src/wikidot/module/forum_category.py
index 53a4af0..a57579b 100644
--- a/src/wikidot/module/forum_category.py
+++ b/src/wikidot/module/forum_category.py
@@ -1,197 +1,112 @@
import re
from collections.abc import Iterator
from dataclasses import dataclass
-from datetime import datetime
from typing import TYPE_CHECKING, Optional
from bs4 import BeautifulSoup
-from wikidot.module.forum_post import ForumPost
-from wikidot.module.forum_thread import ForumThread, ForumThreadCollection
-from wikidot.util.parser import odate as odate_parser
-from wikidot.util.parser import user as user_parser
+from ..common.exceptions import NoElementException
+from .forum_thread import ForumThreadCollection
if TYPE_CHECKING:
- from wikidot.module.forum import Forum
- from wikidot.module.forum_group import ForumGroup
- from wikidot.module.site import Site
+ from .site import Site
class ForumCategoryCollection(list["ForumCategory"]):
- def __init__(self, forum: "Forum", categories: list["ForumCategory"]):
- super().__init__(categories)
- self.forum = forum
+ def __init__(
+ self,
+ site: Optional["Site"] = None,
+ categories: Optional[list["ForumCategory"]] = None,
+ ):
+ super().__init__(categories or [])
+
+ if site is not None:
+ self.site = site
+ else:
+ self.site = self[0].site
def __iter__(self) -> Iterator["ForumCategory"]:
return super().__iter__()
@staticmethod
- def get_categories(site: "Site", forum: "Forum"):
+ def acquire_all(site: "Site"):
categories = []
- for group in forum.groups:
- categories.extend(group.categories)
-
- forum._categories = ForumCategoryCollection(site, categories)
-
- def find(self, id: int = None, title: str = None) -> Optional["ForumCategory"]:
- for category in self:
- if (id is None or category.id == id) and (
- title is None or category.title
- ) == title:
- return category
+ response = site.amc_request(
+ [{"moduleName": "forum/ForumStartModule", "hidden": "true"}]
+ )[0]
- @staticmethod
- def _acquire_update(forum: "Forum", categories: list["ForumCategory"]):
- if len(categories) == 0:
- return categories
-
- responses = forum.site.amc_request(
- [
- {
- "c": category.id,
- "moduleName": "forum/ForumViewCategoryModule",
- }
- for category in categories
- ]
- )
- for category, response in zip(categories, responses):
- html = BeautifulSoup(response.json()["body"], "lxml")
- statistics = html.select_one("div.statistics").text
- description = html.select_one("div.description-block").text.strip()
- info = re.search(
- r"([ \S]*) /\s+([ \S]*)", html.select_one("div.forum-breadcrumbs").text
+ body = response.json()["body"]
+ html = BeautifulSoup(body, "lxml")
+
+ for row in html.select("table tr.head~tr"):
+ name_elem = row.select_one("td.name")
+ if name_elem is None:
+ raise NoElementException("Name element is not found.")
+ name_link_elem = name_elem.select_one("a")
+ if name_link_elem is None:
+ raise NoElementException("Name link element is not found.")
+ name_link_href = name_link_elem.get("href")
+ if name_link_href is None:
+ raise NoElementException("Name link href is not found.")
+ thread_count_elem = row.select_one("td.threads")
+ if thread_count_elem is None:
+ raise NoElementException("Thread count element is not found.")
+ post_count_elem = row.select_one("td.posts")
+ if post_count_elem is None:
+ raise NoElementException("Post count element is not found.")
+ category_id_match = re.search(r"c-(\d+)", str(name_link_href))
+ if category_id_match is None:
+ raise NoElementException("Category ID is not found.")
+ category_id_str = category_id_match.group(1)
+ title_elem = name_elem.select_one("a")
+ if title_elem is None:
+ raise NoElementException("Title element is not found.")
+ description_elem = name_elem.select_one("div.description")
+ if description_elem is None:
+ raise NoElementException("Description element is not found.")
+
+ category = ForumCategory(
+ site=site,
+ id=int(category_id_str),
+ title=title_elem.text,
+ description=description_elem.text,
+ threads_count=int(thread_count_elem.text),
+ posts_count=int(post_count_elem.text),
)
- counts = re.findall(r"\d+", statistics)
-
- if category.posts_counts != int(counts[1]):
- category.last = None
- category.description = re.search(r"[ \S]*$", description).group()
- category.threads_counts, category.posts_counts = counts
- category.group = category.forum.groups.find(info.group(1))
- category.title = info.group(2)
- if (pagerno := html.select_one("span.pager-no")) is None:
- category.pagerno = 1
- else:
- category.pagerno = int(re.search(r"of (\d+)", pagerno.text).group(1))
- return categories
+ categories.append(category)
- def update(self):
- return ForumCategoryCollection._acquire_update(self.forum, self)
+ return ForumCategoryCollection(site=site, categories=categories)
@dataclass
class ForumCategory:
site: "Site"
id: int
- forum: "Forum"
- title: str = None
- description: str = None
- group: "ForumGroup" = None
- threads_counts: int = None
- posts_counts: int = None
- pagerno: int = None
- _last_thread_id: int = None
- _last_post_id: int = None
- _last: "ForumPost" = None
-
- def get_url(self):
- return f"{self.site.get_url}/forum/c-{self.id}"
-
- def update(self):
- return ForumCategoryCollection(self.forum, [self]).update()[0]
-
- @property
- def last(self):
- if self._last_thread_id is not None and self._last_post_id is not None:
- if self._last is None:
- self._last = self.forum.thread.get(self._last_thread_id).get(
- self._last_post_id
- )
- return self._last
-
- @last.setter
- def last(self, value: "ForumPost"):
- self._last = value
-
- @property
- def threads(self):
- client = self.site.client
- self.update()
- responses = self.site.amc_request(
- [
- {
- "p": no + 1,
- "c": self.id,
- "moduleName": "forum/ForumViewCategoryModule",
- }
- for no in range(self.pagerno)
- ]
+ title: str
+ description: str
+ threads_count: int
+ posts_count: int
+ _threads: Optional[ForumThreadCollection] = None
+
+ def __str__(self):
+ return (
+ f"ForumCategory(id={self.id}, "
+ f"title={self.title}, description={self.description}, "
+ f"threads_count={self.threads_count}, posts_count={self.posts_count})"
)
- threads = []
-
- for response in responses:
- html = BeautifulSoup(response.json()["body"], "lxml")
- for info in html.select("table.table tr.head~tr"):
- title = info.select_one("div.title a")
- thread_id = re.search(r"t-(\d+)", title.get("href")).group(1)
- description = info.select_one("div.description")
- user = info.select_one("span.printuser")
- odate = info.select_one("span.odate")
- posts_count = info.select_one("td.posts")
- last_id = info.select_one("td.last>a")
- if last_id is None:
- post_id = None
- else:
- post_id = int(
- re.search(r"post-(\d+)", last_id.get("href")).group(1)
- )
-
- thread = ForumThread(
- site=self.site,
- id=thread_id,
- forum=self.forum,
- title=title.text,
- description=description.text.strip(),
- created_by=user_parser(client, user),
- created_at=odate_parser(odate),
- posts_counts=int(posts_count.text),
- _last_post_id=post_id,
- )
-
- threads.append(thread)
-
- return ForumThreadCollection(self, threads)
-
- def new_thread(self, title: str, source: str, description: str = ""):
- client = self.site.client
- client.login_check()
-
- response = self.site.amc_request(
- [
- {
- "category_id": self.id,
- "title": title,
- "description": description,
- "source": source,
- "action": "ForumAction",
- "event": "newThread",
- }
- ]
- )[0]
-
- body = response.json()
-
- return ForumThread(
- site=self.site,
- id=body["threadId"],
- forum=self.forum,
- category=self,
- title=title,
- description=description,
- created_by=client.user.get(client.username),
- created_at=datetime.fromtimestamp(body["CURRENT_TIMESTAMP"]),
- posts_counts=1,
- )
+ @property
+ def threads(self) -> ForumThreadCollection:
+ if self._threads is None:
+ self._threads = ForumThreadCollection.acquire_all(self)
+ return self._threads
+
+ @threads.setter
+ def threads(self, value):
+ self._threads = value
+
+ def reload_threads(self):
+ self._threads = ForumThreadCollection.acquire_all(self)
+ return self._threads
diff --git a/src/wikidot/module/forum_group.py b/src/wikidot/module/forum_group.py
deleted file mode 100644
index 7470de6..0000000
--- a/src/wikidot/module/forum_group.py
+++ /dev/null
@@ -1,104 +0,0 @@
-import re
-from collections.abc import Iterator
-from dataclasses import dataclass
-from typing import TYPE_CHECKING, Optional
-
-from bs4 import BeautifulSoup
-
-from wikidot.module.forum_category import ForumCategory, ForumCategoryCollection
-
-if TYPE_CHECKING:
- from wikidot.module.forum import Forum
- from wikidot.module.site import Site
-
-
-class ForumGroupCollection(list["ForumGroup"]):
- def __init__(self, forum: "Forum", groups: list["ForumGroup"]):
- super().__init__(groups)
- self.forum = forum
-
- def __iter__(self) -> Iterator["ForumGroup"]:
- return super().__iter__()
-
- @staticmethod
- def get_groups(site: "Site", forum: "Forum"):
- groups = []
-
- response = site.amc_request(
- [{"moduleName": "forum/ForumStartModule", "hidden": "true"}]
- )[0]
- body = response.json()["body"]
- html = BeautifulSoup(body, "lxml")
-
- for group_info in html.select("div.forum-group"):
- group = ForumGroup(
- site=site,
- forum=forum,
- title=group_info.select_one("div.title").text,
- description=group_info.select_one("div.description").text,
- )
-
- categories = []
-
- for info in group_info.select("table tr.head~tr"):
- name = info.select_one("td.name")
- thread_count = info.select_one("td.threads")
- post_count = info.select_one("td.posts")
- last_id = info.select_one("td.last>a")
- if last_id is None:
- thread_id, post_id = None, None
- else:
- thread_id, post_id = re.search(
- r"t-(\d+).+post-(\d+)", last_id.get("href")
- ).groups()
- thread_id, post_id = int(thread_id), int(post_id)
-
- category = ForumCategory(
- site=site,
- id=int(
- re.search(r"c-(\d+)", name.select_one("a").get("href")).group(1)
- ),
- description=name.select_one("div.description").text,
- forum=forum,
- title=name.select_one("a").text,
- group=group,
- threads_counts=thread_count,
- posts_counts=post_count,
- _last_thread_id=thread_id,
- _last_post_id=post_id,
- )
-
- categories.append(category)
-
- group.categories = ForumCategoryCollection(site, categories)
-
- groups.append(group)
-
- forum._groups = ForumGroupCollection(site, groups)
-
- def find(
- self, title: str = None, description: str = None
- ) -> Optional["ForumGroup"]:
- for group in self:
- if (title is None or group.title == title) and (
- description is None or group.description == description
- ):
- return group
-
- def findall(self, title: str = None, description: str = None) -> list["ForumGroup"]:
- groups = []
- for group in self:
- if (title is None or group.title == title) and (
- description is None or group.description == description
- ):
- groups.append(group)
- return groups
-
-
-@dataclass
-class ForumGroup:
- site: "Site"
- forum: "Forum"
- title: str
- description: str
- categories: ForumGroupCollection = None
diff --git a/src/wikidot/module/forum_post.py b/src/wikidot/module/forum_post.py
deleted file mode 100644
index a6e00cb..0000000
--- a/src/wikidot/module/forum_post.py
+++ /dev/null
@@ -1,211 +0,0 @@
-from collections.abc import Iterator
-from dataclasses import dataclass
-from datetime import datetime
-from typing import TYPE_CHECKING, Optional
-
-from bs4 import BeautifulSoup
-
-from wikidot.common import exceptions
-
-if TYPE_CHECKING:
- from wikidot.module.forum import Forum
- from wikidot.module.forum_thread import ForumThread
- from wikidot.module.site import Site
- from wikidot.module.user import AbstractUser
-
-
-class ForumPostCollection(list["ForumPost"]):
- def __init__(self, thread: "ForumThread", posts: list["ForumPost"]):
- super().__init__(posts)
- self.thread = thread
-
- def __iter__(self) -> Iterator["ForumPost"]:
- return super().__iter__()
-
- def find(self, target_id: int) -> Optional["ForumPost"]:
- for post in self:
- if target_id == post.id:
- return post
-
- @staticmethod
- def _acquire_parent_post(thread: "ForumThread", posts: list["ForumPost"]):
- if len(posts) == 0:
- return posts
-
- for post in posts:
- post._parent = thread.get(post.parent_id)
-
- return posts
-
- def get_parent_post(self):
- return ForumPostCollection._acquire_parent_post(self.thread, self)
-
- @staticmethod
- def _acquire_post_info(thread: "ForumThread", posts: list["ForumPost"]):
- if len(posts) == 0:
- return posts
-
- responses = thread.site.amc_request(
- [
- {
- "postId": post.id,
- "threadId": thread.id,
- "moduleName": "forum/sub/ForumEditPostFormModule",
- }
- for post in posts
- ]
- )
-
- for post, response in zip(posts, responses):
- html = BeautifulSoup(response.json()["body"], "lxml")
-
- title = html.select_one("input#np-title").text.strip()
- source = html.select_one("textarea#np-text").text.strip()
- post._title = title
- post._source = source
-
- return posts
-
- def get_post_info(self):
- return ForumPostCollection._acquire_post_info(self.thread, self)
-
-
-@dataclass
-class ForumPost:
- site: "Site"
- id: int
- forum: "Forum"
- thread: "ForumThread" = None
- parent_id: int = None
- created_by: "AbstractUser" = None
- created_at: datetime = None
- edited_by: "AbstractUser" = None
- edited_at: datetime = None
- source_text: str = None
- source_ele: BeautifulSoup = None
- _parent: "ForumPost" = None
- _title: str = None
- _source: str = None
-
- def reply(self, title: str = "", source: str = ""):
- client = self.site.client
- client.login_check()
- if source == "":
- raise exceptions.UnexpectedException("Post body can not be left empty.")
-
- response = self.site.amc_request(
- [
- {
- "parentId": self.id,
- "title": title,
- "source": source,
- "action": "ForumAction",
- "event": "savePost",
- }
- ]
- )[0]
- body = response.json()
-
- return ForumPost(
- site=self.site,
- id=int(body["postId"]),
- forum=self.forum,
- title=title,
- source=source,
- thread=self.thread,
- parent_id=self.id,
- created_by=client.user.get(client.username),
- created_at=body["CURRENT_TIMESTAMP"],
- )
-
- def get_url(self):
- return f"{self.thread.get_url()}#post-{self.id}"
-
- @property
- def parent(self):
- if self._parent is None:
- ForumPostCollection(self.thread, [self]).get_parent_post()
- return self._parent
-
- @parent.setter
- def parent(self, value: "ForumPost"):
- self._parent = value
-
- @property
- def title(self):
- if self._title is None:
- ForumPostCollection(self.thread, [self]).get_post_info()
- return self._title
-
- @title.setter
- def title(self, value: str):
- self._title = value
-
- @property
- def source(self):
- if self._source is None:
- ForumPostCollection(self.thread, [self]).get_post_info()
- return self._source
-
- @source.setter
- def source(self, value: str):
- self._source = value
-
- def edit(self, title: str = None, source: str = None):
- client = self.site.client
- client.login_check()
-
- if title is None and source is None:
- return self
-
- if source == "":
- raise exceptions.UnexpectedException("Post source can not be left empty.")
- try:
- response = self.site.amc_request(
- [
- {
- "postId": self.id,
- "threadId": self.thread.id,
- "moduleName": "forum/sub/ForumEditPostFormModule",
- }
- ]
- )[0]
- html = BeautifulSoup(response.json()["body"], "lxml")
- current_id = int(html.select("form#edit-post-form>input")[1].get("value"))
-
- response = self.site.amc_request(
- [
- {
- "postId": self.id,
- "currentRevisionId": current_id,
- "title": title if title is not None else self.title,
- "source": source if source is not None else self.source,
- "action": "ForumAction",
- "event": "saveEditPost",
- "moduleName": "Empty",
- }
- ]
- )[0]
- except exceptions.WikidotStatusCodeException:
- return self
-
- body = response.json()
- self.edited_by = client.user.get(client.username)
- self.edited_at = datetime.fromtimestamp(body["CURRENT_TIMESTAMP"])
- self.title = title if title is not None else self.title
- self.source = source if source is not None else self.source
-
- return self
-
- def destroy(self):
- self.site.client.login_check()
- self.site.amc_request(
- [
- {
- "postId": self.id,
- "action": "ForumAction",
- "event": "deletePost",
- "moduleName": "Empty",
- }
- ]
- )
diff --git a/src/wikidot/module/forum_thread.py b/src/wikidot/module/forum_thread.py
index 4c14e70..e94973b 100644
--- a/src/wikidot/module/forum_thread.py
+++ b/src/wikidot/module/forum_thread.py
@@ -2,332 +2,149 @@
from collections.abc import Iterator
from dataclasses import dataclass
from datetime import datetime
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional
from bs4 import BeautifulSoup
-from wikidot.common import exceptions
-from wikidot.module.forum_post import ForumPost, ForumPostCollection
-from wikidot.util.parser import odate as odate_parser
-from wikidot.util.parser import user as user_parser
+from ..common.exceptions import NoElementException
+from ..util.parser import odate as odate_parser
+from ..util.parser import user as user_parser
if TYPE_CHECKING:
- from wikidot.module.forum import Forum
- from wikidot.module.forum_category import ForumCategory
- from wikidot.module.page import Page
- from wikidot.module.site import Site
- from wikidot.module.user import AbstractUser
+ from .forum_category import ForumCategory
+ from .site import Site
+ from .user import AbstractUser
class ForumThreadCollection(list["ForumThread"]):
- def __init__(self, forum: "Forum", threads: list["ForumThread"] = None):
+ def __init__(
+ self,
+ site: Optional["Site"] = None,
+ threads: Optional[list["ForumThread"]] = None,
+ ):
super().__init__(threads or [])
- self.forum = forum
+
+ if site is not None:
+ self.site = site
+ else:
+ self.site = self[0].category.site
def __iter__(self) -> Iterator["ForumThread"]:
return super().__iter__()
- def _acquire_update(forum: "Forum", threads: list["ForumThread"]):
- if len(threads) == 0:
- return threads
-
- client = forum.site.client
- responses = forum.site.amc_request(
- [
- {"t": thread.id, "moduleName": "forum/ForumViewThreadModule"}
- for thread in threads
- ]
- )
-
- for thread, response in zip(threads, responses):
- html = BeautifulSoup(response.json()["body"], "lxml")
- statistics = html.select_one("div.statistics")
- user = statistics.select_one("span.printuser")
- odate = statistics.select_one("span.odate")
- category_url = html.select("div.forum-breadcrumbs a")[1].get("href")
- category_id = re.search(r"c-(\d+)", category_url).group(1)
- title = html.select_one("div.forum-breadcrumbs").text.strip()
- counts = int(re.findall(r"\n.+\D(\d+)", statistics.text)[-1])
-
- thread.title = re.search(r"»([ \S]*)$", title).group(1).strip()
- thread.category = thread.forum.category.get(int(category_id))
- if html.select_one("div.description-block div.head") is None:
- thread.description = ""
- else:
- description = html.select_one("div.description-block").text.strip()
- thread.description = re.search(r"[ \S]+$", description).group()
- if thread.posts_counts != counts:
- thread.last = None
- thread.posts_counts = counts
- thread.created_by = user_parser(client, user)
- thread.created_at = odate_parser(odate)
- if (pagerno := html.select_one("span.pager-no")) is None:
- thread.pagerno = 1
- else:
- thread.pagerno = int(re.search(r"of (\d+)", pagerno.text).group(1))
- if (page_ele := html.select_one("div.description-block>a")) is not None:
- thread.page = thread.site.page.get(page_ele.get("href")[1:])
- thread.page.discuss = thread
+ @staticmethod
+ def _parse(category: "ForumCategory", html: BeautifulSoup) -> list["ForumThread"]:
+ threads = []
+ for info in html.select("table.table tr.head~tr"):
+ title = info.select_one("div.title a")
+ if title is None:
+ raise NoElementException("Title element is not found.")
+
+ title_href = title.get("href")
+ if title_href is None:
+ raise NoElementException("Title href is not found.")
+
+ thread_id_match = re.search(r"t-(\d+)", str(title_href))
+ if thread_id_match is None:
+ raise NoElementException("Thread ID is not found.")
+
+ thread_id = int(thread_id_match.group(1))
+
+ description_elem = info.select_one("div.description")
+ user_elem = info.select_one("span.printuser")
+ odate_elem = info.select_one("span.odate")
+ posts_count_elem = info.select_one("td.posts")
+
+ if description_elem is None:
+ raise NoElementException("Description element is not found.")
+ if user_elem is None:
+ raise NoElementException("User element is not found.")
+ if odate_elem is None:
+ raise NoElementException("Odate element is not found.")
+ if posts_count_elem is None:
+ raise NoElementException("Posts count element is not found.")
+
+ thread = ForumThread(
+ _category=category,
+ id=int(thread_id),
+ title=title.text,
+ description=description_elem.text,
+ created_by=user_parser(category.site.client, user_elem),
+ created_at=odate_parser(odate_elem),
+ post_count=int(posts_count_elem.text),
+ )
+
+ threads.append(thread)
return threads
- def update(self):
- return ForumThreadCollection._acquire_update(self.forum, self)
-
-
-@dataclass
-class ForumThread:
- site: "Site"
- id: int
- forum: "Forum"
- category: "ForumCategory" = None
- title: str = None
- description: str = None
- created_by: "AbstractUser" = None
- created_at: datetime = None
- posts_counts: int = None
- page: "Page" = None
- pagerno: int = None
- _last_post_id: int = None
- _last: "ForumPost" = None
-
- @property
- def last(self):
- if self._last_post_id is not None:
- if self._last is None:
- self.update()
- self._last = self.get(self._last_post_id)
- return self._last
-
- @last.setter
- def last(self, value: "ForumPost"):
- self._last = value
-
- @property
- def posts(self) -> ForumPostCollection:
- client = self.site.client
- responses = self.site.amc_request(
- [
- {
- "pagerNo": no + 1,
- "t": self.id,
- "order": "",
- "moduleName": "forum/ForumViewThreadPostsModule",
- }
- for no in range(self.pagerno)
- ]
- )
-
- posts = []
-
- for response in responses:
- html = BeautifulSoup(response.json()["body"], "lxml")
- for post in html.select("div.post"):
- cuser = post.select_one("div.info span.printuser")
- codate = post.select_one("div.info span.odate")
- if (parent := post.parent.get("id")) != "thread-container-posts":
- parent_id = int(re.search(r"fpc-(\d+)", parent).group(1))
- else:
- parent_id = None
- euser = post.select_one("div.changes span.printuser")
- eodate = post.select_one("div.changes span.odate a")
-
- posts.append(
- ForumPost(
- site=self.site,
- id=int(re.search(r"post-(\d+)", post.get("id")).group(1)),
- forum=self.forum,
- thread=self,
- _title=post.select_one("div.title").text.strip(),
- parent_id=parent_id,
- created_by=user_parser(client, cuser),
- created_at=odate_parser(codate),
- edited_by=(
- client.user.get(euser.text) if euser is not None else None
- ),
- edited_at=odate_parser(eodate) if eodate is not None else None,
- source_ele=post.select_one("div.content"),
- source_text=post.select_one("div.content").text.strip(),
- )
- )
-
- return ForumPostCollection(self, posts)
-
- def get_url(self) -> str:
- return f"{self.site.get_url()}/forum/t-{self.id}"
-
- def update(self) -> "ForumThread":
- return ForumThreadCollection(self.forum, [self]).update()[0]
-
- def edit(self, title: str = None, description: str = None):
- self.site.client.login_check()
- if title == "":
- raise exceptions.UnexpectedException("Title can not be left empty.")
-
- if self.page is not None:
- raise exceptions.UnexpectedException("Page's discussion can not be edited.")
-
- if title is None and description is None:
- return self
+ @staticmethod
+ def acquire_all(category: "ForumCategory") -> "ForumThreadCollection":
+ threads = []
- self.site.amc_request(
+ first_response = category.site.amc_request(
[
{
- "threadId": self.id,
- "title": self.title if title is None else title,
- "description": (
- self.description if description is None else description
- ),
- "action": "ForumAction",
- "event": "saveThreadMeta",
- "moduleName": "Empty",
- }
- ]
- )
-
- self.title = self.title if title is None else title
- self.description = self.description if description is None else description
-
- return self
-
- def move_to(self, category_id: int):
- self.site.client.login_check()
- self.site.amc_request(
- [
- {
- "categoryId": category_id,
- "threadId": self.id,
- "action": "ForumAction",
- "event": "moveThread",
- "moduleName": "Empty",
- }
- ]
- )
-
- def lock(self):
- self.site.client.login_check()
- self.site.amc_request(
- [
- {
- "threadId": self.id,
- "block": "true",
- "action": "ForumAction",
- "event": "saveBlock",
- "moduleName": "Empty",
- }
- ]
- )
-
- return self
-
- def unlock(self):
- self.site.client.login_check()
- self.site.amc_request(
- [
- {
- "threadId": self.id,
- "action": "ForumAction",
- "event": "saveBlock",
- "moduleName": "Empty",
- }
- ]
- )
-
- return self
-
- def is_locked(self):
- self.site.client.login_check()
- response = self.site.amc_request(
- [
- {
- "threadId": self.id,
- "moduleName": "forum/sub/ForumEditThreadBlockModule",
+ "p": 1,
+ "c": category.id,
+ "moduleName": "forum/ForumViewCategoryModule",
}
]
)[0]
- html = BeautifulSoup(response.json()["body"], "lxml")
- checked = html.select_one("input.checkbox").get("checked")
+ first_body = first_response.json()["body"]
+ first_html = BeautifulSoup(first_body, "lxml")
- return checked is not None
+ threads.extend(ForumThreadCollection._parse(category, first_html))
- def stick(self):
- self.site.client.login_check()
- self.site.amc_request(
- [
- {
- "threadId": self.id,
- "sticky": "true",
- "action": "ForumAction",
- "event": "saveSticky",
- "moduleName": "Empty",
- }
- ]
- )
+ # pager検索
+ pager = first_html.select_one("div.pager")
+ if pager is None:
+ return ForumThreadCollection(site=category.site, threads=threads)
- return self
+ last_page = int(pager.select("a")[-2].text)
+ if last_page == 1:
+ return ForumThreadCollection(site=category.site, threads=threads)
- def unstick(self):
- self.site.client.login_check()
- self.site.amc_request(
+ responses = category.site.amc_request(
[
{
- "threadId": self.id,
- "action": "ForumAction",
- "event": "saveSticky",
- "moduleName": "Empty",
+ "p": page,
+ "c": category.id,
+ "moduleName": "forum/ForumViewCategoryModule",
}
+ for page in range(2, last_page + 1)
]
)
- return self
-
- def is_sticked(self):
- self.site.client.login_check()
- response = self.site.amc_request(
- [
- {
- "threadId": self.id,
- "moduleName": "forum/sub/ForumEditThreadStickinessModule",
- }
- ]
- )[0]
-
- html = BeautifulSoup(response.json()["body"], "lxml")
- checked = html.select_one("input.checkbox").get("checked")
-
- return checked is not None
+ for response in responses:
+ body = response.json()["body"]
+ html = BeautifulSoup(body, "lxml")
+ threads.extend(ForumThreadCollection._parse(category, html))
- def new_post(self, title: str = "", source: str = "", parent_id: int = ""):
- client = self.site.client
- client.login_check()
- if source == "":
- raise exceptions.UnexpectedException("Post body can not be left empty.")
+ return ForumThreadCollection(site=category.site, threads=threads)
- response = self.site.amc_request(
- [
- {
- "parentId": parent_id,
- "title": title,
- "source": source,
- "action": "ForumAction",
- "event": "savePost",
- }
- ]
- )
- body = response.json()
- return ForumPost(
- site=self.site,
- id=int(body["postId"]),
- forum=self.forum,
- title=title,
- source=source,
- thread=self,
- parent_id=parent_id if parent_id == "" else None,
- created_by=client.user.get(client.username),
- created_at=datetime.fromtimestamp(body["CURRENT_TIMESTAMP"]),
+@dataclass
+class ForumThread:
+ id: int
+ title: str
+ description: str
+ created_by: "AbstractUser"
+ created_at: datetime
+ post_count: int
+ _category: Optional["ForumCategory"] = None
+
+ def __str__(self):
+ return (
+ f"ForumThread(id={self.id}, "
+ f"title={self.title}, description={self.description}, "
+ f"created_by={self.created_by}, created_at={self.created_at}, "
+ f"post_count={self.post_count})"
)
- def get(self, post_id: int):
- return self.posts.find(post_id)
+ @property
+ def category(self) -> "ForumCategory":
+ if self._category is None:
+ raise ValueError("Category is not set.")
+ return self._category
diff --git a/src/wikidot/module/page.py b/src/wikidot/module/page.py
index a27c659..bd1700f 100644
--- a/src/wikidot/module/page.py
+++ b/src/wikidot/module/page.py
@@ -4,20 +4,20 @@
from datetime import datetime
from typing import TYPE_CHECKING, Any, Optional, Union
+import httpx
from bs4 import BeautifulSoup
-from wikidot.common import exceptions
-from wikidot.module.forum_thread import ForumThread
-from wikidot.module.page_revision import PageRevision, PageRevisionCollection
-from wikidot.module.page_source import PageSource
-from wikidot.module.page_votes import PageVote, PageVoteCollection
-from wikidot.util.parser import odate as odate_parser
-from wikidot.util.parser import user as user_parser
-from wikidot.util.requestutil import RequestUtil
+from ..common import exceptions
+from ..util.parser import odate as odate_parser
+from ..util.parser import user as user_parser
+from ..util.requestutil import RequestUtil
+from .page_revision import PageRevision, PageRevisionCollection
+from .page_source import PageSource
+from .page_votes import PageVote, PageVoteCollection
if TYPE_CHECKING:
- from wikidot.module.site import Site
- from wikidot.module.user import User
+ from .site import Site
+ from .user import User
DEFAULT_MODULE_BODY = [
"fullname", # ページのフルネーム(str)
@@ -79,7 +79,9 @@ def as_dict(self) -> dict[str, Any]:
class PageCollection(list["Page"]):
- def __init__(self, site: "Site" = None, pages: list["Page"] = None):
+ def __init__(
+ self, site: Optional["Site"] = None, pages: Optional[list["Page"]] = None
+ ):
super().__init__(pages or [])
if site is not None:
@@ -105,11 +107,14 @@ def _parse(site: "Site", html_body: BeautifulSoup):
# 各値を取得
for set_element in page_element.select("span.set"):
- key = set_element.select_one("span.name").text.strip()
+ key_element = set_element.select_one("span.name")
+ if key_element is None:
+ raise exceptions.NoElementException("Cannot find key element")
+ key = key_element.text.strip()
value_element = set_element.select_one("span.value")
if value_element is None:
- value = None
+ value: Any = None
elif key in ["created_at", "updated_at", "commented_at"]:
odate_element = value_element.select_one("span.odate")
@@ -210,17 +215,19 @@ def search_pages(site: "Site", query: SearchPagesQuery = SearchPagesQuery()):
# pagerが存在する
if first_page_html_body.select_one("div.pager") is not None:
# span.target[-2] > a から最大ページ数を取得
- total = int(
- first_page_html_body.select("div.pager span.target")[-2]
- .select_one("a")
- .text
- )
+ last_pager_element = first_page_html_body.select("div.pager span.target")[
+ -2
+ ]
+ last_pager_link_element = last_pager_element.select_one("a")
+ if last_pager_link_element is None:
+ raise exceptions.NoElementException("Cannot find last pager link")
+ total = int(last_pager_link_element.text.strip())
if total > 1:
request_bodies = []
for i in range(1, total):
_query_dict = query_dict.copy()
- _query_dict["offset"] = i * query.perPage
+ _query_dict["offset"] = i * (query.perPage or 250)
request_bodies.append(_query_dict)
responses = site.amc_request(request_bodies)
@@ -258,6 +265,10 @@ def _acquire_page_ids(site: "Site", pages: list["Page"]):
# "WIKIREQUEST.info.pageId = xxx;"の値をidに設定
for index, response in enumerate(responses):
+ if not isinstance(response, httpx.Response):
+ raise exceptions.UnexpectedException(
+ f"Unexpected response type: {type(response)}"
+ )
source = response.text
id_match = re.search(r"WIKIREQUEST\.info\.pageId = (\d+);", source)
@@ -286,12 +297,11 @@ def _acquire_page_sources(site: "Site", pages: list["Page"]):
for page, responses in zip(pages, responses):
body = responses.json()["body"]
- source = (
- BeautifulSoup(body, "lxml")
- .select_one("div.page-source")
- .text.strip()
- .removeprefix("\t")
- )
+ html = BeautifulSoup(body, "lxml")
+ source_element = html.select_one("div.page-source")
+ if source_element is None:
+ raise exceptions.NoElementException("Cannot find source element")
+ source = source_element.text.strip().removeprefix("\t")
page.source = PageSource(page, source)
return pages
@@ -322,14 +332,24 @@ def _acquire_page_revisions(site: "Site", pages: list["Page"]):
for rev_element in body_html.select(
"table.page-history > tr[id^=revision-row-]"
):
- rev_id = int(rev_element["id"].removeprefix("revision-row-"))
+ rev_id = int(str(rev_element["id"]).removeprefix("revision-row-"))
tds = rev_element.select("td")
rev_no = int(tds[0].text.strip().removesuffix("."))
- created_by = user_parser(
- page.site.client, tds[4].select_one("span.printuser")
- )
- created_at = odate_parser(tds[5].select_one("span.odate"))
+ created_by_elem = tds[4].select_one("span.printuser")
+ if created_by_elem is None:
+ raise exceptions.NoElementException(
+ "Cannot find created by element"
+ )
+ created_by = user_parser(page.site.client, created_by_elem)
+
+ created_at_elem = tds[5].select_one("span.odate")
+ if created_at_elem is None:
+ raise exceptions.NoElementException(
+ "Cannot find created at element"
+ )
+ created_at = odate_parser(created_at_elem)
+
comment = tds[6].text.strip()
revs.append(
@@ -342,7 +362,7 @@ def _acquire_page_revisions(site: "Site", pages: list["Page"]):
comment=comment,
)
)
- page.revisions = revs
+ page.revisions = PageRevisionCollection(page, revs)
return pages
@@ -373,46 +393,22 @@ def _acquire_page_votes(site: "Site", pages: list["Page"]):
users = [user_parser(site.client, user_elem) for user_elem in user_elems]
values = []
for value in value_elems:
- value = value.text.strip()
- if value == "+":
+ _v = value.text.strip()
+ if _v == "+":
values.append(1)
- elif value == "-":
+ elif _v == "-":
values.append(-1)
else:
- values.append(int(value))
+ values.append(int(_v))
votes = [PageVote(page, user, vote) for user, vote in zip(users, values)]
- page._votes = PageVoteCollection(page.site, votes)
+ page._votes = PageVoteCollection(page, votes)
return pages
def get_page_votes(self):
return PageCollection._acquire_page_votes(self.site, self)
- def _acquire_page_discuss(site: "Site", pages: list["Page"]):
- target_pages = [page for page in pages if not page.is_discuss_acquired()]
-
- if len(target_pages) == 0:
- return pages
-
- responses = site.amc_request(
- [
- {
- "action": "ForumAction",
- "event": "createPageDiscussionThread",
- "page_id": page.id,
- "moduleName": "Empty",
- }
- for page in target_pages
- ]
- )
-
- for page, response in zip(pages, responses):
- page._discuss = ForumThread(site, response.json()["thread_id"], page=page)
-
- def get_page_discuss(self):
- return PageCollection._acquire_page_discuss(self.site, self)
-
@dataclass
class Page:
@@ -483,26 +479,12 @@ class Page:
updated_by: "User"
updated_at: datetime
commented_by: Optional["User"]
- commented_at: datetime
- _id: int = None
+ commented_at: Optional[datetime]
+ _id: Optional[int] = None
_source: Optional[PageSource] = None
- _revisions: list["PageRevision"] = None
- _votes: PageVoteCollection = None
- _discuss: ForumThread = None
-
- @property
- def discuss(self):
- if self._discuss is None:
- PageCollection(self.site, [self]).get_page_discuss()
- self._discuss.update()
- return self._discuss
-
- @discuss.setter
- def discuss(self, value: ForumThread):
- self._discuss = value
-
- def is_discuss_acquired(self) -> bool:
- return self._discuss is not None
+ _revisions: Optional[PageRevisionCollection] = None
+ _votes: Optional[PageVoteCollection] = None
+ _metas: Optional[dict[str, str]] = None
def get_url(self) -> str:
return f"{self.site.get_url()}/{self.fullname}"
@@ -516,8 +498,12 @@ def id(self) -> int:
int
ページID
"""
- if self._id is None:
+ if not self.is_id_acquired():
PageCollection(self.site, [self]).get_page_ids()
+
+ if self._id is None:
+ raise exceptions.NotFoundException("Cannot find page id")
+
return self._id
@id.setter
@@ -531,6 +517,10 @@ def is_id_acquired(self) -> bool:
def source(self) -> PageSource:
if self._source is None:
PageCollection(self.site, [self]).get_page_sources()
+
+ if self._source is None:
+ raise exceptions.NotFoundException("Cannot find page source")
+
return self._source
@source.setter
@@ -538,16 +528,17 @@ def source(self, value: PageSource):
self._source = value
@property
- def revisions(self) -> PageRevisionCollection["PageRevision"]:
+ def revisions(self) -> PageRevisionCollection:
if self._revisions is None:
PageCollection(self.site, [self]).get_page_revisions()
return PageRevisionCollection(self, self._revisions)
@revisions.setter
- def revisions(
- self, value: list["PageRevision"] | PageRevisionCollection["PageRevision"]
- ):
- self._revisions = value
+ def revisions(self, value: list["PageRevision"] | PageRevisionCollection):
+ if isinstance(value, list):
+ self._revisions = PageRevisionCollection(self, value)
+ else:
+ self._revisions = value
@property
def latest_revision(self) -> PageRevision:
@@ -562,6 +553,10 @@ def latest_revision(self) -> PageRevision:
def votes(self) -> PageVoteCollection:
if self._votes is None:
PageCollection(self.site, [self]).get_page_votes()
+
+ if self._votes is None:
+ raise exceptions.NotFoundException("Cannot find page votes")
+
return self._votes
@votes.setter
@@ -581,54 +576,67 @@ def destroy(self):
]
)
- def get_metas(self) -> dict[str, str]:
- response = self.site.amc_request(
- [
- {
- "pageId": self.id,
- "moduleName": "edit/EditMetaModule",
- }
- ]
- )
+ @property
+ def metas(self) -> dict[str, str]:
+ if self._metas is None:
+ response = self.site.amc_request(
+ [
+ {
+ "pageId": self.id,
+ "moduleName": "edit/EditMetaModule",
+ }
+ ]
+ )
- # レスポンス解析
- body = response[0].json()["body"]
+ # レスポンス解析
+ body = response[0].json()["body"]
- # を正規表現で取得
- metas = {}
- for meta in re.findall(r'<meta name="([^"]+)" content="([^"]+)"/>', body):
- metas[meta[0]] = meta[1]
+ # を正規表現で取得
+ metas = {}
+ for meta in re.findall(
+ r'<meta name="([^"]+)" content="([^"]+)"/>', body
+ ):
+ metas[meta[0]] = meta[1]
- return metas
+ self._metas = metas
- def set_meta(self, name: str, value: str):
- self.site.client.login_check()
- self.site.amc_request(
- [
- {
- "metaName": name,
- "metaContent": value,
- "action": "WikiPageAction",
- "event": "saveMetaTag",
- "pageId": self.id,
- "moduleName": "edit/EditMetaModule",
- }
- ]
- )
+ return self._metas
- def delete_meta(self, name: str):
+ @metas.setter
+ def metas(self, value: dict[str, str]):
self.site.client.login_check()
- self.site.amc_request(
- [
- {
- "metaName": name,
- "action": "WikiPageAction",
- "event": "deleteMetaTag",
- "pageId": self.id,
- "moduleName": "edit/EditMetaModule",
- }
- ]
- )
+ current_metas = self.metas
+ deleted_metas = {k: v for k, v in current_metas.items() if k not in value}
+ added_metas = {k: v for k, v in value.items() if k not in current_metas}
+
+ for name, content in deleted_metas.items():
+ self.site.amc_request(
+ [
+ {
+ "metaName": name,
+ "action": "WikiPageAction",
+ "event": "deleteMetaTag",
+ "pageId": self.id,
+ "moduleName": "edit/EditMetaModule",
+ }
+ ]
+ )
+
+ for name, content in added_metas.items():
+ self.site.amc_request(
+ [
+ {
+ "metaName": name,
+ "metaContent": content,
+ "action": "WikiPageAction",
+ "event": "saveMetaTag",
+ "pageId": self.id,
+ "moduleName": "edit/EditMetaModule",
+ }
+ ]
+ )
+
+ self._metas = value
@staticmethod
def create_or_edit(
@@ -707,9 +715,9 @@ def create_or_edit(
def edit(
self,
- title: str = None,
- source: str = None,
- comment: str = None,
+ title: Optional[str] = None,
+ source: Optional[str] = None,
+ comment: Optional[str] = None,
force_edit: bool = False,
) -> "Page":
# Noneならそのままにする
@@ -727,13 +735,12 @@ def edit(
force_edit,
)
- def set_tags(self, tags: list[str]):
- # TODO: setter/getterにする
+ def commit_tags(self):
self.site.client.login_check()
self.site.amc_request(
[
{
- "tags": " ".join(tags),
+ "tags": " ".join(self.tags),
"action": "WikiPageAction",
"event": "saveTags",
"pageId": self.id,
@@ -741,3 +748,4 @@ def set_tags(self, tags: list[str]):
}
]
)
+ return self
diff --git a/src/wikidot/module/page_revision.py b/src/wikidot/module/page_revision.py
index 730aa03..1d45e84 100644
--- a/src/wikidot/module/page_revision.py
+++ b/src/wikidot/module/page_revision.py
@@ -5,17 +5,22 @@
from bs4 import BeautifulSoup
-from wikidot.module.page_source import PageSource
+from ..common.exceptions import NoElementException
+from .page_source import PageSource
if TYPE_CHECKING:
- from wikidot.module.page import Page
- from wikidot.module.user import AbstractUser
+ from .page import Page
+ from .user import AbstractUser
class PageRevisionCollection(list["PageRevision"]):
- def __init__(self, page: "Page" = None, revisions: list["PageRevision"] = None):
+ def __init__(
+ self,
+ page: Optional["Page"] = None,
+ revisions: Optional[list["PageRevision"]] = None,
+ ):
super().__init__(revisions or [])
- self.page = page or revisions[0].page
+ self.page = page or self[0].page if len(self) > 0 else None
def __iter__(self) -> Iterator["PageRevision"]:
return super().__iter__()
@@ -39,9 +44,12 @@ def _acquire_sources(page, revisions: list["PageRevision"]):
for revision, response in zip(target_revisions, responses):
body = response.json()["body"]
body_html = BeautifulSoup(body, "lxml")
+ wiki_text_elem = body_html.select_one("div.page-source")
+ if wiki_text_elem is None:
+ raise NoElementException("Wiki text element not found")
revision.source = PageSource(
page=page,
- wiki_text=body_html.select_one("div.page-source").text.strip(),
+ wiki_text=wiki_text_elem.text.strip(),
)
return revisions
@@ -100,7 +108,7 @@ def is_html_acquired(self) -> bool:
return self._html is not None
@property
- def source(self) -> "PageSource":
+ def source(self) -> Optional["PageSource"]:
if not self.is_source_acquired():
PageRevisionCollection(self.page, [self]).get_sources()
return self._source
@@ -110,7 +118,7 @@ def source(self, value: "PageSource"):
self._source = value
@property
- def html(self) -> str:
+ def html(self) -> Optional[str]:
if not self.is_html_acquired():
PageRevisionCollection(self.page, [self]).get_htmls()
return self._html
diff --git a/src/wikidot/module/page_source.py b/src/wikidot/module/page_source.py
index 12acac7..d8079f0 100644
--- a/src/wikidot/module/page_source.py
+++ b/src/wikidot/module/page_source.py
@@ -2,7 +2,7 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
- from wikidot.module.page import Page
+ from .page import Page
@dataclass
diff --git a/src/wikidot/module/page_votes.py b/src/wikidot/module/page_votes.py
index aeafc3f..ce5d75b 100644
--- a/src/wikidot/module/page_votes.py
+++ b/src/wikidot/module/page_votes.py
@@ -3,8 +3,8 @@
from typing import TYPE_CHECKING
if TYPE_CHECKING:
- from wikidot.module.page import Page
- from wikidot.module.user import AbstractUser
+ from .page import Page
+ from .user import AbstractUser
class PageVoteCollection(list["PageVote"]):
diff --git a/src/wikidot/module/private_message.py b/src/wikidot/module/private_message.py
index 2cf8f9e..ad6843e 100644
--- a/src/wikidot/module/private_message.py
+++ b/src/wikidot/module/private_message.py
@@ -1,19 +1,19 @@
from collections.abc import Iterator
from dataclasses import dataclass
from datetime import datetime
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, cast
import httpx
from bs4 import BeautifulSoup, ResultSet, Tag
-from wikidot.common import exceptions
-from wikidot.common.decorators import login_required
-from wikidot.util.parser import odate as odate_parser
-from wikidot.util.parser import user as user_parser
+from ..common import exceptions
+from ..common.decorators import login_required
+from ..util.parser import odate as odate_parser
+from ..util.parser import user as user_parser
if TYPE_CHECKING:
- from wikidot.module.client import Client
- from wikidot.module.user import AbstractUser, User
+ from .client import Client
+ from .user import AbstractUser, User
class PrivateMessageCollection(list["PrivateMessage"]):
@@ -52,7 +52,7 @@ def from_ids(
}
)
- responses: [httpx.Response | Exception] = client.amc_client.request(
+ responses: tuple[httpx.Response | Exception] = client.amc_client.request(
bodies, return_exceptions=True
)
@@ -72,17 +72,23 @@ def from_ids(
sender, recipient = html.select("div.pmessage div.header span.printuser")
+ subject_element = html.select_one("div.pmessage div.header span.subject")
+ body_element = html.select_one("div.pmessage div.body")
+ odate_element = html.select_one("div.header span.odate")
+
messages.append(
PrivateMessage(
client=client,
id=message_ids[index],
sender=user_parser(client, sender),
recipient=user_parser(client, recipient),
- subject=html.select_one(
- "div.pmessage div.header span.subject"
- ).get_text(),
- body=html.select_one("div.pmessage div.body").get_text(),
- created_at=odate_parser(html.select_one("div.header span.odate")),
+ subject=subject_element.get_text() if subject_element else "",
+ body=body_element.get_text() if body_element else "",
+ created_at=(
+ odate_parser(odate_element)
+ if odate_element
+ else datetime.fromtimestamp(0)
+ ),
)
)
@@ -106,7 +112,9 @@ def _acquire(client: "Client", module_name: str):
受信箱のメッセージ
"""
# pager取得
- response = client.amc_client.request([{"moduleName": module_name}])[0]
+ response: httpx.Response = cast(
+ httpx.Response, client.amc_client.request([{"moduleName": module_name}])[0]
+ )
html = BeautifulSoup(response.json()["body"], "lxml")
# pagerの最後から2番目の要素を取得
@@ -121,11 +129,12 @@ def _acquire(client: "Client", module_name: str):
for page in range(1, max_page + 1)
]
- responses: [httpx.Response | Exception] = client.amc_client.request(
- bodies, return_exceptions=False
+ responses: tuple[httpx.Response] = cast(
+ tuple[httpx.Response],
+ client.amc_client.request(bodies, return_exceptions=False),
)
else:
- responses = [response]
+ responses = (response,)
message_ids = []
for response in responses:
@@ -133,7 +142,7 @@ def _acquire(client: "Client", module_name: str):
# tr.messageのdata-href末尾の数字を取得
message_ids.extend(
[
- int(tr["data-href"].split("/")[-1])
+ int(str(tr["data-href"]).split("/")[-1])
for tr in html.select("tr.message")
]
)
diff --git a/src/wikidot/module/site.py b/src/wikidot/module/site.py
index 63fedc8..797cfc8 100644
--- a/src/wikidot/module/site.py
+++ b/src/wikidot/module/site.py
@@ -4,15 +4,15 @@
import httpx
-from wikidot.common import exceptions
-from wikidot.common.decorators import login_required
-from wikidot.module.forum import Forum
-from wikidot.module.page import Page, PageCollection, SearchPagesQuery
-from wikidot.module.site_application import SiteApplication
+from ..common import exceptions
+from ..common.decorators import login_required
+from .forum_category import ForumCategoryCollection
+from .page import Page, PageCollection, SearchPagesQuery
+from .site_application import SiteApplication
if TYPE_CHECKING:
- from wikidot.module.client import Client
- from wikidot.module.user import User
+ from .client import Client
+ from .user import User
class SitePagesMethods:
@@ -101,6 +101,15 @@ def create(
)
+class SiteForumMethods:
+ def __init__(self, site: "Site"):
+ self.site = site
+
+ def categories(self) -> "ForumCategoryCollection":
+ """フォーラムのカテゴリを取得する"""
+ return ForumCategoryCollection.acquire_all(self.site)
+
+
@dataclass
class Site:
"""サイトオブジェクト
@@ -137,7 +146,7 @@ class Site:
def __post_init__(self):
self.pages = SitePagesMethods(self)
self.page = SitePageMethods(self)
- self.forum = Forum(self)
+ self.forum = SiteForumMethods(self)
def __str__(self):
return f"Site(id={self.id}, title={self.title}, unix_name={self.unix_name})"
diff --git a/src/wikidot/module/site_application.py b/src/wikidot/module/site_application.py
index f17011d..09c967c 100644
--- a/src/wikidot/module/site_application.py
+++ b/src/wikidot/module/site_application.py
@@ -3,13 +3,13 @@
from bs4 import BeautifulSoup
-from wikidot.common import exceptions
-from wikidot.common.decorators import login_required
-from wikidot.util.parser import user as user_parser
+from ..common import exceptions
+from ..common.decorators import login_required
+from ..util.parser import user as user_parser
if TYPE_CHECKING:
- from wikidot.module.site import Site
- from wikidot.module.user import AbstractUser
+ from ..module.site import Site
+ from ..module.user import AbstractUser
@dataclass
diff --git a/src/wikidot/module/user.py b/src/wikidot/module/user.py
index ab19add..2c80d8a 100644
--- a/src/wikidot/module/user.py
+++ b/src/wikidot/module/user.py
@@ -4,12 +4,12 @@
from bs4 import BeautifulSoup
-from wikidot.common.exceptions import NotFoundException
-from wikidot.util.requestutil import RequestUtil
-from wikidot.util.stringutil import StringUtil
+from ..common.exceptions import NoElementException, NotFoundException
+from ..util.requestutil import RequestUtil
+from ..util.stringutil import StringUtil
if TYPE_CHECKING:
- from wikidot.module.client import Client
+ from .client import Client
class UserCollection(list["AbstractUser"]):
@@ -64,12 +64,16 @@ def from_names(
continue
# id取得
- user_id = int(
- html.select_one("a.btn.btn-default.btn-xs")["href"].split("/")[-1]
- )
+ user_id_elem = html.select_one("a.btn.btn-default.btn-xs")
+ if user_id_elem is None:
+ raise NoElementException("User ID element not found")
+ user_id = int(str(user_id_elem["href"]).split("/")[-1])
# name取得
- name = html.select_one("h1.profile-title").get_text(strip=True)
+ name_elem = html.select_one("h1.profile-title")
+ if name_elem is None:
+ raise NoElementException("User name element not found")
+ name = name_elem.get_text(strip=True)
# avatar_url取得
avatar_url = f"https://www.wikidot.com/avatar.php?userid={user_id}"
@@ -143,12 +147,12 @@ class User(AbstractUser):
# name: str | None
# unix_name: str | None
# avatar_url: str | None
- ip: None = None
+ ip: str | None = None
@staticmethod
def from_name(
client: "Client", name: str, raise_when_not_found: bool = False
- ) -> "User":
+ ) -> "AbstractUser":
"""ユーザー名からユーザーオブジェクトを取得する
Parameters
@@ -189,12 +193,11 @@ class DeletedUser(AbstractUser):
ユーザーのIPアドレス(取得できないためNone)
"""
- # client: 'Client'
- # id: int | None
- name: str = "account deleted"
- unix_name: str = "account_deleted"
- avatar_url: None = None
- ip: None = None
+ id: int | None = None
+ name: str | None = "account deleted"
+ unix_name: str | None = "account_deleted"
+ avatar_url: str | None = None
+ ip: str | None = None
@dataclass
@@ -217,12 +220,11 @@ class AnonymousUser(AbstractUser):
ユーザーのIPアドレス
"""
- # client: 'Client'
- id: None = None
- name: str = "Anonymous"
- unix_name: str = "anonymous"
- avatar_url: None = None
- # ip: None = None
+ id: int | None = None
+ name: str | None = "Anonymous"
+ unix_name: str | None = "anonymous"
+ avatar_url: str | None = None
+ ip: str | None = None
@dataclass
@@ -245,12 +247,11 @@ class GuestUser(AbstractUser):
ユーザーのIPアドレス(取得できないためNone)
"""
- # client: 'Client'
- id: None = None
- # name: str | None
- unix_name: None = None
+ id: int | None = None
+ name: str | None = None
+ unix_name: str | None = None
avatar_url: str | None = None
- ip: None = None
+ ip: str | None = None
@dataclass
@@ -273,9 +274,8 @@ class WikidotUser(AbstractUser):
ユーザーのIPアドレス(取得できないためNone)
"""
- # client: 'Client'
- id: None = None
- name: str = "Wikidot"
- unix_name: str = "wikidot"
- avatar_url: None = None
- ip: None = None
+ id: int | None = None
+ name: str | None = "Wikidot"
+ unix_name: str | None = "wikidot"
+ avatar_url: str | None = None
+ ip: str | None = None
diff --git a/src/wikidot/util/parser/user.py b/src/wikidot/util/parser/user.py
index 674ffb9..734e208 100644
--- a/src/wikidot/util/parser/user.py
+++ b/src/wikidot/util/parser/user.py
@@ -2,7 +2,7 @@
import bs4
-from wikidot.module import user
+from ...module import user
if TYPE_CHECKING:
from wikidot.module.client import Client
@@ -25,52 +25,51 @@ def user_parse(client: "Client", elem: bs4.Tag) -> user.AbstractUser:
User | DeletedUser | AnonymousUser | GuestUser | WikidotUser のいずれか
"""
- if "class" in elem.attrs and "deleted" in elem["class"]:
- return user.DeletedUser(client=client, id=int(elem["data-id"]))
+ if ("class" in elem.attrs and "deleted" in elem["class"]) or (
+ isinstance(elem, str) and elem.strip() == "(user deleted)"
+ ):
+ if isinstance(elem, str):
+ return user.DeletedUser(client=client, id=0)
+ else:
+ return user.DeletedUser(client=client, id=int(str(elem["data-id"])))
- elif "class" in elem.attrs and "anonymous" in elem["class"]:
- ip = (
- elem.find("span", class_="ip")
- .get_text()
- .replace("(", "")
- .replace(")", "")
- .strip()
- )
- return user.AnonymousUser(client=client, ip=ip)
+ if not isinstance(elem, bs4.Tag):
+ raise ValueError("elem must be bs4.Tag except DeletedUser")
- # TODO: [[user ukwhatn]]構文をパースできなくなる(aが1つしかない)ので、一度無効化 -> GuestUserの例を探して実装を戻す
- # elif len(elem.find_all("a", recursive=False)) == 1:
- # return user.GuestUser(
- # name=elem.get_text().strip()
- # )
+ if "class" in elem.attrs and "anonymous" in elem["class"]:
+ ip_elem = elem.find("span", class_="ip")
+ if ip_elem is None:
+ return user.AnonymousUser(client=client)
+ ip = ip_elem.get_text().replace("(", "").replace(")", "").strip()
+ return user.AnonymousUser(client=client, ip=ip)
# Gravatar URLを持つ場合はGuestUserとする
- elif elem.find("img") and "gravatar.com" in elem.find("img")["src"]:
- avatar_url = elem.find("img")["src"]
+ img_elem = elem.find("img")
+ if isinstance(img_elem, bs4.Tag) and "gravatar.com" in img_elem["src"]:
+ avatar_url = img_elem["src"]
guest_name = elem.get_text().strip().split(" ")[0]
return user.GuestUser(
client=client,
name=guest_name,
- avatar_url=avatar_url
+ avatar_url=str(avatar_url) if avatar_url else None,
)
- elif elem.get_text() == "Wikidot":
+ if elem.get_text() == "Wikidot":
return user.WikidotUser(client=client)
- else:
- _user = elem.find_all("a")[-1]
- user_name = _user.get_text()
- user_unix = str(_user["href"]).replace("http://www.wikidot.com/user:info/", "")
- user_id = int(
- str(_user["onclick"])
- .replace("WIKIDOT.page.listeners.userInfo(", "")
- .replace("); return false;", "")
- )
+ _user = elem.find_all("a")[-1]
+ user_name = _user.get_text()
+ user_unix = str(_user["href"]).replace("http://www.wikidot.com/user:info/", "")
+ user_id = int(
+ str(_user["onclick"])
+ .replace("WIKIDOT.page.listeners.userInfo(", "")
+ .replace("); return false;", "")
+ )
- return user.User(
- client=client,
- id=user_id,
- name=user_name,
- unix_name=user_unix,
- avatar_url=f"http://www.wikidot.com/avatar.php?userid={user_id}",
- )
+ return user.User(
+ client=client,
+ id=user_id,
+ name=user_name,
+ unix_name=user_unix,
+ avatar_url=f"http://www.wikidot.com/avatar.php?userid={user_id}",
+ )