#event-streaming #streaming-parser #representation #composable #cesr #group

parside

Parser for Composable Event Streaming Representation (CESR)

3 unstable releases

0.2.0 May 1, 2023
0.1.3 Jan 20, 2023
0.1.1 Jan 17, 2023

#3 in #cesr

Download history 64/week @ 2024-02-18 46/week @ 2024-02-25 134/week @ 2024-03-03 38/week @ 2024-03-10 34/week @ 2024-03-17 53/week @ 2024-03-24 42/week @ 2024-03-31 13/week @ 2024-04-07 9/week @ 2024-04-14 15/week @ 2024-04-21

79 downloads per month

Apache-2.0

96KB
2K SLoC

parside

parside codecov

Parser library for Composable Event Streaming Representation (CESR).

This library is currently under construction.

Example

Example parsing code (top level ACDC, KEL and TEL parsing):

    fn verify_acdc(&self, creder: &Creder, quadlets: &AttachedMaterialQuadlets, deep: Option<bool>) -> Result<bool> {
        if creder.status()?.is_none() {
            return err!(Error::Validation);
        };

        let vcid = creder.said()?;
        let schema = creder.schema()?;
        let prov = creder.chains()?;

        let event = self.store.get_latest_transaction_event(&vcid)?;
        let state = Serder::new_with_raw(event.as_bytes())?;
        let dtnow = chrono::Utc::now();
        let dte = chrono::DateTime::parse_from_rfc3339(&state.ked()["dt"].to_string()?)?.with_timezone(&chrono::Utc);
        if (dtnow - dte).num_seconds() > Self::DEFAULT_CREDENTIAL_EXPIRY_SECONDS {
            return err!(Error::Validation);
        }

        // added brv here for safety even though unimplemented
        if [Ilkage::rev, Ilkage::brv].contains(&state.ked()[Ids::t].to_string()?.as_str()) {
            return err!(Error::Validation);
        }

        schema_cache().verify(&schema, std::str::from_utf8(&creder.raw())?)?;

        let mut rooted = false;

        for group in quadlets.value() {
            match group {
                CesrGroup::SadPathSigVariant { value } => {
                    for sad_path_sig in value.value() {
                        if sad_path_sig.pather.bext()? != "-" {
                            continue;
                        }
           
                        rooted = true;
           
                        let event = self.store.get_key_event(&sad_path_sig.prefixer.qb64()?, sad_path_sig.seqner.sn()? as u32)?;
                        let serder = Serder::new_with_raw(event.as_bytes())?;
                        if serder.said()? != sad_path_sig.saider.qb64()? {
                            return err!(Error::Verification)
                        }
           
                        let verfers = serder.verfers()?;
                        let mut sigers = vec![];
                        for controller_idx_sig in sad_path_sig.sigers.value() {
                            let siger = &controller_idx_sig.siger;
                            if !sigers.contains(siger) {
                                sigers.push(siger.clone())
                            }
                        }
           
                        let mut verified_indices = vec![];
                        for siger in sigers {
                            if siger.index() as usize > verfers.len() {
                                return err!(Error::Verification);
                            }
           
                            if verfers[siger.index() as usize].verify(&siger.raw(), &creder.raw())? {
                                verified_indices.push(siger.index());
                            }
                        }
           
                        if let Some(tholder) = serder.tholder()? {
                            if !tholder.satisfy(&verified_indices)? {
                                return err!(Error::Verification);
                            }
                        } else {
                            return err!(Error::Verification);
                        }
                    }                        
                },
                _ => return err!(Error::Decoding)
            }
        }

        if !rooted {
            return err!(Error::Verification);
        }

        let edges = if prov.to_map().is_ok() {
            vec![prov]
        } else if prov.to_vec().is_ok() {
            prov.to_vec()?
        } else {
            return err!(Error::Verification);
        };

        for edge in &edges {
            for (label, node) in edge.to_map()? {
                if [Ids::d, "o"].contains(&label.as_str()) {
                    continue;
                }

                let node_said = &node["n"].to_string()?;
                let message = self.store.get_acdc(&node_said)?;

                // here we need to default to true
                if deep.unwrap_or(true) {
                    let pacdc = Creder::new_with_raw(message.as_bytes())?;

                    let (_, message_list) = MessageList::from_stream_bytes(message[pacdc.raw().len()..].as_bytes())?;
                    let group = message_list.messages[0].cesr_group()?;

                    match group {
                        CesrGroup::AttachedMaterialQuadletsVariant { value } => {
                            self.verify_acdc(&pacdc, value, deep)?;
                        },
                        _ => return err!(Error::Decoding)
                    }
                }
            }
        }

        let result = self.store.get_acdc(&vcid);
        let existing = result.is_ok();
        if existing {
            let message = result.unwrap();
            let eacdc = Creder::new_with_raw(message.as_bytes())?;

            // this seems very bad, it means something is in the database that shouldn't be there. how did it get there?
            if vcid != eacdc.said()? {
                return err!(Error::Programmer);
            }
        }

        println!("succesfully verified acdc {vcid}");

        Ok(existing)
    }

    fn verify_key_event(&self, serder: &Serder, quadlets: &AttachedMaterialQuadlets, deep: Option<bool>) -> Result<bool> {
        let pre = serder.pre()?;
        let ked = serder.ked();

        // see if code is supported
        let prefixer = Prefixer::new_with_qb64(&pre)?;

        let sn = serder.sn()?;
        let ilk = ked["t"].to_string()?;
        let said = serder.said()?;

        let mut existing = false;

        let (verfers, tholder) = if serder.est()? {
            let tholder = if let Some(tholder) = serder.tholder()? {
                tholder
            } else {
                return err!(Error::Decoding);
            };

            (serder.verfers()?, tholder)
        } else {
            let (raw, _) = self.store.get_latest_establishment_event_as_of_sn(&pre, sn as u32 - 1)?;
            let serder = Serder::new_with_raw(raw.as_bytes())?;

            let tholder = if let Some(tholder) = serder.tholder()? {
                tholder
            } else {
                return err!(Error::Decoding);
            };

            (serder.verfers()?, tholder)
        };

        let mut verified_indices = vec![0u32; 0];
        for group in quadlets.value() {
            match group {
                CesrGroup::ControllerIdxSigsVariant { value } => {
                    for controller_idx_sig in value.value() {
                        let siger = &controller_idx_sig.siger;

                        if siger.index() as usize > verfers.len() {
                            return err!(Error::Verification);
                        }
   
                        if verfers[siger.index() as usize].verify(&siger.raw(), &serder.raw())? {
                            verified_indices.push(siger.index());
                        }
                    }
                }
                _ => return err!(Error::Decoding)
            }
        }

        if !tholder.satisfy(&verified_indices)? {
            return err!(Error::Verification);
        }

        let labels = match ilk.as_str() {
            Ilkage::icp => &Self::ICP_LABELS,
            Ilkage::rot => &Self::ROT_LABELS,
            Ilkage::ixn => &Self::IXN_LABELS,
            _ => return err!(Error::Decoding),
        };
        if !Self::verify_ked_labels(&ked, labels)? {
            return err!(Error::Validation);
        }

        let inceptive = &ilk == Ilkage::icp;
        let key_event_count = self.store.count_key_events(&pre)?;
        if inceptive {
            if !prefixer.verify(&serder.ked(), Some(true))? {
                return err!(Error::Verification);
            }

            if sn != 0 {
                // must be 0
                return err!(Error::Decoding);
            }

            if said != serder.pre()? {
                return err!(Error::Verification);
            }

            if key_event_count != 0 {
                existing = true;
            }
        } else {
            if !serder.saider().verify(&serder.ked(), Some(inceptive), Some(true), None, None, None)? {
                return err!(Error::Verification);
            }
   
            if sn < 1 {
                return err!(Error::Validation);
            }

            let sno = key_event_count as u128;

            if sn > sno {
                // escrow here
                return err!(Error::OutOfOrder);
            }

            if sn != sno {
                existing = true;
            }

            // this sn implementation will become a problem at around 4 billion events
            let event = self.store.get_key_event(&pre, sn as u32 - 1)?;
            let pserder = Serder::new_with_raw(event.as_bytes())?;
            if pserder.said()? != serder.ked()["p"].to_string()? {
                return err!(Error::Verification);
            }

            if deep.unwrap_or(false) {
                let (_, message_list) = MessageList::from_stream_bytes(event[pserder.raw().len()..].as_bytes())?;
                let group = message_list.messages[0].cesr_group()?;

                match group {
                    CesrGroup::AttachedMaterialQuadletsVariant { value } => {
                        self.verify_key_event(&pserder, value, deep)?;
                    },
                    _ => return err!(Error::Decoding)
                }
            }
        }

        if existing {
            let event = self.store.get_key_event(&pre, sn as u32)?;
            let eserder = Serder::new_with_raw(event.as_bytes())?;

            // this seems very bad, it means something is in the database that shouldn't be there. how did it get there?
            if said != eserder.said()? {
                return err!(Error::Programmer);
            }
        }

        println!("succesfully verified key event [{pre}, {said}]");
       
        Ok(existing)
    }

    fn verify_transaction_event(&self, serder: &Serder, seal_source_couples: &SealSourceCouples, deep: Option<bool>) -> Result<bool> {
        let pre = serder.pre()?;
        let ked = serder.ked();

        // see if code is supported
        let prefixer = Prefixer::new_with_qb64(&pre)?;

        let sn = serder.sn()?;
        let ilk = ked["t"].to_string()?;
        let said = serder.said()?;
        let ri = Self::extract_registry_from_serder(&ilk, &serder)?;

        let mut existing = false;

        if seal_source_couples.value.len() != 1 {
            return err!(Error::Decoding);
        }
        let source_saider = &seal_source_couples.value()[0].saider;
        let source_seqner = &seal_source_couples.value()[0].seqner;

        let labels = match ilk.as_str() {
            Ilkage::vcp => &Self::VCP_LABELS,
            Ilkage::iss => &Self::ISS_LABELS,
            Ilkage::rev => &Self::REV_LABELS,
            _ => return err!(Error::Decoding),
        };
        if !Self::verify_ked_labels(&ked, labels)? {
            return err!(Error::Validation);
        }

        let inceptive = &ilk == Ilkage::vcp || &ilk == Ilkage::iss;
        let transaction_event_count = self.store.count_transaction_events(&pre)?;

        let apre = match ilk.as_str() {
            Ilkage::vcp => {
                if !prefixer.verify(&ked, Some(true))? {
                    return err!(Error::Verification);
                }

                ked["ii"].to_string()?
            },
            Ilkage::iss | Ilkage::rev => {
                if !serder.saider().verify(&ked, Some(false), Some(true), None, None, None)? {
                    return err!(Error::Verification);
                }

                let rievent = self.store.get_transaction_event(&ri, 0)?;
                let riserder = Serder::new_with_raw(rievent.as_bytes())?;

                riserder.ked()["ii"].to_string()?
            },
            _ => return err!(Error::Decoding)
        };

        if !self.verify_anchor(&apre, source_seqner, source_saider, serder, deep)? {
            return err!(Error::Verification);
        }

        if inceptive {
            if sn != 0 {
                // must be 0
                return err!(Error::Decoding);
            }

            if transaction_event_count != 0 {
                existing = true;
            }
        } else {
            if sn < 1 {
                return err!(Error::Validation);
            }

            let sno = transaction_event_count as u128;

            if sn > sno {
                // escrow here
                return err!(Error::OutOfOrder);
            }

            if sn != sno {
                existing = true;
            }

            // this sn implementation will become a problem at around 4 billion events
            let event = self.store.get_transaction_event(&pre, sn as u32 - 1)?;
            let pserder = Serder::new_with_raw(event.as_bytes())?;

            if deep.unwrap_or(false) {
                let (_, message_list) = MessageList::from_stream_bytes(event[pserder.raw().len()..].as_bytes())?;
                let group = message_list.messages[0].cesr_group()?;

                match group {
                    CesrGroup::SealSourceCouplesVariant { value } => {
                        self.verify_transaction_event(&pserder, value, deep)?;
                    },
                    _ => return err!(Error::Decoding)
                }
            }

            if pserder.said()? != serder.ked()["p"].to_string()? {
                return err!(Error::Verification);
            }
        }

        if existing {
            let event = self.store.get_transaction_event(&pre, sn as u32)?;
            let eserder = Serder::new_with_raw(event.as_bytes())?;

            // this seems very bad, it means something is in the database that shouldn't be there. how did it get there?
            if said != eserder.said()? {
                return err!(Error::Programmer);
            }
        }

        println!("succesfully verified transaction event [{pre}, {said}]");

        Ok(existing)
    }

    pub fn ingest_messages(&mut self, messages: &str, deep: Option<bool>) -> Result<()> {
        let (_, message_list) = MessageList::from_stream_bytes(messages.as_bytes())?;
        let mut messages = message_list.messages.iter();

        loop {
            let sadder = messages.next();
            if let Some(sadder) = sadder {
                let payload = sadder.payload()?;
                let raw_string = payload.value.to_string();
                let raw_message = raw_string.as_bytes();
                let result = cesride::common::sniff(raw_message)?;

                if result.ident == Identage::KERI {
                    let serder = Serder::new_with_raw(raw_message)?;

                    let message = messages.next();
                    if let Some(message) = message {
                        let group = message.cesr_group()?;
                        match serder.ked()["t"].to_string()?.as_str() {
                            Ilkage::icp | Ilkage::rot | Ilkage::ixn => {                
                                match group {
                                    CesrGroup::AttachedMaterialQuadletsVariant { value } => {
                                        let existing = self.verify_key_event(&serder, value, deep)?;
                                        if !existing {
                                            let event = String::from_utf8(serder.raw())? + &group.qb64()?;
                                            self.store.insert_key_event(&serder.pre()?, &event)?;
                                        }
                                    },
                                    _ => return err!(Error::Decoding), // we only accept pipelined input at present
                                }
                            },
                            Ilkage::vcp | Ilkage::iss | Ilkage::rev => {
                                match group {
                                    CesrGroup::SealSourceCouplesVariant { value } => {
                                        let existing = self.verify_transaction_event(&serder, value, deep)?;
                                        if !existing {
                                            let event = String::from_utf8(serder.raw())? + &group.qb64()?;
                                            self.store.insert_transaction_event(&serder.pre()?, &event)?;
                                        }
                                    },
                                    _ => return err!(Error::Decoding),
                                }
                            }
                            _ => return err!(Error::Decoding),
                        }
                    } else {
                        return err!(Error::Decoding);
                    }
                } else if result.ident == Identage::ACDC {
                    let creder = Creder::new_with_raw(raw_message)?;

                    let message = messages.next();
                    if let Some(message) = message {
                        let group = message.cesr_group()?;
                        match group {
                            CesrGroup::AttachedMaterialQuadletsVariant { value } => {
                                let existing = self.verify_acdc(&creder, value, deep)?;
                                if !existing {
                                    let acdc = String::from_utf8(creder.raw())? + &group.qb64()?;
                                    self.store.insert_acdc(&creder.said()?, &acdc)?;
                                }
                            },
                            _ => return err!(Error::Decoding), // we only accept pipelined input at present
                        };
                    } else {
                        return err!(Error::Decoding);
                    }
                } else {
                    return err!(Error::Decoding)
                }
            } else {
                break;
            }
        }

        self.encrypt_and_cache_storage()
    }

Community

Parside work currently resides alongside the cesride work.

Bi-weekly Meeting

Important Reference Material

Dependencies

~13–22MB
~319K SLoC