|
@@ -20,10 +20,10 @@ use tracing::*;
|
|
|
|
|
|
use aspect_core::{
|
|
|
ident::{DeviceIdent, DirPackage},
|
|
|
- message_queue::MessageQueueId,
|
|
|
+ message_queue::{MessageQueueId, Origin},
|
|
|
};
|
|
|
use aspect_db::{
|
|
|
- device_msg_queue::DeviceMqDatastore,
|
|
|
+ device_msg_queue::{DeviceMqDatastore, QueueStatus},
|
|
|
devident::{DevIdentDatastore, DirEntry},
|
|
|
generic_msg_queue::GenericMqDatastore,
|
|
|
llmq::query::MqDatastore,
|
|
@@ -48,6 +48,22 @@ pub struct HomeserverStorage {
|
|
|
}
|
|
|
|
|
|
impl HomeserverStorage {
|
|
|
+ /// Startup storage tasks. This just creates some basic databate structures
|
|
|
+ /// if they don't already exist, but we might expand this into a more developed
|
|
|
+ /// system in the future.
|
|
|
+ pub async fn exec_startup_tasks(&self) -> Result<(), StorageError> {
|
|
|
+ let mut new_queues = 0;
|
|
|
+
|
|
|
+ let notif_id = MessageQueueId::Server(DMQID_SERVER_NOTIFICATIONS.to_string());
|
|
|
+ if self.dmq_ds.create_queue(notif_id.clone()).await? {
|
|
|
+ new_queues += 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ debug!(%new_queues, "created server queues");
|
|
|
+
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+
|
|
|
/// Creates a user and any other relevant records they might need.
|
|
|
pub async fn create_user(&self, username: &str, password: &str) -> Result<(), StorageError> {
|
|
|
self.user_ds.create_user(username, password).await?;
|
|
@@ -166,6 +182,28 @@ impl HomeserverStorage {
|
|
|
}
|
|
|
Ok(pkgs)
|
|
|
}
|
|
|
+
|
|
|
+ /// Gets the ID and status of all of the queues a device has access to.
|
|
|
+ pub async fn get_device_queues(
|
|
|
+ &self,
|
|
|
+ ident: &DeviceIdent,
|
|
|
+ ) -> Result<Vec<QueueStatus>, StorageError> {
|
|
|
+ Ok(self.dmq_ds.get_device_queues(*ident).await?)
|
|
|
+ }
|
|
|
+
|
|
|
+ /// Queues a message to a device queue.
|
|
|
+ pub async fn queue_device_msg(
|
|
|
+ &self,
|
|
|
+ mqid: &MessageQueueId,
|
|
|
+ origin: &Origin,
|
|
|
+ payload: &[u8],
|
|
|
+ ) -> Result<u64, StorageError> {
|
|
|
+ let idx = self
|
|
|
+ .dmq_ds
|
|
|
+ .submit_msg_to_queue(mqid, origin, payload)
|
|
|
+ .await?;
|
|
|
+ Ok(idx)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/// Opens a storage instance on top of primitives.
|