6 releases

0.1.5 Feb 3, 2024
0.1.4 Dec 22, 2023

#222 in Database interfaces

42 downloads per month

MIT license

5MB
3K SLoC

mongor

Rust based ODM for MongoDB, built on-top of the official mongodb crate. Provides an ergonomic interface to interact with the underlying driver, alongside a simplified testing framework to make writing tests suites more ergonomic as well.

Table of Contents

Requirements

Installation

cargo add mongor

Documentation

This README provides a general overview, and practical examples, but does not go over all methods available. Full crate documentation can be found here at docs.rs

Usage

Overview

While the crate exposes the core module, the main interface designed for this crate is the Model<D> struct. This struct controls a generic D that represents the structure of the document you wish for the model to control. In the examples below I will demonstrate the general pattern to using a model, and also some more ergonomic ways to perform atomic transactions and building a test suite.

Basic example

use mongor::{
    core::find::FindManyCursor,
    model::Model,
    mongodb::{
        bson::{doc, oid::ObjectId},
        options::*,
        results::*,
        Client, Collection,
    },
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub species: String,
    pub sightings: usize,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client_options = ClientOptions::parse("mongodb://localhost:27017/").await?;
    let client = Client::with_options(client_options)?;
    let db = client.database("db_name");
    let collection: Collection<Shark> = db.collection("shark");

    let model: Model<Shark> = Model::from(collection);

    // Applying an index
    model
        .apply_unique_index(doc! { "species": 1 }, Some("species_unique"), None, None)
        .await?;

    // Inserting a document
    let new_shark = Shark {
        oid: None,
        name: "Whale Shark".to_string(),
        species: "Rhincodon typus".to_string(),
        sightings: 0,
    };

    let document_oid: ObjectId = model.insert_one(new_shark, None, None).await?;

    // Updating a document
    let update_result: UpdateResult = model
        .update_one_by_oid(
            &document_oid,
            doc! {
                "$inc": {
                    "sightings": 1,
                },
            },
            None,
            None,
        )
        .await?;

    // Finding a document
    let shark: Option<Shark> = model
        .find_one_by_field_value("sightings", 1, None, None)
        .await?;

    println!("{:?}", shark);
    // Log example:
    // ```
    // Some(Shark {
    //     oid: Some(ObjectId("65712bdfb1fb166eb8cce7e5")),
    //     name: "Whale Shark",
    //     species: "Rhincodon typus",
    //     sightings: 1,
    // })
    // ```

    // Deleting a document
    let delete_result: DeleteResult = model
        .delete_one(
            doc! {
                "_id": &document_oid,
            },
            None,
            None,
        )
        .await?;

    // Inserting many
    let sharks = vec![
        Shark {
            oid: None,
            name: "Whale Shark".to_string(),
            species: "Rhincodon typus".to_string(),
            sightings: 0,
        },
        Shark {
            oid: None,
            name: "Great White".to_string(),
            species: "Carcharodon carcharias".to_string(),
            sightings: 0,
        },
    ];

    let document_oids: Vec<ObjectId> = model.insert_many(sharks, None, None).await?;

    // Cursor-ing via a `findMany` query
    let options = FindOptions::builder().sort(doc! { "name": -1 }).build();

    let mut cursor: FindManyCursor<Shark> = model
        .find_many(doc! { "sightings": 0 }, Some(options), None)
        .await?;

    while let Some(shark) = cursor.next().await? {
        // shark: Shark
    }

    // Updating many
    let update_result: UpdateResult = model
        .update_many(
            doc! { "sightings": 0 },
            doc! { "$inc": { "sightings": 5 }},
            None,
            None,
        )
        .await?;

    // Deleting many
    let delete_result: DeleteResult = model.delete_many(doc! {}, None, None).await?;

    db.drop(None).await?;

    Ok(())
}

Sessions

Official References:

https://www.mongodb.com/docs/manual/reference/method/Session/

https://www.mongodb.com/docs/manual/replication/

Overview

Please note, to use sessions and transactions with MongoDB, currently you require a replica set deployment. Setting this up is very simple, as you just need to run multiple mongod (MongoDB server) instances in a replica setup.

You can start such a set via a method in the shell called rs.initiate(). Here is an example of how to setup a replica set with three nodes.

First, connect to a MongoDB instance on the main port: say 27017:

mongo --port 27017

Then, call rs.initiate() to start the replica set with the specified members:

rs.initiate({
  _id: "rs0",
  members: [
    { _id: 0, host: "localhost:27017" },
    { _id: 1, host: "localhost:27018" },
    { _id: 2, host: "localhost:27019" }
  ]
});

Then you need to setup separate data directories for each set:

mkdir ~/data/mongo_rp_1 &&
mkdir ~/data/mongo_rp_2 &&
mkdir ~/data/mongo_rp_3

Then, you can start three separate instances to deploy the replica set under the configured hosts:

sudo mongod --port 27017 --dbpath ~/data/mongo_rp_1 --replSet rs0
---
sudo mongod --port 27018 --dbpath ~/data/mongo_rp_2 --replSet rs0
---
sudo mongod --port 27019 --dbpath ~/data/mongo_rp_3 --replSet rs0

Now, you have a replica set deployed under the name rs0 with three nodes.

Basic Example

use mongor::{
    core::{
        find::FindManyCursor,
        session::{commit_transaction, start_session, start_transaction},
    },
    model::Model,
    mongodb::{
        bson::{doc, oid::ObjectId},
        options::*,
        results::*,
        Client, Collection,
    },
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub species: String,
    pub sightings: usize,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Note: We pass all three of our nodes hosts to the client options
    let client_options =
        ClientOptions::parse("mongodb://localhost:27017,localhost:27018,localhost:27019/").await?;
    let client = Client::with_options(client_options)?;
    let db = client.database("db_name");
    let collection: Collection<Shark> = db.collection("shark");

    let model: Model<Shark> = Model::from(collection);

    let mut session = start_session(&client, None).await?;

    start_transaction(&mut session, None).await?;

    // Inserting a document with a session
    let new_shark = Shark {
        oid: None,
        name: "Whale Shark".to_string(),
        species: "Rhincodon typus".to_string(),
        sightings: 0,
    };

    let document_oid: ObjectId = model
        .insert_one(new_shark, None, Some(&mut session))
        .await?;

    commit_transaction(&mut session).await?;

    // Cursor-ing via a `findMany` query with a session
    let options = FindOptions::builder().sort(doc! { "name": -1 }).build();

    let mut cursor: FindManyCursor<Shark> = model
        .find_many(doc! { "sightings": 0 }, Some(options), Some(&mut session))
        .await?;

    while let Some(shark) = cursor.next_with_session(&mut session).await? {
        // shark: Shark
    }

    db.drop(None).await?;

    Ok(())
}

Atomic Transactions

A method called run_atomic_transaction is provided in core::atomic. This method allows a "short-hand" to run a async closure, and having database operations inside protected under a Session and a single transaction. Upon failure, it will automatically attempt to abort the transaction, and upon no failures, (Closure returning Ok(())), it will attempt to commit the transaction. Creating a more ergonomic way to structure transactions that must be atomic.

Note: This method uses std::sync::Mutex in its implementation, I made this decision to keep the crate as agnostic and lightweight as possible, but if the need presents itself, make a PR or an issue to add a feature that enables a version with tokio::sync::Mutex.

Example:

use mongor::{
    core::{atomic::run_atomic_transaction, session::start_session},
    error::Error,
    model::Model,
    mongodb::{
        bson::{doc, oid::ObjectId},
        options::*,
        Client, ClientSession, Collection,
    },
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};

#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub species: String,
    pub sightings: usize,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Researcher {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub sighted: Vec<String>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Note: We pass all three of our nodes hosts to the client options
    let client_options =
        ClientOptions::parse("mongodb://localhost:27017,localhost:27018,localhost:27019/").await?;
    let client = Client::with_options(client_options)?;
    let db = client.database("db_name");

    let shark_collection: Collection<Shark> = db.collection("shark");
    let researcher_collection: Collection<Researcher> = db.collection("researcher");

    let shark_model: Model<Shark> = Model::from(shark_collection);
    let researcher_model: Model<Researcher> = Model::from(researcher_collection);

    let session = start_session(&client, None).await?;
    let shared_session = Arc::new(Mutex::new(session));

    let shark_model_ref = &shark_model;
    let researcher_model_ref = &researcher_model;

    run_atomic_transaction(
        |shared_session: Arc<Mutex<ClientSession>>| async move {
            let mut session_lock = shared_session
                .lock()
                // Note: Here I am using `mongor::error::Error`, but you can use any error
                // type that implements `dyn std::error::Error` inside this closure to use `?`
                .map_err(|err| Error::SharedSessionLockFailure(err.to_string()))?;

            let new_shark = Shark {
                oid: None,
                name: "Whale Shark".to_string(),
                species: "Rhincodon typus".to_string(),
                sightings: 0,
            };

            let new_researcher = Researcher {
                oid: None,
                name: "John Doe".to_string(),
                sighted: vec![],
            };

            let shark_oid: ObjectId = shark_model_ref
                .insert_one(new_shark, None, Some(&mut session_lock))
                .await?;

            let researcher_oid: ObjectId = researcher_model_ref
                .insert_one(new_researcher, None, Some(&mut session_lock))
                .await?;

            researcher_model_ref
                .update_one_by_oid(
                    &researcher_oid,
                    doc! {
                        "$push": {
                            "sighted": "Whale Shark",
                        },
                    },
                    None,
                    Some(&mut session_lock),
                )
                .await?;

            shark_model_ref
                .update_one_by_oid(
                    &shark_oid,
                    doc! {
                        "$inc": {
                            "sightings": 1,
                        },
                    },
                    None,
                    Some(&mut session_lock),
                )
                .await?;

            Ok(())
        },
        shared_session,
        None,
    )
    .await?;

    let shark = shark_model.find_one(doc! {}, None, None).await?;
    let researcher = researcher_model.find_one(doc! {}, None, None).await?;

    println!("{:?}", shark);
    // Example log:
    // Some(Shark {
    //     oid: Some(ObjectId("657226d179aa103ff4cf05ab")),
    //     name: "Whale Shark",
    //     species: "Rhincodon typus",
    //     sightings: 1,
    // })

    println!("{:?}", researcher);
    // Example log:
    // Some(Researcher {
    //     oid: Some(ObjectId("657226d179aa103ff4cf05ac")),
    //     name: "John Doe",
    //     sighted: ["Whale Shark"],
    // })


    db.drop(None).await?;

    Ok(())
}

Now, lets modify the closure to purposely fail.. you will notice the operations inside the closure are not performed.

use mongor::{
    core::{atomic::run_atomic_transaction, session::start_session},
    error::Error,
    model::Model,
    mongodb::{
        bson::{doc, oid::ObjectId},
        options::*,
        Client, ClientSession, Collection,
    },
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};

#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub species: String,
    pub sightings: usize,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Researcher {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub sighted: Vec<String>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Note: We pass all three of our nodes hosts to the client options
    let client_options =
        ClientOptions::parse("mongodb://localhost:27017,localhost:27018,localhost:27019/").await?;
    let client = Client::with_options(client_options)?;

    let db = client.database("db_name");

    let shark_collection: Collection<Shark> = db.collection("shark");
    let researcher_collection: Collection<Researcher> = db.collection("researcher");

    let shark_model: Model<Shark> = Model::from(shark_collection);
    let researcher_model: Model<Researcher> = Model::from(researcher_collection);

    let session = start_session(&client, None).await?;
    let shared_session = Arc::new(Mutex::new(session));

    let shark_model_ref = &shark_model;
    let researcher_model_ref = &researcher_model;

    let _ = run_atomic_transaction(
        |shared_session: Arc<Mutex<ClientSession>>| async move {
            let mut session_lock = shared_session
                .lock()
                // Note: Here I am using `mongor::error::Error`, but you can use any error
                // type that implements `dyn std::error::Error` inside this closure to use `?`
                .map_err(|err| Error::SharedSessionLockFailure(err.to_string()))?;

            let new_shark = Shark {
                oid: None,
                name: "Whale Shark".to_string(),
                species: "Rhincodon typus".to_string(),
                sightings: 0,
            };

            let new_researcher = Researcher {
                oid: None,
                name: "John Doe".to_string(),
                sighted: vec![],
            };

            let shark_oid: ObjectId = shark_model_ref
                .insert_one(new_shark, None, Some(&mut session_lock))
                .await?;

            let researcher_oid: ObjectId = researcher_model_ref
                .insert_one(new_researcher, None, Some(&mut session_lock))
                .await?;

            // Purposefully erroring after insertions
            Err(Error::internal("some error"))?;

            Ok(())
        },
        shared_session,
        None,
    )
    .await;

    let shark = shark_model.find_one(doc! {}, None, None).await?;
    let researcher = researcher_model.find_one(doc! {}, None, None).await?;

    // Note: the insertions did not occur as the "transaction" as a whole failed.
    println!("{:?}", shark);
    // Logs:
    // `None`

    println!("{:?}", researcher);
    // Logs:
    // `None`

    db.drop(None).await?;

    Ok(())
}

Testing

Overview

A simple testing framework is provided via the test_db module. The general use case it to create a new TestDB per test, which spawns a new Database associated with a newly generated ObjectId for its name to allow for easy handling of running a concurrent test suite, which is common with frameworks like tokio.

Then, a method called TestDB::run_test is provided to take ownership of this db and run a isolated test closure, in which the Database will be cleaned up(dropped) on success or failure of the test.

Pre-defined assertions are provided in test_db::asset_model::AssertModel, which wraps a Model<D>. As well as a module test_db::assert_document::* which provides general assertions regarding documents. A macro called test_error! from test_db::test_error is also provided, that acts like panic! inside this test closure. The closure accepts a Future<Output = Result<(), Box<dyn std::error::Error>>> to allow the use of the ? operator to end the test as well.

Basic Example

use mongor::mongodb::bson::{doc, oid::ObjectId};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub species: String,
    pub sightings: usize,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    Ok(())
}

#[cfg(test)]
mod tests {
    use mongor::{
        model::Model,
        mongodb::{
            bson::{doc, oid::ObjectId},
            options::ClientOptions,
            Client, Collection,
        },
        test_db::{assert_model::AssertModel, test_error::TestError, TestDB},
        test_error,
    };

    use crate::Shark;

    #[tokio::test]
    pub async fn example_test() {
        let client_options = ClientOptions::parse("mongodb://localhost:27017/")
            .await
            .unwrap();
        let client = Client::with_options(client_options).unwrap();

        // Opens a new isolated `Database` named with a `ObjectId`
        let test_db = TestDB::new(&client);

        let shark_collection: Collection<Shark> = test_db.collection("shark");
        let shark_model = Model::from(shark_collection);

        // Run isolated test, cleaning up the Database on failure or success
        test_db
            .run_test(
                || async {
                    let new_shark = Shark {
                        oid: None,
                        name: "Whale Shark".to_string(),
                        species: "Rhincodon typus".to_string(),
                        sightings: 0,
                    };

                    let oid = shark_model.insert_one(new_shark, None, None).await?;

                    // Using pre-defined assertions from `test_db::assert_model`,
                    // which call `test_error!` under the hood

                    let assert_shark_model = AssertModel::from(&shark_model);

                    assert_shark_model
                        .assert_exists(doc! { "name": "Whale Shark" }, None)
                        .await?
                        .assert_count(doc! {}, 1, None)
                        .await?;

                    // Using the `test_error!` macro from `test_db::test_error`,
                    // which acts like `panic!` inside the closure
                    let some_other_oid = ObjectId::new();

                    if oid == some_other_oid {
                        test_error!("Inserted shark oid {} == {}", oid, some_other_oid);
                    }

                    Ok(())
                },
                None,
            )
            .await
            .unwrap();
    }
}

GridFS

Official References:

https://www.mongodb.com/docs/manual/core/gridfs/

Overview

Using the crate feature grid_fs, you can expose access to the GridFs struct, which is a robust interface to the underlying GridFsBucket. This feature adds a single dependency futures_util.

A lot of the methods involve the concept of a revision number. As the name field on the file document is not unique, we have the ability to have multiple revisions of a file, they can be defined as:

0 = the original stored file
1 = the first revision
2 = the second revision
etc...
-2 = the second most recent revision
-1 = the most recent revision

Methods regarding streaming (uploading and downloading) are implemented in the mongodb crate via futures_util::io::*, so if you are using tokio, you must add a dependency tokio_util which provides tokio_util::compat::*. This module will expose compat() and compat_write() to convert tokio async read or writers into a Compat<_> which can directly be used with the futures_util::io::AsyncRead and futures_util::io::AsyncWrite.

See the basic example or: https://docs.rs/tokio-util/latest/tokio_util/compat/index.html for more information.

Basic Example

use futures_util::io::Cursor;
use mongor::{
    grid_fs::GridFs,
    mongodb::{
        bson::{doc, oid::ObjectId},
        options::*,
        Client,
    },
};
use tokio::fs::*;
use tokio_util::compat::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client_options = ClientOptions::parse("mongodb://localhost:27017/").await?;
    let client = Client::with_options(client_options)?;

    let db = client.database("db_name");

    let bucket_options = GridFsBucketOptions::builder()
        .bucket_name(Some("shark_images".to_string()))
        .build();

    let grid_fs = GridFs::new(&db, Some(bucket_options));

    // Upload a file
    let file = File::open("./test_data/shark-0.png").await?;
    let revision_0_oid: ObjectId = grid_fs.upload("shark", file.compat(), None).await?;

    // Upload a revision of a file
    let file = File::open("./test_data/shark-1.jpg").await?;
    let revision_1_oid: ObjectId = grid_fs.upload("shark", file.compat(), None).await?;

    // Get a `core::find::FindManyCursor` of the `FilesCollectionDocument`s
    let mut cursor = grid_fs.find_many(doc! {}, None).await?;

    while let Some(file_document) = cursor.next().await? {
        // file_document: FilesCollectionDocument
    }

    // Download the most recent revision of a file
    let mut writer = Cursor::new(vec![]);
    grid_fs.download("shark", &mut writer, None).await?;

    let file_data: Vec<u8> = writer.into_inner();

    // Download a specific revision of a file
    let mut writer = Cursor::new(vec![]);
    grid_fs.download("shark", &mut writer, Some(0)).await?;

    let file_data: Vec<u8> = writer.into_inner();

    // Rename a file (All revisions as None was provided to revision)
    grid_fs
        .rename_by_filename("shark", "new_shark", None)
        .await?;

    // Delete a file (All revisions as None was provided to revision)
    grid_fs.delete_by_filename("new_shark", None).await?;

    db.drop(None).await?;

    Ok(())
}

Bug Reports

Please report bugs by creating an issue, or if there is a sufficient fix you are aware of, feel free to open a PR, but please follow the Contributing guidelines below.

To report a bug, it must be directly related to this crate, and you must provide as much information as possible, such as:

  • Code examples

  • Error messages

  • Steps to reproduce

  • System information (If applicable)

Feature requests

If you feel there is something missing, or some variation of the current crate that would require additional dependencies other than mongodb and serde ( For example, GridFs requires future_util, so it is a separate feature ); please create an issue with the request and discuss why you feel it should be part of this crate and not a third party crate as I have plans to implement a mongor-extras crate to add additional functionality that has a more defined scope, and wish to keep this crate as agnostic and flexible as possible.

Contributing

I welcome anyone to contribute to the crate. But I do have some general requirements:

  • The change should keep the crate agnostic to use cases, I.E. where possible, the methods should allow use of Sessions, and options from mongodb::options::* instead of custom options, and exposed closures, like seen in run_test or run_atomic_transaction should interface publicly well with generic types or generics like dyn std::error::Error.

  • Any additional or modified methods require unit testing with 100% test coverage, that should be placed in the tests module.

  • Any change that adds in additional dependencies should be created as a separate feature.

  • All current unit tests must pass, I.E. run cargo test and all should pass.

  • Add your name and or handle to CONTRIBUTORS.md if not already present, as well as to the Authors section on the header comment for the file.

  • If adding in a new dependency, please update License::Third Party in this README to correspond with their licensing.

If your change meets these guidelines, feel free to open a PR.

Project Status

I plan to maintain this crate for the forseeable future, the crate API is subject to change, although I would anticipate simplifications and additions rather than deletions or major modifications of the current feature set.

License

MIT

See LICENSE.md for more information

Third Party

This crate is built on-top of:

Dependencies

~26–38MB
~752K SLoC