-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Built-in Pulsar source #2237
Conversation
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #2237 +/- ##
==========================================
+ Coverage 63.91% 63.95% +0.03%
==========================================
Files 338 342 +4
Lines 41085 41203 +118
==========================================
+ Hits 26259 26350 +91
- Misses 13756 13780 +24
- Partials 1070 1073 +3 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
rust/numaflow-core/src/message.rs
Outdated
impl TryFrom<PulsarMessage> for Message { | ||
type Error = Error; | ||
|
||
fn try_from(message: PulsarMessage) -> Result<Self> { | ||
let offset = Offset::Int(IntOffset::new(message.offset, 1)); // FIXME: partition id | ||
|
||
Ok(Message { | ||
keys: vec![message.key], | ||
value: message.payload, | ||
offset: Some(offset.clone()), | ||
event_time: message.event_time, | ||
id: MessageID { | ||
vertex_name: get_vertex_name().to_string(), | ||
offset: offset.to_string(), | ||
index: 0, | ||
}, | ||
headers: message.headers, | ||
}) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this to source/pulsar.rs
type PulsarSource struct { | ||
ServerAddr string `json:"serverAddr" protobuf:"bytes,1,name=server_addr"` | ||
Topic string `json:"topic" protobuf:"bytes,2,name=topic"` | ||
ConsumerName string `json:"consumerName" protobuf:"bytes,3,name=consumerName"` | ||
SubscriptionName string `json:"subscriptionName" protobuf:"bytes,4,name=subscriptionName"` | ||
MaxUnack uint32 `json:"maxUnack,omitempty" protobuf:"bytes,5,opt,name=maxUnack"` | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add TLS and Auth too, if you prefer you can do that in a follow up PR.
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
if source.udsource.is_some() { | ||
return Ok(SourceType::UserDefined(UserDefinedConfig::default())); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if source.udsource.is_some() { | |
return Ok(SourceType::UserDefined(UserDefinedConfig::default())); | |
} | |
if source.udsource.is_some().take() { | |
return Ok(SourceType::UserDefined(UserDefinedConfig::default())); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Option.take()
returns the inner value as an option. We can't use it in if
statement.
rust/numaflow-core/src/error.rs
Outdated
impl From<numaflow_pulsar::Error> for Error { | ||
fn from(value: numaflow_pulsar::Error) -> Self { | ||
match value { | ||
numaflow_pulsar::Error::Pulsar(e) => Error::Source(e.to_string()), | ||
numaflow_pulsar::Error::UnknownOffset(_) => Error::Source(value.to_string()), | ||
numaflow_pulsar::Error::AckPendingExceeded(pending) => { | ||
Error::AckPendingExceeded(pending) | ||
} | ||
numaflow_pulsar::Error::ActorTaskTerminated(_) => { | ||
Error::ActorPatternRecv(value.to_string()) | ||
} | ||
numaflow_pulsar::Error::Other(e) => Error::Source(e), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this to `source/pulsar.rs?
rust/numaflow-core/src/message.rs
Outdated
pub offset: u64, | ||
pub partition_idx: u16, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub offset: u64, | |
pub partition_idx: u16, | |
pub(crate) offset: u64, | |
pub(crate) partition_idx: u16, |
@@ -207,6 +207,41 @@ pub(crate) async fn create_sink_handle( | |||
} | |||
} | |||
|
|||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove this?
@@ -14,6 +14,7 @@ pub(crate) mod user_defined; | |||
/// | |||
/// [Generator]: https://numaflow.numaproj.io/user-guide/sources/generator/ | |||
pub(crate) mod generator; | |||
pub(crate) mod pulsar; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any value in committing this file? /cc @whynowy
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Sreekanth <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add metrics in a follow up PR
Signed-off-by: Vigith Maurice <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]> Signed-off-by: Sreekanth <[email protected]> Co-authored-by: Vigith Maurice <[email protected]>
Status: Tested by running a Pipeline and a Monovertex with Pulsar as built-in source