Skip to content

Commit

Permalink
Completed #20
Browse files Browse the repository at this point in the history
  • Loading branch information
thelabcat committed Dec 28, 2024
1 parent 80e0bd6 commit 8f93237
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 179 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "rumchat_actor"
version = "3.1.0"
version = "3.2.0"
keywords = ["rumble", "chat", "livestream", "bot"]
authors = [
{ name="Wilbur Jaywright", email="[email protected]" },
Expand All @@ -13,7 +13,7 @@ description = "Automatically interact with your Rumble livestream chats."
readme = "README.md"
requires-python = ">=3.8"
dependencies = [
"cocorum==2.1.0",
"cocorum",
"moviepy",
"pyautogui",
"requests",
Expand Down
15 changes: 8 additions & 7 deletions src/rumchat_actor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ def eat_some_cheese(message, actor):
#stream_id is either the base 10 or base 36 livestream ID you want the Actor to connect to, obtained from the popout chat or the Rumble Live Stream API.
#If stream_id is None but you pass api_url, the latest livestream shown on the API is chosen automatically.
#If you pass profile_dir to an existing Firefox profile directory, your sign-ins to Rumble chat for the actor will be saved.
#Otherwise, you will have to log in manuaglly each time you use the bot, or pass username and password.
actor = rumchat_actor.RumbleChatActor(stream_id = STREAM_ID)
#Pass CREDENTIALS to log in to Rumble
actor = rumchat_actor.RumbleChatActor(stream_id = STREAM_ID, username, password = CREDENTIALS)
#Register an action to be called on every message
actor.register_message_action(eat_some_cheese)
Expand Down Expand Up @@ -118,8 +117,8 @@ def __init__(self, init_message = "Hello, Rumble!", ignore_users = ["TheRumbleBo
self.password = kwargs.get("password")

#Username must not be an email
if self.username and "@" in self.username:
print("Username cannot be provided as email, must be displayed username.")
if "@" in self.username:
print("Username cannot be provided as email.")
self.username = None

#We can get the username from the Rumble Live Stream API
Expand All @@ -137,7 +136,7 @@ def __init__(self, init_message = "Hello, Rumble!", ignore_users = ["TheRumbleBo
self.password = getpass("Actor password: ")

try:
self.session_cookie = servicephp.login(self.username, self.password)
self.chat = ChatAPI(self.stream_id, self.username, self.password)
#Login failed
except AssertionError:
print("Error. Login failed with provided credentials.")
Expand All @@ -146,9 +145,11 @@ def __init__(self, init_message = "Hello, Rumble!", ignore_users = ["TheRumbleBo

first_time = False

self.chat = ChatAPI(self.stream_id, self.session_cookie)
self.chat.clear_mailbox()

#Reference the chat's servicephp for commands and stuff that might go to us for it
self.servicephp = self.chat.servicephp

#Ignore these users when processing messages
self.ignore_users = ignore_users

Expand Down
196 changes: 26 additions & 170 deletions src/rumchat_actor/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,11 @@
Functions and classes that did not fit in another module
S.D.G."""

from getpass import getpass
import queue
import threading
import time
import selenium
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from . import static, utils
from cocorum import uploadphp
from . import static

class ClipUploader():
"""Upload clips to Rumble automatically"""
Expand All @@ -23,7 +17,6 @@ def __init__(self, actor, clip_command, **kwargs):
clip_command: The clip command instance
channel_id: The name or int ID of the channel to upload to, defaults to no channel (user page)
profile_dir: The Firefox profile directory to use, defaults to burner profile
username, password: The username and password to log in with, not needed if Firefox profile is signed in, otherwise defaults to manual login
browser_head: Display a head for the Firefox process. Defaults to false."""

#Save actor
Expand All @@ -33,86 +26,11 @@ def __init__(self, actor, clip_command, **kwargs):
self.clip_command = clip_command
self.clip_command.clip_uploader = self

#Set browser profile directory if we have one
options = webdriver.FirefoxOptions()
if "profile_dir" in kwargs:
options.add_argument("-profile")
options.add_argument(kwargs["profile_dir"])

#Set browser to headless mode, unless otherwise specified
if not kwargs.get("browser_head"):
options.add_argument("--headless")

#Start the driver
self.driver = webdriver.Firefox(options)
self.driver.maximize_window() #Make sure the window covers the full screen
#self.driver.minimize_window() #Cannot do because it shrinks the window. Must be minimized manually

#Load the upload page
self.driver.get(static.URI.upload_page)

#Close the premium banner if it is there
utils.close_premium_banner(self.driver)

#Get the login credentials from arguments, or None if they were not passed
self.username = kwargs.get("username")
self.password = kwargs.get("password")

#Wait for sign in
first_time = True
while "login" in self.driver.current_url:
#First time trying to log in
if first_time:
#Get the credentials entry fields
uname_field = self.driver.find_element(By.ID, "login-username")
password_field = self.driver.find_element(By.ID, "login-password")

#Login failed
else:
print("Error. Login failed with provided credentials.")
self.username = None
self.password = None

uname_field.clear()
password_field.clear()

#A username was not passed or was incorrect
if not self.username:
#Default to the actor username
if self.actor.username:
self.username = self.actor.username
else:
self.username = input("Clip uploader username: ")

#A password was not passed or was incorrect
if not self.password:
#Default to the actor password
if self.actor.password:
self.password = self.actor.password
else:
self.password = getpass("Clip uploader password: ")

#Enter the credentials
uname_field.send_keys(self.username + Keys.RETURN)
password_field.send_keys(self.password + Keys.RETURN)

#Click the login button
#self.driver.find_element(By.CLASS_NAME, "login-button.login-form-button.round-button.bg-green")

first_time = False

#Wait for login
try:
self.wait_for_upload_elem()

#Sign in did not work
except selenium.common.exceptions.WebDriverException as e:
print(e)
print("Could not get file upload field.")
assert "login" in self.driver.current_url, "Not on login or upload page"
#Get upload system
self.uploadphp = uploadphp.UploadPHP(self.actor.servicephp)

#Channel ID to use, or None if it was not passed
self.channel_id = kwargs.get("channel_id")
self.channel_id = kwargs.get("channel_id", 0)

#List of clip filenames to upload
self.clips_to_upload = queue.Queue()
Expand All @@ -121,93 +39,28 @@ def __init__(self, actor, clip_command, **kwargs):
self.clip_uploader_thread = threading.Thread(target = self.clip_upload_loop, daemon = True)
self.clip_uploader_thread.start()

def wait_for_upload_elem(self):
"""Wait for the file upload element to appear, indicating page load"""
WebDriverWait(self.driver, static.Driver.wait_timeout).until(
EC.presence_of_element_located((By.CSS_SELECTOR, "input[type='file']")),
"ClipUploader timed out waiting for file upload field to appear",
)

def upload_clip(self, filename, complete_path):
"""Add the clip filename to the queue"""
self.clips_to_upload.put((filename, complete_path))

def __upload_clip(self, filename, complete_path):
"""Upload a clip to Rumble"""
#Load the upload page
self.driver.get(static.URI.upload_page)

#Wait for file upload field to appear
self.wait_for_upload_elem()
upload = self.uploadphp.upload_video(
file_path = complete_path,
title = f"stream {self.actor.stream_id_b10} clip {filename}",
description = "Automatic clip upload. Enjoy!",
category1 = static.Clip.Upload.category_1,
category2 = static.Clip.Upload.category_2,
channel_id = self.channel_id,
visibility = "unlisted",
)

#Select file and begin upload
file_input = self.driver.find_element(By.CSS_SELECTOR, "input[type='file']")
file_input.send_keys(complete_path)
#Announce link
self.actor.send_message("Clip uploaded to " + upload.url)

#Wait for upload to complete
upload_progress_el = self.driver.find_element(By.CLASS_NAME, "green_percent")
while (progress := int(upload_progress_el.get_attribute("style").split("width: ")[-1].split(";")[0].removesuffix("%"))) < 100:
time.sleep(1)
print("Upload progress:", progress)
print("Upload complete")

#Fill out video information
title = f"stream {self.actor.stream_id_b10} clip {filename}"
self.driver.find_element(By.ID, "title").send_keys(title)
self.driver.find_element(By.ID, "description").send_keys("Automatic clip upload. Enjoy!")
#driver.find_element(By.ID, "tags").send_keys(", ".join(TAGS_LIST))
self.driver.find_element(By.NAME, "primary-category").send_keys(static.Clip.Upload.category_1 + Keys.RETURN)
if static.Clip.Upload.category_2:
self.driver.find_element(By.NAME, "secondary-category").send_keys(static.Clip.Upload.category_2 + Keys.RETURN)

#Select channel
channel_id_fieldset = self.driver.find_element(By.ID, "channelId")
try:
if self.channel_id:
if isinstance(self.channel_id, str):
channel_id_fieldset.find_element(By.XPATH, f"//label[text()='{self.channel_id}']").click()
elif isinstance(self.channel_id, int):
channel_id_fieldset.find_element(By.XPATH, f"//label[@channel='{self.channel_id}']").click()
else:
print("Invalid channel format")
else:
print("No channel ID specified. Defaulting to User ID.")
except selenium.common.exceptions.NoSuchElementException:
print("Channel ID specified did not exist. Defaulting to User ID.")

#Set visibility
vis_options_el = self.driver.find_element(By.ID, "visibility-options")
vis_options_el.find_element(By.XPATH, "*/label[@for='visibility_unlisted']").click()

#Submit form 1
self.scroll_to_bottom()
self.driver.find_element(By.ID, "submitForm").click()

#Wait for rights checkbox, then click it
WebDriverWait(self.driver, static.Driver.wait_timeout).until(EC.element_to_be_clickable((By.XPATH, "//label[@for='crights']"))).click()

#Click terms checkbox
self.driver.find_element(By.XPATH, "//label[@for='cterms']").click()

#Submit form 2
self.scroll_to_bottom()
self.driver.find_element(By.ID, "submitForm2").click()

#Wait for form to submit
WebDriverWait(self.driver, static.Driver.wait_timeout).until(EC.visibility_of_element_located((By.XPATH, "//h3[text()='Video Upload Complete!']")))

#Get link
#video_link = self.driver.find_element(By.XPATH, f"//a[text()='View \"{TITLE}\"']").get_attribute("href")
video_link = self.driver.find_element(By.ID, "direct").get_attribute("value")
self.actor.send_message("Clip uploaded to " + video_link)
print(f"Clip {filename} published.")

def scroll_to_bottom(self):
"""Scroll all the way to the bottom of the page"""
page = self.driver.find_element(By.TAG_NAME, "html")
page.send_keys(Keys.END)
#webdriver.ActionChains(self.driver).scroll_to_element(footer).perform()

def clip_upload_loop(self):
"""Keep uploading clips while actor is alive"""
while self.actor.keep_running:
Expand All @@ -218,21 +71,24 @@ def clip_upload_loop(self):

time.sleep(1)

#Shutdown the upload driver when the actor shuts down
self.driver.quit()

class Thanker(threading.Thread):
"""Thank followers and subscribers in the chat"""
def __init__(self, actor):
"""Pass the Rumble Chat Actor"""
def __init__(self, actor, **kwargs):
"""Pass the following:
actor: The Rumble Chat Actor instance
follower_message: Message to format with follower username
Defaults to static.Thank.DefaultMessages.follower
subscriber_message: Message to format with the subscriber username
Defaults to static.Thank.DefaultMessages.subscriber"""

super().__init__(daemon = True)
self.actor = actor
self.rum_api = self.actor.rum_api
assert self.rum_api, "Thanker cannot function if actor does not have Rumble API"

#Set up default messages
self.follower_message = static.Thank.DefaultMessages.follower
self.subscriber_message = static.Thank.DefaultMessages.subscriber
self.follower_message = kwargs.get("follower_message", static.Thank.DefaultMessages.follower)
self.subscriber_message = kwargs.get("subscriber_message", static.Thank.DefaultMessages.subscriber)

#Start the thread immediately
self.start()
Expand Down

0 comments on commit 8f93237

Please sign in to comment.