The MongoDB connector is one of Conduit plugins. It provides both, a source and a destination MongoDB connector.
- Go 1.23+
- MongoDB replica set ( at least single-node) or sharded cluster with WiredTiger storage engine
- Docker
- (optional) golangci-lint v1.55.2
Run make build
.
Run make install-tools
to install all the required tools.
Run make test
to run all the units and make test-integration
to run all the
integration tests, which require Docker to be installed and running. The command
will handle starting and stopping docker container for you.
The MongoDB Source Connector connects to a MongoDB with the provided uri
, db
and collection
and starts creating records for each change detected in a
collection.
Upon starting, the Source takes a snapshot of a given collection in the database, then switches into CDC mode. In CDC mode, the plugin reads events from a Change Stream. In order for this to work correctly, your MongoDB instance must meet the criteria specified on the official website.
When the connector first starts, snapshot mode is enabled. The connector reads
all rows of a collection in batches using
a cursor-based
pagination,
limiting the rows by batchSize
. The connector stores the last processed
element value of an orderingColumn
in a position, so the snapshot process can
be paused and resumed without losing data. Once all rows in that initial
snapshot are read the connector switches into CDC mode.
This behavior is enabled by default, but can be turned off by adding
"snapshot": false
to the Source configuration.
The connector implements CDC features for MongoDB by using a Change Stream that
listens to changes in the configured collection. Every detected change is
converted into a record and returned in the call to Read
. If there is no
available record when Read
is called, the connector returns
sdk.ErrBackoffRetry
error.
The connector stores a resumeToken
of every Change Stream event in a position,
so the CDC process is resumble.
Warning
Azure CosmosDB for MongoDB has very limited support for Change Streams, so they cannot be used for CDC. If CDC is not possible, like in the case with CosmosDB, the connector only supports detecting insert operations by polling for new documents.
name | description | required | default |
---|---|---|---|
uri |
The connection string. The URI can contain host names, IPv4/IPv6 literals, or an SRV record. | false | mongodb://localhost:27017 |
db |
The name of a database the connector must work with. | true | |
collection |
The name of a collection the connector must read from. | true | |
auth.username |
The username. | false | |
auth.password |
The user's password. | false | |
auth.db |
The name of a database that contains the user's authentication data. | false | admin |
auth.mechanism |
The authentication mechanism. The available values are SCRAM-SHA-256 , SCRAM-SHA-1 , MONGODB-CR , MONGODB-AWS , MONGODB-X509 . |
false | The default mechanism that defined depending on your MongoDB server version. |
auth.tls.caFile |
The path to either a single or a bundle of certificate authorities to trust when making a TLS connection. | false | |
auth.tls.certificateKeyFile |
The path to the client certificate file or the client private key file. | false | |
batchSize |
The size of a document batch. | false | 1000 |
snapshot |
The field determines whether or not the connector will take a snapshot of the entire collection before starting CDC mode. | false | true |
orderingField |
The name of a field that is used for ordering collection documents when capturing a snapshot. | false | _id |
The connector always uses the _id
field as a key.
If the _id
field is bson.ObjectID
the connector converts it to a string when
transferring a record to a destination, otherwise, it leaves it unchanged.
The MongoDB Destination takes a opencdc.Record
and parses it into a valid
MongoDB query. The Destination is designed to handle different payloads and
keys. Because of this, each record is individually parsed and written.
If a record contains a mongo.collection
property in its metadata it will be
written in that collection, otherwise it will fall back to use the collection
configured in the connector. Thus, a Destination can support multiple
collections in the same connector, as long as the user has proper access to
those collections.
name | description | required | default |
---|---|---|---|
uri |
The connection string. The URI can contain host names, IPv4/IPv6 literals, or an SRV record. | false | mongodb://localhost:27017 |
db |
The name of a database the connector must work with. | true | |
collection |
The name of a collection the connector must write to. | true | |
auth.username |
The username. | false | |
auth.password |
The user's password. | false | |
auth.db |
The name of a database that contains the user's authentication data. | false | admin |
auth.mechanism |
The authentication mechanism. The available values are SCRAM-SHA-256 , SCRAM-SHA-1 , MONGODB-CR , MONGODB-AWS , MONGODB-X509 . |
false | The default mechanism that defined depending on your MongoDB server version. |
auth.tls.caFile |
The path to either a single or a bundle of certificate authorities to trust when making a TLS connection. | false | |
auth.tls.certificateKeyFile |
The path to the client certificate file or the client private key file. | false |
The connector uses all keys from an opencdc.Record
when updating and deleting
documents.
If the _id
field can be converted to a bson.ObjectID
, the connector converts
it, otherwise, it uses it as it is.