-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchange_brightness.py
140 lines (117 loc) · 4.03 KB
/
change_brightness.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import os
import json
import asyncio
import subprocess
from dataclasses import dataclass, asdict
STATE_FILE = "display.state.json"
@dataclass
class DisplayInfo:
name: str
bus: int | None
id: str
max_value: int = 100
curr_value: int = -1
lowest_set_value: int = -1
min_value: int = 0
loaded_modules = (
subprocess.check_output("lsmod | awk '{print $1}'", shell=True).decode().split("\n")
)
if "i2c_dev" not in loaded_modules:
print(f"i2c_dev is not loaded. Loading")
subprocess.check_call(["modprobe", "i2c_dev"])
async def single_display_update(
key: str, dinfo: DisplayInfo
) -> tuple[str, DisplayInfo]:
if dinfo.bus:
busparams = ["--bus", str(dinfo.bus)]
else:
busparams = ["-d", dinfo.id]
get_proc = await asyncio.create_subprocess_exec(
*["ddcutil", "getvcp", "10", *busparams],
stdout=asyncio.subprocess.PIPE,
)
c_brightness_str_stdout, _ = await get_proc.communicate()
print(c_brightness_str_stdout)
c_brightness_str = (
c_brightness_str_stdout.decode()
.strip()
.replace(" value ", "")
.split(":")[1]
.split(",")
)
curr_value = int(c_brightness_str[0].split("=")[1].strip())
max_value = int(c_brightness_str[1].split("=")[1].strip())
dinfo.curr_value = curr_value
dinfo.max_value = max_value
if dinfo.lowest_set_value == -1 or dinfo.lowest_set_value > curr_value:
dinfo.lowest_set_value = dinfo.curr_value
print(
type(dinfo.curr_value),
type(dinfo.lowest_set_value),
dinfo.curr_value == dinfo.lowest_set_value,
)
if dinfo.curr_value == dinfo.lowest_set_value:
new_val = 70
else:
new_val = dinfo.lowest_set_value
print(f"Setting {dinfo.id} = {new_val}")
while True:
set_proc = await asyncio.create_subprocess_exec(
*["ddcutil", "setvcp", "10", str(new_val), *busparams]
)
await set_proc.communicate()
if set_proc.returncode == 0:
break
await asyncio.sleep(0.5)
print(f"Retrying due to failure on {dinfo.id} | {set_proc.returncode}")
return key, dinfo
async def main() -> None:
displays: dict[str, DisplayInfo] = {}
loaded_displays: dict[str, DisplayInfo] = {}
if os.path.exists(STATE_FILE):
with open(STATE_FILE, "r") as inf:
dicted_data = json.loads(inf.read())
for key, dinfo in dicted_data.items():
if "bus" not in dinfo:
dinfo["bus"] = None
loaded_displays[key] = DisplayInfo(**dinfo)
print("Loaded:")
print(loaded_displays)
current_display = None
current_bus = None
for line in (
subprocess.check_output(
["ddcutil", "detect", "--async", "--sleep-multiplier=0.5"]
)
.decode()
.split("\n")
):
if line.startswith("Display"):
current_display = line.split(" ")[1]
print(f"Have current display {current_display}")
if "Model:" in line and current_display:
mname = line.split("Model:")[1].strip()
if mname not in loaded_displays:
displays[mname] = DisplayInfo(
name=mname, id=current_display, bus=current_bus
)
else:
displays[mname] = loaded_displays[mname]
displays[mname].bus = current_bus
current_display = None
current_bus = None
if "I2C bus:" in line and current_display:
busname = line.split("I2C bus:")[1].strip().replace("/dev/i2c-", "")
current_bus = busname
tasks = []
for key, dinfo in displays.items():
tasks.append(single_display_update(key, dinfo))
res = await asyncio.gather(*tasks)
for key, dinfo in res:
displays[key] = dinfo
with open(STATE_FILE, "w") as of:
dicted_data = {}
for key, dinfo in displays.items():
dicted_data[key] = asdict(dinfo)
of.write(json.dumps(dicted_data, indent=4))
asyncio.run(main())