6 releases
0.1.5 | Feb 3, 2024 |
---|---|
0.1.4 | Dec 22, 2023 |
#222 in Database interfaces
42 downloads per month
5MB
3K
SLoC
mongor
Rust based ODM for MongoDB, built on-top of the official mongodb
crate. Provides an ergonomic interface to interact with the underlying driver, alongside a simplified testing framework to make writing tests suites more ergonomic as well.
Table of Contents
- Requirements
- Installation
- Documentation
- Usage
- Bug Reports
- Feature Requests
- Contributing
- Project Status
- License
Requirements
-
MongoDB deployment (version 3.6+):
-
Rust (version 1.6+):
Installation
cargo add mongor
Documentation
This README provides a general overview, and practical examples, but does not go over all methods available. Full crate documentation can be found here at docs.rs
Usage
Overview
While the crate exposes the core
module, the main interface designed for this crate is the Model<D>
struct. This struct controls a generic D
that represents the structure of the document you wish for the model to control. In the examples below I will demonstrate the general pattern to using a model, and also some more ergonomic ways to perform atomic transactions and building a test suite.
Basic example
use mongor::{
core::find::FindManyCursor,
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
results::*,
Client, Collection,
},
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client_options = ClientOptions::parse("mongodb://localhost:27017/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let collection: Collection<Shark> = db.collection("shark");
let model: Model<Shark> = Model::from(collection);
// Applying an index
model
.apply_unique_index(doc! { "species": 1 }, Some("species_unique"), None, None)
.await?;
// Inserting a document
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let document_oid: ObjectId = model.insert_one(new_shark, None, None).await?;
// Updating a document
let update_result: UpdateResult = model
.update_one_by_oid(
&document_oid,
doc! {
"$inc": {
"sightings": 1,
},
},
None,
None,
)
.await?;
// Finding a document
let shark: Option<Shark> = model
.find_one_by_field_value("sightings", 1, None, None)
.await?;
println!("{:?}", shark);
// Log example:
// ```
// Some(Shark {
// oid: Some(ObjectId("65712bdfb1fb166eb8cce7e5")),
// name: "Whale Shark",
// species: "Rhincodon typus",
// sightings: 1,
// })
// ```
// Deleting a document
let delete_result: DeleteResult = model
.delete_one(
doc! {
"_id": &document_oid,
},
None,
None,
)
.await?;
// Inserting many
let sharks = vec![
Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
},
Shark {
oid: None,
name: "Great White".to_string(),
species: "Carcharodon carcharias".to_string(),
sightings: 0,
},
];
let document_oids: Vec<ObjectId> = model.insert_many(sharks, None, None).await?;
// Cursor-ing via a `findMany` query
let options = FindOptions::builder().sort(doc! { "name": -1 }).build();
let mut cursor: FindManyCursor<Shark> = model
.find_many(doc! { "sightings": 0 }, Some(options), None)
.await?;
while let Some(shark) = cursor.next().await? {
// shark: Shark
}
// Updating many
let update_result: UpdateResult = model
.update_many(
doc! { "sightings": 0 },
doc! { "$inc": { "sightings": 5 }},
None,
None,
)
.await?;
// Deleting many
let delete_result: DeleteResult = model.delete_many(doc! {}, None, None).await?;
db.drop(None).await?;
Ok(())
}
Sessions
Official References:
https://www.mongodb.com/docs/manual/reference/method/Session/
https://www.mongodb.com/docs/manual/replication/
Overview
Please note, to use sessions and transactions with MongoDB, currently you require a replica set deployment. Setting this up is very simple, as you just need to run multiple mongod
(MongoDB server) instances in a replica setup.
You can start such a set via a method in the shell called rs.initiate()
. Here is an example of how to setup a replica set with three nodes.
First, connect to a MongoDB instance on the main port: say 27017
:
mongo --port 27017
Then, call rs.initiate()
to start the replica set with the specified members:
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "localhost:27017" },
{ _id: 1, host: "localhost:27018" },
{ _id: 2, host: "localhost:27019" }
]
});
Then you need to setup separate data directories for each set:
mkdir ~/data/mongo_rp_1 &&
mkdir ~/data/mongo_rp_2 &&
mkdir ~/data/mongo_rp_3
Then, you can start three separate instances to deploy the replica set under the configured hosts:
sudo mongod --port 27017 --dbpath ~/data/mongo_rp_1 --replSet rs0
---
sudo mongod --port 27018 --dbpath ~/data/mongo_rp_2 --replSet rs0
---
sudo mongod --port 27019 --dbpath ~/data/mongo_rp_3 --replSet rs0
Now, you have a replica set deployed under the name rs0
with three nodes.
Basic Example
use mongor::{
core::{
find::FindManyCursor,
session::{commit_transaction, start_session, start_transaction},
},
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
results::*,
Client, Collection,
},
};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Note: We pass all three of our nodes hosts to the client options
let client_options =
ClientOptions::parse("mongodb://localhost:27017,localhost:27018,localhost:27019/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let collection: Collection<Shark> = db.collection("shark");
let model: Model<Shark> = Model::from(collection);
let mut session = start_session(&client, None).await?;
start_transaction(&mut session, None).await?;
// Inserting a document with a session
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let document_oid: ObjectId = model
.insert_one(new_shark, None, Some(&mut session))
.await?;
commit_transaction(&mut session).await?;
// Cursor-ing via a `findMany` query with a session
let options = FindOptions::builder().sort(doc! { "name": -1 }).build();
let mut cursor: FindManyCursor<Shark> = model
.find_many(doc! { "sightings": 0 }, Some(options), Some(&mut session))
.await?;
while let Some(shark) = cursor.next_with_session(&mut session).await? {
// shark: Shark
}
db.drop(None).await?;
Ok(())
}
Atomic Transactions
A method called run_atomic_transaction
is provided in core::atomic
. This method allows a "short-hand" to run a async closure, and having database operations inside protected under a Session and a single transaction. Upon failure, it will automatically attempt to abort the transaction, and upon no failures, (Closure returning Ok(())
), it will attempt to commit the transaction. Creating a more ergonomic way to structure transactions that must be atomic.
Note: This method uses std::sync::Mutex
in its implementation, I made this decision to keep the crate as agnostic and lightweight as possible, but if the need presents itself, make a PR or an issue to add a feature that enables a version with tokio::sync::Mutex
.
Example:
use mongor::{
core::{atomic::run_atomic_transaction, session::start_session},
error::Error,
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
Client, ClientSession, Collection,
},
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Researcher {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub sighted: Vec<String>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Note: We pass all three of our nodes hosts to the client options
let client_options =
ClientOptions::parse("mongodb://localhost:27017,localhost:27018,localhost:27019/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let shark_collection: Collection<Shark> = db.collection("shark");
let researcher_collection: Collection<Researcher> = db.collection("researcher");
let shark_model: Model<Shark> = Model::from(shark_collection);
let researcher_model: Model<Researcher> = Model::from(researcher_collection);
let session = start_session(&client, None).await?;
let shared_session = Arc::new(Mutex::new(session));
let shark_model_ref = &shark_model;
let researcher_model_ref = &researcher_model;
run_atomic_transaction(
|shared_session: Arc<Mutex<ClientSession>>| async move {
let mut session_lock = shared_session
.lock()
// Note: Here I am using `mongor::error::Error`, but you can use any error
// type that implements `dyn std::error::Error` inside this closure to use `?`
.map_err(|err| Error::SharedSessionLockFailure(err.to_string()))?;
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let new_researcher = Researcher {
oid: None,
name: "John Doe".to_string(),
sighted: vec![],
};
let shark_oid: ObjectId = shark_model_ref
.insert_one(new_shark, None, Some(&mut session_lock))
.await?;
let researcher_oid: ObjectId = researcher_model_ref
.insert_one(new_researcher, None, Some(&mut session_lock))
.await?;
researcher_model_ref
.update_one_by_oid(
&researcher_oid,
doc! {
"$push": {
"sighted": "Whale Shark",
},
},
None,
Some(&mut session_lock),
)
.await?;
shark_model_ref
.update_one_by_oid(
&shark_oid,
doc! {
"$inc": {
"sightings": 1,
},
},
None,
Some(&mut session_lock),
)
.await?;
Ok(())
},
shared_session,
None,
)
.await?;
let shark = shark_model.find_one(doc! {}, None, None).await?;
let researcher = researcher_model.find_one(doc! {}, None, None).await?;
println!("{:?}", shark);
// Example log:
// Some(Shark {
// oid: Some(ObjectId("657226d179aa103ff4cf05ab")),
// name: "Whale Shark",
// species: "Rhincodon typus",
// sightings: 1,
// })
println!("{:?}", researcher);
// Example log:
// Some(Researcher {
// oid: Some(ObjectId("657226d179aa103ff4cf05ac")),
// name: "John Doe",
// sighted: ["Whale Shark"],
// })
db.drop(None).await?;
Ok(())
}
Now, lets modify the closure to purposely fail.. you will notice the operations inside the closure are not performed.
use mongor::{
core::{atomic::run_atomic_transaction, session::start_session},
error::Error,
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
Client, ClientSession, Collection,
},
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Researcher {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub sighted: Vec<String>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Note: We pass all three of our nodes hosts to the client options
let client_options =
ClientOptions::parse("mongodb://localhost:27017,localhost:27018,localhost:27019/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let shark_collection: Collection<Shark> = db.collection("shark");
let researcher_collection: Collection<Researcher> = db.collection("researcher");
let shark_model: Model<Shark> = Model::from(shark_collection);
let researcher_model: Model<Researcher> = Model::from(researcher_collection);
let session = start_session(&client, None).await?;
let shared_session = Arc::new(Mutex::new(session));
let shark_model_ref = &shark_model;
let researcher_model_ref = &researcher_model;
let _ = run_atomic_transaction(
|shared_session: Arc<Mutex<ClientSession>>| async move {
let mut session_lock = shared_session
.lock()
// Note: Here I am using `mongor::error::Error`, but you can use any error
// type that implements `dyn std::error::Error` inside this closure to use `?`
.map_err(|err| Error::SharedSessionLockFailure(err.to_string()))?;
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let new_researcher = Researcher {
oid: None,
name: "John Doe".to_string(),
sighted: vec![],
};
let shark_oid: ObjectId = shark_model_ref
.insert_one(new_shark, None, Some(&mut session_lock))
.await?;
let researcher_oid: ObjectId = researcher_model_ref
.insert_one(new_researcher, None, Some(&mut session_lock))
.await?;
// Purposefully erroring after insertions
Err(Error::internal("some error"))?;
Ok(())
},
shared_session,
None,
)
.await;
let shark = shark_model.find_one(doc! {}, None, None).await?;
let researcher = researcher_model.find_one(doc! {}, None, None).await?;
// Note: the insertions did not occur as the "transaction" as a whole failed.
println!("{:?}", shark);
// Logs:
// `None`
println!("{:?}", researcher);
// Logs:
// `None`
db.drop(None).await?;
Ok(())
}
Testing
Overview
A simple testing framework is provided via the test_db
module. The general use case it to create a new TestDB
per test, which spawns a new Database
associated with a newly generated ObjectId
for its name to allow for easy handling of running a concurrent test suite, which is common with frameworks like tokio
.
Then, a method called TestDB::run_test
is provided to take ownership of this db and run a isolated test closure, in which the Database will be cleaned up(dropped) on success or failure of the test.
Pre-defined assertions are provided in test_db::asset_model::AssertModel
, which wraps a Model<D>
. As well as a module test_db::assert_document::*
which provides general assertions regarding documents. A macro called test_error!
from test_db::test_error
is also provided, that acts like panic!
inside this test closure. The closure accepts a Future<Output = Result<(), Box<dyn std::error::Error>>>
to allow the use of the ?
operator to end the test as well.
Basic Example
use mongor::mongodb::bson::{doc, oid::ObjectId};
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub oid: Option<ObjectId>,
pub name: String,
pub species: String,
pub sightings: usize,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
#[cfg(test)]
mod tests {
use mongor::{
model::Model,
mongodb::{
bson::{doc, oid::ObjectId},
options::ClientOptions,
Client, Collection,
},
test_db::{assert_model::AssertModel, test_error::TestError, TestDB},
test_error,
};
use crate::Shark;
#[tokio::test]
pub async fn example_test() {
let client_options = ClientOptions::parse("mongodb://localhost:27017/")
.await
.unwrap();
let client = Client::with_options(client_options).unwrap();
// Opens a new isolated `Database` named with a `ObjectId`
let test_db = TestDB::new(&client);
let shark_collection: Collection<Shark> = test_db.collection("shark");
let shark_model = Model::from(shark_collection);
// Run isolated test, cleaning up the Database on failure or success
test_db
.run_test(
|| async {
let new_shark = Shark {
oid: None,
name: "Whale Shark".to_string(),
species: "Rhincodon typus".to_string(),
sightings: 0,
};
let oid = shark_model.insert_one(new_shark, None, None).await?;
// Using pre-defined assertions from `test_db::assert_model`,
// which call `test_error!` under the hood
let assert_shark_model = AssertModel::from(&shark_model);
assert_shark_model
.assert_exists(doc! { "name": "Whale Shark" }, None)
.await?
.assert_count(doc! {}, 1, None)
.await?;
// Using the `test_error!` macro from `test_db::test_error`,
// which acts like `panic!` inside the closure
let some_other_oid = ObjectId::new();
if oid == some_other_oid {
test_error!("Inserted shark oid {} == {}", oid, some_other_oid);
}
Ok(())
},
None,
)
.await
.unwrap();
}
}
GridFS
Official References:
https://www.mongodb.com/docs/manual/core/gridfs/
Overview
Using the crate feature grid_fs
, you can expose access to the GridFs
struct, which is a robust interface to the underlying GridFsBucket
. This feature adds a single dependency futures_util
.
A lot of the methods involve the concept of a revision
number. As the name
field on the file document is not unique, we have the ability to have multiple revisions of a file, they can be defined as:
0 = the original stored file
1 = the first revision
2 = the second revision
etc...
-2 = the second most recent revision
-1 = the most recent revision
Methods regarding streaming (uploading and downloading) are implemented in the mongodb
crate via futures_util::io::*
, so if you are using tokio
, you must add a dependency tokio_util
which provides tokio_util::compat::*
. This module will expose compat()
and compat_write()
to convert tokio async read or writers into a Compat<_>
which can directly be used with the futures_util::io::AsyncRead
and futures_util::io::AsyncWrite
.
See the basic example or: https://docs.rs/tokio-util/latest/tokio_util/compat/index.html
for more information.
Basic Example
use futures_util::io::Cursor;
use mongor::{
grid_fs::GridFs,
mongodb::{
bson::{doc, oid::ObjectId},
options::*,
Client,
},
};
use tokio::fs::*;
use tokio_util::compat::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client_options = ClientOptions::parse("mongodb://localhost:27017/").await?;
let client = Client::with_options(client_options)?;
let db = client.database("db_name");
let bucket_options = GridFsBucketOptions::builder()
.bucket_name(Some("shark_images".to_string()))
.build();
let grid_fs = GridFs::new(&db, Some(bucket_options));
// Upload a file
let file = File::open("./test_data/shark-0.png").await?;
let revision_0_oid: ObjectId = grid_fs.upload("shark", file.compat(), None).await?;
// Upload a revision of a file
let file = File::open("./test_data/shark-1.jpg").await?;
let revision_1_oid: ObjectId = grid_fs.upload("shark", file.compat(), None).await?;
// Get a `core::find::FindManyCursor` of the `FilesCollectionDocument`s
let mut cursor = grid_fs.find_many(doc! {}, None).await?;
while let Some(file_document) = cursor.next().await? {
// file_document: FilesCollectionDocument
}
// Download the most recent revision of a file
let mut writer = Cursor::new(vec![]);
grid_fs.download("shark", &mut writer, None).await?;
let file_data: Vec<u8> = writer.into_inner();
// Download a specific revision of a file
let mut writer = Cursor::new(vec![]);
grid_fs.download("shark", &mut writer, Some(0)).await?;
let file_data: Vec<u8> = writer.into_inner();
// Rename a file (All revisions as None was provided to revision)
grid_fs
.rename_by_filename("shark", "new_shark", None)
.await?;
// Delete a file (All revisions as None was provided to revision)
grid_fs.delete_by_filename("new_shark", None).await?;
db.drop(None).await?;
Ok(())
}
Bug Reports
Please report bugs by creating an issue
, or if there is a sufficient fix you are aware of, feel free to open a PR, but please follow the Contributing
guidelines below.
To report a bug, it must be directly related to this crate, and you must provide as much information as possible, such as:
-
Code examples
-
Error messages
-
Steps to reproduce
-
System information (If applicable)
Feature requests
If you feel there is something missing, or some variation of the current crate that would require additional dependencies other than mongodb
and serde
( For example, GridFs
requires future_util
, so it is a separate feature ); please create an issue
with the request and discuss why you feel it should be part of this crate and not a third party crate as I have plans to implement a mongor-extras
crate to add additional functionality that has a more defined scope, and wish to keep this crate as agnostic and flexible as possible.
Contributing
I welcome anyone to contribute to the crate. But I do have some general requirements:
-
The change should keep the crate agnostic to use cases, I.E. where possible, the methods should allow use of Sessions, and options from
mongodb::options::*
instead of custom options, and exposed closures, like seen inrun_test
orrun_atomic_transaction
should interface publicly well with generic types or generics likedyn std::error::Error
. -
Any additional or modified methods require unit testing with 100% test coverage, that should be placed in the
tests
module. -
Any change that adds in additional dependencies should be created as a separate feature.
-
All current unit tests must pass, I.E. run
cargo test
and all should pass. -
Add your name and or handle to
CONTRIBUTORS.md
if not already present, as well as to theAuthors
section on the header comment for the file. -
If adding in a new dependency, please update
License::Third Party
in this README to correspond with their licensing.
If your change meets these guidelines, feel free to open a PR.
Project Status
I plan to maintain this crate for the forseeable future, the crate API is subject to change, although I would anticipate simplifications and additions rather than deletions or major modifications of the current feature set.
License
MIT
See LICENSE.md
for more information
Third Party
This crate is built on-top of:
-
The
mongodb
crate, which is licensed under Apache License 2.0, view it here. -
The
serde
crate, which is licensed under either Apache License Version 2.0 (view it here), or MIT license (view it here). -
The
futures_util
crate (Used in thegird_fs
feature), which is licensed under either Apache License Version 2.0 (view it here), or MIT license (view it here). -
The
tokio
crate (Used in internal testing), which is licensed under MIT, view it here. -
The
tokio_util
crate (Used in internal testing), which is licensed under MIT, view it here.
Dependencies
~26–38MB
~752K SLoC