10 releases

0.1.9 Sep 23, 2024
0.1.8 Jun 4, 2024
0.1.7 May 12, 2024
0.1.5 Feb 3, 2024
0.1.4 Dec 22, 2023

#814 in Database interfaces

MIT license

5MB
3K SLoC

mongor

Rust based ODM for MongoDB, built on-top of the official mongodb crate. Provides an ergonomic interface to interact with the underlying driver, alongside a simplified testing framework to make writing tests suites more ergonomic as well.

Table of Contents

Requirements

Installation

cargo add mongor

Documentation

This README provides a general overview, and practical examples, but does not go over all methods available. Full crate documentation can be found here at docs.rs

Usage

Overview

While the crate exposes the core module, the main interface designed for this crate is the Model<D> struct. This struct controls a generic D that represents the structure of the document you wish for the model to control. In the examples below I will demonstrate the general pattern to using a model.

Basic example

use mongor::{
    core::find::FindManyCursor,
    model::Model,
    mongodb::{
        bson::{doc, oid::ObjectId},
        options::*,
        results::*,
        Client, Collection,
    },
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub species: String,
    pub sightings: usize,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client_options = ClientOptions::parse("mongodb://localhost:27017/").await?;
    let client = Client::with_options(client_options)?;
    let db = client.database("db_name");
    let collection: Collection<Shark> = db.collection("shark");

    let model: Model<Shark> = Model::from(collection);

    // Applying an index
    model
        .apply_unique_index(doc! { "species": 1 }, Some("species_unique"), None, None)
        .await?;

    // Inserting a document
    let new_shark = Shark {
        oid: None,
        name: "Whale Shark".to_string(),
        species: "Rhincodon typus".to_string(),
        sightings: 0,
    };

    let document_oid: ObjectId = model.insert_one(new_shark, None, None).await?;

    // Updating a document
    let update_result: UpdateResult = model
        .update_one_by_oid(
            &document_oid,
            doc! {
                "$inc": {
                    "sightings": 1,
                },
            },
            None,
            None,
        )
        .await?;

    // Finding a document
    let shark: Option<Shark> = model
        .find_one_by_field_value("sightings", 1, None, None)
        .await?;

    println!("{:?}", shark);
    // Log example:
    // ```
    // Some(Shark {
    //     oid: Some(ObjectId("65712bdfb1fb166eb8cce7e5")),
    //     name: "Whale Shark",
    //     species: "Rhincodon typus",
    //     sightings: 1,
    // })
    // ```

    // Deleting a document
    let delete_result: DeleteResult = model
        .delete_one(
            doc! {
                "_id": &document_oid,
            },
            None,
            None,
        )
        .await?;

    // Inserting many
    let sharks = vec![
        Shark {
            oid: None,
            name: "Whale Shark".to_string(),
            species: "Rhincodon typus".to_string(),
            sightings: 0,
        },
        Shark {
            oid: None,
            name: "Great White".to_string(),
            species: "Carcharodon carcharias".to_string(),
            sightings: 0,
        },
    ];

    let document_oids: Vec<ObjectId> = model.insert_many(sharks, None, None).await?;

    // Cursor-ing via a `findMany` query
    let options = FindOptions::builder().sort(doc! { "name": -1 }).build();

    let mut cursor: FindManyCursor<Shark> = model
        .find_many(doc! { "sightings": 0 }, Some(options), None)
        .await?;

    while let Some(shark) = cursor.next().await? {
        // shark: Shark
    }

    // Updating many
    let update_result: UpdateResult = model
        .update_many(
            doc! { "sightings": 0 },
            doc! { "$inc": { "sightings": 5 }},
            None,
            None,
        )
        .await?;

    // Deleting many
    let delete_result: DeleteResult = model.delete_many(doc! {}, None, None).await?;

    db.drop(None).await?;

    Ok(())
}

Sessions

Official References:

https://www.mongodb.com/docs/manual/reference/method/Session/

https://www.mongodb.com/docs/manual/replication/

Overview

Please note, to use sessions and transactions with MongoDB, currently you require a replica set deployment. Setting this up is very simple, as you just need to run multiple mongod (MongoDB server) instances in a replica setup.

You can start such a set via a method in the shell called rs.initiate(). Here is an example of how to setup a replica set with three nodes.

First, connect to a MongoDB instance on the main port: say 27017:

mongo --port 27017

Then, call rs.initiate() to start the replica set with the specified members:

rs.initiate({
  _id: "rs0",
  members: [
    { _id: 0, host: "localhost:27017" },
    { _id: 1, host: "localhost:27018" },
    { _id: 2, host: "localhost:27019" }
  ]
});

Then you need to setup separate data directories for each set:

mkdir ~/data/mongo_rp_1 &&
mkdir ~/data/mongo_rp_2 &&
mkdir ~/data/mongo_rp_3

Then, you can start three separate instances to deploy the replica set under the configured hosts:

sudo mongod --port 27017 --dbpath ~/data/mongo_rp_1 --replSet rs0
---
sudo mongod --port 27018 --dbpath ~/data/mongo_rp_2 --replSet rs0
---
sudo mongod --port 27019 --dbpath ~/data/mongo_rp_3 --replSet rs0

Now, you have a replica set deployed under the name rs0 with three nodes.

Basic Example

use mongor::{
    core::{
        find::FindManyCursor,
        session::{commit_transaction, start_session, start_transaction},
    },
    model::Model,
    mongodb::{
        bson::{doc, oid::ObjectId},
        options::*,
        results::*,
        Client, Collection,
    },
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub species: String,
    pub sightings: usize,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Note: We pass all three of our nodes hosts to the client options
    let client_options =
        ClientOptions::parse("mongodb://localhost:27017,localhost:27018,localhost:27019/").await?;
    let client = Client::with_options(client_options)?;
    let db = client.database("db_name");
    let collection: Collection<Shark> = db.collection("shark");

    let model: Model<Shark> = Model::from(collection);

    let mut session = start_session(&client, None).await?;

    start_transaction(&mut session, None).await?;

    // Inserting a document with a session
    let new_shark = Shark {
        oid: None,
        name: "Whale Shark".to_string(),
        species: "Rhincodon typus".to_string(),
        sightings: 0,
    };

    let document_oid: ObjectId = model
        .insert_one(new_shark, None, Some(&mut session))
        .await?;

    commit_transaction(&mut session).await?;

    // Cursor-ing via a `findMany` query with a session
    let options = FindOptions::builder().sort(doc! { "name": -1 }).build();

    let mut cursor: FindManyCursor<Shark> = model
        .find_many(doc! { "sightings": 0 }, Some(options), Some(&mut session))
        .await?;

    while let Some(shark) = cursor.next_with_session(&mut session).await? {
        // shark: Shark
    }

    db.drop(None).await?;

    Ok(())
}

Testing

Overview

A simple testing framework is provided via the test_db module. The general use case it to create a new TestDB per test, which spawns a new Database associated with a newly generated ObjectId for its name to allow for easy handling of running a concurrent test suite, which is common with frameworks like tokio.

Then, a method called TestDB::run_test is provided to take ownership of this db and run a isolated test closure, in which the Database will be cleaned up(dropped) on success or failure of the test.

Pre-defined assertions are provided in test_db::asset_model::AssertModel, which wraps a Model<D>. As well as a module test_db::assert_document::* which provides general assertions regarding documents. A macro called test_error! from test_db::test_error is also provided, that acts like panic! inside this test closure. The closure accepts a Future<Output = Result<(), Box<dyn std::error::Error>>> to allow the use of the ? operator to end the test as well.

Basic Example

use mongor::mongodb::bson::{doc, oid::ObjectId};
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
pub struct Shark {
    #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
    pub oid: Option<ObjectId>,
    pub name: String,
    pub species: String,
    pub sightings: usize,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    Ok(())
}

#[cfg(test)]
mod tests {
    use mongor::{
        model::Model,
        mongodb::{
            bson::{doc, oid::ObjectId},
            options::ClientOptions,
            Client, Collection,
        },
        test_db::{assert_model::AssertModel, test_error::TestError, TestDB},
        test_error,
    };

    use crate::Shark;

    #[tokio::test]
    pub async fn example_test() {
        let client_options = ClientOptions::parse("mongodb://localhost:27017/")
            .await
            .unwrap();
        let client = Client::with_options(client_options).unwrap();

        // Opens a new isolated `Database` named with a `ObjectId`
        let test_db = TestDB::new(&client);

        let shark_collection: Collection<Shark> = test_db.collection("shark");
        let shark_model = Model::from(shark_collection);

        // Run isolated test, cleaning up the Database on failure or success
        test_db
            .run_test(
                || async {
                    let new_shark = Shark {
                        oid: None,
                        name: "Whale Shark".to_string(),
                        species: "Rhincodon typus".to_string(),
                        sightings: 0,
                    };

                    let oid = shark_model.insert_one(new_shark, None, None).await?;

                    // Using pre-defined assertions from `test_db::assert_model`,
                    // which call `test_error!` under the hood

                    let assert_shark_model = AssertModel::from(&shark_model);

                    assert_shark_model
                        .assert_exists(doc! { "name": "Whale Shark" }, None)
                        .await?
                        .assert_count(doc! {}, 1, None)
                        .await?;

                    // Using the `test_error!` macro from `test_db::test_error`,
                    // which acts like `panic!` inside the closure
                    let some_other_oid = ObjectId::new();

                    if oid == some_other_oid {
                        test_error!("Inserted shark oid {} == {}", oid, some_other_oid);
                    }

                    Ok(())
                },
                None,
            )
            .await
            .unwrap();
    }
}

GridFS

Official References:

https://www.mongodb.com/docs/manual/core/gridfs/

Overview

Using the crate feature grid_fs, you can expose access to the GridFs struct, which is a robust interface to the underlying GridFsBucket. This feature adds a single dependency futures_util.

A lot of the methods involve the concept of a revision number. As the name field on the file document is not unique, we have the ability to have multiple revisions of a file, they can be defined as:

0 = the original stored file
1 = the first revision
2 = the second revision
etc...
-2 = the second most recent revision
-1 = the most recent revision

Methods regarding streaming (uploading and downloading) are implemented in the mongodb crate via futures_util::io::*, so if you are using tokio, you must add a dependency tokio_util which provides tokio_util::compat::*. This module will expose compat() and compat_write() to convert tokio async read or writers into a Compat<_> which can directly be used with the futures_util::io::AsyncRead and futures_util::io::AsyncWrite.

See the basic example or: https://docs.rs/tokio-util/latest/tokio_util/compat/index.html for more information.

Basic Example

use futures_util::io::Cursor;
use mongor::{
    grid_fs::GridFs,
    mongodb::{
        bson::{doc, oid::ObjectId},
        options::*,
        Client,
    },
};
use tokio::fs::*;
use tokio_util::compat::*;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client_options = ClientOptions::parse("mongodb://localhost:27017/").await?;
    let client = Client::with_options(client_options)?;

    let db = client.database("db_name");

    let bucket_options = GridFsBucketOptions::builder()
        .bucket_name(Some("shark_images".to_string()))
        .build();

    let grid_fs = GridFs::new(&db, Some(bucket_options));

    // Upload a file
    let file = File::open("./test_data/shark-0.png").await?;
    let revision_0_oid: ObjectId = grid_fs.upload("shark", file.compat(), None).await?;

    // Upload a revision of a file
    let file = File::open("./test_data/shark-1.jpg").await?;
    let revision_1_oid: ObjectId = grid_fs.upload("shark", file.compat(), None).await?;

    // Get a `core::find::FindManyCursor` of the `FilesCollectionDocument`s
    let mut cursor = grid_fs.find_many(doc! {}, None).await?;

    while let Some(file_document) = cursor.next().await? {
        // file_document: FilesCollectionDocument
    }

    // Download the most recent revision of a file
    let mut writer = Cursor::new(vec![]);
    grid_fs.download("shark", &mut writer, None).await?;

    let file_data: Vec<u8> = writer.into_inner();

    // Download a specific revision of a file
    let mut writer = Cursor::new(vec![]);
    grid_fs.download("shark", &mut writer, Some(0)).await?;

    let file_data: Vec<u8> = writer.into_inner();

    // Rename a file (All revisions as None was provided to revision)
    grid_fs
        .rename_by_filename("shark", "new_shark", None)
        .await?;

    // Delete a file (All revisions as None was provided to revision)
    grid_fs.delete_by_filename("new_shark", None).await?;

    db.drop(None).await?;

    Ok(())
}

Bug Reports

Please report bugs by creating an issue, or if there is a sufficient fix you are aware of, feel free to open a PR.

Please provide any of the following if applicable:

  • Code examples

  • Error messages

  • Steps to reproduce

  • System information (If applicable)

Contributing

I welcome anyone to contribute to the crate. All tests must pass, and the feature/change should make sense based on the current API allotted.

Project Status

I plan to maintain this crate for the forseeable future.

License

MIT

See LICENSE.md for more information

Dependencies

~15–25MB
~374K SLoC