From e4a185847111915968b028b539ceaa1dab6ee38c Mon Sep 17 00:00:00 2001 From: George Hotz <72895+geohot@users.noreply.github.com> Date: Sat, 6 Apr 2024 08:58:18 -0700 Subject: [PATCH] revert command queue (#4097) --- openpilot/compile2.py | 3 +- tinygrad/engine/commandqueue.py | 113 -------------------------------- tinygrad/engine/realize.py | 44 +++++++++++-- tinygrad/renderer/assembly.py | 2 +- 4 files changed, 42 insertions(+), 120 deletions(-) delete mode 100644 tinygrad/engine/commandqueue.py diff --git a/openpilot/compile2.py b/openpilot/compile2.py index da3cde157837..a28a7d4175b4 100644 --- a/openpilot/compile2.py +++ b/openpilot/compile2.py @@ -15,7 +15,6 @@ from tinygrad.dtype import ImageDType from tinygrad.helpers import partition, Context, fetch, getenv, DEBUG from tinygrad.engine.realize import run_schedule -from tinygrad.engine.commandqueue import CommandQueue from tinygrad.engine.schedule import create_schedule from tinygrad.ops import LoadOps, ScheduleItem Device.DEFAULT = "GPU" @@ -89,7 +88,7 @@ def test_vs_onnx(onnx_data, schedule:Optional[List[ScheduleItem]], inputs:Dict[s # run code (all buffers have been allocated) GlobalCounters.reset() output = schedule[-1].outputs[0] - CommandQueue(schedule)() + run_schedule(schedule) new_tinygrad_out = np.frombuffer(output.as_buffer(), dtype=output.dtype.np) np.testing.assert_allclose(new_torch_out.reshape(new_tinygrad_out.shape), new_tinygrad_out, atol=1e-4, rtol=1e-2) diff --git a/tinygrad/engine/commandqueue.py b/tinygrad/engine/commandqueue.py deleted file mode 100644 index b3e6d89411ef..000000000000 --- a/tinygrad/engine/commandqueue.py +++ /dev/null @@ -1,113 +0,0 @@ -# NOTE: this will replace jit.py, realize.py, and a lot of the boilerplate in each graph executor -from __future__ import annotations -from typing import List, Dict, Union, DefaultDict -from collections import defaultdict -from dataclasses import dataclass -from tinygrad.helpers import colored, cpu_time_execution, DEBUG -from tinygrad.ops import ScheduleItem, LoadOps, BufferOps -from tinygrad.shape.symbolic import Variable -from tinygrad.device import Buffer, JITRunner, Device, BufferXfer, BufferCopy, update_stats - -class CustomOp(JITRunner): - def __init__(self, fxn): - self.fxn = fxn - super().__init__() - def __call__(self, rawbufs:List[Buffer], var_vals:Dict[Variable, int], wait=False, jit=False): self.fxn(*rawbufs) - -# NOTE: two syncitems aren't the same if they are in different places in the queue -@dataclass(eq=False) -class SyncItem: - device: str - waiters: int = 0 - def __repr__(self): return f"SyncItem({self.device}, waiters={self.waiters}, {id(self)})" - -@dataclass(frozen=True) -class WaitItem: - sync: SyncItem - -@dataclass(frozen=True) -class CopyItem: - output: Buffer - input: Buffer - -# this will interface with HWCommandQueue to replace Graph -class CommandQueue: - def __init__(self, schedule:List[ScheduleItem]): - self.q: DefaultDict[str, List[Union[ScheduleItem, CopyItem, SyncItem, WaitItem]]] = defaultdict(list) - - def add_sync_item(device:str): - if not len(self.q[device]) or not isinstance(sync_item:=self.q[device][-1], SyncItem): - sync_item = SyncItem(device) - self.q[device].append(sync_item) - return sync_item - - def add_wait_item(device:str, syncitem:SyncItem): - # if you are adding this right after a first sync, delete this one - if len(self.q[device]) and isinstance(wi:=self.q[device][-1], WaitItem) and wi.sync.device == syncitem.device: - self.q[device] = self.q[device][:-1] - wi.sync.waiters -= 1 - if wi.sync.waiters == 0: self.q[wi.sync.device].remove(wi.sync) - if (wi:=WaitItem(syncitem)) not in self.q[device]: - syncitem.waiters += 1 - self.q[device].append(wi) - - while len(schedule): - si = schedule.pop(0) - assert len(set(x.device for x in si.outputs+si.inputs)) == 1 or (si.ast[0].op is LoadOps.COPY and len(si.outputs) == 1) - queue = self.q[si.outputs[0].device] - - if si.ast[0].op is LoadOps.COPY: - # TODO: add back copy device - copy_device = si.outputs[0].device #+"-copy" - add_wait_item(copy_device, add_sync_item(si.inputs[0].device)) - self.q[copy_device].append(CopyItem(si.outputs[0], si.inputs[0])) - #add_wait_item(si.outputs[0].device, add_sync_item(copy_device)) - continue - - # NOTE: LoadOps.EMPTY and LoadOps.CUSTOM are making it here - queue.append(si) - - def __call__(self): - active_queues = list(self.q.keys()) - waiting_queues: DefaultDict[SyncItem, List[str]] = defaultdict(list) - seen_sids = set() - while len(active_queues): - device = active_queues.pop(0) - if not len(self.q[device]): continue - si = self.q[device].pop(0) - #print(device, si, active_queues, seen_sids) - if isinstance(si, SyncItem): - # don't sync if there's other options - if all(isinstance(self.q[x][0], SyncItem) for x in active_queues if len(self.q[x])): - et = cpu_time_execution(Device[device].synchronize, enable=DEBUG>=2) - update_stats(colored("synchronize", "RED"), 0, 0, {}, et, 1, device=device) - if si in waiting_queues: - active_queues += waiting_queues[si] - waiting_queues[si].clear() - seen_sids.add(si) - else: - # put it back - self.q[device] = [si] + self.q[device] - elif isinstance(si, WaitItem): - if si.sync not in seen_sids: - waiting_queues[si.sync].append(device) - continue - elif isinstance(si, CopyItem): - si.output.allocate() - fxn = BufferXfer() if hasattr(Device[si.output.device].allocator, 'transfer') and \ - si.output.device.split(":")[0] == si.input.device.split(":")[0] else BufferCopy() - fxn.exec([si.output, si.input]) - elif isinstance(si, ScheduleItem): - for out in si.outputs: - if not hasattr(out, "_buf") and not (out.device.startswith("DISK") and si.ast[0].op is BufferOps.STORE): out.allocate() - if si.ast[0].op is not LoadOps.EMPTY: - if si.ast[0].op is LoadOps.CUSTOM: - runner:JITRunner = CustomOp(si.ast[0].arg) - elif si.ast[0].op is BufferOps.STORE: - runner = Device[si.outputs[0].device].get_runner(*si.ast) - else: raise RuntimeError(f"unknown type {si}") - runner.exec(list(si.outputs+si.inputs), si.var_vals) - else: - update_stats(colored(f"empty {si.outputs[0].size:10d} {si.outputs[0].dtype}", "yellow"), 0, 0, {}, None, 1, device=si.outputs[0].device) - else: raise RuntimeError(f"unknown type {si}") - active_queues.append(device) diff --git a/tinygrad/engine/realize.py b/tinygrad/engine/realize.py index 5bd70dfea681..26e92bce0ac0 100644 --- a/tinygrad/engine/realize.py +++ b/tinygrad/engine/realize.py @@ -1,5 +1,41 @@ -from typing import List -from tinygrad.ops import ScheduleItem -from tinygrad.engine.commandqueue import CommandQueue +from typing import List, Dict, Optional +from tinygrad.helpers import getenv, colored +from tinygrad.ops import ScheduleItem, BufferOps, LoadOps +from tinygrad.device import JITRunner, Device, BufferCopy, BufferXfer, update_stats +from tinygrad.buffer import Buffer +from tinygrad.shape.symbolic import Variable -def run_schedule(schedule:List[ScheduleItem]): CommandQueue(schedule)() +class CustomOp(JITRunner): + def __init__(self, fxn): + self.fxn = fxn + super().__init__() + def __call__(self, rawbufs:List[Buffer], var_vals:Dict[Variable, int], wait=False, jit=False): self.fxn(*rawbufs) + +def lower_schedule_item(si:ScheduleItem) -> Optional[JITRunner]: + assert len(set(x.device for x in si.outputs+si.inputs)) == 1 or si.ast[0].op is LoadOps.COPY + if si.ast[0].op is BufferOps.STORE: return Device[si.outputs[0].device].get_runner(*si.ast) + assert len(si.ast) == 1 and len(si.outputs) == 1, "only ASTRunner supports multioutput" + out, ast = si.outputs[0], si.ast[0] + if ast.op is LoadOps.COPY: + if hasattr(Device[out.device].allocator, 'transfer') and out.device.split(":")[0] == si.inputs[0].device.split(":")[0]: return BufferXfer() + return BufferCopy() + if ast.op is LoadOps.CUSTOM: return CustomOp(ast.arg) + return None + +logops = open(getenv("LOGOPS", ""), "a") if getenv("LOGOPS", "") else None +def run_schedule(schedule:List[ScheduleItem]): + while len(schedule): + si = schedule.pop(0) + if logops and si.ast[0].op not in LoadOps and not any(i.device.startswith("DISK:") for i in si.inputs): logops.write(str(si.ast)+"\n") + + # get the program + prg = lower_schedule_item(si) + + for out in si.outputs: + # we don't have an output buffer, we have to create it, and create to max size if it has symbolic shape + if out.size > 0 and not (out.device.startswith("DISK") and si.ast[0].op is BufferOps.STORE) and not hasattr(out, "_buf"): out.allocate() + + # run the function (put it in JIT) + real_buffers = [x for x in si.outputs+si.inputs if x.size != 0] + if prg: prg.exec(real_buffers, si.var_vals) + elif (out:=si.outputs[0]).size > 0: update_stats(colored(f"empty {out.size:10d} {out.dtype}", "yellow"), 0, 0, {}, None, 1, device=out.device) diff --git a/tinygrad/renderer/assembly.py b/tinygrad/renderer/assembly.py index 475a060e8baa..0835cc2c0ddb 100644 --- a/tinygrad/renderer/assembly.py +++ b/tinygrad/renderer/assembly.py @@ -70,7 +70,7 @@ def uops_to_asm(lang:AssemblyLanguage, function_name:str, uops:UOpGraph) -> str: lambda root,x,y: UOp(root.uop, root.dtype, (UOp(UOps.ALU, dtypes.bool, (x,), UnaryOps.NEG), y), BinaryOps.MUL)), ({"__name__": "root", "uop": UOps.ALU, "arg": BinaryOps.ADD, "dtype": set([dtypes.float16, dtypes.bfloat16, dtypes.float32, dtypes.float64]), "vin": [{"__name__": "non_muls"}, {"__name__": "muls", "uop": UOps.ALU, "arg": BinaryOps.MUL}]}, - lambda root, muls, non_muls : UOp(UOps.ALU, root.dtype, muls.vin + (non_muls,), TernaryOps.MULACC)), + lambda root, muls, non_muls: UOp(UOps.ALU, root.dtype, muls.vin + (non_muls,), TernaryOps.MULACC)), *[({"__name__": "x", "uop": UOps.ALU, "dtype": dtypes.half, "arg": op}, lambda x: UOp(UOps.CAST, dtypes.half, (UOp(x.uop, dtypes.float32, tuple([UOp(UOps.CAST, dtypes.float32, (vv,)) for vv in x.vin]), x.arg),))) for op in lang.asm_for_op.keys() if op not in lang.supports_half],