-
-
Notifications
You must be signed in to change notification settings - Fork 172
/
main.py
196 lines (176 loc) · 6.88 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
from __future__ import annotations
# import an additional thing for proper PyInstaller freeze support
from multiprocessing import freeze_support
if __name__ == "__main__":
freeze_support()
import io
import sys
import signal
import asyncio
import logging
import argparse
import warnings
import traceback
import tkinter as tk
from tkinter import messagebox
from typing import IO, NoReturn
import truststore
truststore.inject_into_ssl()
from translate import _
from twitch import Twitch
from settings import Settings
from version import __version__
from exceptions import CaptchaRequired
from utils import lock_file, resource_path, set_root_icon
from constants import LOGGING_LEVELS, SELF_PATH, FILE_FORMATTER, LOG_PATH, LOCK_PATH
warnings.simplefilter("default", ResourceWarning)
if sys.version_info < (3, 10):
raise RuntimeError("Python 3.10 or higher is required")
class Parser(argparse.ArgumentParser):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._message: io.StringIO = io.StringIO()
def _print_message(self, message: str, file: IO[str] | None = None) -> None:
self._message.write(message)
# print(message, file=self._message)
def exit(self, status: int = 0, message: str | None = None) -> NoReturn:
try:
super().exit(status, message) # sys.exit(2)
finally:
messagebox.showerror("Argument Parser Error", self._message.getvalue())
class ParsedArgs(argparse.Namespace):
_verbose: int
_debug_ws: bool
_debug_gql: bool
log: bool
tray: bool
dump: bool
# TODO: replace int with union of literal values once typeshed updates
@property
def logging_level(self) -> int:
return LOGGING_LEVELS[min(self._verbose, 4)]
@property
def debug_ws(self) -> int:
"""
If the debug flag is True, return DEBUG.
If the main logging level is DEBUG, return INFO to avoid seeing raw messages.
Otherwise, return NOTSET to inherit the global logging level.
"""
if self._debug_ws:
return logging.DEBUG
elif self._verbose >= 4:
return logging.INFO
return logging.NOTSET
@property
def debug_gql(self) -> int:
if self._debug_gql:
return logging.DEBUG
elif self._verbose >= 4:
return logging.INFO
return logging.NOTSET
# handle input parameters
# NOTE: parser output is shown via message box
# we also need a dummy invisible window for the parser
root = tk.Tk()
root.overrideredirect(True)
root.withdraw()
set_root_icon(root, resource_path("icons/pickaxe.ico"))
root.update()
parser = Parser(
SELF_PATH.name,
description="A program that allows you to mine timed drops on Twitch.",
)
parser.add_argument("--version", action="version", version=f"v{__version__}")
parser.add_argument("-v", dest="_verbose", action="count", default=0)
parser.add_argument("--tray", action="store_true")
parser.add_argument("--log", action="store_true")
parser.add_argument("--dump", action="store_true")
# undocumented debug args
parser.add_argument(
"--debug-ws", dest="_debug_ws", action="store_true", help=argparse.SUPPRESS
)
parser.add_argument(
"--debug-gql", dest="_debug_gql", action="store_true", help=argparse.SUPPRESS
)
args = parser.parse_args(namespace=ParsedArgs())
# load settings
try:
settings = Settings(args)
except Exception:
messagebox.showerror(
"Settings error",
f"There was an error while loading the settings file:\n\n{traceback.format_exc()}"
)
sys.exit(4)
# dummy window isn't needed anymore
root.destroy()
# get rid of unneeded objects
del root, parser
# client run
async def main():
# set language
try:
_.set_language(settings.language)
except ValueError:
# this language doesn't exist - stick to English
pass
# handle logging stuff
if settings.logging_level > logging.DEBUG:
# redirect the root logger into a NullHandler, effectively ignoring all logging calls
# that aren't ours. This always runs, unless the main logging level is DEBUG or lower.
logging.getLogger().addHandler(logging.NullHandler())
logger = logging.getLogger("TwitchDrops")
logger.setLevel(settings.logging_level)
if settings.log:
handler = logging.FileHandler(LOG_PATH)
handler.setFormatter(FILE_FORMATTER)
logger.addHandler(handler)
logging.getLogger("TwitchDrops.gql").setLevel(settings.debug_gql)
logging.getLogger("TwitchDrops.websocket").setLevel(settings.debug_ws)
exit_status = 0
client = Twitch(settings)
loop = asyncio.get_running_loop()
if sys.platform == "linux":
loop.add_signal_handler(signal.SIGINT, lambda *_: client.gui.close())
loop.add_signal_handler(signal.SIGTERM, lambda *_: client.gui.close())
try:
await client.run()
except CaptchaRequired:
exit_status = 1
client.prevent_close()
client.print(_("error", "captcha"))
except Exception:
exit_status = 1
client.prevent_close()
client.print("Fatal error encountered:\n")
client.print(traceback.format_exc())
finally:
if sys.platform == "linux":
loop.remove_signal_handler(signal.SIGINT)
loop.remove_signal_handler(signal.SIGTERM)
client.print(_("gui", "status", "exiting"))
await client.shutdown()
if not client.gui.close_requested:
# user didn't request the closure
client.gui.tray.change_icon("error")
client.print(_("status", "terminated"))
client.gui.status.update(_("gui", "status", "terminated"))
# notify the user about the closure
client.gui.grab_attention(sound=True)
await client.gui.wait_until_closed()
# save the application state
# NOTE: we have to do it after wait_until_closed,
# because the user can alter some settings between app termination and closing the window
client.save(force=True)
client.gui.stop()
client.gui.close_window()
sys.exit(exit_status)
try:
# use lock_file to check if we're not already running
success, file = lock_file(LOCK_PATH)
if not success:
# already running - exit
sys.exit(3)
asyncio.run(main())
finally:
file.close()