3 Commits a6ce9a4528 ... 2ee930a8fa

Auteur SHA1 Bericht Datum
  Trey Del Bonis 2ee930a8fa homeserver: registered the MQ list protocol and added datastore to the RPC server 4 maanden geleden
  Trey Del Bonis 19e7320977 homeserver: fully integrate new storage abstraction 4 maanden geleden
  Trey Del Bonis cf41faffd8 db, homeserver: renamed queue modules to be more understandable, added beginning of opaque homeserver storage type 4 maanden geleden
10 gewijzigde bestanden met toevoegingen van 34 en 26 verwijderingen
  1. 12 12
      db/src/device_msg_queue.rs
  2. 5 5
      db/src/errors.rs
  3. 7 7
      db/src/msg_queue.rs
  4. 2 2
      db/src/lib.rs
  5. 0 0
      db/src/llmq/errors.rs
  6. 0 0
      db/src/llmq/id.rs
  7. 8 0
      db/src/llmq/mod.rs
  8. 0 0
      db/src/llmq/ops.rs
  9. 0 0
      db/src/llmq/query.rs
  10. 0 0
      db/src/mq/state.rs

+ 12 - 12
db/src/device_msg_queue.rs

@@ -13,8 +13,8 @@ use aspect_core::message_queue::{MessageQueueId, Origin, QueueEntry};
 use aspect_sqldb::{device_queue_sub, message_queue};
 use aspect_util::codec;
 
-use crate::msg_queue::{self, Msg, QueueId, ReaderId};
-use crate::{errors::*, mq};
+use crate::generic_msg_queue::{self, Msg, QueueId, ReaderId};
+use crate::{errors::*, llmq};
 
 #[derive(Clone, Debug)]
 pub struct QueueStatus {
@@ -30,7 +30,7 @@ pub struct QueueStatus {
 
 pub struct DeviceMqDatastore {
     /// Underlying wrapper around the higher level interface to the MQ data.
-    msg_queue: Arc<msg_queue::MsgQueueDatastore>,
+    msg_queue: Arc<generic_msg_queue::GenericMqDatastore>,
 
     /// Database connection for the high level device data.
     dbc: DatabaseConnection,
@@ -38,7 +38,7 @@ pub struct DeviceMqDatastore {
 
 impl DeviceMqDatastore {
     pub fn new(
-        msg_queue: Arc<msg_queue::MsgQueueDatastore>,
+        msg_queue: Arc<generic_msg_queue::GenericMqDatastore>,
         dbc: DatabaseConnection,
     ) -> Result<Self, Error> {
         Ok(Self { msg_queue, dbc })
@@ -150,8 +150,8 @@ impl DeviceMqDatastore {
                 idx,
                 timestamp: mdata.timestamp(),
                 origin: match mdata.origin() {
-                    mq::state::Origin::Device(id) => Origin::Device(*id),
-                    mq::state::Origin::None(_) => Origin::None(()),
+                    llmq::state::Origin::Device(id) => Origin::Device(*id),
+                    llmq::state::Origin::None(_) => Origin::None(()),
                 },
                 data: mdata.into_data(),
             };
@@ -359,16 +359,16 @@ fn now_ts() -> u64 {
     time::UNIX_EPOCH.elapsed().expect("db: now_ts").as_millis() as u64
 }
 
-fn conv_down_origin(o: &Origin) -> mq::state::Origin {
+fn conv_down_origin(o: &Origin) -> llmq::state::Origin {
     match o {
-        Origin::Device(id) => mq::state::Origin::Device(*id),
-        Origin::None(_) => mq::state::Origin::none(),
+        Origin::Device(id) => llmq::state::Origin::Device(*id),
+        Origin::None(_) => llmq::state::Origin::none(),
     }
 }
 
-fn conv_up_origin(o: &mq::state::Origin) -> Origin {
+fn conv_up_origin(o: &llmq::state::Origin) -> Origin {
     match o {
-        mq::state::Origin::Device(id) => Origin::Device(*id),
-        mq::state::Origin::None(_) => Origin::none(),
+        llmq::state::Origin::Device(id) => Origin::Device(*id),
+        llmq::state::Origin::None(_) => Origin::none(),
     }
 }

+ 5 - 5
db/src/errors.rs

@@ -10,7 +10,7 @@ use aspect_core::{
 };
 use aspect_util::codec::CodecError;
 
-use crate::mq;
+use crate::llmq;
 
 #[derive(Debug, Error)]
 pub enum Error {
@@ -70,11 +70,11 @@ pub enum Error {
     #[error("task '{0}' paniced")]
     BlockingTaskPanic(&'static str),
 
-    #[error("mq: {0}")]
-    Mq(#[from] mq::errors::Error),
+    #[error("llmq: {0}")]
+    Llmq(#[from] llmq::errors::Error),
 
-    #[error("mqdb: {0}")]
-    MqDb(#[from] mq::errors::DbError),
+    #[error("llmq db: {0}")]
+    LlmqDb(#[from] llmq::errors::DbError),
 
     #[error("unknown message queue '{0}'")]
     UnknownMessageQueue(MessageQueueId),

+ 7 - 7
db/src/msg_queue.rs

@@ -6,17 +6,17 @@ use tokio::task;
 use tracing::*;
 
 use crate::errors::*;
-use crate::mq;
+use crate::llmq;
 
-pub use mq::id::*;
-pub use mq::state::{Msg, Origin, QueueView};
+pub use llmq::id::*;
+pub use llmq::state::{Msg, Origin, QueueView};
 
-pub struct MsgQueueDatastore {
-    db: Arc<mq::query::MqDatastore>,
+pub struct GenericMqDatastore {
+    db: Arc<llmq::query::MqDatastore>,
 }
 
-impl MsgQueueDatastore {
-    pub fn new(db: mq::query::MqDatastore) -> Result<Self, Error> {
+impl GenericMqDatastore {
+    pub fn new(db: llmq::query::MqDatastore) -> Result<Self, Error> {
         Ok(Self { db: Arc::new(db) })
     }
 

+ 2 - 2
db/src/lib.rs

@@ -1,11 +1,11 @@
 #![feature(is_sorted)]
 
-pub mod mq;
+pub mod llmq;
 
 pub mod device_msg_queue;
 pub mod devident;
+pub mod generic_msg_queue;
 pub mod job_queue;
-pub mod msg_queue;
 pub mod state_prop;
 pub mod user;
 

db/src/mq/errors.rs → db/src/llmq/errors.rs


db/src/mq/id.rs → db/src/llmq/id.rs


+ 8 - 0
db/src/llmq/mod.rs

@@ -0,0 +1,8 @@
+//! Aspect low-level MQ.  A reasonably high performance low-level message queue
+//! store.  This might eventually be split out into a separate service we can
+//! run at higher performance or be replicated or something.
+
+pub mod errors;
+pub mod id;
+pub mod query;
+pub mod state;

db/src/mq/ops.rs → db/src/llmq/ops.rs


db/src/mq/query.rs → db/src/llmq/query.rs


+ 0 - 0
db/src/mq/state.rs


Some files were not shown because too many files changed in this diff