Skip to content

Commit

Permalink
Revert rclone (path fixed)
Browse files Browse the repository at this point in the history
Signed-off-by: anasty17 <[email protected]>
  • Loading branch information
anasty17 committed Jan 4, 2024
1 parent d3f5931 commit 007740b
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 59 deletions.
24 changes: 21 additions & 3 deletions bot/helper/mirror_utils/download_utils/rclone_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,27 @@ async def add_rclone_download(listener, path):
remote, listener.link = listener.link.split(":", 1)
listener.link = listener.link.strip("/")

cmd1 = f"rclone lsjson --fast-list --stat --no-mimetype --no-modtime --config {config_path} '{remote}:{listener.link}'"
cmd2 = f"rclone size --fast-list --json --config {config_path} '{remote}:{listener.link}'"
res1, res2 = await gather(cmd_exec(cmd1, shell=True), cmd_exec(cmd2, shell=True))
cmd1 = [
"rclone",
"lsjson",
"--fast-list",
"--stat",
"--no-mimetype",
"--no-modtime",
"--config",
config_path,
f"{remote}:{listener.link}",
]
cmd2 = [
"rclone",
"size",
"--fast-list",
"--json",
"--config",
config_path,
f"{remote}:{listener.link}",
]
res1, res2 = await gather(cmd_exec(cmd1), cmd_exec(cmd2))
if res1[2] != res2[2] != 0:
if res1[2] != -9:
err = res1[1] or res2[1]
Expand Down
4 changes: 2 additions & 2 deletions bot/helper/mirror_utils/rclone_utils/list.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,10 @@ async def get_path(self, itype=""):
self.item_type == itype
elif self.list_status == "rcu":
self.item_type == "--dirs-only"
cmd = f"rclone lsjson {self.item_type} --fast-list --no-mimetype --no-modtime --config {self.config_path} '{self.remote}{self.path}'"
cmd = ["rclone", "lsjson", self.item_type, "--fast-list", "--no-mimetype", "--no-modtime", "--config", self.config_path, f"{self.remote}{self.path}"]
if self.is_cancelled:
return
res, err, code = await cmd_exec(cmd, shell=True)
res, err, code = await cmd_exec(cmd)
if code not in [0, -9]:
LOGGER.error(
f"While rclone listing. Path: {self.remote}{self.path}. Stderr: {err}"
Expand Down
25 changes: 20 additions & 5 deletions bot/helper/mirror_utils/rclone_utils/serve.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from asyncio import create_subprocess_shell
from asyncio import create_subprocess_exec
from aiofiles.os import path as aiopath
from aiofiles import open as aiopen
from configparser import ConfigParser
Expand Down Expand Up @@ -34,11 +34,26 @@ async def rclone_serve_booter():
RcloneServe.clear()
except:
pass
cmd = f"rclone serve http --config rclone.conf --no-modtime combine: --addr :{config_dict['RCLONE_SERVE_PORT']}"
cmd += " --vfs-cache-mode full --vfs-cache-max-age 1m0s --buffer-size 64M"
cmd = [
"rclone",
"serve",
"http",
"--config",
"rclone.conf",
"--no-modtime",
"combine:",
"--addr",
f":{config_dict['RCLONE_SERVE_PORT']}",
"--vfs-cache-mode",
"full",
"--vfs-cache-max-age",
"1m0s",
"--buffer-size",
"64M",
]
if (user := config_dict["RCLONE_SERVE_USER"]) and (
pswd := config_dict["RCLONE_SERVE_PASS"]
):
cmd += f' --user "{user}" --pass "{pswd}"'
rcs = await create_subprocess_shell(cmd)
cmd.extend(("--user", user, "--pass", pswd))
rcs = await create_subprocess_exec(*cmd)
RcloneServe.append(rcs)
98 changes: 63 additions & 35 deletions bot/helper/mirror_utils/rclone_utils/transfer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from asyncio import create_subprocess_shell, gather
from asyncio import create_subprocess_exec, gather
from asyncio.subprocess import PIPE
from re import findall as re_findall
from json import loads
Expand Down Expand Up @@ -106,8 +106,8 @@ async def _create_rc_sa(self, remote, remote_opts):
await f.write(text)
return sa_conf_file

async def _start_download(self, cmd, remote_type, spath):
self._proc = await create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
async def _start_download(self, cmd, remote_type):
self._proc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE)
_, return_code = await gather(self._progress(), self._proc.wait())

if self._is_cancelled:
Expand All @@ -133,11 +133,10 @@ async def _start_download(self, cmd, remote_type, spath):
):
if self._sa_count < self._sa_number:
remote = self._switchServiceAccount()
npath = f"{remote}:{spath.split(':', 1)[1]}"
cmd.replace(spath, npath)
cmd[6] = f"{remote}:{cmd[6].split(':', 1)[1]}"
if self._is_cancelled:
return
return await self._start_download(cmd, remote_type, npath)
return await self._start_download(cmd, remote_type)
else:
LOGGER.info(
f"Reached maximum number of service accounts switching, which is {self._sa_count}"
Expand Down Expand Up @@ -169,19 +168,20 @@ async def download(self, remote, config_path, path):
remote = f"sa{self._sa_index:03}"
LOGGER.info(f"Download with service account {remote}")

spath = f"{remote}:{self._listener.link}"
cmd = self._getUpdatedCommand(config_path, spath, path, "copy")
cmd = self._getUpdatedCommand(
config_path, f"{remote}:{self._listener.link}", path, "copy"
)

if (
remote_type == "drive"
and not config_dict["RCLONE_FLAGS"]
and not self._listener.rcFlags
):
cmd += " --drive-acknowledge-abuse"
cmd.append("--drive-acknowledge-abuse")
elif remote_type != "drive":
cmd += " --retries-sleep 3s"
cmd.extend(("--retries-sleep", "3s"))

await self._start_download(cmd, remote_type, spath)
await self._start_download(cmd, remote_type)

async def _get_gdrive_link(self, config_path, remote, rc_path, mime_type):
if mime_type == "Folder":
Expand All @@ -195,8 +195,17 @@ async def _get_gdrive_link(self, config_path, remote, rc_path, mime_type):
epath = f"{remote}:{rc_path}{self._listener.name}"
destination = epath

cmd = f"rclone lsjson --fast-list --no-mimetype --no-modtime --config {config_path} '{epath}'"
res, err, code = await cmd_exec(cmd, shell=True)
cmd = [
"rclone",
"lsjson",
"--fast-list",
"--no-mimetype",
"--no-modtime",
"--config",
config_path,
epath,
]
res, err, code = await cmd_exec(cmd)

if code == 0:
result = loads(res)
Expand All @@ -215,8 +224,8 @@ async def _get_gdrive_link(self, config_path, remote, rc_path, mime_type):
link = ""
return link, destination

async def _start_upload(self, cmd, remote_type, spath):
self._proc = await create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
async def _start_upload(self, cmd, remote_type):
self._proc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE)
_, return_code = await gather(self._progress(), self._proc.wait())

if self._is_cancelled:
Expand All @@ -241,12 +250,11 @@ async def _start_upload(self, cmd, remote_type, spath):
):
if self._sa_count < self._sa_number:
remote = self._switchServiceAccount()
npath = f"{remote}:{spath.split(':', 1)[1]}"
cmd.replace(spath, npath)
cmd[7] = f"{remote}:{cmd[7].split(':', 1)[1]}"
return (
False
if self._is_cancelled
else await self._start_upload(cmd, remote_type, npath)
else await self._start_upload(cmd, remote_type)
)
else:
LOGGER.info(
Expand Down Expand Up @@ -309,8 +317,9 @@ async def upload(self, path, size):
LOGGER.info(f"Upload with service account {fremote}")

method = "move" if not self._listener.seed or self._listener.newDir else "copy"
spath = f"{fremote}:{rc_path}"
cmd = self._getUpdatedCommand(fconfig_path, path, spath, method)
cmd = self._getUpdatedCommand(
fconfig_path, path, f"{fremote}:{rc_path}", method
)
if (
remote_type == "drive"
and not config_dict["RCLONE_FLAGS"]
Expand All @@ -320,7 +329,7 @@ async def upload(self, path, size):
elif remote_type != "drive":
cmd += " --retries-sleep 3s"

result = await self._start_upload(cmd, remote_type, spath)
result = await self._start_upload(cmd, remote_type)
if not result:
return

Expand All @@ -336,8 +345,8 @@ async def upload(self, path, size):
else:
destination = f"{oremote}:{self._listener.name}"

cmd = f'rclone link --config {oconfig_path} "{destination}"'
res, err, code = await cmd_exec(cmd, shell=True)
cmd = ["rclone", "link", "--config", oconfig_path, destination]
res, err, code = await cmd_exec(cmd)

if code == 0:
link = res
Expand Down Expand Up @@ -374,13 +383,15 @@ async def clone(self, config_path, src_remote, src_path, mime_type):
)
if not self._listener.rcFlags and not config_dict["RCLONE_FLAGS"]:
if src_remote_type == "drive" and dst_remote_type != "drive":
cmd += " --drive-acknowledge-abuse"
cmd.append("--drive-acknowledge-abuse")
elif dst_remote_type == "drive" and src_remote_type != "drive":
cmd += " --drive-chunk-size 64M --drive-upload-cutoff 32M"
cmd.extend(
("--drive-chunk-size", "64M", "--drive-upload-cutoff", "32M")
)
elif src_remote_type == "drive":
cmd += " --tpslimit 3 --transfers 3"
cmd.extend(("--tpslimit", "3", "--transfers", "3"))

self._proc = await create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE)
self._proc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE)
_, return_code = await gather(self._progress(), self._proc.wait())

if self._is_cancelled:
Expand All @@ -405,8 +416,8 @@ async def clone(self, config_path, src_remote, src_path, mime_type):
f"/{self._listener.name}" if dst_path else self._listener.name
)

cmd = f"rclone link --config {config_path} '{destination}'"
res, err, code = await cmd_exec(cmd, shell=True)
cmd = ["rclone", "link", "--config", config_path, destination]
res, err, code = await cmd_exec(cmd)

if self._is_cancelled:
return None, None
Expand All @@ -417,21 +428,38 @@ async def clone(self, config_path, src_remote, src_path, mime_type):
LOGGER.error(
f"while getting link. Path: {destination} | Stderr: {err}"
)
await self._listener.onUploadError(err[:4000])
return None, None
return None, destination

def _getUpdatedCommand(self, config_path, source, destination, method):
ext = "*.{" + ",".join(self._listener.extension_filter) + "}"
cmd = f"rclone {method} --fast-list --config {config_path} -P '{source}' '{destination}' --exclude '{ext}'"
cmd += " --ignore-case --low-level-retries 1 -M --log-file rlog.txt --log-level DEBUG"
cmd = [
"rclone",
method,
"--fast-list",
"--config",
config_path,
"-P",
source,
destination,
"--exclude",
ext,
"--ignore-case",
"--low-level-retries",
"1",
"-M",
"--log-file",
"rlog.txt",
"--log-level",
"DEBUG",
]
if rcflags := self._listener.rcFlags or config_dict["RCLONE_FLAGS"]:
rcflags = rcflags.split("|")
for flag in rcflags:
if ":" in flag:
key, value = map(str.strip, flag.split(":", 1))
cmd += f' {key} "{value}"'
cmd.extend((key, value))
elif len(flag) > 0:
cmd += f" {flag.strip()}"
cmd.append(flag.strip())
return cmd

@staticmethod
Expand Down
60 changes: 46 additions & 14 deletions bot/modules/clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,17 @@ async def _proceedToClone(self):
remote, src_path = self.link.split(":", 1)
src_path = src_path.strip("/")

cmd = f'rclone lsjson --fast-list --stat --no-modtime --config {config_path} "{remote}:{src_path}"'
res = await cmd_exec(cmd, shell=True)
cmd = [
"rclone",
"lsjson",
"--fast-list",
"--stat",
"--no-modtime",
"--config",
config_path,
f"{remote}:{src_path}",
]
res = await cmd_exec(cmd)
if res[2] != 0:
if res[2] != -9:
msg = f"Error: While getting rclone stat. Path: {remote}:{src_path}. Stderr: {res[1][:4000]}"
Expand All @@ -183,9 +192,8 @@ async def _proceedToClone(self):
rstat = loads(res[0])
if rstat["IsDir"]:
self.name = src_path.rsplit("/", 1)[-1] if src_path else remote
self.upDest += (
self.name if self.upDest.endswith(":") else f"/{self.name}"
)
self.upDest += self.name if self.upDest.endswith(":") else f"/{self.name}"

mime_type = "Folder"
else:
self.name = src_path.rsplit("/", 1)[-1]
Expand All @@ -205,18 +213,42 @@ async def _proceedToClone(self):
flink, destination = await RCTransfer.clone(
config_path, remote, src_path, mime_type
)
if not flink:
if not destination:
return
LOGGER.info(f"Cloning Done: {self.name}")
cmd1 = f'rclone lsf --fast-list -R --files-only --config {config_path} "{destination}"'
cmd2 = f'rclone lsf --fast-list -R --dirs-only --config {config_path} "{destination}"'
cmd3 = (
f'rclone size --fast-list --json --config {config_path} "{destination}"'
)
cmd1 = [
"rclone",
"lsf",
"--fast-list",
"-R",
"--files-only",
"--config",
config_path,
destination,
]
cmd2 = [
"rclone",
"lsf",
"--fast-list",
"-R",
"--dirs-only",
"--config",
config_path,
destination,
]
cmd3 = [
"rclone",
"size",
"--fast-list",
"--json",
"--config",
config_path,
destination,
]
res1, res2, res3 = await gather(
cmd_exec(cmd1, shell=True),
cmd_exec(cmd2, shell=True),
cmd_exec(cmd3, shell=True),
cmd_exec(cmd1),
cmd_exec(cmd2),
cmd_exec(cmd3),
)
if res1[2] != res2[2] != res3[2] != 0:
if res1[2] == -9:
Expand Down

0 comments on commit 007740b

Please sign in to comment.