4 Commits 2ee930a8fa ... e56a0d3be5

Auteur SHA1 Message Date
  Trey Del Bonis e56a0d3be5 test: reworked test framework envs, added test to ensure device queues can be queried il y a 4 mois
  Trey Del Bonis 021f7d0f0e rpc/homeserver, homeserver: added a bunch of impl for querying device MQs, added RPC for submitting server messages il y a 4 mois
  Trey Del Bonis e9d1eb47dc homeserver: create server notification queue on startup il y a 4 mois
  Trey Del Bonis a1f6dbd564 test: added log settings to run-tests.sh il y a 4 mois

+ 1 - 0
homeserver/src/main.rs

@@ -73,6 +73,7 @@ async fn main_task(config: &Config) -> anyhow::Result<()> {
     // Set up rest of data storage layer.
     let sled_db = sled::open(&config.sled_path)?;
     let storage = Arc::new(storage::open_on_primitives(sled_db, dbc)?);
+    storage.exec_startup_tasks().await?;
 
     // Set up services.
     let ident_svc = IdentityServer::new(storage.clone());

+ 30 - 0
homeserver/src/rpc_server.rs

@@ -1,10 +1,15 @@
 use std::sync::Arc;
 
+use aspect_core::message_queue::{MessageQueueId, Origin};
+use futures::{FutureExt, TryFutureExt};
+use jsonrpsee::types::{ErrorCode, ErrorObject};
 use tokio::sync::{oneshot, Mutex};
+use tracing::*;
 
 use aspect_homeserver_rpc::*;
 use aspect_rpc_common::*;
 
+use crate::constants::*;
 use crate::storage::HomeserverStorage;
 
 pub struct RpcImpl {
@@ -19,6 +24,24 @@ impl RpcImpl {
             stop_sig: Mutex::new(Some(stop_sig)),
         }
     }
+
+    async fn queue_device_msg<'c>(
+        &self,
+        mqid: &MessageQueueId,
+        origin: &Origin,
+        msg: &[u8],
+    ) -> Result<u64, ErrorObject<'c>> {
+        // TODO better errors
+        let idx = self
+            .storage
+            .queue_device_msg(mqid, origin, msg)
+            .map_err(|e| {
+                warn!(err = %e, "failed to queue message");
+                ErrorObject::from(ErrorCode::InternalError)
+            })
+            .await?;
+        Ok(idx)
+    }
 }
 
 #[async_trait]
@@ -35,4 +58,11 @@ impl HomeserverRpcServer for RpcImpl {
 
         Ok(())
     }
+
+    async fn queue_server_msg(&self, msg: String) -> RpcResult<u64> {
+        let mqid = MessageQueueId::Server(DMQID_SERVER_NOTIFICATIONS.to_string());
+        Ok(self
+            .queue_device_msg(&mqid, &Origin::none(), msg.as_bytes())
+            .await?)
+    }
 }

+ 40 - 2
homeserver/src/storage.rs

@@ -20,10 +20,10 @@ use tracing::*;
 
 use aspect_core::{
     ident::{DeviceIdent, DirPackage},
-    message_queue::MessageQueueId,
+    message_queue::{MessageQueueId, Origin},
 };
 use aspect_db::{
-    device_msg_queue::DeviceMqDatastore,
+    device_msg_queue::{DeviceMqDatastore, QueueStatus},
     devident::{DevIdentDatastore, DirEntry},
     generic_msg_queue::GenericMqDatastore,
     llmq::query::MqDatastore,
@@ -48,6 +48,22 @@ pub struct HomeserverStorage {
 }
 
 impl HomeserverStorage {
+    /// Startup storage tasks.  This just creates some basic databate structures
+    /// if they don't already exist, but we might expand this into a more developed
+    /// system in the future.
+    pub async fn exec_startup_tasks(&self) -> Result<(), StorageError> {
+        let mut new_queues = 0;
+
+        let notif_id = MessageQueueId::Server(DMQID_SERVER_NOTIFICATIONS.to_string());
+        if self.dmq_ds.create_queue(notif_id.clone()).await? {
+            new_queues += 1;
+        }
+
+        debug!(%new_queues, "created server queues");
+
+        Ok(())
+    }
+
     /// Creates a user and any other relevant records they might need.
     pub async fn create_user(&self, username: &str, password: &str) -> Result<(), StorageError> {
         self.user_ds.create_user(username, password).await?;
@@ -166,6 +182,28 @@ impl HomeserverStorage {
         }
         Ok(pkgs)
     }
+
+    /// Gets the ID and status of all of the queues a device has access to.
+    pub async fn get_device_queues(
+        &self,
+        ident: &DeviceIdent,
+    ) -> Result<Vec<QueueStatus>, StorageError> {
+        Ok(self.dmq_ds.get_device_queues(*ident).await?)
+    }
+
+    /// Queues a message to a device queue.
+    pub async fn queue_device_msg(
+        &self,
+        mqid: &MessageQueueId,
+        origin: &Origin,
+        payload: &[u8],
+    ) -> Result<u64, StorageError> {
+        let idx = self
+            .dmq_ds
+            .submit_msg_to_queue(mqid, origin, payload)
+            .await?;
+        Ok(idx)
+    }
 }
 
 /// Opens a storage instance on top of primitives.

+ 31 - 1
homeserver/src/svc/device_mq.rs

@@ -1,6 +1,7 @@
 use std::future::Future;
 use std::sync::Arc;
 
+use futures::TryFutureExt;
 use tracing::*;
 
 use aspect_proto::msg_queue::*;
@@ -30,7 +31,36 @@ impl RpcHandler<MqListProto> for DeviceMqServer {
                 device_ident,
             } = request;
 
-            let resp = MqListResponse { queues: Vec::new() };
+            if self
+                .storage
+                .check_user_password(&creds.username, &creds.password)
+                .await
+                .is_err()
+            {
+                return Err(RpcError::Other("invalid password".to_owned()));
+            }
+
+            let queue_statuses = self
+                .storage
+                .get_device_queues(device_ident)
+                .map_err(RpcError::server)
+                .await?;
+
+            let resp = MqListResponse {
+                queues: queue_statuses
+                    .into_iter()
+                    .map(|qs| QueueViewState {
+                        id: qs.id,
+                        device_pos: qs.reader_pos.unwrap_or_default(),
+                        // TODO should we fix this inconsistency?
+                        queue_pos: if qs.next_message_pos > 0 {
+                            qs.next_message_pos
+                        } else {
+                            u64::MAX
+                        },
+                    })
+                    .collect(),
+            };
             Ok(resp)
         }
     }

+ 3 - 0
rpc/homeserver/src/lib.rs

@@ -14,4 +14,7 @@ pub trait HomeserverRpc {
 
     #[method(name = "stop")]
     async fn stop(&self) -> RpcResult<()>;
+
+    #[method(name = "queue_server_msg")]
+    async fn queue_server_msg(&self, msg: String) -> RpcResult<u64>;
 }

+ 34 - 3
test/entry.py

@@ -2,6 +2,7 @@
 
 import os
 import sys
+import time
 
 import flexitest
 
@@ -73,7 +74,7 @@ class ProcClientFactory(ClientFactory):
             _inject_rpc(svc)
             return svc
 
-class SimpleEnvConfig(flexitest.EnvConfig):
+class BareEnvConfig(flexitest.EnvConfig):
     """Environment that just inits some number of clients and servers."""
     def __init__(self, nserv, nclients):
         self.num_servers = nserv
@@ -94,6 +95,35 @@ class SimpleEnvConfig(flexitest.EnvConfig):
 
         return flexitest.LiveEnv(svcs)
 
+class SimpleEnvConfig(flexitest.EnvConfig):
+    """
+    Environment that creates a single server and some number of clients that are
+    all connected and registered to it.
+    """
+    def __init__(self, nclients):
+        self.num_clients = nclients
+
+    def init(self, facs: dict) -> flexitest.LiveEnv:
+        sfac = facs["server"]
+        cfac = facs["client"]
+        svcs = {}
+
+        s = sfac.create_server("serv")
+        svcs["serv"] = s
+        serv_addr = s.get_prop("p2p_addr")
+        time.sleep(1)
+
+        for i in range(self.num_clients):
+            name = "cli%s" % i
+            print("creating client", name)
+            c = cfac.create_client(name)
+            time.sleep(1)
+            svcs[name] = c
+            crpc = c.create_rpc()
+            crpc.aspc_register(serv_addr, name, name)
+
+        return flexitest.LiveEnv(svcs)
+
 def _inject_rpc(svc: flexitest.Service):
     def _create_rpc():
         return rpc.RpcClient(svc.get_prop("rpc_url"))
@@ -116,8 +146,9 @@ def main(argv):
     factories = {"server": sfac, "client": cfac}
 
     # Prepare env configs.
-    simple_env = SimpleEnvConfig(1, 2)
-    env_configs = {"simple": simple_env}
+    bare_env = BareEnvConfig(1, 2)
+    basic_env = SimpleEnvConfig(1)
+    env_configs = {"bare": bare_env, "basic": basic_env}
 
     # Set up the runtime and prepare tests.
     rt = flexitest.TestRuntime(env_configs, datadir_root, factories)

+ 3 - 6
test/fn_cs_register.py

@@ -1,9 +1,9 @@
 # Tests client ability to log in and register an account on a server.
 
-import flexitest
-
 import time
 
+import flexitest
+
 C0_USERNAME = "fooclient"
 C0_PASSWORD = "fcpassword"
 
@@ -13,7 +13,7 @@ C1_PASSWORD = "bcpassword"
 @flexitest.register
 class Test(flexitest.Test):
     def __init__(self, ctx: flexitest.InitContext):
-        ctx.set_env("simple")
+        ctx.set_env("bare")
 
     def main(self, ctx: flexitest.RunContext):
         serv0 = ctx.get_service("serv0")
@@ -57,6 +57,3 @@ class Test(flexitest.Test):
         res = c1rpc.aspc_status()
         print("c1 status", res)
         assert res["self_ref"] == C1_USERNAME, "wrong username reported in status"
-
-        # TODO move to another test
-        print(c0rpc.aspc_list_device_queues())

+ 20 - 0
test/fn_cs_server_queue.py

@@ -0,0 +1,20 @@
+# Tests client ability to query queues and read server queue messages.
+
+import time
+
+import flexitest
+
+
+@flexitest.register
+class Test(flexitest.Test):
+    def __init__(self, ctx: flexitest.InitContext):
+        ctx.set_env("basic")
+
+    def main(self, ctx: flexitest.RunContext):
+        serv = ctx.get_service("serv")
+        client = ctx.get_service("cli0")
+
+        srpc = serv.create_rpc()
+        crpc = client.create_rpc()
+
+        print("Queues:", crpc.aspc_list_device_queues())

+ 1 - 1
test/fn_s_startup.py

@@ -5,7 +5,7 @@ import flexitest
 @flexitest.register
 class Test(flexitest.Test):
     def __init__(self, ctx: flexitest.InitContext):
-        ctx.set_env("simple")
+        ctx.set_env("bare")
 
     def main(self, ctx: flexitest.RunContext):
         server = ctx.get_service("serv0")

+ 0 - 0
test/run-tests.sh


Certains fichiers n'ont pas été affichés car il y a eu trop de fichiers modifiés dans ce diff