-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathnicesuno.py
421 lines (403 loc) · 22.1 KB
/
nicesuno.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
# encoding:utf-8
import os
import re
import json
import time
import requests
import threading
from typing import List
from pathvalidate import sanitize_filename
import plugins
from bridge.context import ContextType
from bridge.reply import Reply, ReplyType
from common.log import logger
from plugins import *
@plugins.register(
name="Nicesuno",
desire_priority=90,
hidden=False,
desc="一款基于Suno和Suno-API创作音乐的插件。",
version="1.3",
author="空心菜",
)
class Nicesuno(Plugin):
def __init__(self):
super().__init__()
try:
# 加载配置
conf = super().load_config()
# 配置不存在则使用默认配置
if not conf:
logger.debug("[Nicesuno] config.json not found, config.json.template used.")
curdir = os.path.dirname(__file__)
config_path = os.path.join(curdir, "config.json.template")
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
conf = json.load(f)
self.suno_api_bases = conf.get("suno_api_bases", [])
self.music_create_prefixes = conf.get("music_create_prefixes", [])
self.instrumental_create_prefixes = conf.get("instrumental_create_prefixes", [])
self.lyrics_create_prefixes = conf.get("lyrics_create_prefixes", [])
self.music_output_dir = conf.get("music_output_dir", "/tmp")
self.is_send_lyrics = conf.get("is_send_lyrics", True)
self.is_send_covers = conf.get("is_send_covers", True)
if not os.path.exists(self.music_output_dir):
logger.info(f"[Nicesuno] music_output_dir={self.music_output_dir} not exists, create it.")
os.makedirs(self.music_output_dir)
if self.suno_api_bases and isinstance(self.suno_api_bases, List) \
and self.music_create_prefixes and isinstance(self.music_create_prefixes, List):
self.handlers[Event.ON_HANDLE_CONTEXT] = self.on_handle_context
logger.info("[Nicesuno] inited")
else:
logger.warn("[Nicesuno] init failed because suno_api_bases or music_create_prefixes is incorrect.")
# 待实现:部署多套Suno-API,实现限额后自动切换Suno账号
self.suno_api_base = self.suno_api_bases[0]
except Exception as e:
logger.error(f"[Nicesuno] init failed, ignored.")
raise e
def on_handle_context(self, e_context: EventContext):
try:
# 判断是否是TEXT类型消息
context = e_context["context"]
if context.type != ContextType.TEXT:
return
content = context.content
logger.debug(f"[Nicesuno] on_handle_context. content={content}")
# 判断是否包含创作的前缀
make_instrumental, make_lyrics = False, False
music_create_prefix = self._check_prefix(content, self.music_create_prefixes)
instrumental_create_prefix = self._check_prefix(content, self.instrumental_create_prefixes)
lyrics_create_prefix = self._check_prefix(content, self.lyrics_create_prefixes)
if music_create_prefix:
suno_prompt = content[len(music_create_prefix):].strip()
elif instrumental_create_prefix:
make_instrumental = True
suno_prompt = content[len(instrumental_create_prefix):].strip()
elif lyrics_create_prefix:
make_lyrics = True
suno_prompt = content[len(lyrics_create_prefix):].strip()
else:
logger.debug(f"[Nicesuno] content starts without any suno prefixes, ignored.")
return
# 判断是否包含创作的提示词
if not suno_prompt:
logger.info("[Nicesuno] content starts without any suno prompts, ignored.")
return
# 开始创作
if make_lyrics:
logger.info(f"[Nicesuno] start generating lyrics, suno_prompt={suno_prompt}.")
self._create_lyrics(e_context, suno_prompt)
else:
logger.info(
f"[Nicesuno] start generating {'instrumental' if make_instrumental else 'vocal'} music, suno_prompt={suno_prompt}.")
self._create_music(e_context, suno_prompt, make_instrumental)
except Exception as e:
logger.warning(f"[Nicesuno] failed to generate music, error={e}")
reply = Reply(ReplyType.TEXT, "抱歉!创作失败了,请稍后再试🥺")
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS
# 创作音乐
def _create_music(self, e_context, suno_prompt, make_instrumental=False):
custom_mode = False
# 自定义模式
if '标题' in suno_prompt and '风格' in suno_prompt:
regex_prompt = r' *标题[::]?(?P<title>[\S ]*)\n+ *风格[::]?(?P<tags>[\S ]*)(\n+(?P<lyrics>.*))?'
r = re.fullmatch(regex_prompt, suno_prompt, re.DOTALL)
title = r.group('title').strip() if r and r.group('title') else None
tags = r.group('tags').strip() if r and r.group('tags') else None
lyrics = r.group('lyrics').strip() if r and r.group('lyrics') else None
if r and (tags or lyrics):
custom_mode = True
logger.info(f"[Nicesuno] generating {'instrumental' if make_instrumental else 'vocal'} music in custom mode, title={title}, tags={tags}, lyrics={lyrics}")
data = self._suno_generate_music_custom_mode(title, tags, lyrics, make_instrumental)
else:
logger.warning(f"[Nicesuno] generating {'instrumental' if make_instrumental else 'vocal'} music in custom mode failed because of wrong format, suno_prompt={suno_prompt}")
reply = Reply(ReplyType.TEXT, self.get_help_text())
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS
return
# 描述模式
else:
logger.info(f"[Nicesuno] generating {'instrumental' if make_instrumental else 'vocal'} music with description, description={suno_prompt}")
data = self._suno_generate_music_with_description(suno_prompt, make_instrumental)
channel = e_context["channel"]
context = e_context["context"]
to_user_nickname = context["msg"].to_user_nickname
if not data:
logger.warning(f"response data of _suno_generate_music is empty.")
reply = Reply(ReplyType.TEXT, f"因为神秘原因,创作失败了😂请稍后再试...")
# 如果Suno超过限额
elif data.get('detail') == 'Insufficient credits.' and custom_mode:
logger.warning(f"[Nicesuno] insufficient credits in custom mode.")
reply = Reply(ReplyType.TEXT, f"Suno老师说一天只能创作5次😂今天确实唱够了,明天11点之后再来好不好😘")
elif data.get('detail') == 'Insufficient credits.':
logger.warning(f"[Nicesuno] insufficient credits with description, changed to generating lyrics...")
reply = Reply(ReplyType.TEXT, f"Suno老师说一天只能创作5次😂今天确实唱够了,{to_user_nickname}来为你写歌好不好😘")
self._create_lyrics(e_context, suno_prompt)
# 如果Suno-API的Token失效
elif data.get('detail'):
logger.warning(f"[Nicesuno] error occurred, response data={data}")
if data.get('detail') == 'Unauthorized':
reply = Reply(ReplyType.TEXT, f"因为长期翘课,被Suno老师劝退了😂请重新找Suno老师申请入学...")
elif data.get('detail') == 'Topic too long.':
reply = Reply(ReplyType.TEXT, f"因为废话太多,被Suno老师打回了😂请重新提交创作申请...")
elif data.get('detail') == 'Too many running jobs.':
reply = Reply(ReplyType.TEXT, f"Suno老师说工作太忙😂请稍等片刻再创作...")
else:
reply = Reply(ReplyType.TEXT, f"因为{data.get('detail')},创作失败了😂请稍后再试...")
elif not data.get('clips'):
logger.warning(f"[Nicesuno] no clips in response data, response data={data}")
reply = Reply(ReplyType.TEXT, f"因为神秘原因,创作失败了😂请稍后再试...")
# 获取和发送音乐
else:
aids = [clip['id'] for clip in data['clips']]
logger.debug(f"[Nicesuno] start to handle music, aids={aids}, data={data}")
threading.Thread(target=self._handle_music, args=(channel, context, aids)).start()
reply = Reply(ReplyType.TEXT, f"{to_user_nickname}正在为您创作音乐,请稍等☕")
e_context["reply"] = reply
e_context.action = EventAction.BREAK_PASS
# 创作歌词
def _create_lyrics(self, e_context, suno_prompt):
data = self._suno_generate_lyrics(suno_prompt)
channel = e_context["channel"]
context = e_context["context"]
if not data:
error = f"response data of _suno_generate_lyrics is empty."
raise Exception(error)
# 获取和发送歌词
lid = data['id']
logger.debug(f"[Nicesuno] start to handle lyrics, lid={lid}, data={data}")
threading.Thread(target=self._handle_lyric, args=(channel, context, lid, suno_prompt)).start()
e_context.action = EventAction.BREAK_PASS
# 下载和发送音乐
def _handle_music(self, channel, context, aids: List):
# 用户信息
actual_user_nickname = context["msg"].actual_user_nickname or context["msg"].other_user_nickname
to_user_nickname = context["msg"].to_user_nickname
# 获取歌词和音乐
initial_delay_seconds = 15
last_lyrics = ""
for aid in aids:
# 获取音乐信息
start_time = time.time()
while True:
if initial_delay_seconds:
time.sleep(initial_delay_seconds)
initial_delay_seconds = 0
data = self._suno_get_music(aid)
if not data:
raise Exception("[Nicesuno] 获取音乐信息失败!")
elif data["audio_url"]:
break
elif time.time() - start_time > 180:
raise TimeoutError("[Nicesuno] 获取音乐信息超时!")
time.sleep(5)
# 解析音乐信息
title, metadata, audio_url = data["title"], data["metadata"], data["audio_url"]
lyrics, tags, description_prompt = metadata["prompt"], metadata["tags"], metadata['gpt_description_prompt']
description_prompt = description_prompt if description_prompt else "自定义模式不展示"
# 发送歌词
if not self.is_send_lyrics:
logger.debug(f"[Nicesuno] 发送歌词开关关闭,不发送歌词!")
elif lyrics == last_lyrics:
logger.debug("[Nicesuno] 歌词和上次相同,不再重复发送歌词!")
else:
reply_text = f"🎻{title}🎻\n\n{lyrics}\n\n🎹风格: {tags}\n👶发起人:{actual_user_nickname}\n🍀制作人:Suno\n🎤提示词: {description_prompt}"
logger.debug(f"[Nicesuno] 发送歌词,reply_text={reply_text}")
last_lyrics = lyrics
reply = Reply(ReplyType.TEXT, reply_text)
channel.send(reply, context)
# 下载音乐
filename = f"{int(time.time())}-{sanitize_filename(title).replace(' ', '')[:20]}"
audio_path = os.path.join(self.music_output_dir, f"{filename}.mp3")
logger.debug(f"[Nicesuno] 下载音乐,audio_url={audio_url}")
self._download_file(audio_url, audio_path)
# 发送音乐
logger.debug(f"[Nicesuno] 发送音乐,audio_path={audio_path}")
reply = Reply(ReplyType.FILE, audio_path)
channel.send(reply, context)
# 发送封面
if not self.is_send_covers:
logger.debug(f"[Nicesuno] 发送封面开关关闭,不发送封面!")
else:
# 获取封面信息
start_time = time.time()
while True:
data = self._suno_get_music(aid)
if not data:
#raise Exception("[Nicesuno] 获取封面信息失败!")
logger.warning("[Nicesuno] 获取封面信息失败!")
break
elif data["image_url"]:
break
elif time.time() - start_time > 60:
#raise TimeoutError("[Nicesuno] 获取封面信息超时!")
logger.warning("[Nicesuno] 获取封面信息超时!")
break
time.sleep(5)
if data and data["image_url"]:
image_url = data["image_url"]
logger.debug(f"[Nicesuno] 发送封面,image_url={image_url}")
reply = Reply(ReplyType.IMAGE_URL, image_url)
channel.send(reply, context)
else:
logger.warning(f"[Nicesuno] 获取封面信息失败,放弃发送封面!")
# 获取视频地址
video_urls = []
for aid in aids:
# 获取视频地址
start_time = time.time()
while True:
data = self._suno_get_music(aid)
if not data:
#raise Exception("[Nicesuno] 获取视频地址失败!")
logger.warning("[Nicesuno] 获取视频地址失败!")
video_urls.append("获取失败!")
break
elif data["video_url"]:
video_urls.append(data["video_url"])
break
elif time.time() - start_time > 180:
#raise TimeoutError("[Nicesuno] 获取视频地址超时!")
logger.warning("[Nicesuno] 获取视频地址超时!")
video_urls.append("获取超时!")
time.sleep(10)
# 查收提醒
video_text = '\n'.join(f'视频{idx+1}: {url}' for idx, url in zip(range(len(video_urls)), video_urls))
reply_text = f"{to_user_nickname}已经为您创作了音乐,请查收!以下是音乐视频:\n{video_text}"
if context.get("isgroup", False):
reply_text = f"@{actual_user_nickname}\n" + reply_text
logger.debug(f"[Nicesuno] 发送查收提醒,reply_text={reply_text}")
reply = Reply(ReplyType.TEXT, reply_text)
channel.send(reply, context)
# 获取和发送歌词
def _handle_lyric(self, channel, context, lid, description_prompt=""):
# 用户信息
actual_user_nickname = context["msg"].actual_user_nickname or context["msg"].other_user_nickname
# 获取歌词信息
start_time = time.time()
while True:
data = self._suno_get_lyrics(lid)
if not data:
raise Exception("[Nicesuno] 获取歌词信息失败!")
elif data["status"] == 'complete':
break
elif time.time() - start_time > 120:
raise TimeoutError("[Nicesuno] 获取歌词信息超时!")
time.sleep(5)
# 发送歌词
title, lyrics = data["title"], data["text"]
reply_text = f"🎻{title}🎻\n\n{lyrics}\n\n👶发起人:{actual_user_nickname}\n🍀制作人:Suno\n🎤提示词: {description_prompt}"
logger.debug(f"[Nicesuno] 发送歌词,reply_text={reply_text}")
reply = Reply(ReplyType.TEXT, reply_text)
channel.send(reply, context)
# 创作音乐
def _suno_generate_music_with_description(self, description, make_instrumental=False, retry_count=0):
payload = {
"gpt_description_prompt": description,
"make_instrumental": make_instrumental,
"mv": "chirp-v3-0",
}
while retry_count >= 0:
try:
response = requests.post(f"{self.suno_api_base}/generate/description-mode", data=json.dumps(payload), timeout=(5, 30))
if response.status_code != 200:
raise Exception(f"status_code is not ok, status_code={response.status_code}")
logger.debug(f"[Nicesuno] _suno_generate_music_with_description, response={response.text}")
return response.json()
except Exception as e:
logger.error(f"[Nicesuno] _suno_generate_music_with_description failed, description={description}, error={e}")
retry_count -= 1
time.sleep(5)
# 创作音乐
def _suno_generate_music_custom_mode(self, title=None, tags=None, lyrics=None, make_instrumental=False, retry_count=0):
payload = {
"title": title,
"tags": tags,
"prompt": lyrics,
"make_instrumental": make_instrumental,
"mv": "chirp-v3-0",
"continue_clip_id": None,
"continue_at": None,
}
while retry_count >= 0:
try:
response = requests.post(f"{self.suno_api_base}/generate", data=json.dumps(payload), timeout=(5, 30))
if response.status_code != 200:
raise Exception(f"status_code is not ok, status_code={response.status_code}")
logger.debug(f"[Nicesuno] _suno_generate_music_custom_mode, response={response.text}")
return response.json()
except Exception as e:
logger.error(f"[Nicesuno] _suno_generate_music_custom_mode failed, title={title}, tags={tags}, lyrics={lyrics}, error={e}")
retry_count -= 1
time.sleep(5)
# 获取音乐信息
def _suno_get_music(self, aid, retry_count=3):
while retry_count >= 0:
try:
response = requests.get(f"{self.suno_api_base}/feed/{aid}", timeout=(5, 30))
if response.status_code != 200:
raise Exception(f"status_code is not ok, status_code={response.status_code}")
logger.debug(f"[Nicesuno] _suno_get_music, response={response.text}")
return response.json()[0]
except Exception as e:
logger.error(f"[Nicesuno] _suno_get_music failed, aid={aid}, error={e}")
retry_count -= 1
time.sleep(5)
# 创作歌词
def _suno_generate_lyrics(self, suno_lyric_prompt, retry_count=3):
payload = {
"prompt": suno_lyric_prompt
}
while retry_count >= 0:
try:
response = requests.post(f"{self.suno_api_base}/generate/lyrics/", data=json.dumps(payload), timeout=(5, 30))
if response.status_code != 200:
raise Exception(f"status_code is not ok, status_code={response.status_code}")
logger.debug(f"[Nicesuno] _suno_generate_lyrics, response={response.text}")
return response.json()
except Exception as e:
logger.error(f"[Nicesuno] _suno_generate_lyrics failed, suno_lyric_prompt={suno_lyric_prompt}, error={e}")
retry_count -= 1
time.sleep(5)
# 获取歌词信息
def _suno_get_lyrics(self, lid, retry_count=3):
while retry_count >= 0:
try:
response = requests.get(f"{self.suno_api_base}/lyrics/{lid}", timeout=(5, 30))
if response.status_code != 200:
raise Exception(f"status_code is not ok, status_code={response.status_code}")
logger.debug(f"[Nicesuno] _suno_get_lyrics, response={response.text}")
return response.json()
except Exception as e:
logger.error(f"[Nicesuno] _suno_get_lyrics failed, lid={lid}, error={e}")
retry_count -= 1
time.sleep(5)
# 下载文件
def _download_file(self, file_url, file_path, retry_count=3):
while retry_count >= 0:
try:
response = requests.get(file_url, allow_redirects=True, stream=True)
if response.status_code != 200:
raise Exception(f"[Nicesuno] 文件下载失败,file_url={file_url}, status_code={response.status_code}")
with open(file_path, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
except Exception as e:
logger.error(f"[Nicesuno] 文件下载失败,file_url={file_url}, error={e}")
retry_count -= 1
time.sleep(5)
else:
break
# 检查是否包含创作音乐的前缀
def _check_prefix(self, content, prefix_list):
if not prefix_list:
return None
for prefix in prefix_list:
if content.startswith(prefix):
return prefix
return None
# 帮助文档
def get_help_text(self, **kwargs):
return "使用Suno创作音乐。\n1.创作声乐\n用法:唱/演唱<提示词>\n示例:唱明天会更好。\n\n2.创作器乐\n用法:演奏<提示词>\n示例:演奏明天会更好。\n\n3.创作歌词\n用法:写歌/作词<提示词>\n示例:写歌明天会更好。\n\n4.自定义模式\n用法:\n唱/演唱/演奏\n标题: <标题>\n风格: <风格1> <风格2> ...\n<歌词>\n备注:前三行必须为创作前缀、标题、风格,<标题><风格><歌词>三个值可以为空,但<风格><歌词>不可同时为空!"