Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Py2f]: Cache Fortran pointers in dictionary #616

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion model/common/src/icon4py/model/common/grid/icon.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def start_index(self, domain: h_grid.Domain) -> gtx.int32:
# special treatment because this value is not set properly in the underlying data.
return gtx.int32(0)
# ndarray.item() does not respect the dtype of the array, returns a copy of the value _as the default python type_
return gtx.int32(self._start_indices[domain.dim][domain()])
return gtx.int32(self._start_indices[domain.dim][domain()].item())

def end_index(self, domain: h_grid.Domain) -> gtx.int32:
"""
Expand Down
95 changes: 82 additions & 13 deletions tools/src/icon4pytools/py2fgen/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,32 @@

import logging
import math
import typing
from pathlib import Path
from typing import Tuple

import cffi
import gt4py.next as gtx
import numpy as np
from cffi import FFI
from numpy.typing import NDArray

from icon4pytools.common.logger import setup_logger
from icon4pytools.py2fgen.settings import xp


if typing.TYPE_CHECKING:
import cupy as cp # type: ignore

ffi = FFI() # needed for unpack and unpack_gpu functions

logger = setup_logger(__name__)


def unpack(ptr, *sizes: int) -> NDArray:
def unpack(ptr: cffi.api.FFI.CData, *sizes: int) -> NDArray:
"""
Converts a C pointer into a NumPy array to directly manipulate memory allocated in Fortran.
This function is needed for operations requiring in-place modification of CPU data, enabling
changes made in Python to reflect immediately in the original Fortran memory space.

Args:
ptr (CData): A CFFI pointer to the beginning of the data array in CPU memory. This pointer
ptr (ffi.CData): A CFFI pointer to the beginning of the data array in CPU memory. This pointer
should reference a contiguous block of memory whose total size matches the product
of the specified dimensions.
*sizes (int): Variable length argument list specifying the dimensions of the array.
Expand Down Expand Up @@ -62,7 +61,7 @@ def unpack(ptr, *sizes: int) -> NDArray:
return arr


def unpack_gpu(ptr, *sizes: int):
def unpack_gpu(ptr: cffi.api.FFI.CData, *sizes: int):
"""
Converts a C pointer into a CuPy array to directly manipulate memory allocated in Fortran.
This function is needed for operations that require in-place modification of GPU data,
Expand All @@ -89,8 +88,8 @@ def unpack_gpu(ptr, *sizes: int):
c_type = ffi.getctype(ffi.typeof(ptr).item)

dtype_map = {
"int": cp.int32,
"double": cp.float64,
"int": xp.int32,
"double": xp.float64,
}
dtype = dtype_map.get(c_type, None)
if dtype is None:
Expand All @@ -100,11 +99,11 @@ def unpack_gpu(ptr, *sizes: int):
total_size = length * itemsize

# cupy array from OpenACC device pointer
current_device = cp.cuda.Device()
current_device = xp.cuda.Device()
ptr_val = int(ffi.cast("uintptr_t", ptr))
mem = cp.cuda.UnownedMemory(ptr_val, total_size, owner=ptr, device_id=current_device.id)
memptr = cp.cuda.MemoryPointer(mem, 0)
arr = cp.ndarray(shape=sizes, dtype=dtype, memptr=memptr, order="F")
mem = xp.cuda.UnownedMemory(ptr_val, total_size, owner=ptr, device_id=current_device.id)
memptr = xp.cuda.MemoryPointer(mem, 0)
arr = xp.ndarray(shape=sizes, dtype=dtype, memptr=memptr, order="F")
return arr


Expand All @@ -123,6 +122,76 @@ def int_array_to_bool_array(int_array: NDArray) -> NDArray:
return bool_array


def unpack_and_cache_pointer(
pointer: cffi.api.FFI.CData,
key: Tuple[str, Tuple[int, ...]],
sizes: list[int],
gt_dims: list[gtx.Dimension],
dtype: xp.dtype,
is_uninitialized: bool,
is_bool: bool,
backend: str,
cache: dict,
xp: xp,
):
"""
Unpacks and caches a Fortran pointer, retrieving a cached version if available.

This function checks if a field corresponding to the given `key` exists in the
cache. If the field does not exist, it unpacks the pointer data, optionally
converts it to a boolean array, allocates the field with specified dimensions, and stores
it in the cache. If the field exists, it is directly retrieved from the cache.

Args:
pointer: The raw pointer or data to be unpacked into the field.
key: A unique identifier for the field, typically a tuple of the field name
and its shape.
sizes: A list of dimension sizes for unpacking the pointer.
gt_dims: A list of GT4Py dimensions for creating a gt4py field.
dtype: The data type of the field (e.g., `numpy.float32`).
is_uninitialized (bool): Whether the field is uninitialized and should be filled with ones.
is_bool (bool): Whether the field contains boolean data and needs conversion.
backend (str): The backend in use, e.g., `"GPU"` or `"CPU"`, to determine unpacking logic.
cache (dict): The dictionary storing cached fields, keyed by `field_key`.
xp (module): The numerical library in use, such as `numpy` or `cupy`.

Returns:
Any: The allocated and cached field corresponding to `field_key`.

Raises:
KeyError: If a required field key or dimension is missing during the caching process.

Example:
```python
cached_field = get_cached_field(
field_key=("temperature", ("n_Cell", "n_K")),
pointer=temp_pointer,
sizes=[n_Cell, n_K],
gt_dims=[dims.CellDim, dims.KDim],
dtype=np.float64,
is_uninitialized=False,
is_bool=False,
backend="CPU",
field_dict=allocated_fields,
xp=np,
)
```
"""
if key not in cache:
if is_uninitialized:
# in these instances the field is filled with garbage values as it is not used by ICON.
unpacked = xp.ones((1,) * len(sizes), dtype=dtype, order="F")
else:
unpacked = unpack_gpu(pointer, *sizes) if backend == "GPU" else unpack(pointer, *sizes)

if is_bool:
unpacked = int_array_to_bool_array(unpacked)

cache[key] = gtx.np_as_located_field(*gt_dims)(unpacked)

return cache[key]


def generate_and_compile_cffi_plugin(
plugin_name: str, c_header: str, python_wrapper: str, build_path: Path, backend: str
) -> None:
Expand Down
1 change: 1 addition & 0 deletions tools/src/icon4pytools/py2fgen/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,4 @@ def parallel_run(self):
dace_orchestration = config.icon4py_dace_orchestration
device = config.device
limited_area = config.limited_area
xp = config.array_ns
79 changes: 26 additions & 53 deletions tools/src/icon4pytools/py2fgen/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
# Please, refer to the LICENSE file in the root directory.
# SPDX-License-Identifier: BSD-3-Clause

import inspect
from typing import Any, Sequence

from gt4py.eve import Node, datamodels
Expand All @@ -19,8 +18,7 @@
BUILTIN_TO_ISO_C_TYPE,
BUILTIN_TO_NUMPY_TYPE,
)
from icon4pytools.py2fgen.plugin import int_array_to_bool_array, unpack, unpack_gpu
from icon4pytools.py2fgen.settings import GT4PyBackend
from icon4pytools.py2fgen import settings
from icon4pytools.py2fgen.utils import flatten_and_get_unique_elts
from icon4pytools.py2fgen.wrappers import wrapper_dimension

Expand Down Expand Up @@ -102,14 +100,11 @@ class PythonWrapper(CffiPlugin):
profile: bool
limited_area: bool
cffi_decorator: str = CFFI_DECORATOR
cffi_unpack: str = inspect.getsource(unpack)
cffi_unpack_gpu: str = inspect.getsource(unpack_gpu)
int_to_bool: str = inspect.getsource(int_array_to_bool_array)
gt4py_backend: str = datamodels.field(init=False)
is_gt4py_program_present: bool = datamodels.field(init=False)

def __post_init__(self, *args: Any, **kwargs: Any) -> None:
self.gt4py_backend = GT4PyBackend[self.backend].value
self.gt4py_backend = settings.GT4PyBackend[self.backend].value
self.is_gt4py_program_present = any(func.is_gt4py_program for func in self.functions)
self.uninitialised_arrays = get_uninitialised_arrays(self.limited_area)

Expand Down Expand Up @@ -239,15 +234,11 @@ class PythonWrapperGenerator(TemplatedGenerator):
# imports for generated wrapper code
import logging
{% if _this_node.profile %}import time{% endif %}
import math
from {{ plugin_name }} import ffi
import numpy as np
{% if _this_node.backend == 'GPU' %}import cupy as cp {% endif %}
from numpy.typing import NDArray
from gt4py.next.iterator.embedded import np_as_located_field
from icon4pytools.py2fgen.settings import config
xp = config.array_ns
from icon4py.model.common.settings import xp
from icon4py.model.common import dimension as dims
from icon4pytools.py2fgen.plugin import unpack_and_cache_pointer

{% if _this_node.is_gt4py_program_present %}
# necessary imports when embedding a gt4py program directly
Expand Down Expand Up @@ -278,13 +269,8 @@ class PythonWrapperGenerator(TemplatedGenerator):
from {{ module_name }} import {{ func.name }}
{% endfor %}

{% if _this_node.backend == 'GPU' %}
{{ cffi_unpack_gpu }}
{% else %}
{{ cffi_unpack }}
{% endif %}

{{ int_to_bool }}
# holds cached fields
field_cache = {}

{% for func in _this_node.functions %}

Expand All @@ -302,67 +288,55 @@ def {{ func.name }}_wrapper(
logging.info("Python Execution Context Start")
{% endif %}

# pack, allocate and cache array fields
{% for arg in func.args %}
{% if arg.is_array %}

{% if _this_node.profile %}
cp.cuda.Stream.null.synchronize()
unpack_start_time = time.perf_counter()
{% endif %}

# Unpack pointers into Ndarrays
{% for arg in func.args %}
{% if arg.is_array %}
{%- if _this_node.debug_mode %}
msg = '{{ arg.name }} before unpacking: %s' % str({{ arg.name}})
msg = '{{ arg.name }} before unpacking: %s' % str({{ arg.name }})
logging.debug(msg)
{% endif %}

{%- if arg.name in _this_node.uninitialised_arrays -%}
{{ arg.name }} = xp.ones((1,) * {{ arg.size_args_len }}, dtype={{arg.np_type}}, order="F")
{%- else -%}
{{ arg.name }} = unpack{%- if _this_node.backend == 'GPU' -%}_gpu{%- endif -%}({{ arg.name }}, {{ ", ".join(arg.size_args) }})
{%- endif -%}

{%- if arg.d_type.name == "BOOL" %}
{{ arg.name }} = int_array_to_bool_array({{ arg.name }})
{%- endif %}
{{ arg.name }} = unpack_and_cache_pointer(
pointer={{ arg.name }},
key=("{{ arg.name }}", ({{", ".join(arg.size_args) }})),
sizes=[{{ ", ".join(arg.size_args) }}],
gt_dims=[{{ ", ".join(arg.gtdims) }}],
dtype={{ arg.np_type }},
is_uninitialized={{ "True" if arg.name in _this_node.uninitialised_arrays else "False" }},
is_bool={{ "True" if arg.d_type.name == "BOOL" else "False" }},
backend="{{ _this_node.backend }}",
cache=field_cache,
xp=xp
)

{%- if _this_node.debug_mode %}
msg = '{{ arg.name }} after unpacking: %s' % str({{ arg.name}})
msg = '{{ arg.name }} after unpacking: %s' % str({{ arg.name }})
logging.debug(msg)
msg = 'shape of {{ arg.name }} after unpacking = %s' % str({{ arg.name}}.shape)
msg = 'shape of {{ arg.name }} after unpacking = %s' % str({{ arg.name }}.shape)
logging.debug(msg)
{% endif %}
{% endif %}
{% endfor %}

{% if _this_node.profile %}
cp.cuda.Stream.null.synchronize()
unpack_end_time = time.perf_counter()
logging.critical('{{ func.name }} unpacking arrays time per timestep: %s' % str(unpack_end_time - unpack_start_time))
{% endif %}

{% if _this_node.profile %}
cp.cuda.Stream.null.synchronize()
allocate_start_time = time.perf_counter()
{% endif %}

# Allocate GT4Py Fields
{% for arg in func.args %}
{% if arg.is_array %}
{{ arg.name }} = np_as_located_field({{ ", ".join(arg.gtdims) }})({{ arg.name }})
{%- if _this_node.debug_mode %}
msg = 'shape of {{ arg.name }} after allocating as field = %s' % str({{ arg.name}}.shape)
logging.debug(msg)
msg = '{{ arg.name }} after allocating as field: %s' % str({{ arg.name }}.ndarray)
logging.debug(msg)
{% endif %}
{% endif %}
{% endfor %}

{% if _this_node.profile %}
cp.cuda.Stream.null.synchronize()
allocate_end_time = time.perf_counter()
logging.critical('{{ func.name }} allocating to gt4py fields time per timestep: %s' % str(allocate_end_time - allocate_start_time))
{% endif %}
{% endfor %}

{% if _this_node.profile %}
cp.cuda.Stream.null.synchronize()
Expand All @@ -385,7 +359,6 @@ def {{ func.name }}_wrapper(
logging.critical('{{ func.name }} function time per timestep: %s' % str(func_end_time - func_start_time))
{% endif %}


{% if _this_node.debug_mode %}
# debug info
{% for arg in func.args %}
Expand Down
Loading