Skip to content

Commit

Permalink
nv timeline semaphores (tinygrad#4464)
Browse files Browse the repository at this point in the history
* nv timeline semaphores

* nv hcq fixes
  • Loading branch information
nimlgen authored May 7, 2024
1 parent e3bb85f commit a1d350a
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 96 deletions.
76 changes: 46 additions & 30 deletions test/external/external_test_hcq.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import unittest, ctypes, struct, time
import unittest, ctypes, struct, time, array
from tinygrad import Device, Tensor, dtypes
from tinygrad.helpers import to_mv
from tinygrad.buffer import Buffer, BufferOptions
from tinygrad.engine.schedule import create_schedule
from tinygrad.runtime.ops_amd import AMDDevice, HWCopyQueue, HWPM4Queue

def _time_queue(q, d):
st = time.perf_counter()
Expand All @@ -12,24 +12,39 @@ def _time_queue(q, d):
d.timeline_value += 1
return time.perf_counter() - st

@unittest.skipUnless(Device.DEFAULT in ["NV", "AMD"], "Runs only on NV or AMD")
class TestHCQ(unittest.TestCase):
@classmethod
def setUpClass(self):
TestHCQ.d0: AMDDevice = Device["AMD"]
TestHCQ.d0 = Device[Device.DEFAULT]
#TestHCQ.d1: AMDDevice = Device["AMD:1"]
TestHCQ.a = Tensor([0.,1.], device="AMD").realize()
TestHCQ.a = Tensor([0.,1.], device=Device.DEFAULT).realize()
TestHCQ.b = self.a + 1
si = create_schedule([self.b.lazydata])[-1]
TestHCQ.runner = TestHCQ.d0.get_runner(*si.ast)
TestHCQ.b.lazydata.buffer.allocate()
# wow that's a lot of abstraction layers
TestHCQ.addr = struct.pack("QQ", TestHCQ.b.lazydata.buffer._buf.va_addr, TestHCQ.a.lazydata.buffer._buf.va_addr)
TestHCQ.addr2 = struct.pack("QQ", TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr)
ctypes.memmove(TestHCQ.d0.kernargs_ptr, TestHCQ.addr, len(TestHCQ.addr))
ctypes.memmove(TestHCQ.d0.kernargs_ptr+len(TestHCQ.addr), TestHCQ.addr2, len(TestHCQ.addr2))
TestHCQ.compute_queue = HWPM4Queue
TestHCQ.kernargs_off = TestHCQ.runner.clprg.kernargs_offset
TestHCQ.kernargs_size = TestHCQ.runner.clprg.kernargs_segment_size
ctypes.memmove(TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_off, TestHCQ.addr, len(TestHCQ.addr))
ctypes.memmove(TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_size+TestHCQ.kernargs_off, TestHCQ.addr2, len(TestHCQ.addr2))

if Device.DEFAULT == "AMD":
from tinygrad.runtime.ops_amd import HWCopyQueue, HWPM4Queue
TestHCQ.compute_queue = HWPM4Queue
TestHCQ.copy_queue = HWCopyQueue
elif Device.DEFAULT == "NV":
from tinygrad.runtime.ops_nv import HWCopyQueue, HWComputeQueue
# nv need to copy constbuffer there as well
to_mv(TestHCQ.d0.kernargs_ptr, 0x160).cast('I')[:] = array.array('I', TestHCQ.runner.clprg.constbuffer_0)
to_mv(TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_size, 0x160).cast('I')[:] = array.array('I', TestHCQ.runner.clprg.constbuffer_0)
TestHCQ.compute_queue = HWComputeQueue
TestHCQ.copy_queue = HWCopyQueue

def setUp(self):
TestHCQ.d0.synchronize()
TestHCQ.a.lazydata.buffer.copyin(memoryview(bytearray(struct.pack("ff", 0, 1))))
TestHCQ.b.lazydata.buffer.copyin(memoryview(bytearray(struct.pack("ff", 0, 0))))
TestHCQ.d0.synchronize() # wait for copyins to complete
Expand All @@ -42,7 +57,7 @@ def test_run_1000_times_one_submit(self):
q.signal(temp_signal, temp_value + 1).wait(temp_signal, temp_value + 1)
temp_value += 1

q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr+len(TestHCQ.addr), TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_size, TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(temp_signal, temp_value + 1).wait(temp_signal, temp_value + 1)
temp_value += 1

Expand All @@ -57,10 +72,10 @@ def test_run_1000_times(self):
q = TestHCQ.compute_queue()
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(temp_signal, 2).wait(temp_signal, 2)
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr+len(TestHCQ.addr), TestHCQ.runner.global_size,
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_size, TestHCQ.runner.global_size,
TestHCQ.runner.local_size)
for _ in range(1000):
temp_signal.value = 1
TestHCQ.d0._set_signal(temp_signal, 1)
q.submit(TestHCQ.d0)
TestHCQ.compute_queue().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
Expand All @@ -72,7 +87,7 @@ def test_run_to_3(self):
q = TestHCQ.compute_queue()
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(temp_signal, 1).wait(temp_signal, 1)
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr+len(TestHCQ.addr), TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr+TestHCQ.kernargs_size, TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(temp_signal, 2).wait(temp_signal, 2)
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
Expand All @@ -86,17 +101,17 @@ def test_wait_signal(self):
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=50)
# clean up
temp_signal.value = 1
TestHCQ.d0._set_signal(temp_signal, 1)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=100)
TestHCQ.d0.timeline_value += 1

def test_wait_copy_signal(self):
temp_signal = TestHCQ.d0._get_signal(value=0)
HWCopyQueue().wait(temp_signal, value=1).signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.copy_queue().wait(temp_signal, value=1).signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
with self.assertRaises(RuntimeError):
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=50)
# clean up
temp_signal.value = 1
TestHCQ.d0._set_signal(temp_signal, 1)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value, timeout=100)
TestHCQ.d0.timeline_value += 1

Expand All @@ -110,7 +125,7 @@ def test_run_normal(self):

def test_submit_empty_queues(self):
TestHCQ.compute_queue().submit(TestHCQ.d0)
HWCopyQueue().submit(TestHCQ.d0)
TestHCQ.copy_queue().submit(TestHCQ.d0)

def test_signal_timeout(self):
with self.assertRaises(RuntimeError):
Expand All @@ -126,7 +141,7 @@ def test_signal(self):

def test_copy_signal(self):
new_timeline_value = TestHCQ.d0.timeline_value + 0xff
HWCopyQueue().signal(TestHCQ.d0.timeline_signal, new_timeline_value).submit(TestHCQ.d0)
TestHCQ.copy_queue().signal(TestHCQ.d0.timeline_signal, new_timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, new_timeline_value)
TestHCQ.d0.timeline_value = new_timeline_value + 1 # update to not break runtime

Expand All @@ -140,12 +155,12 @@ def test_run_signal(self):
assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[0]) == 1.0, f"got val {val}"

def test_copy_1000_times(self):
q = HWCopyQueue()
q = TestHCQ.copy_queue()
q.copy(TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr, 8)
q.copy(TestHCQ.b.lazydata.buffer._buf.va_addr, TestHCQ.a.lazydata.buffer._buf.va_addr, 8)
for _ in range(1000):
q.submit(TestHCQ.d0)
HWCopyQueue().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.copy_queue().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
TestHCQ.d0.timeline_value += 1
# confirm the signal didn't exceed the put value
Expand All @@ -154,7 +169,7 @@ def test_copy_1000_times(self):
assert (val:=TestHCQ.b.lazydata.buffer.as_buffer().cast("f")[1]) == 0.0, f"got val {val}"

def test_copy(self):
q = HWCopyQueue()
q = TestHCQ.copy_queue()
q.copy(TestHCQ.b.lazydata.buffer._buf.va_addr, TestHCQ.a.lazydata.buffer._buf.va_addr, 8)
q.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
q.submit(TestHCQ.d0)
Expand All @@ -165,9 +180,9 @@ def test_copy(self):
def test_copy_bandwidth(self):
# THEORY: the bandwidth is low here because it's only using one SDMA queue. I suspect it's more stable like this at least.
SZ = 2_000_000_000
a = Buffer("AMD", SZ, dtypes.uint8, options=BufferOptions(nolru=True)).allocate()
b = Buffer("AMD", SZ, dtypes.uint8, options=BufferOptions(nolru=True)).allocate()
q = HWCopyQueue()
a = Buffer(Device.DEFAULT, SZ, dtypes.uint8, options=BufferOptions(nolru=True)).allocate()
b = Buffer(Device.DEFAULT, SZ, dtypes.uint8, options=BufferOptions(nolru=True)).allocate()
q = TestHCQ.copy_queue()
q.copy(a._buf.va_addr, b._buf.va_addr, SZ)
et = _time_queue(q, TestHCQ.d0)
gb_s = (SZ/1e9)/et
Expand All @@ -176,10 +191,10 @@ def test_copy_bandwidth(self):

def test_cross_device_copy_bandwidth(self):
SZ = 2_000_000_000
b = Buffer("AMD:1", SZ, dtypes.uint8, options=BufferOptions(nolru=True)).allocate()
a = Buffer("AMD", SZ, dtypes.uint8, options=BufferOptions(nolru=True)).allocate()
b = Buffer(f"{Device.DEFAULT}:1", SZ, dtypes.uint8, options=BufferOptions(nolru=True)).allocate()
a = Buffer(Device.DEFAULT, SZ, dtypes.uint8, options=BufferOptions(nolru=True)).allocate()
TestHCQ.d0._gpu_map(b._buf)
q = HWCopyQueue()
q = TestHCQ.copy_queue()
q.copy(a._buf.va_addr, b._buf.va_addr, SZ)
et = _time_queue(q, TestHCQ.d0)
gb_s = (SZ/1e9)/et
Expand All @@ -188,9 +203,9 @@ def test_cross_device_copy_bandwidth(self):

def test_interleave_compute_and_copy(self):
q = TestHCQ.compute_queue()
qc = HWCopyQueue()
qc = TestHCQ.copy_queue()
q.exec(TestHCQ.runner.clprg, TestHCQ.d0.kernargs_ptr, TestHCQ.runner.global_size, TestHCQ.runner.local_size) # b = [1, 2]
q.signal(sig:=AMDDevice._get_signal(value=0), value=1)
q.signal(sig:=TestHCQ.d0._get_signal(value=0), value=1)
qc.wait(sig, value=1)
qc.copy(TestHCQ.a.lazydata.buffer._buf.va_addr, TestHCQ.b.lazydata.buffer._buf.va_addr, 8)
qc.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
Expand All @@ -202,10 +217,10 @@ def test_interleave_compute_and_copy(self):
assert (val:=TestHCQ.a.lazydata.buffer.as_buffer().cast("f")[0]) == 1.0, f"got val {val}"

def test_cross_device_signal(self):
d1 = Device["AMD:1"]
d1 = Device[f"{Device.DEFAULT}:1"]
q1 = TestHCQ.compute_queue()
q2 = TestHCQ.compute_queue()
q1.signal(sig:=AMDDevice._get_signal(value=0), value=0xfff)
q1.signal(sig:=TestHCQ.d0._get_signal(value=0), value=0xfff)
q2.wait(sig, value=0xfff)
q2.signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value)
q2.submit(TestHCQ.d0)
Expand All @@ -217,7 +232,8 @@ def test_cross_device_signal(self):
d1.timeline_value += 1

def test_timeline_signal_rollover(self):
TestHCQ.d0.timeline_value = (1 << 32) - 20 # close value to reset
# NV 64bit, AMD 32bit
TestHCQ.d0.timeline_value = (1 << 64) - 20 if Device.DEFAULT == "NV" else (1 << 32) - 20 # close value to reset
TestHCQ.compute_queue().signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1).submit(TestHCQ.d0)
TestHCQ.d0._wait_signal(TestHCQ.d0.timeline_signal, TestHCQ.d0.timeline_value - 1)

Expand Down
4 changes: 4 additions & 0 deletions tinygrad/runtime/ops_amd.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def __init__(self, device:AMDDevice, name:str, lib:bytes):
self.group_segment_size = lib_gpu_view.cast("I")[entry_point//4]
self.private_segment_size = lib_gpu_view.cast("I")[entry_point//4 + 1]
self.kernargs_segment_size = lib_gpu_view.cast("I")[entry_point//4 + 2]
self.kernargs_offset = 0
assert self.private_segment_size <= self.device.max_private_segment_size, \
f"{self.private_segment_size=} > {self.device.max_private_segment_size=}"

Expand Down Expand Up @@ -485,6 +486,9 @@ def _gpu_free(self, mem):
libc.munmap(mem.va_addr, mem.size)
kio.free_memory_of_gpu(self.kfd, handle=mem.handle)

@classmethod
def _set_signal(self, sig, value): sig.value = value

@classmethod
def _get_signal(self, num=None, sync_event=None, value=0) -> hsa.amd_signal_t:
if num is None:
Expand Down
Loading

0 comments on commit a1d350a

Please sign in to comment.