forked from RickDB/PlexAniSync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplexmodule.py
266 lines (216 loc) · 9.89 KB
/
plexmodule.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# coding=utf-8
import logging
import re
import sys
from typing import List, Optional
from dataclasses import dataclass
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.poolmanager import PoolManager
from plexapi.myplex import MyPlexAccount
from plexapi.server import PlexServer
from plexapi.video import Episode, Season, Show
logger = logging.getLogger("PlexAniSync")
plex_settings = dict()
@dataclass
class PlexSeason:
season_number: int
watched_episodes: int
@dataclass
class PlexWatchedSeries:
title: str
title_sort: str
title_original: str
year: int
seasons: List[PlexSeason]
anilist_id: Optional[int]
class HostNameIgnoringAdapter(HTTPAdapter):
def init_poolmanager(self, connections, maxsize, block=..., **pool_kwargs):
self.poolmanager = PoolManager(num_pools=connections,
maxsize=maxsize,
block=block,
assert_hostname=False,
**pool_kwargs)
def authenticate():
method = plex_settings["authentication_method"].lower()
try:
home_user_sync = plex_settings["home_user_sync"].lower()
home_username = plex_settings["home_username"]
home_server_base_url = plex_settings["home_server_base_url"]
except Exception:
home_user_sync = "false"
home_username = ""
home_server_base_url = ""
try:
session = Session()
session.mount("https://", HostNameIgnoringAdapter())
# Direct connection
if method == "direct":
base_url = plex_settings["base_url"]
token = plex_settings["token"]
plex = PlexServer(base_url, token, session)
# Myplex connection
elif method == "myplex":
plex_server = plex_settings["server"]
plex_user = plex_settings["myplex_user"]
plex_password = plex_settings["myplex_password"]
if home_user_sync == "true":
if home_username == "":
logger.error(
"Home authentication cancelled as certain home_user settings are invalid"
)
return None
logger.warning(
f"Authenticating as admin for MyPlex home user: {home_username}"
)
plex_account = MyPlexAccount(plex_user, plex_password)
plex_server_home = PlexServer(
home_server_base_url, plex_account.authenticationToken, session
)
logger.warning("Retrieving home user information")
plex_user_account = plex_account.user(home_username)
logger.warning("Retrieving user token for MyPlex home user")
plex_user_token = plex_user_account.get_token(
plex_server_home.machineIdentifier
)
logger.warning("Retrieved user token for MyPlex home user")
plex = PlexServer(home_server_base_url, plex_user_token, session)
logger.warning("Successfully authenticated for MyPlex home user")
else:
account = MyPlexAccount(plex_user, plex_password, session=session)
plex = account.resource(plex_server).connect()
else:
logger.critical(
"[PLEX] Failed to authenticate due to invalid settings or authentication info, exiting..."
)
sys.exit(1)
return plex
except Exception:
logger.exception("Unable to authenticate to Plex Media Server")
sys.exit(1)
def get_anime_shows() -> List[Show]:
plex = authenticate()
sections = plex_settings["anime_section"].split("|")
shows: List[Show] = []
for section in sections:
try:
logger.info(f"[PLEX] Retrieving anime series from section: {section}")
shows_search = plex.library.section(section.strip()).search()
shows += shows_search
logger.info(
f"[PLEX] Found {len(shows_search)} anime series in section: {section}"
)
except BaseException:
logger.error(
f"Could not find library [{section}] on your Plex Server, check the library "
"name in AniList settings file and also verify that your library "
"name in Plex has no trailing spaces in it"
)
return shows
def get_anime_shows_filter(show_name):
shows = get_anime_shows()
shows_filtered = []
for show in shows:
show_title_clean_without_year = show.title
filter_title_clean_without_year = re.sub("[^A-Za-z0-9]+", "", show_name)
show_title_clean_without_year = re.sub(r"\(\d{4}\)", "", show_title_clean_without_year)
show_title_clean_without_year = re.sub("[^A-Za-z0-9]+", "", show_title_clean_without_year)
if (show.title.lower().strip() == show_name.lower().strip()
or show_title_clean_without_year.lower().strip() == filter_title_clean_without_year.lower().strip()):
shows_filtered.append(show)
if shows_filtered:
logger.info("[PLEX] Found matching anime series")
else:
logger.info(f"[PLEX] Did not find {show_name} in anime series")
return shows_filtered
def get_watched_shows(shows: List[Show]) -> Optional[List[PlexWatchedSeries]]:
logger.info("[PLEX] Retrieving watch count for series")
watched_series: List[PlexWatchedSeries] = []
ovas_found = 0
for show in shows:
try:
anilist_id = None
match = re.search(r"me\.sachaw\.agents\.anilist://([0-9]+)", show.guid)
if match:
anilist_id = int(match.group(1))
if hasattr(show, "seasons"):
show_seasons = show.seasons()
# ignore season 0 and unwatched seasons
show_seasons = filter(lambda season: season.seasonNumber > 0 and season.viewedLeafCount > 0, show_seasons)
seasons = []
for season in show_seasons:
season_watchcount = get_watched_episodes_for_show_season(season)
seasons.append(PlexSeason(season.seasonNumber, season_watchcount))
if seasons:
# Add year if we have one otherwise fallback
year = 1900
if show.year:
year = int(show.year)
if not hasattr(show, "titleSort"):
show.titleSort = show.title
elif show.titleSort == "":
show.titleSort = show.title
# Disable original title for now, results in false positives for yet unknown reason
# if not hasattr(show, 'originalTitle'):
# show.originalTitle = show.title
# elif show.originalTitle == '':
# show.originalTitle = show.title
show.originalTitle = show.title
watched_show = PlexWatchedSeries(
show.title.strip(),
show.titleSort.strip(),
show.originalTitle.strip(),
year,
seasons,
anilist_id
)
watched_series.append(watched_show)
# logger.info(
# 'Watched %s episodes of show: %s' % (
# episodes_watched, show.title))
else:
# Probably OVA but adding as series with 1 episode and season
# Needs proper solution later on and requires changing AniList
# class to support it properly
if hasattr(show, "isWatched") and show.isWatched:
year = 1900
if show.year:
year = int(show.year)
if not hasattr(show, "titleSort"):
show.titleSort = show.title
elif show.titleSort == "":
show.titleSort = show.title
# Disable original title for now, results in false positives for yet unknown reason
# if not hasattr(show, 'originalTitle'):
# show.originalTitle = show.title
# elif show.originalTitle == '':
# show.originalTitle = show.title
show.originalTitle = show.title
watched_show = PlexWatchedSeries(
show.title.strip(),
show.titleSort.strip(),
show.originalTitle.strip(),
year,
[PlexSeason(1, 1)],
anilist_id
)
watched_series.append(watched_show)
ovas_found += 1
except Exception:
logger.exception(f"[PLEX] Error occured during episode processing of show {show}")
logger.info(f"[PLEX] Found {len(watched_series)} watched series")
if ovas_found > 0:
logger.info(
f"[PLEX] Watched series also contained {ovas_found} releases with no episode attribute (probably movie / OVA), "
"support for this is still experimental"
)
if watched_series is not None and len(watched_series) == 0:
return None
else:
return watched_series
def get_watched_episodes_for_show_season(season: Season) -> int:
watched_episodes_of_season: List[Episode] = season.watched()
# len(watched_episodes_of_season) only works when the user didn't skip any episodes
episodes_watched = max(map(lambda e: int(e.index), watched_episodes_of_season), default=0)
logger.info(f'[PLEX] {episodes_watched} episodes watched for {season.parentTitle} season {season.seasonNumber}')
return episodes_watched