Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
cloud run job unique names
Browse files Browse the repository at this point in the history
  • Loading branch information
kevingrismore committed Feb 5, 2024
1 parent 4f3a60b commit 16f28cb
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 17 deletions.
3 changes: 3 additions & 0 deletions prefect_gcp/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
```
"""

from __future__ import annotations

import json
Expand Down Expand Up @@ -59,6 +60,8 @@

from prefect_gcp.credentials import GcpCredentials

JOB_NAME_MAX_LENGTH = 30


class Job(BaseModel):
"""
Expand Down
2 changes: 2 additions & 0 deletions prefect_gcp/models/cloud_run_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
else:
from pydantic import BaseModel

JOB_V2_NAME_MAX_LENGTH = 30


class JobV2(BaseModel):
"""
Expand Down
9 changes: 7 additions & 2 deletions prefect_gcp/workers/cloud_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@
import shlex
import time
from typing import TYPE_CHECKING, Any, Dict, Optional
from uuid import uuid4

import anyio
import googleapiclient
Expand All @@ -170,7 +171,7 @@
else:
from pydantic import Field, validator

from prefect_gcp.cloud_run import Execution, Job
from prefect_gcp.cloud_run import JOB_NAME_MAX_LENGTH, Execution, Job
from prefect_gcp.credentials import GcpCredentials

if TYPE_CHECKING:
Expand Down Expand Up @@ -338,7 +339,11 @@ def _populate_name_if_not_present(self):
"""Adds the flow run name to the job if one is not already provided."""
try:
if "name" not in self.job_body["metadata"]:
self.job_body["metadata"]["name"] = self.name
base_job_name = self.name.lower()
if len(base_job_name) > JOB_NAME_MAX_LENGTH:
base_job_name = base_job_name[:JOB_NAME_MAX_LENGTH]
job_name = f"{base_job_name}-{uuid4().hex}"
self.job_body["metadata"]["name"] = job_name
except KeyError:
raise ValueError("Unable to verify name due to invalid job body template.")

Expand Down
32 changes: 17 additions & 15 deletions prefect_gcp/workers/cloud_run_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import shlex
import time
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional
from uuid import uuid4

from anyio.abc import TaskStatus
from google.api_core.client_options import ClientOptions
Expand Down Expand Up @@ -29,7 +30,12 @@
from pydantic import Field, validator

from prefect_gcp.credentials import GcpCredentials
from prefect_gcp.models.cloud_run_v2 import CloudRunJobV2Result, ExecutionV2, JobV2
from prefect_gcp.models.cloud_run_v2 import (
JOB_V2_NAME_MAX_LENGTH,
CloudRunJobV2Result,
ExecutionV2,
JobV2,
)

if TYPE_CHECKING:
from prefect.client.schemas import FlowRun
Expand Down Expand Up @@ -125,6 +131,7 @@ class CloudRunWorkerJobV2Configuration(BaseJobConfiguration):
"complete before raising an exception."
),
)
job_name: str = Field(default=None, description="The name of the Cloud Run job.")

@property
def project(self) -> str:
Expand All @@ -136,20 +143,6 @@ def project(self) -> str:
"""
return self.credentials.project

@property
def job_name(self):
"""
Returns the job name, if it does not exist, it creates it.
"""
pre_trim_cr_job_name = f"prefect-{self.name}"

if len(pre_trim_cr_job_name) > 40:
pre_trim_cr_job_name = pre_trim_cr_job_name[:40]

pre_trim_cr_job_name = pre_trim_cr_job_name.rstrip("-")

return pre_trim_cr_job_name

def prepare_for_flow_run(
self,
flow_run: "FlowRun",
Expand All @@ -175,6 +168,7 @@ def prepare_for_flow_run(
)

self._populate_env()
self._populate_name()
self._populate_or_format_command()
self._format_args_if_present()
self._populate_image_if_not_present()
Expand All @@ -194,6 +188,14 @@ def _populate_env(self):

self.job_body["template"]["template"]["containers"][0]["env"] = envs

def _populate_name(self):
"""Create a unique and valid job name."""
base_job_name = self.name.lower()
if len(base_job_name) > JOB_V2_NAME_MAX_LENGTH:
base_job_name = base_job_name[:JOB_V2_NAME_MAX_LENGTH]
job_name = f"{base_job_name}-{uuid4().hex}"
self.job_name = job_name

def _populate_image_if_not_present(self):
"""
Populates the job body with the image if not present.
Expand Down

0 comments on commit 16f28cb

Please sign in to comment.