diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b949d79..1319e3d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: python-version: ["39", "310", "311", "312"] env: CIBW_SKIP: "cp36-* pp* *-win32" - CIBW_ARCHS_MACOS: x86_64 universal2 + CIBW_ARCHS_MACOS: x86_64 universal2 arm64 CIBW_ARCHS_LINUX: auto aarch64 CIBW_BEFORE_ALL_LINUX: "{package}/tools/cibw_before_all_linux.sh" CIBW_BUILD_VERBOSITY: 1 diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 711301d..d2c3fa5 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -15,7 +15,7 @@ jobs: python-version: ["38", "39", "310", "311", "312"] env: CIBW_SKIP: "cp36-* pp* *-win32" - CIBW_ARCHS_MACOS: x86_64 universal2 + CIBW_ARCHS_MACOS: x86_64 universal2 arm64 CIBW_ARCHS_LINUX: auto aarch64 CIBW_BEFORE_ALL_LINUX: "{package}/tools/cibw_before_all_linux.sh" CIBW_BUILD_VERBOSITY: 1 diff --git a/fluvio/__init__.py b/fluvio/__init__.py index 41b0bfd..06a8c0d 100644 --- a/fluvio/__init__.py +++ b/fluvio/__init__.py @@ -72,6 +72,7 @@ from ._fluvio_python import Error as FluviorError # noqa: F401 +from .new_topic import NewTopic, CompressionType, TopicMode from .record import Record, RecordMetadata from .specs import ( # support types @@ -97,6 +98,11 @@ "Fluvio", "FluvioConfig", "FluvioAdmin", + # new_topic + "NewTopic", + "CompressionType", + "TopicMode", + # record "Record", "RecordMetadata", "Offset", @@ -731,17 +737,17 @@ def connect(): def connect_with_config(config: FluvioConfig): return FluvioAdmin(_FluvioAdmin.connect_with_config(config._inner)) - def create_topic(self, topic: str): - + def create_topic(self, topic: str, spec: typing.Optional[TopicSpec] = None): partitions = 1 replication = 1 ignore_rack = True - spec = _TopicSpec.new_computed(partitions, replication, ignore_rack) dry_run = False - return self._inner.create_topic(topic, dry_run, spec) - - def create_topic_spec(self, topic: str, dry_run: bool, spec: TopicSpec): - return self._inner.create_topic(topic, dry_run, spec._inner) + spec_inner = ( + spec._inner + if spec is not None + else _TopicSpec.new_computed(partitions, replication, ignore_rack) + ) + return self._inner.create_topic(topic, dry_run, spec_inner) def create_topic_with_config( self, topic: str, req: CommonCreateRequest, spec: TopicSpec diff --git a/fluvio/new_topic.py b/fluvio/new_topic.py new file mode 100644 index 0000000..d468d83 --- /dev/null +++ b/fluvio/new_topic.py @@ -0,0 +1,123 @@ +from dataclasses import dataclass, replace +from typing import Optional, List, Union +from enum import Enum +from .specs import TopicSpec, PartitionMap +from humanfriendly import parse_timespan, parse_size + + +class TopicMode(Enum): + MIRROR = "mirror" + ASSIGNED = "assigned" + COMPUTED = "computed" + + +class CompressionType(Enum): + NONE = "none" + GZIP = "gzip" + SNAPPY = "snappy" + LZ4 = "lz4" + ANY = "any" + ZSTD = "zstd" + + +@dataclass(frozen=True) +class NewTopic: + mode: TopicMode = TopicMode.COMPUTED + partitions: int = 1 + replications: int = 1 + ignore_rack: bool = False + replica_assignment: Optional[List[PartitionMap]] = None + retention_time: Optional[int] = None + segment_size: Optional[int] = None + compression_type: Optional[CompressionType] = None + max_partition_size: Optional[int] = None + system: bool = False + + @classmethod + def create(cls) -> "NewTopic": + """Alternative constructor method""" + return cls() + + def with_assignment(self, replica_assignment: List[PartitionMap]) -> "NewTopic": + """Set the assigned replica configuration""" + return replace( + self, mode=TopicMode.ASSIGNED, replica_assignment=replica_assignment + ) + + def as_mirror_topic(self) -> "NewTopic": + """Set as a mirror topic""" + return replace(self, mode=TopicMode.MIRROR) + + def with_partitions(self, partitions: int) -> "NewTopic": + """Set the specified partitions""" + if partitions < 0: + raise ValueError("Partitions must be a positive integer") + return replace(self, partitions=partitions) + + def with_replication(self, replication: int) -> "NewTopic": + """Set the specified replication factor""" + if replication < 0: + raise ValueError("Replication factor must be a positive integer") + return replace(self, replications=replication) + + def with_ignore_rack(self, ignore: bool = True) -> "NewTopic": + """Set the rack ignore setting""" + return replace(self, ignore_rack=ignore) + + def with_compression(self, compression: CompressionType) -> "NewTopic": + """Set the specified compression type""" + return replace(self, compression_type=compression) + + def with_retention_time(self, retention_time: Union[str, int]) -> "NewTopic": + """Set the specified retention time""" + + if isinstance(retention_time, int): + return replace(self, retention_time=retention_time) + + parsed_time = parse_timespan(retention_time) + return replace(self, retention_time=parsed_time) + + def with_segment_size(self, size: Union[str, int]) -> "NewTopic": + """Set the specified segment size""" + if isinstance(size, int): + return replace(self, segment_size=size) + + parsed_size = parse_size(size, True) + return replace(self, segment_size=parsed_size) + + def with_max_partition_size(self, size: Union[str, int]) -> "NewTopic": + """Set the specified max partition size""" + if isinstance(size, int): + return replace(self, max_partition_size=size) + + parsed_size = parse_size(size, True) + return replace(self, max_partition_size=parsed_size) + + def as_system_topic(self, is_system: bool = True) -> "NewTopic": + """Set the topic as an internal system topic""" + return replace(self, system=is_system) + + def build(self) -> TopicSpec: + """Build the TopicSpec based on the current configuration""" + # Similar implementation to the original build method + if self.mode == TopicMode.ASSIGNED and self.replica_assignment: + spec = TopicSpec.new_assigned(self.replica_assignment) + elif self.mode == TopicMode.MIRROR: + spec = TopicSpec.new_mirror() + else: + spec = TopicSpec.new_computed( + self.partitions, self.replications, self.ignore_rack + ) + + spec.set_system(self.system) + + if self.retention_time is not None: + spec.set_retention_time(self.retention_time) + + if self.max_partition_size is not None or self.segment_size is not None: + spec.set_storage(self.max_partition_size, self.segment_size) + + if self.compression_type is not None: + spec.set_compression_type(self.compression_type.value) + + return spec diff --git a/fluvio/specs.py b/fluvio/specs.py index 244c490..cca287c 100644 --- a/fluvio/specs.py +++ b/fluvio/specs.py @@ -32,14 +32,36 @@ class TopicSpec: def __init__(self, inner: _TopicSpec): self._inner = inner + @classmethod + def new(cls): + return cls(_TopicSpec.new_computed(1, 1, True)) + @classmethod def new_assigned(cls, partition_maps: typing.List[PartitionMap]): partition_maps = [x._inner for x in partition_maps] - return cls(_TopicSpec.new_computed(partition_maps)) + return cls(_TopicSpec.new_assigned(partition_maps)) + + @classmethod + def new_computed(cls, partitions: int, replications: int, ignore: bool): + return cls(_TopicSpec.new_computed(partitions, replications, ignore)) @classmethod - def new_computed(cls, partitions: int, replication: int, ignore: bool): - return cls(_TopicSpec.new_computed(partitions, replication, ignore)) + def new_mirror(cls): + return cls(_TopicSpec.new_mirror()) + + def set_storage( + self, partition_size: typing.Optional[int], segment: typing.Optional[int] + ): + self._inner.set_storage(partition_size, segment) + + def set_system(self, system: bool): + self._inner.set_system(system) + + def set_retention_time(self, retention: int): + self._inner.set_retation_time(retention) + + def set_compression_type(self, compression: str): + self._inner.set_compression_type(compression) class CommonCreateRequest: diff --git a/fluvio/utils.py b/fluvio/utils.py new file mode 100644 index 0000000..e404ac2 --- /dev/null +++ b/fluvio/utils.py @@ -0,0 +1,84 @@ +from typing import Optional, Union +import re + + +def parse_time_to_seconds(time_str: Union[str, int, None]) -> Optional[int]: + """ + Convert time string to seconds. + Supports formats like: + - '1d' (1 day) + - '12h' (12 hours) + - '30m' (30 minutes) + - '3600' (direct seconds) + - None + """ + if time_str is None: + return None + + if isinstance(time_str, int): + return time_str + + # Remove whitespace + time_str = time_str.strip().lower() + + # Direct integer seconds + if time_str.isdigit(): + return int(time_str) + + # Pattern to match number + unit + match = re.match(r"^(\d+)([dhms])$", time_str) + if not match: + raise ValueError(f"Invalid time format: {time_str}") + + value = int(match.group(1)) + unit = match.group(2) + + multipliers = { + "d": 86400, # days + "h": 3600, # hours + "m": 60, # minutes + "s": 1, # seconds + } + + return value * multipliers[unit] + + +def parse_byte_size(size_str: Union[str, int, None]) -> Optional[int]: + """ + Convert byte size string to integer bytes. + Supports formats like: + - '1G' (1 Gigabyte) + - '500M' (500 Megabytes) + - '1024K' (1024 Kilobytes) + - '1024' (direct bytes) + - None + """ + if size_str is None: + return None + + if isinstance(size_str, int): + return size_str + + # Remove whitespace + size_str = size_str.strip().upper() + + # Direct integer bytes + if size_str.isdigit(): + return int(size_str) + + # Pattern to match number + unit + match = re.match(r"^(\d+)([KMGT])$", size_str) + if not match: + raise ValueError(f"Invalid size format: {size_str}") + + value = int(match.group(1)) + unit = match.group(2) + + multipliers = { + "K": 1024, # Kilobytes + "M": 1024 * 1024, # Megabytes + "G": 1024 * 1024 * 1024, # Gigabytes + "T": 1024 * 1024 * 1024 * 1024, # Terabytes + } + + return value * multipliers[unit] diff --git a/integration-tests/test_fluvio_python.py b/integration-tests/test_fluvio_python.py index 8c92b46..85d9496 100644 --- a/integration-tests/test_fluvio_python.py +++ b/integration-tests/test_fluvio_python.py @@ -7,7 +7,7 @@ from string import ascii_lowercase from fluvio import Fluvio, Offset, ConsumerConfig, SmartModuleKind, FluvioConfig -from fluvio import FluvioAdmin, TopicSpec +from fluvio import FluvioAdmin, TopicSpec, NewTopic def create_smartmodule(sm_name, sm_path): @@ -706,10 +706,17 @@ def setUp(self): def test_admin_topic(self): fluvio_admin = FluvioAdmin.connect() - topic_spec = TopicSpec.new_computed(3, 1, False) # create topic - fluvio_admin.create_topic_spec(self.topic, False, topic_spec) + topic_spec = ( + NewTopic.create() + .with_max_partition_size("1Gb") + .with_retention_time(3600) + .with_segment_size("10M") + .build() + ) + print(topic_spec) + fluvio_admin.create_topic(self.topic, topic_spec) # watch topic stream = fluvio_admin.watch_topic() diff --git a/requirements.txt b/requirements.txt index 0791429..e0419ec 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,3 +7,4 @@ black==24.8.0 semantic-version==2.10.0 setuptools-rust==1.10.1 toml==0.10.2 +humanfriendly==10.0 diff --git a/setup.py b/setup.py index 47668b6..404df0c 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,9 @@ from setuptools import setup, find_namespace_packages from setuptools_rust import Binding, RustExtension, Strip +with open('requirements.txt') as f: + requirements = f.read().splitlines() + setup( name="fluvio", version="0.17.0", @@ -43,6 +46,7 @@ ) ], packages=["fluvio"], + install_requires=requirements, # rust extensions are not zip safe, just like C-extensions. zip_safe=False, ) diff --git a/src/lib.rs b/src/lib.rs index f66ed74..d900e42 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,8 +35,11 @@ use fluvio_sc_schema::objects::{ MetadataUpdate as NativeMetadataUpdate, WatchResponse as NativeWatchResponse, }; use fluvio_sc_schema::smartmodule::SmartModuleSpec as NativeSmartModuleSpec; -use fluvio_sc_schema::topic::PartitionMaps as NativePartitionMaps; -use fluvio_types::PartitionId; +use fluvio_sc_schema::topic::{ + CleanupPolicy, CompressionAlgorithm, Deduplication, HomeMirrorConfig, MirrorConfig, + PartitionMaps as NativePartitionMaps, ReplicaSpec, SegmentBasedPolicy, TopicStorageConfig, +}; +use fluvio_types::{compression, PartitionId}; use fluvio_types::{ IgnoreRackAssignment as NativeIgnoreRackAssignment, PartitionCount as NativePartitionCount, PartitionId as NativePartitionId, ReplicationFactor as NativeReplicationFactor, @@ -53,7 +56,6 @@ use tracing::info; use url::Host; mod cloud; -// use crate::error::FluvioError; mod consumer; mod error; mod produce_output; @@ -1111,6 +1113,58 @@ impl TopicSpec { inner: NativeTopicSpec::new_computed(partitions, replications, ignore), } } + + #[staticmethod] + pub fn new_mirror() -> TopicSpec { + let mut home_mirror = HomeMirrorConfig::from(vec![]); + //TODO: when update to 0.13.0 + //home_mirror.source = home_to_remote; + let mirror_map = MirrorConfig::Home(home_mirror); + TopicSpec { + inner: NativeTopicSpec::new_mirror(mirror_map), + } + } + + pub fn set_system(&mut self, system: bool) { + self.inner.set_system(system); + } + + pub fn set_retation_time(&mut self, time_in_seconds: i64) { + self.inner + .set_cleanup_policy(CleanupPolicy::Segment(SegmentBasedPolicy { + time_in_seconds: time_in_seconds as u32, + })); + } + + pub fn set_storage(&mut self, max_partition_size: Option, segment_size: Option) { + let mut storage = TopicStorageConfig::default(); + + if let Some(segment_size) = segment_size { + storage.segment_size = Some(segment_size as u32); + } + + if let Some(max_partition_size) = max_partition_size { + storage.max_partition_size = Some(max_partition_size as u64); + } + + self.inner.set_storage(storage); + } + + pub fn set_compression_type(&mut self, compression: String) { + if compression == "none" { + self.inner.set_compression_type(CompressionAlgorithm::None); + } else if compression == "gzip" { + self.inner.set_compression_type(CompressionAlgorithm::Gzip); + } else if compression == "snappy" { + self.inner + .set_compression_type(CompressionAlgorithm::Snappy); + } else if compression == "lz4" { + self.inner.set_compression_type(CompressionAlgorithm::Lz4); + } else if compression == "zstd" { + self.inner.set_compression_type(CompressionAlgorithm::Zstd); + } + self.inner.set_compression_type(CompressionAlgorithm::Any); + } } fn into_native_partition_maps(maps: Vec) -> Vec { diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_new_topic.py b/tests/test_new_topic.py new file mode 100644 index 0000000..6477eee --- /dev/null +++ b/tests/test_new_topic.py @@ -0,0 +1,160 @@ +import unittest + +from fluvio import NewTopic, TopicMode, CompressionType + +class TestNewTopic(unittest.TestCase): + def test_default_creation(self): + """Test default topic creation""" + topic = NewTopic.create() + + self.assertEqual(topic.mode, TopicMode.COMPUTED) + self.assertEqual(topic.partitions, 1) + self.assertEqual(topic.replications, 1) + self.assertFalse(topic.ignore_rack) + self.assertIsNone(topic.replica_assignment) + self.assertIsNone(topic.retention_time) + self.assertIsNone(topic.segment_size) + self.assertIsNone(topic.compression_type) + self.assertIsNone(topic.max_partition_size) + self.assertFalse(topic.system) + + def test_topic_immutability(self): + """Ensure topic instances are immutable""" + topic = NewTopic.create() + + # These should create new instances, not modify the original + topic2 = topic.with_partitions(3) + topic3 = topic.with_replication(2) + + self.assertIsNot(topic, topic2) + self.assertIsNot(topic, topic3) + self.assertEqual(topic.partitions, 1) # Original remains unchanged + self.assertEqual(topic2.partitions, 3) + self.assertEqual(topic3.replications, 2) + + def test_topic_with_methods(self): + """Test all with_* methods create correct instances""" + topic = (NewTopic.create() + .with_partitions(3) + .with_replication(2) + .with_ignore_rack() + .with_retention_time(86400) + .with_segment_size(1024) + .with_compression(CompressionType.GZIP) + .with_max_partition_size(2048) + .as_system_topic() + ) + + self.assertEqual(topic.partitions, 3) + self.assertEqual(topic.replications, 2) + self.assertTrue(topic.ignore_rack) + self.assertEqual(topic.retention_time, 86400) + self.assertEqual(topic.segment_size, 1024) + self.assertEqual(topic.compression_type, CompressionType.GZIP) + self.assertEqual(topic.max_partition_size, 2048) + self.assertTrue(topic.system) + + def test_system_topic(self): + """Test system topic configuration""" + # Default not system + default_topic = NewTopic.create() + self.assertFalse(default_topic.system) + + # Explicitly set as system + system_topic = default_topic.as_system_topic() + self.assertTrue(system_topic.system) + + # Optional parameter for non-system + non_system_topic = default_topic.as_system_topic(False) + self.assertFalse(non_system_topic.system) + + def test_compression_types(self): + """Test all compression types can be set""" + for compression in CompressionType: + topic = NewTopic.create().with_compression(compression) + self.assertEqual(topic.compression_type, compression) + + def test_chaining_multiple_configurations(self): + """Test complex chaining of configuration methods""" + topic = (NewTopic.create() + .with_partitions(5) + .with_replication(3) + .as_mirror_topic() + .with_retention_time(172800) # 2 days + .as_system_topic() + ) + + self.assertEqual(topic.mode, TopicMode.MIRROR) + self.assertEqual(topic.partitions, 5) + self.assertEqual(topic.replications, 3) + self.assertEqual(topic.retention_time, 172800) + self.assertTrue(topic.system) + + def test_with_time_and_size_parsing(self): + """Test creating topics with parsed time and size""" + topic = (NewTopic.create() + .with_retention_time('1d') + .with_segment_size('500M') + .with_max_partition_size('1G') + ) + + self.assertEqual(topic.retention_time, 86400) # 1 day in seconds + self.assertEqual(topic.segment_size, 500 * 1024 * 1024) # 500 MB in bytes + self.assertEqual(topic.max_partition_size, 1024 * 1024 * 1024) # 1 GB in bytes + + topic = (NewTopic.create() + .with_retention_time('2m') + .with_segment_size('500mb') + .with_max_partition_size('1gb') + ) + + self.assertEqual(topic.retention_time, 2 * 60) # 2 minutes in seconds + self.assertEqual(topic.segment_size, 500 * 1024 * 1024) + self.assertEqual(topic.max_partition_size, 1024 * 1024 * 1024) + + topic = (NewTopic.create() + .with_retention_time('3h') + .with_segment_size('500MiB') + .with_max_partition_size('1GiB') + ) + + self.assertEqual(topic.retention_time, 3 * 3600) # 3 hours in seconds + self.assertEqual(topic.segment_size, 500 * 1024 * 1024) + self.assertEqual(topic.max_partition_size, 1024 * 1024 * 1024) + + + topic = (NewTopic.create() + .with_retention_time('4s') + .with_segment_size('500MB') + .with_max_partition_size('1GB') + ) + + self.assertEqual(topic.retention_time, 4) # 4 seconds + self.assertEqual(topic.segment_size, 500 * 1024 * 1024) + self.assertEqual(topic.max_partition_size, 1024 * 1024 * 1024) + + + def test_with_numeric_inputs(self): + """Test creating topics with numeric inputs""" + topic = (NewTopic.create() + .with_retention_time(3600) + .with_segment_size(1024 * 1024) + .with_max_partition_size(2 * 1024 * 1024 * 1024) + ) + + # Verify numeric inputs + self.assertEqual(topic.retention_time, 3600) + self.assertEqual(topic.segment_size, 1024 * 1024) + self.assertEqual(topic.max_partition_size, 2 * 1024 * 1024 * 1024) + + def test_invalid_configurations(self): + """Test handling of potentially invalid configurations""" + with self.assertRaises(ValueError): + NewTopic.create().with_partitions(-1) + + with self.assertRaises(ValueError): + NewTopic.create().with_replication(-1) + + +if __name__ == '__main__': + unittest.main()