cpg

Storing Protobufs in Postgres or MongoDB

2023-05-09 #rust

Storing and querying Protobufs in a JSON-compatible database in Rust, using serde

Contents

Protocol Buffers is a widely used binary serialization format, with bindings available in many languages.

Is there a way to easily store protobuf messages in a Postgres database, so that they become queriable with SQL?

message Message {
  string name = 1;
  uint32 age = 2;
  Address address = 3;
  message Address {
    ...
  }
}
SELECT * from messages WHERE age > 30 AND address.country = 'CH';

Possible methods

One way is to mirror the protobuf schema (.proto) as a SQL schema, but this requires a lot of boilerplate for reading and writing, and backwards-compatible changes to the proto (one of the strengths of Protobuf) require performing migrations.

A simpler solution would be to combine:

  1. The ability to serialize Protobufs to/from JSON in a way that matches Protobuf semantics (see e.g. the Python API).
  2. Postgres’ support for JSONb

A blog post by Dataform describes a Go implementation of this idea with MongoDB. Fields names are replaces by their indexes, given that the former are allowed to freely change.

A quick search also reveals:

A implementation in Rust using serde

The following describes a way to implement the idea above in Rust, leveraging serde and the support of custom types in diesel. The same could be achieved with the postgres crate in lieu of diesel (which would probably make more sense given we’re not really using the ORM features…).

JSONb in diesel via serde

Assume we have a protobuf message

message Message {
  ...
}

that we codegen in Rust to

struct Message {
 ...
}

using prost.

The following generalizes slightly diesel’s support of JSONb by handling arbitrary types T rather than only serde_json::Value.

#[derive(
    Debug, Serialize, Deserialize, diesel::expression::AsExpression, diesel::deserialize::FromSqlRow,
)]
#[diesel(sql_type = diesel::sql_types::Jsonb)]
pub struct Proto<T: std::fmt::Debug>(T);
 
const JSONB_HEADER: u8 = 1;
impl<T> diesel::serialize::ToSql<diesel::sql_types::Jsonb, Pg>
    for Proto<T>
    where T: std::fmt::Debug + Serialize
{
    fn to_sql<'b>(
        &'b self,
        out: &mut diesel::serialize::Output<'b, '_, Pg>,
    ) -> diesel::serialize::Result {
        out.write_all(&[JSONB_HEADER])?;
        serde_json::to_writer(out, &self.0)
            .map(|_| diesel::serialize::IsNull::No)
            .map_err(Into::into)
    }
}
impl<T: std::fmt::Debug + serde::de::DeserializeOwned>
    diesel::deserialize::FromSql<diesel::sql_types::Jsonb, Pg> for Proto<T>
{
    fn from_sql(value: diesel::pg::PgValue<'_>) -> diesel::deserialize::Result<Self> {
        let bytes = value.as_bytes();
        if bytes[0] != JSONB_HEADER {
            return Err("Unsupported JSONB encoding version".into());
        }
        serde_json::from_slice(&bytes[1..]).map_err(|_| "Invalid Json".into())
    }
}

Regarding Protobuf semantics:

Getting Serialize and Deserialize on the messages

One small issue is that prost does not give by default serde::{Serialize, Deserialize} on the types it generates (see this issue).

The type_attribute option in prost_build allows adding the #[derive(...)] macro, but adding #[serde(default)] on fields is a bit more tricky. Indeed, field_attribute applies to structs and enums indistinctively and #[serde(default)] is not valid on enum fields…. Instead, we achieve this with a custom Derive macro:

#[proc_macro_attribute]
pub fn serde_deserialize_default(_args: TokenStream, input: TokenStream) -> TokenStream {
    let mut input: syn::Item = syn::parse(input).unwrap();
    let attr = quote! {#[serde(default)]};
    let attr = syn::Attribute::parse_outer.parse(attr.into()).unwrap();
    if let syn::Item::Struct(input) = &mut input {
        for field in &mut input.fields {
            field.attrs.extend(attr.clone());
        }
    }
    quote! {
        #[derive(serde::Serialize,serde::Deserialize)]
        #input
    }
    .into()
}
 

Then we can set

config.type_attribute(".", "#[derive(serde_deserialize_default)]");

in the prost configuration.

Table setup and example

Finally, we can set our table up as:

#[derive(Insertable, Debug, Queryable, Serialize, Deserialize)]
#[diesel(table_name = messages)]
struct Message  {
    data: Proto<proto::Message>,
}

A Rust Message struct can be added and retrieved with:

conn.interact(move |conn| {
    diesel::insert_into(messages::table)
        .values(&[(
            messages::dsl::data.eq(Proto(message)),
        )])
        .execute(conn)
})
.await
.unwrap()?;
let message: proto::Message = conn
    .interact(|conn| messages::dsl::messages.first(conn))
    .await
    .unwrap()?
    .data
    .0;
 

More complex queries are then achieved with raw SQL, following Postgres’ JSONb support.

Indexes are supported, but only to some extent for sub-messages (see the operators that the GIN indexes support). An solution for that case is to create a materialized view and set indexes there.

The same with MongoDB

The very same idea can be used with MongoDB and the official Rust driver.

A priori, the bson type already implements serde::{Serialize, Deserialize}. However, it requires that map keys be strings. We use a wrapper type again to pass through serde_json, which will transparently convert the keys to strings and back.

pub struct ProtoAsJson<T> {
    pub inner: T,
}
 
impl<T> ProtoAsJson<T> {
    pub fn from(inner: T) -> ProtoAsJson<T> {
        ProtoAsJson { inner }
    }
}
 
// BSON-serialized (with keys as strings) -> JSON -> Native
impl<'de, T: Deserialize<'de>> Deserialize<'de> for ProtoAsJson<T> {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
    where
        D: serde::Deserializer<'de>,
    {
        let v: Result<serde_json::Value, D::Error> = serde_json::Value::deserialize(deserializer);
        T::deserialize(v?)
            .map(|inner| ProtoAsJson { inner })
            .map_err(|e| D::Error::custom(format!("Failed to deserialize BSON: {}", e)))
    }
}
// Native -> JSON -> BSON-serialized (with keys as strings)
impl<T: Serialize> Serialize for ProtoAsJson<T> {
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: serde::Serializer,
    {
        let x = serde_json::to_value(&self.inner).unwrap();
        x.serialize(serializer)
    }
}
 

A word of warning

JSON and Protobuf both have complex semantics, and the above implementations should be taken with caution. For example, there is no NaN in JSON, which are mapped to null in MongoDB.