2 Commits 3aab06a0ca ... e8b2ae5d2b

Author SHA1 Message Date
  Trey Del Bonis e8b2ae5d2b sqldb: added message queue migrations and entities 2 weeks ago
  Trey Del Bonis df8f081b2d core: reworked MQ IDs and added codec support 2 weeks ago

+ 64 - 26
core/src/message_queue.rs

@@ -2,6 +2,8 @@ use std::str::FromStr;
 
 use thiserror::Error;
 
+use aspect_util::prelude::*;
+
 use crate::ident::DeviceIdent;
 
 pub const MAX_ARG_BYTE_LEN: usize = 255;
@@ -21,19 +23,39 @@ pub enum Error {
     MalformedDeviceId(String),
 }
 
-#[derive(Clone, Debug)]
-pub enum MessageQueueId {
-    /// Device's personal queue.
-    Device(DeviceIdent),
+/// Describes a simplex channel from one device to another.
+#[derive(Clone, Debug, Hash, Eq, PartialEq)]
+pub struct D2DChannelSpec {
+    pub src: DeviceIdent,
+    pub dest: DeviceIdent,
+}
 
-    /// Device-to-device queue, source -> dest.
-    D2D(DeviceIdent, DeviceIdent),
+impl D2DChannelSpec {
+    pub fn new(src: DeviceIdent, dest: DeviceIdent) -> Self {
+        Self { src, dest }
+    }
+}
+
+aspect_util::inst_struct_codec! {
+    D2DChannelSpec {
+        src: DeviceIdent,
+        dest: DeviceIdent
+    }
+}
 
+#[derive(Clone, Debug)]
+pub enum MessageQueueId {
     /// Homeserver message queue, used for control messages.
     Server(String),
 
-    /// Generic queue type, maybe make it application programmable?
+    /// Generic queue type, maybe make it user programmable?
     Generic(String),
+
+    /// Device's personal queue.
+    Device(DeviceIdent),
+
+    /// Device-to-device queue, source -> dest.
+    D2D(D2DChannelSpec),
     // TODO channel ID, space ID?
 }
 
@@ -42,26 +64,33 @@ impl FromStr for MessageQueueId {
 
     fn from_str(s: &str) -> Result<Self, Self::Err> {
         match s.split_once(":") {
-            Some((ty_str, arg_str)) => match ty_str {
-                "device" => {
-                    let did = DeviceIdent::from_str(arg_str)
-                        .map_err(|_| Error::MalformedDeviceId(arg_str.to_owned()))?;
-                    Ok(MessageQueueId::Device(did))
+            Some((ty_str, arg_str)) => {
+                let arg_len = arg_str.as_bytes().len();
+                if arg_len > MAX_ARG_BYTE_LEN {
+                    return Err(Error::ArgTooLong(arg_len));
                 }
-                "d2d" => match arg_str.split_once(",") {
-                    Some((lhs, rhs)) => {
-                        let src_id = DeviceIdent::from_str(lhs)
-                            .map_err(|_| Error::MalformedDeviceId(lhs.to_owned()))?;
-                        let dest_id = DeviceIdent::from_str(rhs)
-                            .map_err(|_| Error::MalformedDeviceId(rhs.to_owned()))?;
-                        Ok(MessageQueueId::D2D(src_id, dest_id))
+
+                match ty_str {
+                    "server" => Ok(MessageQueueId::Server(arg_str.to_owned())),
+                    "generic" => Ok(MessageQueueId::Generic(arg_str.to_owned())),
+                    "device" => {
+                        let did = DeviceIdent::from_str(arg_str)
+                            .map_err(|_| Error::MalformedDeviceId(arg_str.to_owned()))?;
+                        Ok(MessageQueueId::Device(did))
                     }
-                    None => Err(Error::Malformed(s.to_owned())),
-                },
-                "server" => Ok(MessageQueueId::Server(arg_str.to_owned())),
-                "generic" => Ok(MessageQueueId::Generic(arg_str.to_owned())),
-                _ => Err(Error::UnknownType(s.to_owned())),
-            },
+                    "d2d" => match arg_str.split_once(",") {
+                        Some((lhs, rhs)) => {
+                            let src_id = DeviceIdent::from_str(lhs)
+                                .map_err(|_| Error::MalformedDeviceId(lhs.to_owned()))?;
+                            let dest_id = DeviceIdent::from_str(rhs)
+                                .map_err(|_| Error::MalformedDeviceId(rhs.to_owned()))?;
+                            Ok(MessageQueueId::D2D(D2DChannelSpec::new(src_id, dest_id)))
+                        }
+                        None => Err(Error::Malformed(s.to_owned())),
+                    },
+                    _ => Err(Error::UnknownType(s.to_owned())),
+                }
+            }
             None => Err(Error::Malformed(s.to_owned())),
         }
     }
@@ -71,9 +100,18 @@ impl ToString for MessageQueueId {
     fn to_string(&self) -> String {
         match self {
             Self::Device(id) => format!("device:{id}"),
-            Self::D2D(src_id, dest_id) => format!("d2d:{src_id},{dest_id}"),
+            Self::D2D(D2DChannelSpec { src, dest }) => format!("d2d:{src},{dest}"),
             Self::Server(name) => format!("server:{name}"),
             Self::Generic(name) => format!("generic:{name}"),
         }
     }
 }
+
+aspect_util::inst_enum_codec! {
+    MessageQueueId u8 {
+        0 => Server(field String),
+        1 => Generic(field String),
+        2 => Device(field DeviceIdent),
+        3 => D2D(field D2DChannelSpec),
+    }
+}

+ 34 - 0
sqldb/migration/src/idents.rs

@@ -126,3 +126,37 @@ pub enum CompletedJob {
     /// Job data, usually codec-encoded.  This might get pruned.
     Payload,
 }
+
+/// Authoritative table for message queues, irrespective of their readers.
+#[derive(DeriveIden)]
+pub enum MessageQueue {
+    Table,
+
+    /// Low-level queue ID, used as primary key.
+    QueueId,
+
+    /// Name of the table.
+    Name,
+    // will add garbage collection settings here
+}
+
+/// Normalized table for device subscriptions to message queues.
+#[derive(DeriveIden)]
+pub enum DeviceQueueSub {
+    Table,
+
+    /// Queue subscription ID
+    Id,
+
+    /// Queue name.
+    Name,
+
+    /// Device ID of the subscription.
+    DeviceId,
+
+    /// Message queue ID, also primary key of named table.
+    QueueId,
+
+    /// Low level reader ID (should be unique to this reader).
+    ReaderId,
+}

+ 2 - 0
sqldb/migration/src/lib.rs

@@ -6,6 +6,7 @@ mod m000001_create_user_tables;
 mod m000002_create_device_tables;
 mod m000003_create_state_prop;
 mod m000004_create_job_queue;
+mod m000005_create_msg_queue_tracking;
 
 pub struct Migrator;
 
@@ -17,6 +18,7 @@ impl MigratorTrait for Migrator {
             Box::new(m000002_create_device_tables::Migration),
             Box::new(m000003_create_state_prop::Migration),
             Box::new(m000004_create_job_queue::Migration),
+            Box::new(m000005_create_msg_queue_tracking::Migration),
         ]
     }
 }

+ 95 - 0
sqldb/migration/src/m000005_create_msg_queue_tracking.rs

@@ -0,0 +1,95 @@
+use sea_orm_migration::prelude::*;
+
+use crate::idents::*;
+
+#[derive(DeriveMigrationName)]
+pub struct Migration;
+
+#[async_trait::async_trait]
+impl MigrationTrait for Migration {
+    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
+        manager
+            .create_table(
+                Table::create()
+                    .table(MessageQueue::Table)
+                    .col(ColumnDef::new(MessageQueue::QueueId).binary().primary_key())
+                    .col(ColumnDef::new(MessageQueue::Name).text().not_null())
+                    .to_owned(),
+            )
+            .await?;
+
+        manager
+            .create_table(
+                Table::create()
+                    .table(DeviceQueueSub::Table)
+                    .col(
+                        ColumnDef::new(DeviceQueueSub::Id)
+                            .big_integer()
+                            .primary_key()
+                            .auto_increment(),
+                    )
+                    .col(ColumnDef::new(DeviceQueueSub::Name).text().not_null())
+                    .col(ColumnDef::new(DeviceQueueSub::DeviceId).binary().not_null())
+                    .col(ColumnDef::new(DeviceQueueSub::QueueId).binary().not_null())
+                    .col(ColumnDef::new(DeviceQueueSub::ReaderId).binary().not_null())
+                    .to_owned(),
+            )
+            .await?;
+
+        manager
+            .create_index(
+                Index::create()
+                    .name("IDX_message_queue_on_name")
+                    .table(DeviceQueueSub::Table)
+                    .col(DeviceQueueSub::Name)
+                    .to_owned(),
+            )
+            .await?;
+
+        manager
+            .create_index(
+                Index::create()
+                    .name("IDX_device_queue_sub_on_name")
+                    .table(DeviceQueueSub::Table)
+                    .col(DeviceQueueSub::Name)
+                    .to_owned(),
+            )
+            .await?;
+
+        manager
+            .create_index(
+                Index::create()
+                    .name("IDX_device_queue_sub_on_device_id")
+                    .table(DeviceQueueSub::Table)
+                    .col(DeviceQueueSub::DeviceId)
+                    .to_owned(),
+            )
+            .await?;
+
+        manager
+            .create_index(
+                Index::create()
+                    .name("IDX_device_queue_sub_on_queue_id")
+                    .table(DeviceQueueSub::Table)
+                    .col(DeviceQueueSub::QueueId)
+                    .to_owned(),
+            )
+            .await?;
+
+        manager
+            .create_foreign_key(
+                ForeignKey::create()
+                    .name("FK_device_queue_sub_on_queue_id_to_message_queue")
+                    .from(DeviceQueueSub::Table, DeviceQueueSub::QueueId)
+                    .to(MessageQueue::Table, MessageQueue::QueueId)
+                    .to_owned(),
+            )
+            .await?;
+
+        Ok(())
+    }
+
+    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
+        Ok(())
+    }
+}

+ 38 - 0
sqldb/src/device_queue_sub.rs

@@ -0,0 +1,38 @@
+//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.14
+
+use sea_orm::entity::prelude::*;
+
+#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
+#[sea_orm(table_name = "device_queue_sub")]
+pub struct Model {
+    #[sea_orm(primary_key)]
+    pub id: i64,
+    #[sea_orm(column_type = "Text")]
+    pub name: String,
+    #[sea_orm(column_type = "Binary(BlobSize::Blob(None))")]
+    pub device_id: Vec<u8>,
+    #[sea_orm(column_type = "Binary(BlobSize::Blob(None))")]
+    pub queue_id: Vec<u8>,
+    #[sea_orm(column_type = "Binary(BlobSize::Blob(None))")]
+    pub reader_id: Vec<u8>,
+}
+
+#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
+pub enum Relation {
+    #[sea_orm(
+        belongs_to = "super::message_queue::Entity",
+        from = "Column::QueueId",
+        to = "super::message_queue::Column::QueueId",
+        on_update = "NoAction",
+        on_delete = "NoAction"
+    )]
+    MessageQueue,
+}
+
+impl Related<super::message_queue::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::MessageQueue.def()
+    }
+}
+
+impl ActiveModelBehavior for ActiveModel {}

+ 2 - 0
sqldb/src/lib.rs

@@ -4,6 +4,8 @@ pub mod prelude;
 
 pub mod completed_job;
 pub mod device;
+pub mod device_queue_sub;
+pub mod message_queue;
 pub mod pending_job;
 pub mod session;
 pub mod state_prop;

+ 30 - 0
sqldb/src/message_queue.rs

@@ -0,0 +1,30 @@
+//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.14
+
+use sea_orm::entity::prelude::*;
+
+#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)]
+#[sea_orm(table_name = "message_queue")]
+pub struct Model {
+    #[sea_orm(
+        primary_key,
+        auto_increment = false,
+        column_type = "Binary(BlobSize::Blob(None))"
+    )]
+    pub queue_id: Vec<u8>,
+    #[sea_orm(column_type = "Text")]
+    pub name: String,
+}
+
+#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
+pub enum Relation {
+    #[sea_orm(has_many = "super::device_queue_sub::Entity")]
+    DeviceQueueSub,
+}
+
+impl Related<super::device_queue_sub::Entity> for Entity {
+    fn to() -> RelationDef {
+        Relation::DeviceQueueSub.def()
+    }
+}
+
+impl ActiveModelBehavior for ActiveModel {}

+ 2 - 0
sqldb/src/prelude.rs

@@ -2,6 +2,8 @@
 
 pub use super::completed_job::Entity as CompletedJob;
 pub use super::device::Entity as Device;
+pub use super::device_queue_sub::Entity as DeviceQueueSub;
+pub use super::message_queue::Entity as MessageQueue;
 pub use super::pending_job::Entity as PendingJob;
 pub use super::session::Entity as Session;
 pub use super::state_prop::Entity as StateProp;