-
-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathVRC_OSCLib.py
358 lines (263 loc) · 11.3 KB
/
VRC_OSCLib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# Copyright (c) 2013 Iris
# Released under the MIT license
# https://opensource.org/licenses/mit-license.php
# Request "pythonosc" https://pypi.org/project/python-osc/
# OSC Imput Event Name => https://docs.vrchat.com/v2022.1.1/docs/osc-as-input-controller
import time
from pythonosc import udp_client
from pythonosc.osc_message_builder import OscMessageBuilder
from unidecode import unidecode
import threading
import queue
# Variables and locks for thread safety and message queuing
send_message_lock = threading.Lock()
osc_queue = queue.Queue()
last_message_sent_time = 0
stop_flag = False
thread = None
min_time_between_messages = 1.5
def AV3_SetInt(data=0, Parameter="example", IP='127.0.0.1', PORT=9000):
Int(data, "/avatar/parameters/" + Parameter, IP, PORT)
def AV3_SetFloat(data=0.0, Parameter="example", IP='127.0.0.1', PORT=9000):
Float(data, "/avatar/parameters/" + Parameter, IP, PORT)
def AV3_SetBool(data=False, Parameter="example", IP='127.0.0.1', PORT=9000):
Bool(data, "/avatar/parameters/" + Parameter, IP, PORT)
def Control_Push(Button="example", IP='127.0.0.1', PORT=9000):
Buttons("/input/" + Button, IP, PORT)
def Control_Joystick(data=0.0, axis="example", IP='127.0.0.1', PORT=9000):
Float(data, "/input/" + axis, IP, PORT)
def RemoveNonASCII(data):
new_val = data.encode("ascii", "ignore")
return new_val.decode()
# Button
def Buttons(address="/input/example", IP='127.0.0.1', PORT=9000):
# OSC Bild
client = udp_client.UDPClient(IP, PORT)
msg = OscMessageBuilder(address=address)
msg.add_arg(1)
m = msg.build()
msgb = OscMessageBuilder(address=address)
msgb.add_arg(0)
mb = msgb.build()
# OSC Send
client.send(m)
time.sleep(0.1)
client.send(mb)
# Int
def Int(data=0, address="/input/example", IP='127.0.0.1', PORT=9000):
senddata = int(data)
# OSC Bild
client = udp_client.UDPClient(IP, PORT)
msg = OscMessageBuilder(address=address)
msg.add_arg(senddata)
m = msg.build()
# OSC Send
client.send(m)
# Float
def Float(data=0.0, address="/input/example", IP='127.0.0.1', PORT=9000):
senddata = float(data)
# OSC Bild
client = udp_client.UDPClient(IP, PORT)
msg = OscMessageBuilder(address=address)
msg.add_arg(senddata)
m = msg.build()
# OSC Send
client.send(m)
# Bool
def Bool(data=False, address="/input/Jump", IP='127.0.0.1', PORT=9000):
# OSC Bild
client = udp_client.UDPClient(IP, PORT)
msg = OscMessageBuilder(address=address)
msg.add_arg(data)
m = msg.build()
# OSC Send
client.send(m)
def set_min_time_between_messages(time_in_seconds):
global min_time_between_messages
min_time_between_messages = time_in_seconds
def _send_osc_message():
global last_message_sent_time, min_time_between_messages
#print("min_time_between_messages: " + str(min_time_between_messages))
while True:
try:
# Wait for a message to be available in the queue. This will block until a message is available.
message_data = osc_queue.get()
# Discard all but the latest message in the queue.
while not osc_queue.empty():
message_data = osc_queue.get()
current_time = time.time()
time_since_last_message = current_time - last_message_sent_time
if time_since_last_message < min_time_between_messages:
time_to_wait = min_time_between_messages - time_since_last_message
time.sleep(time_to_wait)
# Send the actual OSC message here using the original Chat function
_direct_osc_send(**message_data)
last_message_sent_time = time.time()
except Exception as e:
# This can be useful for debugging any exceptions that might occur
print(f"Error in OSC sender thread: {e}")
time.sleep(0.1)
# Start the OSC message sender thread
osc_sender_thread = threading.Thread(target=_send_osc_message)
osc_sender_thread.daemon = True
osc_sender_thread.start()
# OSC Send Command
def Message(data="example", address="/example", IP='127.0.0.1', PORT=9000):
# OSC Bild
client = udp_client.UDPClient(IP, PORT)
msg = OscMessageBuilder(address=address)
msg.add_arg(data)
m = msg.build()
# OSC Send
client.send(m)
def Chat(data="example", send=True, nofify=True, address="/chatbox/input", IP='127.0.0.1', PORT=9000, convert_ascii=False):
with send_message_lock:
# Enqueue the message for sending
osc_queue.put({
"data": data,
"send": send,
"nofify": nofify,
"address": address,
"IP": IP,
"PORT": PORT,
"convert_ascii": convert_ascii
})
# OSC Send Chat
def _direct_osc_send(data="example", send=True, nofify=True, address="/chatbox/input", IP='127.0.0.1', PORT=9000, convert_ascii=False):
# OSC Bild
client = udp_client.UDPClient(IP, PORT)
msg = OscMessageBuilder(address=address)
if convert_ascii:
msg.add_arg(unidecode(data))
else:
msg.add_arg(data)
msg.add_arg(send)
msg.add_arg(nofify)
m = msg.build()
# OSC Send
client.send(m)
def count_utf16_code_units(s):
return len(s.encode('utf-16le')) // 2
def split_words(text, chunk_size):
words = text.split()
chunks = []
current_chunk = ""
for i, word in enumerate(words):
# If a word is longer than the chunk size, split it into parts
while count_utf16_code_units(word) > chunk_size - 6:
part, word = word[:chunk_size - 6], word[chunk_size - 6:]
chunks.append(current_chunk + " " + part if current_chunk else part)
current_chunk = word
# Check if there is a previous chunk and will be a next chunk
if i != 0 and i != len(words) - 1:
# Adding 2 to account for the space that will be added, and 6 for the two dots
condition = count_utf16_code_units(current_chunk + word + " ... ...") <= chunk_size
else:
# Adding 1 to account for the space that will be added, and 3 for the dots
condition = count_utf16_code_units(current_chunk + word + " ...") <= chunk_size
if condition:
# Add the word to the current chunk
if current_chunk != "":
current_chunk += " "
current_chunk += word
else:
# The current word would make the chunk too long, so it's time to start a new chunk
chunks.append(current_chunk)
current_chunk = word
# Only add the last chunk if it's not an empty string
if current_chunk != "":
chunks.append(current_chunk)
return chunks
def sleep_while_checking_stop_flag(delay):
start_time = time.time()
while time.time() - start_time < delay:
if stop_flag:
return
time.sleep(0.1) # check every 100ms
def Chat_chunks(data="example", chunk_size=144, delay=1., initial_delay=1., nofify=True, address="/chatbox/input", ip='127.0.0.1', port=9000, convert_ascii=False):
global thread, stop_flag
# stop thread
if thread and thread.is_alive():
stop_flag = True
thread.join()
stop_flag = False
thread = threading.Thread(target=send_chunks_v2, args=(data, chunk_size, delay, initial_delay, nofify, address, ip, port, convert_ascii))
thread.start()
def Chat_scrolling_chunks(data="example", chunk_size=144, delay=1., initial_delay=1., scroll_size=1, nofify=True, address="/chatbox/input", ip='127.0.0.1', port=9000, convert_ascii=False):
global thread, stop_flag
# stop thread
if thread and thread.is_alive():
stop_flag = True
thread.join()
stop_flag = False
thread = threading.Thread(target=send_scrolling_chunks, args=(data, chunk_size, delay, initial_delay, scroll_size, nofify, address, ip, port, convert_ascii))
thread.start()
# send chat by chunks
def send_chunks(text, chunk_size=144, delay=1., initial_delay=1., nofify=True, address="/chatbox/input", ip='127.0.0.1', port=9000, convert_ascii=False):
# Convert text to list of UTF-16 code units
text_utf16 = text.encode('utf-16le')
# Check if text is shorter than chunk_size
if count_utf16_code_units(text) <= chunk_size:
Chat(text, send=True, nofify=nofify, address=address, IP=ip, PORT=port, convert_ascii=convert_ascii)
return
# Calculate the number of chunks
num_chunks = count_utf16_code_units(text) // chunk_size + (count_utf16_code_units(text) % chunk_size != 0)
for i in range(num_chunks):
if stop_flag:
break
# Get the current chunk
chunk = text[i*chunk_size:(i+1)*chunk_size]
# Convert chunk back to string
chunk = chunk.decode('utf-16le')
# Send the chunk to the API
Chat(chunk, send=True, nofify=(nofify and i == 0), address=address, IP=ip, PORT=port, convert_ascii=convert_ascii)
# Wait for the specified delay
if i == 0:
sleep_while_checking_stop_flag(initial_delay)
else:
sleep_while_checking_stop_flag(delay)
def send_chunks_v2(text, chunk_size=144, delay=1., initial_delay=1., nofify=True, address="/chatbox/input", ip='127.0.0.1', port=9000, convert_ascii=False):
# Send the full text if it fits into a single chunk
if count_utf16_code_units(text) <= chunk_size:
Chat(text, send=True, nofify=nofify, address=address, IP=ip, PORT=port, convert_ascii=convert_ascii)
return
chunks = split_words(text, chunk_size)
for i, chunk in enumerate(chunks):
if stop_flag:
break
# Add dots to indicate continuation
if i != 0:
chunk = "... " + chunk # Add a space after the dots
if i != len(chunks) - 1:
chunk = chunk + " ..." # Add a space before the dots
# Send the chunk to the API
Chat(chunk, send=True, nofify=(nofify and i == 0), address=address, IP=ip, PORT=port, convert_ascii=convert_ascii)
# Wait for the specified delay
if i == 0:
sleep_while_checking_stop_flag(initial_delay)
else:
sleep_while_checking_stop_flag(delay)
# send chat by scrolling chunks
def send_scrolling_chunks(text, chunk_size=144, delay=1., initial_delay=1., scroll_size=1, nofify=True, address="/chatbox/input", ip='127.0.0.1', port=9000, convert_ascii=False):
# Convert text to list of UTF-16 code units
text_utf16 = text.encode('utf-16le')
# Check if text is shorter than chunk_size
if count_utf16_code_units(text) <= chunk_size:
Chat(text, send=True, nofify=nofify, address=address, IP=ip, PORT=port, convert_ascii=convert_ascii)
return
# Calculate the number of chunks
num_chunks = (count_utf16_code_units(text) - chunk_size) // scroll_size + 1
for i in range(num_chunks):
if stop_flag:
break
# Get the current chunk
chunk_utf16 = text_utf16[i*scroll_size*2:(i*scroll_size*2)+chunk_size*2]
# Convert chunk back to string
chunk = chunk_utf16.decode('utf-16le')
# Send the chunk to the API
Chat(chunk, send=True, nofify=(nofify and i == 0), address=address, IP=ip, PORT=port, convert_ascii=convert_ascii)
# Wait for the specified delay
if i == 0:
sleep_while_checking_stop_flag(initial_delay)
else:
sleep_while_checking_stop_flag(delay)