Skip to content

Commit

Permalink
feat: worker on the GPU (#119)
Browse files Browse the repository at this point in the history
  • Loading branch information
salma-aneo authored Aug 29, 2024
2 parents ab04120 + 4e46c03 commit e1d1a09
Show file tree
Hide file tree
Showing 9 changed files with 824 additions and 0 deletions.
2 changes: 2 additions & 0 deletions python/hello-world-gpu/client-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
armonik
numpy
187 changes: 187 additions & 0 deletions python/hello-world-gpu/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import argparse
import logging
import grpc
from datetime import timedelta
from armonik.client import ArmoniKResults, ArmoniKSessions, ArmoniKTasks, ArmoniKEvents
from armonik.common import TaskDefinition, TaskOptions
import numpy as np
from common import NameIdDict, NumpyArraySerializer

# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)


def run(endpoint: str, partition: str, size: int, seed: int) -> None:
"""
Connects to the ArmoniK control plane via a gRPC channel and performs a series of tasks.
Args:
endpoint: The endpoint for the connection to ArmoniK control plane.
partition: The name of the partition to which tasks are submitted.
size: The size of the two vectors.
seed: A seed value used for initializing random number generation.
Example:
run("172.24.55.197:5001", "default", 1000000, 47)
"""
# Create gRPC channel to connect with ArmoniK control plane
with grpc.insecure_channel(endpoint) as channel:
# Create client for task submission
task_client = ArmoniKTasks(channel)

# Create client for result creation
result_client = ArmoniKResults(channel)

# Create client for session creation
sessions_client = ArmoniKSessions(channel)

# Create client for events listening
events_client = ArmoniKEvents(channel)

# Default task options that will be used by each task if not overwritten when submitting tasks
task_options = TaskOptions(
max_duration=timedelta(hours=1), # Duration of 1 hour
max_retries=2,
priority=1,
partition_id=partition,
)

# Request for session creation with default task options and allowed partitions for the session
session_id = sessions_client.create_session(
task_options, partition_ids=[partition]
)
logger.info("Create session", extra={"context": {"sessionId": session_id}})
# Create the result metadata and keep the id for task submission
results = result_client.create_results_metadata(
result_names=["array1", "array2", "output", "payload"],
session_id=session_id,
)

# Get the results ids
array1_id = results["array1"].result_id
array2_id = results["array2"].result_id
output_id = results["output"].result_id
payload_id = results["payload"].result_id

# Arrays
# Set the seed for reproducibility
np.random.seed(seed)
a_host = np.random.rand(size).astype(np.float32)
b_host = np.random.rand(size).astype(np.float32)

# Create the metadata (a result) and upload data at the same time
result_client.upload_result_data(
result_id=array1_id,
session_id=session_id,
result_data=NumpyArraySerializer(a_host).serialize(),
)
result_client.upload_result_data(
result_id=array2_id,
session_id=session_id,
result_data=NumpyArraySerializer(b_host).serialize(),
)
logger.info("data uploaded")

# Creating a NameIdDict instance
name_id_mapping = NameIdDict(
{
"array1": array1_id,
"array2": array2_id,
"output": output_id,
"threads_per_block": 1024,
"blocks_per_grid": (size + (1024 - 1)) // 1024,
}
)

# Serializing the instance
serialized_name_id_mapping = name_id_mapping.serialize()

result_client.upload_result_data(
result_id=payload_id,
session_id=session_id,
result_data=serialized_name_id_mapping,
)
logger.info("payload uploaded")

# Submit task with payload and result ids
task_client.submit_tasks(
session_id=session_id,
tasks=[
TaskDefinition(
data_dependencies=[array1_id, array2_id],
expected_output_ids=[output_id],
payload_id=payload_id,
)
],
)
logger.info("tasks submitted")

# Wait for task end and result availability
try:
events_client.wait_for_result_availability(
result_ids=[output_id], session_id=session_id
)
except Exception as e:
logger.error("An error occured", extra={"context": {"error": e}})
# Download result
try:
serialized_result = result_client.download_result_data(
output_id, session_id
)
final_result = NumpyArraySerializer.deserialize(serialized_result).array
logger.info(
"Result ready",
extra={"context": {"resultId": output_id, "data": final_result}},
)
except Exception as e:
logger.error("An error occured", extra={"context": {"error": e.details}})

logger.info("End Connection!")


def main() -> None:
"""
Parses command-line arguments and runs the Hello World demo for ArmoniK.
Example:
python client.py --endpoint 127.0.0.1:5001 --partition default --size 1000000
"""
parser = argparse.ArgumentParser(
description="Hello World demo for ArmoniK.\nIt sends a task to ArmoniK in the given partition. The task receives two vectors as input and, for the result that will be returned by the task, sums the two vectors and the resultID to the input. Then, the client retrieves and prints the result of the task.\nArmoniK endpoint location is provided through --endpoint."
)
parser.add_argument(
"--endpoint",
type=str,
default="127.0.0.1:5001",
help="Endpoint for the connection to ArmoniK control plane.",
)
parser.add_argument(
"--partition",
type=str,
default="default",
help="Name of the partition to which submit tasks.",
)

parser.add_argument(
"--size",
type=int,
default=1000000,
help="The size of the vectors.",
)

parser.add_argument(
"--seed",
type=int,
default=47,
help="Seed for generating random vectors.",
)

parsed_args = parser.parse_args()
run(parsed_args.endpoint, parsed_args.partition, parsed_args.size, parsed_args.seed)


if __name__ == "__main__":
main()
86 changes: 86 additions & 0 deletions python/hello-world-gpu/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from typing import Dict
import json
import numpy as np


class NameIdDict:
def __init__(self, data: Dict[str, str]):
"""
Initializes a NameIdDict instance.
This class serves as a container for a dictionary where the keys are result names
and the values are their associated IDs.
Args:
data: Dictionary with result names as keys and their associated IDs as values.
"""
self.data = data

def serialize(self) -> bytes:
"""
Serializes the dictionary to a JSON-encoded byte array.
This method converts the dictionary to a JSON string and then encodes it to bytes.
Returns:
bytes: The serialized dictionary as a byte array.
"""
return json.dumps(self.data).encode("utf-8")

@classmethod
def deserialize(cls, payload: bytes) -> "NameIdDict":
"""
Deserializes bytes into a NameIdDict instance.
This method decodes the byte array to a JSON string and then loads it into a dictionary
to create a NameIdDict instance.
Args:
payload (bytes): The serialized data as bytes.
Returns:
NameIdDict: An instance of NameIdDict created from the serialized data.
"""
return cls(json.loads(payload.decode("utf-8")))


class NumpyArraySerializer:
def __init__(self, array: np.ndarray):
"""
Initializes a NumpyArraySerializer instance.
This class serves as a container for a numpy array of type float32.
Args:
array: numpy array to be serialized and deserialized.
"""
self.array = array

def serialize(self) -> bytes:
"""
Serializes the numpy array to a JSON-encoded byte array.
This method converts the numpy array to a list, encodes it as JSON,
and then encodes it to bytes.
Returns:
bytes: The serialized numpy array as a byte array.
"""
return self.array.tobytes()

@classmethod
def deserialize(cls, payload: bytes) -> "NumpyArraySerializer":
"""
Deserializes bytes into a NumpyArraySerializer instance.
This method decodes the byte array to a JSON string,
loads it into a list, and converts it to a numpy array.
Args:
payload (bytes): The serialized data as bytes.
Returns:
NumpyArraySerializer: An instance of NumpyArraySerializer created from the serialized data.
"""
array = np.frombuffer(payload, dtype=np.float32)
return cls(array)
22 changes: 22 additions & 0 deletions python/hello-world-gpu/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM nvidia/cuda:12.3.1-devel-ubuntu20.04
RUN apt-get update && apt-get install -y \
python3 \
python3-pip \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
COPY worker-requirements.txt ./
RUN pip3 install --no-cache-dir -r worker-requirements.txt

RUN groupadd --gid 5000 armonikuser && \
useradd --home-dir /home/armonikuser --create-home --uid 5000 --gid 5000 --shell /bin/sh --skel /dev/null armonikuser && \
mkdir /cache && chown armonikuser: /cache
USER armonikuser
ENV PYTHONUNBUFFERED=1

WORKDIR /app
COPY common.py /app
COPY worker.py /app
ENTRYPOINT ["python3", "worker.py"]



23 changes: 23 additions & 0 deletions python/hello-world-gpu/dockerfile.aws
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM nvidia/cuda:12.3.2-devel-ubuntu20.04
RUN apt-get update && apt-get install -y \
python3 \
python3-pip \
sudo \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

COPY worker-requirements.txt ./

RUN pip3 install --no-cache-dir -r worker-requirements.txt

RUN groupadd --gid 5000 armonikuser && \
useradd --home-dir /home/armonikuser --create-home --uid 5000 --gid 5000 --shell /bin/sh --skel /dev/null armonikuser && \
mkdir /cache && chown armonikuser: /cache
ENV PYTHONUNBUFFERED=1

WORKDIR /app
COPY common.py /app
COPY worker.py /app
COPY init.sh /
RUN chmod +x /init.sh
ENTRYPOINT ["/init.sh"]
4 changes: 4 additions & 0 deletions python/hello-world-gpu/init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
ln -sf /usr/lib/x86_64-linux-gnu/libnvidia-ml.so.535.183.01 /usr/lib/x86_64-linux-gnu/libnvidia-ml.so.1
ln -sf /usr/lib/x86_64-linux-gnu/libcuda.so.545.23.08 /usr/lib/x86_64-linux-gnu/libcuda.so.1
sudo -E -u armonikuser python3 /app/worker.py
Loading

0 comments on commit e1d1a09

Please sign in to comment.