Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DocumentDB support #706

Merged
merged 27 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
00d166c
insert event draft
morgan1371 Sep 22, 2023
fd9d60f
abstract change event
morgan1371 Sep 23, 2023
c85df32
Added documentdb delete event
EduardoRSO Sep 23, 2023
55bce52
Added support to change event drop
EduardoRSO Sep 26, 2023
04b46b5
Added support to dropDatabase Event
EduardoRSO Oct 2, 2023
d921903
Merge branch 'awslabs:main' into feature/documentdb-insert-support
nichmorgan Oct 3, 2023
3e1a07b
- MongoDB v6.0 Change Event Fields removed
morgan1371 Oct 3, 2023
79370a3
Merge branch 'feature/documentdb-insert-support' into feature/documen…
morgan1371 Oct 3, 2023
9e63dc1
replace event support
nichmorgan Oct 9, 2023
c76c9d8
added support to invalidate event in documentdb
lucabarcelos Oct 9, 2023
2e1d493
Adding DocumentDB Rename event.
ViniciusBrisotti Oct 10, 2023
3c579b1
run cargo fmt
lucabarcelos Oct 10, 2023
1e00076
Excluding 'to' parameter
ViniciusBrisotti Oct 10, 2023
093136c
Add DocumentDB Update event
Kamorst Oct 10, 2023
81333df
fixed 'to' parameter and run cargo fmt
lucabarcelos Oct 11, 2023
d2a50a5
Refactoring 'Rename' event declaration as a single type not a commum …
ViniciusBrisotti Oct 11, 2023
dbc86b0
InsertNs renamed to DatabaseCollection for code reuse
nichmorgan Oct 12, 2023
423f196
unused field removed
nichmorgan Oct 12, 2023
c385aad
cfg fix
nichmorgan Oct 12, 2023
212b4c1
Merge branch 'feature/documentdb-support' into feature/documentdb-upd…
Oct 13, 2023
3b56c7f
Merge branch 'feature/documentdb-support' into feature/documentdb-inv…
Oct 13, 2023
9239d27
Merge branch 'feature/documentdb-support' into origin/feature/documen…
Oct 13, 2023
f81c573
fix lines
nichmorgan Oct 18, 2023
8b03bf3
Merge branch 'main' into feature/documentdb-support
nichmorgan Oct 18, 2023
6900c7c
fmt and makefile fixed
nichmorgan Oct 19, 2023
ddd61ee
Merge branch 'main' into feature/documentdb-support
nichmorgan Oct 19, 2023
26313f5
makefile reord
nichmorgan Oct 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lambda-events/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ default = [
"sns",
"sqs",
"streams",
"documentdb",
]

activemq = []
Expand Down Expand Up @@ -117,3 +118,4 @@ ses = ["chrono"]
sns = ["chrono", "serde_with"]
sqs = ["serde_with"]
streams = []
documentdb = []
44 changes: 44 additions & 0 deletions lambda-events/src/event/documentdb/events/commom_types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use serde_json::Value;

pub type AnyDocument = HashMap<String, Value>;

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DatabaseCollection {
db: String,
#[serde(default)]
coll: Option<String>,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct DocumentId {
#[serde(rename = "_data")]
pub data: String,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct DocumentKeyIdOid {
#[serde(rename = "$oid")]
pub oid: String,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct DocumentKeyId {
#[serde(rename = "_id")]
pub id: DocumentKeyIdOid,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct InnerTimestamp {
t: usize,
i: usize,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct Timestamp {
#[serde(rename = "$timestamp")]
pub timestamp: InnerTimestamp,
}
20 changes: 20 additions & 0 deletions lambda-events/src/event/documentdb/events/delete_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, DocumentKeyId, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeDeleteEvent {
#[serde(rename = "_id")]
id: DocumentId,
#[serde(default)]
cluster_time: Option<Timestamp>,
document_key: DocumentKeyId,
#[serde(default)]
#[serde(rename = "lsid")]
ls_id: Option<AnyDocument>,
ns: DatabaseCollection,
// operation_type: String,
#[serde(default)]
txn_number: Option<String>,
}
18 changes: 18 additions & 0 deletions lambda-events/src/event/documentdb/events/drop_database_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{DocumentId, Timestamp, DatabaseCollection, AnyDocument};


#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeDropDatabaseEvent {
#[serde(rename = "_id")]
id: DocumentId,
cluster_time: Timestamp,
#[serde(rename = "lsid")]
ls_id: Option<AnyDocument>,
ns: DatabaseCollection,
// operation_type: String,
#[serde(default)]
txn_number: Option<String>,
}
17 changes: 17 additions & 0 deletions lambda-events/src/event/documentdb/events/drop_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, Timestamp};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeDropEvent {
#[serde(rename = "_id")]
id: DocumentId,
cluster_time: Timestamp,
#[serde(default)]
#[serde(rename = "lsid")]
ls_id: Option<AnyDocument>,
ns: DatabaseCollection,
// operation_type: String,
#[serde(default)]
txn_number: Option<String>,
}
21 changes: 21 additions & 0 deletions lambda-events/src/event/documentdb/events/insert_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, DocumentKeyId, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]

pub struct ChangeInsertEvent {
#[serde(rename = "_id")]
id: DocumentId,
#[serde(default)]
cluster_time: Option<Timestamp>,
document_key: DocumentKeyId,
#[serde(default)]
#[serde(rename = "lsid")]
ls_id: Option<String>,
ns: DatabaseCollection,
//operation_type: String,
#[serde(default)]
txn_number: Option<AnyDocument>,
}
13 changes: 13 additions & 0 deletions lambda-events/src/event/documentdb/events/invalidate_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{DocumentId, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeInvalidateEvent {
#[serde(rename = "_id")]
id: DocumentId,
#[serde(default)]
cluster_time: Option<Timestamp>,
// operation_type: String,
}
9 changes: 9 additions & 0 deletions lambda-events/src/event/documentdb/events/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
pub mod commom_types;
pub mod delete_event;
pub mod drop_event;
pub mod drop_database_event;
pub mod insert_event;
pub mod invalidate_event;
pub mod rename_event;
pub mod replace_event;
pub mod update_event;
21 changes: 21 additions & 0 deletions lambda-events/src/event/documentdb/events/rename_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeRenameEvent {
#[serde(rename = "_id")]
id: DocumentId,
#[serde(default)]
cluster_time: Option<Timestamp>,

#[serde(default)]
#[serde(rename = "lsid")]
ls_id: Option<AnyDocument>,
ns: DatabaseCollection,
//operation_type: String,
#[serde(default)]
txn_number: Option<String>,
to: DatabaseCollection,
}
20 changes: 20 additions & 0 deletions lambda-events/src/event/documentdb/events/replace_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{AnyDocument, DatabaseCollection, DocumentId, DocumentKeyId, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeReplaceEvent {
#[serde(rename = "_id")]
id: DocumentId,
#[serde(default)]
cluster_time: Option<Timestamp>,
document_key: DocumentKeyId,
#[serde(default)]
#[serde(rename = "lsid")]
ls_id: Option<String>,
ns: DatabaseCollection,
// operation_type: String,
#[serde(default)]
txn_number: Option<AnyDocument>,
}
19 changes: 19 additions & 0 deletions lambda-events/src/event/documentdb/events/update_event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use serde::{Deserialize, Serialize};

use super::commom_types::{AnyDocument, DocumentId, DocumentKeyId, DatabaseCollection, Timestamp};

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChangeUpdateEvent {
#[serde(rename = "_id")]
id: DocumentId,
#[serde(default)]
cluster_time: Option<Timestamp>,
document_key: DocumentKeyId,
#[serde(rename = "lsid")]
ls_id: Option<String>,
ns: DatabaseCollection,
// operation_type: String,
#[serde(default)]
txn_number: Option<AnyDocument>,
}
97 changes: 97 additions & 0 deletions lambda-events/src/event/documentdb/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
pub mod events;

use self::events::{
delete_event::ChangeDeleteEvent,
drop_event::ChangeDropEvent,
insert_event::ChangeInsertEvent,
invalidate_event::ChangeInvalidateEvent,
replace_event::ChangeReplaceEvent,
update_event::ChangeUpdateEvent,
rename_event::ChangeRenameEvent,
drop_database_event::ChangeDropDatabaseEvent,
};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(tag = "operationType", rename_all = "camelCase")]
pub enum ChangeEvent {
Insert(ChangeInsertEvent),
Delete(ChangeDeleteEvent),
Drop(ChangeDropEvent),
DropDatabase(ChangeDropDatabaseEvent),
Invalidate(ChangeInvalidateEvent),
Replace(ChangeReplaceEvent),
Update(ChangeUpdateEvent),
Rename(ChangeRenameEvent),
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct DocumentDbInnerEvent {
pub event: ChangeEvent,
}

#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DocumentDbEvent {
#[serde(default)]
pub event_source_arn: Option<String>,
pub events: Vec<DocumentDbInnerEvent>,
#[serde(default)]
pub event_source: Option<String>,
}

#[cfg(test)]
#[cfg(feature = "documentdb")]
mod test {
use super::*;

pub type Event = DocumentDbEvent;

fn test_example(data: &[u8]) {
let parsed: Event = serde_json::from_slice(data).unwrap();
let output: String = serde_json::to_string(&parsed).unwrap();
let reparsed: Event = serde_json::from_slice(output.as_bytes()).unwrap();

assert_eq!(parsed, reparsed);
}

#[test]
fn example_documentdb_insert_event() {
test_example(include_bytes!( "../../fixtures/example-documentdb-insert-event.json"));
}

#[test]
fn example_documentdb_delete_event() {
test_example(include_bytes!("../../fixtures/example-documentdb-delete-event.json"));
}

#[test]
fn example_documentdb_drop_event() {
test_example(include_bytes!("../../fixtures/example-documentdb-drop-event.json"));
}

#[test]
fn example_documentdb_replace_event() {
test_example(include_bytes!("../../fixtures/example-documentdb-replace-event.json"));
}

#[test]
fn example_documentdb_update_event() {
test_example(include_bytes!("../../fixtures/example-documentdb-update-event.json"));
}

#[test]
fn example_documentdb_rename_event() {
test_example(include_bytes!("../../fixtures/example-documentdb-rename-event.json"));
}

#[test]
fn example_documentdb_invalidate_event() {
test_example(include_bytes!("../../fixtures/example-documentdb-invalidate-event.json"));
}

#[test]
fn example_documentdb_drop_database_event(){
test_example(include_bytes!("../../fixtures/example-documentdb-drop-database-event.json"));
}
}
Loading
Loading