Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix tensor memory #421

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions python/test/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import pytest
import tritonserver

# import objgraph


try:
import cupy
except ImportError:
Expand Down Expand Up @@ -272,6 +275,10 @@ def test_allocate_on_gpu_and_reshape(self):
self.assertEqual(torch_fp32_tensor.nbytes, 200)


import gc
from collections import Counter


class TensorTests(unittest.TestCase):
@pytest.mark.skipif(cupy is None, reason="Skipping gpu memory, cupy not installed")
def test_cpu_to_gpu(self):
Expand Down Expand Up @@ -315,6 +322,88 @@ def test_tensor_from_numpy(self):
numpy.testing.assert_array_equal(torch_tensor.numpy(), cpu_array)
self.assertEqual(torch_tensor.data_ptr(), cpu_array.ctypes.data)

def test_cpu_memory_leak(self):
gc.collect()
objects_before = gc.get_objects()
for index in range(30):
tensor = numpy.ones(2**27)
dl_pack_tensor = tritonserver.Tensor.from_dlpack(tensor)
array = numpy.from_dlpack(dl_pack_tensor)
# print(index, index*torch.numel(tensor)*tensor.element_size())
del array
del dl_pack_tensor
del tensor
print(index)

# NOTE: if gc collect is called here
# no tensors are leaked - indicating a circular reference
# gc.collect()

# Note:
# Originally gc.collect() had no effect on memory reclaiming
# with the changes in the PR - uncommenting this line
# forces all tensors to be reclaimed and test passes
# This shouldn't be needed

# gc.collect()
objects_after = gc.get_objects()
print(len(objects_after) - len(objects_before))
new_objects = [type(x) for x in objects_after[len(objects_before) :]]
tensor_objects = [
x for x in objects_after if isinstance(x, tritonserver.Tensor)
]
if tensor_objects:
print("Tensor objects")
print(len(tensor_objects))
print(type(tensor_objects[-1].memory_buffer.owner))

# chain = objgraph.find_backref_chain(
# tensor_objects[-1], objgraph.is_proper_module
# )
# print(len(chain))
# print(chain)
print(Counter(new_objects))

assert len(tensor_objects) == 0, "Leaked Objects"

def test_gpu_memory_leak(self):
gc.collect()
objects_before = gc.get_objects()
for index in range(50):
tensor = cupy.ones(2**27)
dl_pack_tensor = tritonserver.Tensor.from_dlpack(tensor)
array = cupy.from_dlpack(dl_pack_tensor)
# print(index, index*torch.numel(tensor)*tensor.element_size())
del array
del dl_pack_tensor
del tensor
print(index)

# NOTE: if gc collect is called here
# no tensors are leaked - indicating a circular reference
# gc.collect()

# Note:
# Originally gc.collect() had no effect on memory reclaiming
# with the changes in the PR - uncommenting this line
# forces all tensors to be reclaimed and test passes
# This shouldn't be needed

# gc.collect()
# gc.collect()
objects_after = gc.get_objects()
print(len(objects_after) - len(objects_before))
new_objects = [type(x) for x in objects_after[len(objects_before) :]]
tensor_objects = [
x for x in objects_after if isinstance(x, tritonserver.Tensor)
]
if tensor_objects:
print(type(tensor_objects[-1].memory_buffer.owner))

print(Counter(new_objects))

assert len(tensor_objects) == 0, "Leaked Objects"


class ServerTests(unittest.TestCase):
def setup_method(self, method):
Expand Down
73 changes: 64 additions & 9 deletions python/tritonserver/_api/_tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@
from tritonserver._c.triton_bindings import TRITONSERVER_MemoryType as MemoryType
from tritonserver._c.triton_bindings import UnsupportedError

# import objgraph


DeviceOrMemoryType = (
tuple[MemoryType, int] | MemoryType | tuple[DLDeviceType, int] | str
)

import sys

try:
import cupy
except ImportError:
Expand Down Expand Up @@ -214,26 +219,40 @@ def __dlpack__(self, *, stream=None):
Any
A DLPack-compatible object representing the tensor.
"""

self._sync_on_requested_stream(stream)

## Debug Note: creates managed tensor with malloc
dl_managed_tensor = Tensor._create_managed_tensor()

dl_managed_tensor.dl_tensor.data = self.data_ptr
dl_managed_tensor.dl_tensor.device = DLDevice(
TRITON_MEMORY_TYPE_TO_DLPACK_DEVICE_TYPE[self.memory_type],
self.memory_type_id,
)

dl_managed_tensor.dl_tensor.dtype = TRITON_TO_DLPACK_DTYPE[self.data_type]
dl_managed_tensor.dl_tensor.ndim = len(self.shape)
dl_managed_tensor.dl_tensor.shape = (ctypes.c_int64 * len(self.shape))(
*self.shape
)
print("storing shape", self.shape)

## Original issue was that the shape was created here
## But could not be freed correctly
##
## dl_managed_tensor.dl_tensor.shape = (ctypes.c_int64 * len(self.shape))(
## *self.shape
## )

## now we create the shape array using malloc
dl_managed_tensor.dl_tensor.shape = Tensor._create_shape_array(self.shape)

## NOTE for debug: this is a null ptr
dl_managed_tensor.dl_tensor.strides = ctypes.POINTER(ctypes.c_int64)()
dl_managed_tensor.dl_tensor.byte_offset = 0
dl_managed_tensor.deleter = Tensor._managed_tensor_deleter

## Note for debug: this method sets the context to point to
## this Tensor instance after increasing the reference count

self._set_dlpack_manager_ctx(dl_managed_tensor)

pycapsule = ctypes.pythonapi.PyCapsule_New(
ctypes.byref(dl_managed_tensor),
c_str_dltensor,
Expand Down Expand Up @@ -603,6 +622,16 @@ def _from_numpy(obj: numpy.ndarray | numpy.generic) -> Tensor:

return Tensor(data_type, shape, memory_buffer)

@staticmethod
def _create_shape_array(shape):
array_type = ctypes.c_int64 * len(shape)
size = ctypes.c_size_t(ctypes.sizeof(array_type))
address = ctypes.pythonapi.PyMem_RawMalloc(size)
array = array_type.from_address(address)
for index in range(len(shape)):
array[index] = shape[index]
return array

@staticmethod
def _create_managed_tensor():
size = ctypes.c_size_t(ctypes.sizeof(DLManagedTensor))
Expand All @@ -612,20 +641,39 @@ def _create_managed_tensor():
@staticmethod
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _managed_tensor_deleter(handle: int) -> None:
# DEBUG print("managed tensor deleter!",flush=True)

dl_managed_tensor = DLManagedTensor.from_address(handle)
tensor_obj_ptr = ctypes.cast(
dl_managed_tensor.manager_ctx, ctypes.POINTER(ctypes.py_object)
)
tensor_obj = tensor_obj_ptr.contents

print(dl_managed_tensor.dl_tensor.shape[0])

# DEBUG Note: free the shape array
ctypes.pythonapi.PyMem_RawFree(dl_managed_tensor.dl_tensor.shape)

## Original - caused memory leak
## shape_obj = ctypes.py_object(dl_managed_tensor.dl_tensor.shape)
## ctypes.pythonapi.Py_DecRef(shape_obj)

# DEBUG Note: decrement reference to original tensor object
ctypes.pythonapi.Py_DecRef(tensor_obj)
shape_obj = ctypes.py_object(dl_managed_tensor.dl_tensor.shape)
ctypes.pythonapi.Py_DecRef(shape_obj)

# DEBUG Note: free the managed tensor

ctypes.pythonapi.PyMem_RawFree(handle)

# DEBUG chain = objgraph.find_backref_chain(tensor_obj, objgraph.is_proper_module)
# DEBUG print(len(chain))
# DEBUG print([type(x) for x in chain])

@staticmethod
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def _pycapsule_deleter(handle: ctypes.c_void_p) -> None:
try:
# DEBUG print("capsule deleter!",flush=True)
pycapsule: ctypes.py_object = ctypes.cast(handle, ctypes.py_object)
if ctypes.pythonapi.PyCapsule_IsValid(pycapsule, c_str_dltensor):
dl_managed_tensor = ctypes.pythonapi.PyCapsule_GetPointer(
Expand All @@ -643,9 +691,16 @@ def _set_dlpack_manager_ctx(self, dl_managed_tensor):
tensor_obj = ctypes.py_object(self)
tensor_obj_ptr = ctypes.pointer(tensor_obj)
dl_managed_tensor.manager_ctx = ctypes.cast(tensor_obj_ptr, ctypes.c_void_p)
shape_obj = ctypes.py_object(dl_managed_tensor.dl_tensor.shape)
ctypes.pythonapi.Py_IncRef(tensor_obj)
ctypes.pythonapi.Py_IncRef(shape_obj)

## Original Issue
## this caused the tensor object to never be garbage collected
##
## Removing the IncRef caused the shape to be corrupted
## Current solution uses malloc

## shape_obj = ctypes.py_object(dl_managed_tensor.dl_tensor.shape)
## ctypes.pythonapi.Py_IncRef(shape_obj)

_from_converters: ClassVar[dict[type, Callable[[Any], Tensor]]] = dict(
{numpy.ndarray: _from_numpy, numpy.generic: _from_numpy, list: _from_list},
Expand Down