Skip to content

Commit

Permalink
feat: improve topic creation api
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Dec 12, 2024
1 parent 2c01e0e commit b74cd35
Show file tree
Hide file tree
Showing 12 changed files with 479 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
python-version: ["39", "310", "311", "312"]
env:
CIBW_SKIP: "cp36-* pp* *-win32"
CIBW_ARCHS_MACOS: x86_64 universal2
CIBW_ARCHS_MACOS: x86_64 universal2 arm64
CIBW_ARCHS_LINUX: auto aarch64
CIBW_BEFORE_ALL_LINUX: "{package}/tools/cibw_before_all_linux.sh"
CIBW_BUILD_VERBOSITY: 1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
python-version: ["38", "39", "310", "311", "312"]
env:
CIBW_SKIP: "cp36-* pp* *-win32"
CIBW_ARCHS_MACOS: x86_64 universal2
CIBW_ARCHS_MACOS: x86_64 universal2 arm64
CIBW_ARCHS_LINUX: auto aarch64
CIBW_BEFORE_ALL_LINUX: "{package}/tools/cibw_before_all_linux.sh"
CIBW_BUILD_VERBOSITY: 1
Expand Down
20 changes: 13 additions & 7 deletions fluvio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@

from ._fluvio_python import Error as FluviorError # noqa: F401

from .new_topic import NewTopic, CompressionType, TopicMode
from .record import Record, RecordMetadata
from .specs import (
# support types
Expand All @@ -97,6 +98,11 @@
"Fluvio",
"FluvioConfig",
"FluvioAdmin",
# new_topic
"NewTopic",
"CompressionType",
"TopicMode",
# record
"Record",
"RecordMetadata",
"Offset",
Expand Down Expand Up @@ -731,17 +737,17 @@ def connect():
def connect_with_config(config: FluvioConfig):
return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner))

def create_topic(self, topic: str):

def create_topic(self, topic: str, spec: typing.Optional[TopicSpec] = None):
partitions = 1
replication = 1
ignore_rack = True
spec = _TopicSpec.new_computed(partitions, replication, ignore_rack)
dry_run = False
return self._inner.create_topic(topic, dry_run, spec)

def create_topic_spec(self, topic: str, dry_run: bool, spec: TopicSpec):
return self._inner.create_topic(topic, dry_run, spec._inner)
spec_inner = (
spec._inner
if spec is not None
else _TopicSpec.new_computed(partitions, replication, ignore_rack)
)
return self._inner.create_topic(topic, dry_run, spec_inner)

def create_topic_with_config(
self, topic: str, req: CommonCreateRequest, spec: TopicSpec
Expand Down
123 changes: 123 additions & 0 deletions fluvio/new_topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from dataclasses import dataclass, replace
from typing import Optional, List, Union
from enum import Enum
from .specs import TopicSpec, PartitionMap
from humanfriendly import parse_timespan, parse_size


class TopicMode(Enum):
MIRROR = "mirror"
ASSIGNED = "assigned"
COMPUTED = "computed"


class CompressionType(Enum):
NONE = "none"
GZIP = "gzip"
SNAPPY = "snappy"
LZ4 = "lz4"
ANY = "any"
ZSTD = "zstd"


@dataclass(frozen=True)
class NewTopic:
mode: TopicMode = TopicMode.COMPUTED
partitions: int = 1
replications: int = 1
ignore_rack: bool = False
replica_assignment: Optional[List[PartitionMap]] = None
retention_time: Optional[int] = None
segment_size: Optional[int] = None
compression_type: Optional[CompressionType] = None
max_partition_size: Optional[int] = None
system: bool = False

@classmethod
def create(cls) -> "NewTopic":
"""Alternative constructor method"""
return cls()

def with_assignment(self, replica_assignment: List[PartitionMap]) -> "NewTopic":
"""Set the assigned replica configuration"""
return replace(
self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment
)

def as_mirror_topic(self) -> "NewTopic":
"""Set as a mirror topic"""
return replace(self, mode=TopicMode.MIRROR)

def with_partitions(self, partitions: int) -> "NewTopic":
"""Set the specified partitions"""
if partitions < 0:
raise ValueError("Partitions must be a positive integer")
return replace(self, partitions=partitions)

def with_replication(self, replication: int) -> "NewTopic":
"""Set the specified replication factor"""
if replication < 0:
raise ValueError("Replication factor must be a positive integer")
return replace(self, replications=replication)

def with_ignore_rack(self, ignore: bool = True) -> "NewTopic":
"""Set the rack ignore setting"""
return replace(self, ignore_rack=ignore)

def with_compression(self, compression: CompressionType) -> "NewTopic":
"""Set the specified compression type"""
return replace(self, compression_type=compression)

def with_retention_time(self, retention_time: Union[str, int]) -> "NewTopic":
"""Set the specified retention time"""

if isinstance(retention_time, int):
return replace(self, retention_time=retention_time)

parsed_time = parse_timespan(retention_time)
return replace(self, retention_time=parsed_time)

def with_segment_size(self, size: Union[str, int]) -> "NewTopic":
"""Set the specified segment size"""
if isinstance(size, int):
return replace(self, segment_size=size)

parsed_size = parse_size(size, True)
return replace(self, segment_size=parsed_size)

def with_max_partition_size(self, size: Union[str, int]) -> "NewTopic":
"""Set the specified max partition size"""
if isinstance(size, int):
return replace(self, max_partition_size=size)

parsed_size = parse_size(size, True)
return replace(self, max_partition_size=parsed_size)

def as_system_topic(self, is_system: bool = True) -> "NewTopic":
"""Set the topic as an internal system topic"""
return replace(self, system=is_system)

def build(self) -> TopicSpec:
"""Build the TopicSpec based on the current configuration"""
# Similar implementation to the original build method
if self.mode == TopicMode.ASSIGNED and self.replica_assignment:
spec = TopicSpec.new_assigned(self.replica_assignment)
elif self.mode == TopicMode.MIRROR:
spec = TopicSpec.new_mirror()
else:
spec = TopicSpec.new_computed(
self.partitions, self.replications, self.ignore_rack
)

spec.set_system(self.system)

if self.retention_time is not None:
spec.set_retention_time(self.retention_time)

if self.max_partition_size is not None or self.segment_size is not None:
spec.set_storage(self.max_partition_size, self.segment_size)

if self.compression_type is not None:
spec.set_compression_type(self.compression_type.value)

return spec
28 changes: 25 additions & 3 deletions fluvio/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,36 @@ class TopicSpec:
def __init__(self, inner: _TopicSpec):
self._inner = inner

@classmethod
def new(cls):
return cls(_TopicSpec.new_computed(1, 1, True))

@classmethod
def new_assigned(cls, partition_maps: typing.List[PartitionMap]):
partition_maps = [x._inner for x in partition_maps]
return cls(_TopicSpec.new_computed(partition_maps))
return cls(_TopicSpec.new_assigned(partition_maps))

@classmethod
def new_computed(cls, partitions: int, replications: int, ignore: bool):
return cls(_TopicSpec.new_computed(partitions, replications, ignore))

@classmethod
def new_computed(cls, partitions: int, replication: int, ignore: bool):
return cls(_TopicSpec.new_computed(partitions, replication, ignore))
def new_mirror(cls):
return cls(_TopicSpec.new_mirror())

def set_storage(
self, partition_size: typing.Optional[int], segment: typing.Optional[int]
):
self._inner.set_storage(partition_size, segment)

def set_system(self, system: bool):
self._inner.set_system(system)

def set_retention_time(self, retention: int):
self._inner.set_retation_time(retention)

def set_compression_type(self, compression: str):
self._inner.set_compression_type(compression)


class CommonCreateRequest:
Expand Down
84 changes: 84 additions & 0 deletions fluvio/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from typing import Optional, Union
import re


def parse_time_to_seconds(time_str: Union[str, int, None]) -> Optional[int]:
"""
Convert time string to seconds.
Supports formats like:
- '1d' (1 day)
- '12h' (12 hours)
- '30m' (30 minutes)
- '3600' (direct seconds)
- None
"""
if time_str is None:
return None

if isinstance(time_str, int):
return time_str

# Remove whitespace
time_str = time_str.strip().lower()

# Direct integer seconds
if time_str.isdigit():
return int(time_str)

# Pattern to match number + unit
match = re.match(r"^(\d+)([dhms])$", time_str)
if not match:
raise ValueError(f"Invalid time format: {time_str}")

value = int(match.group(1))
unit = match.group(2)

multipliers = {
"d": 86400, # days
"h": 3600, # hours
"m": 60, # minutes
"s": 1, # seconds
}

return value * multipliers[unit]


def parse_byte_size(size_str: Union[str, int, None]) -> Optional[int]:
"""
Convert byte size string to integer bytes.
Supports formats like:
- '1G' (1 Gigabyte)
- '500M' (500 Megabytes)
- '1024K' (1024 Kilobytes)
- '1024' (direct bytes)
- None
"""
if size_str is None:
return None

if isinstance(size_str, int):
return size_str

# Remove whitespace
size_str = size_str.strip().upper()

# Direct integer bytes
if size_str.isdigit():
return int(size_str)

# Pattern to match number + unit
match = re.match(r"^(\d+)([KMGT])$", size_str)
if not match:
raise ValueError(f"Invalid size format: {size_str}")

value = int(match.group(1))
unit = match.group(2)

multipliers = {
"K": 1024, # Kilobytes
"M": 1024 * 1024, # Megabytes
"G": 1024 * 1024 * 1024, # Gigabytes
"T": 1024 * 1024 * 1024 * 1024, # Terabytes
}

return value * multipliers[unit]
13 changes: 10 additions & 3 deletions integration-tests/test_fluvio_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from string import ascii_lowercase
from fluvio import Fluvio, Offset, ConsumerConfig, SmartModuleKind, FluvioConfig
from fluvio import FluvioAdmin, TopicSpec
from fluvio import FluvioAdmin, TopicSpec, NewTopic


def create_smartmodule(sm_name, sm_path):
Expand Down Expand Up @@ -706,10 +706,17 @@ def setUp(self):

def test_admin_topic(self):
fluvio_admin = FluvioAdmin.connect()
topic_spec = TopicSpec.new_computed(3, 1, False)

# create topic
fluvio_admin.create_topic_spec(self.topic, False, topic_spec)
topic_spec = (
NewTopic.create()
.with_max_partition_size("1Gb")
.with_retention_time(3600)
.with_segment_size("10M")
.build()
)
print(topic_spec)
fluvio_admin.create_topic(self.topic, topic_spec)

# watch topic
stream = fluvio_admin.watch_topic()
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ black==24.8.0
semantic-version==2.10.0
setuptools-rust==1.10.1
toml==0.10.2
humanfriendly==10.0
4 changes: 4 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from setuptools import setup, find_namespace_packages
from setuptools_rust import Binding, RustExtension, Strip

with open('requirements.txt') as f:
requirements = f.read().splitlines()

setup(
name="fluvio",
version="0.17.0",
Expand Down Expand Up @@ -43,6 +46,7 @@
)
],
packages=["fluvio"],
install_requires=requirements,
# rust extensions are not zip safe, just like C-extensions.
zip_safe=False,
)
Loading

0 comments on commit b74cd35

Please sign in to comment.