Skip to content

Commit

Permalink
fix u2c --ow (overwrite/replace)
Browse files Browse the repository at this point in the history
the u2c flag to overwrite files on the server became no-op in v1.13.8
  • Loading branch information
9001 committed Sep 9, 2024
1 parent 2fac2be commit 6eee601
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 23 deletions.
19 changes: 17 additions & 2 deletions copyparty/up2k.py
Original file line number Diff line number Diff line change
Expand Up @@ -2846,7 +2846,7 @@ def _handle_json(self, cj: dict[str, Any], depth: int = 1) -> dict[str, Any]:
self.log(t)
del reg[wark]

elif inc_ap != orig_ap and not data_ok:
elif inc_ap != orig_ap and not data_ok and "done" in reg[wark]:
self.log("asserting contents of %s" % (orig_ap,))
dhashes, _ = self._hashlist_from_file(orig_ap)
dwark = up2k_wark_from_hashlist(self.salt, st.st_size, dhashes)
Expand Down Expand Up @@ -3107,7 +3107,22 @@ def _untaken(self, fdir: str, job: dict[str, Any], ts: float) -> str:
fp = djoin(fdir, fname)
if job.get("replace") and bos.path.exists(fp):
self.log("replacing existing file at {}".format(fp))
wunlink(self.log, fp, self.flags.get(job["ptop"]) or {})
cur = None
ptop = job["ptop"]
vf = self.flags.get(ptop) or {}
st = bos.stat(fp)
try:
vrel = vjoin(job["prel"], fname)
xlink = bool(vf.get("xlink"))
cur, wark, _, _, _, _ = self._find_from_vpath(ptop, vrel)
self._forget_file(ptop, vrel, cur, wark, True, st.st_size, xlink)
except Exception as ex:
self.log("skipping replace-relink: %r" % (ex,))
finally:
if cur:
cur.connection.commit()

wunlink(self.log, fp, vf)

if self.args.plain_ip:
dip = ip.replace(":", ".")
Expand Down
95 changes: 74 additions & 21 deletions tests/test_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ class TestDedup(unittest.TestCase):
def setUp(self):
self.td = tu.get_ramdisk()

# (data, chash, wark)
self.files = [
(
"one",
"BfcDQQeKz2oG1CPSFyD5ZD1flTYm2IoCY23DqeeVgq6w",
"XMbpLRqVdtGmgggqjUI6uSoNMTqZVX4K6zr74XA1BRKc",
),
(
"two",
"ko1Q0eJNq3zKYs_oT83Pn8aVFgonj5G1wK8itwnYL4qj",
"fxvihWlnQIbVbUPr--TxyV41913kPLhXPD1ngXYxDfou",
),
]

def tearDown(self):
os.chdir(tempfile.gettempdir())
shutil.rmtree(self.td)
Expand All @@ -40,25 +54,54 @@ def cinit(self):
if self.fstab:
self.conn.hsrv.hub.up2k.fstab = self.fstab

def test_a(self):
file404 = "\nJ2EOT"
f1, f2 = self.files
fns = ("f1", "f2", "f3")
dn = "d"

self.conn = None
self.fstab = None
for e2d in [True, False]:
self.args = Cfg(v=[".::A"], a=[], e2d=e2d)
self.reset()
self.cinit()

# dupes in parallel
sfn, hs = self.do_post_hs(dn, fns[0], f1, True)
for fn in fns[1:]:
h, b = self.handshake(dn, fn, f1)
self.assertIn(" 422 Unpro", h)
self.assertIn("a different location;", b)
self.do_post_data(dn, fns[0], f1, True, sfn, hs)
if not e2d:
# dupesched is e2d only; hs into existence
for fn, data in zip(fns, (f1[0], file404, file404)):
h, b = self.curl("%s/%s" % ("d", fn))
self.assertEqual(b, data)
for fn in fns[1:]:
h, b = self.do_post_hs(dn, fn, f1, False)
for fn in fns:
h, b = self.curl("%s/%s" % ("d", fn))
self.assertEqual(b, f1[0])

if not e2d:
continue

# overwrite file
sfn, hs = self.do_post_hs(dn, fns[0], f2, True, replace=True)
self.do_post_data(dn, fns[0], f2, True, sfn, hs)
for fn, f in zip(fns, (f2, f1, f1)):
h, b = self.curl("%s/%s" % ("d", fn))
self.assertEqual(b, f[0])

def test(self):
quick = True # sufficient for regular smoketests
# quick = False

dirnames = ["d1", "d2"]
filenames = ["f1", "f2"]
files = [
(
"one",
"BfcDQQeKz2oG1CPSFyD5ZD1flTYm2IoCY23DqeeVgq6w",
"XMbpLRqVdtGmgggqjUI6uSoNMTqZVX4K6zr74XA1BRKc",
),
(
"two",
"ko1Q0eJNq3zKYs_oT83Pn8aVFgonj5G1wK8itwnYL4qj",
"fxvihWlnQIbVbUPr--TxyV41913kPLhXPD1ngXYxDfou",
),
]
# (data, chash, wark)
files = self.files

self.ctr = 336 if quick else 2016 # estimated total num uploads
self.conn = None
Expand Down Expand Up @@ -127,10 +170,13 @@ def do_tc(self, cm1, cm2, cm3, irm):
def do_post(self, dn, fn, fi, first):
print("\n\n# do_post", self.ctr, repr((dn, fn, fi, first)))
self.ctr -= 1
sfn, hs = self.do_post_hs(dn, fn, fi, first)
return self.do_post_data(dn, fn, fi, first, sfn, hs)

data, chash, wark = fi
hs = self.handshake(dn, fn, fi)
self.assertEqual(hs["wark"], wark)
def do_post_hs(self, dn, fn, fi, first, replace=False):
h, b = self.handshake(dn, fn, fi, replace=replace)
hs = json.loads(b)
self.assertEqual(hs["wark"], fi[2])

sfn = hs["name"]
if sfn == fn:
Expand All @@ -140,6 +186,10 @@ def do_post(self, dn, fn, fi, first):
if first:
raise Exception("wait what")

return sfn, hs

def do_post_data(self, dn, fn, fi, first, sfn, hs):
data, chash, wark = fi
if hs["hash"]:
self.assertEqual(hs["hash"][0], chash)
self.put_chunk(dn, wark, chash, data)
Expand All @@ -150,16 +200,18 @@ def do_post(self, dn, fn, fi, first):
self.assertEqual(b, data)
return sfn

def handshake(self, dn, fn, fi):
def handshake(self, dn, fn, fi, replace=False):
hdr = "POST /%s/ HTTP/1.1\r\nConnection: close\r\nContent-Type: text/plain\r\nContent-Length: %d\r\n\r\n"
msg = {"name": fn, "size": 3, "lmod": 1234567890, "life": 0, "hash": [fi[1]]}
if replace:
msg["replace"] = True
buf = json.dumps(msg).encode("utf-8")
buf = (hdr % (dn, len(buf))).encode("utf-8") + buf
print("HS -->", buf)
# print("HS -->", buf)
HttpCli(self.conn.setbuf(buf)).run()
ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
print("HS <--", ret)
return json.loads(ret[1])
# print("HS <--", ret)
return ret

def put_chunk(self, dn, wark, chash, data):
msg = [
Expand All @@ -173,14 +225,15 @@ def put_chunk(self, dn, wark, chash, data):
data,
]
buf = "\r\n".join(msg).encode("utf-8")
print("PUT -->", buf)
# print("PUT -->", buf)
HttpCli(self.conn.setbuf(buf)).run()
ret = self.conn.s._reply.decode("utf-8").split("\r\n\r\n", 1)
self.assertEqual(ret[1], "thank")

def curl(self, url, binary=False, meth=None):
h = "%s /%s HTTP/1.1\r\nConnection: close\r\n\r\n"
h = h % (meth or "GET", url)
# print("CURL -->", url)
HttpCli(self.conn.setbuf(h.encode("utf-8"))).run()
if binary:
h, b = self.conn.s._reply.split(b"\r\n\r\n", 1)
Expand Down

0 comments on commit 6eee601

Please sign in to comment.