2 次代码提交 6d51ac47c1 ... f518fefccf

作者 SHA1 备注 提交日期
  Trey Del Bonis f518fefccf client: fixed missing `&` 3 月之前
  Trey Del Bonis 20de029b39 client: added queue reader interfaces 3 月之前

+ 1 - 0
client/src/lib.rs

@@ -4,6 +4,7 @@ pub mod device_resolver;
 pub mod group_state;
 pub mod infra;
 pub mod peer_manager;
+pub mod queue;
 pub mod session;
 pub mod signer_manager;
 

+ 62 - 0
client/src/queue/async_wrapper.rs

@@ -0,0 +1,62 @@
+//! Wraps a blocking queue interface as an async queue interface.
+
+use std::sync::Arc;
+
+use super::errors::*;
+use super::reader_interface::{AsyncQueueReader, QueueReader, QueueView};
+
+pub struct AsyncQueueWrapper<Q: QueueReader> {
+    handle: tokio::runtime::Handle,
+    inner: Arc<Q>,
+}
+
+impl<Q: QueueReader> AsyncQueueWrapper<Q> {
+    pub fn new(handle: tokio::runtime::Handle, inner: Arc<Q>) -> Self {
+        Self { handle, inner }
+    }
+}
+
+impl<Q: QueueReader + Sync + Send + 'static> AsyncQueueReader for AsyncQueueWrapper<Q> {
+    fn get_view(
+        &self,
+    ) -> impl futures::prelude::Future<Output = Result<QueueView, Error>> + 'static {
+        let handle = self.handle.clone();
+        let inner = self.inner.clone();
+        async move {
+            handle
+                .spawn_blocking(move || inner.get_view())
+                .await
+                .map_err(|_| Error::LlmqPanic)?
+        }
+    }
+
+    fn get_entry(
+        &self,
+        idx: u64,
+    ) -> impl futures::prelude::Future<
+        Output = Result<Option<aspect_core::message_queue::QueueEntry>, Error>,
+    > + 'static {
+        let handle = self.handle.clone();
+        let inner = self.inner.clone();
+        async move {
+            handle
+                .spawn_blocking(move || inner.get_entry(idx))
+                .await
+                .map_err(|_| Error::LlmqPanic)?
+        }
+    }
+
+    fn consume_upto(
+        &self,
+        next_read_idx: u64,
+    ) -> impl futures::prelude::Future<Output = Result<(), Error>> + 'static {
+        let handle = self.handle.clone();
+        let inner = self.inner.clone();
+        async move {
+            handle
+                .spawn_blocking(move || inner.consume_upto(next_read_idx))
+                .await
+                .map_err(|_| Error::LlmqPanic)?
+        }
+    }
+}

+ 68 - 0
client/src/queue/direct_reader.rs

@@ -0,0 +1,68 @@
+//! Reader that interacts directly with the LLMQ datastore.
+
+use aspect_core::message_queue::{Origin, QueueEntry};
+use aspect_db::{
+    generic_msg_queue::{QueueId, ReaderId},
+    llmq::query::MqDatastore,
+};
+
+use super::errors::*;
+use super::reader_interface::*;
+
+/// Direct queue reader.
+pub struct DirectQueueReader {
+    /// The queue datastore.
+    mqds: MqDatastore,
+
+    /// The queue ID.
+    queue_id: QueueId,
+
+    /// The reader ID.
+    reader_id: ReaderId,
+}
+
+impl DirectQueueReader {
+    pub fn new(mqds: MqDatastore, queue_id: QueueId, reader_id: ReaderId) -> Self {
+        Self {
+            mqds,
+            queue_id,
+            reader_id,
+        }
+    }
+}
+
+impl QueueReader for DirectQueueReader {
+    fn get_view(&self) -> Result<QueueView, Error> {
+        let raw_qv = self
+            .mqds
+            .query_queue_view(&self.queue_id, Some(&self.reader_id))?;
+        Ok(QueueView::new(raw_qv.next_idx(), raw_qv.tail_idx()))
+    }
+
+    fn get_entry(&self, idx: u64) -> Result<Option<aspect_core::message_queue::QueueEntry>, Error> {
+        let mut msg_range =
+            self.mqds
+                .query_queue_message_range(&self.queue_id, &self.reader_id, idx, 1)?;
+
+        if msg_range.is_empty() {
+            return Ok(None);
+        }
+
+        let (idx, msg) = msg_range.remove(0);
+        let ent = QueueEntry {
+            idx,
+            timestamp: msg.timestamp(),
+            // TODO make this reflect real origin
+            origin: Origin::none(),
+            data: msg.into_data(),
+        };
+
+        Ok(Some(ent))
+    }
+
+    fn consume_upto(&self, next_read_idx: u64) -> Result<(), Error> {
+        self.mqds
+            .update_consumed_position(&self.queue_id, &self.reader_id, next_read_idx)?;
+        Ok(())
+    }
+}

+ 14 - 0
client/src/queue/errors.rs

@@ -0,0 +1,14 @@
+use aspect_db::llmq;
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+pub enum Error {
+    #[error("llmq/db: {0}")]
+    LlmqDb(#[from] llmq::errors::DbError),
+
+    #[error("llmq routine panic")]
+    LlmqPanic,
+
+    #[error("{0}")]
+    Other(String),
+}

+ 5 - 0
client/src/queue/mod.rs

@@ -0,0 +1,5 @@
+pub mod errors;
+
+pub mod async_wrapper;
+pub mod direct_reader;
+pub mod reader_interface;

+ 58 - 0
client/src/queue/reader_interface.rs

@@ -0,0 +1,58 @@
+use std::future::Future;
+
+use aspect_core::message_queue::QueueEntry;
+
+use super::errors::Error;
+
+/// Describes the set of unconsumed entries in the queue from our perspective.
+#[derive(Copy, Clone, Debug, Eq, PartialEq)]
+pub struct QueueView {
+    /// The idx of the next entry to be read.
+    next_read_idx: u64,
+
+    /// The idx of the next entry to be written.
+    next_entry_idx: u64,
+}
+
+impl QueueView {
+    pub fn new(next_read_idx: u64, next_entry_idx: u64) -> Self {
+        Self {
+            next_read_idx,
+            next_entry_idx,
+        }
+    }
+
+    pub fn next_read_idx(&self) -> u64 {
+        self.next_read_idx
+    }
+
+    pub fn next_entry_idx(&self) -> u64 {
+        self.next_entry_idx
+    }
+}
+
+pub trait QueueReader {
+    /// Queries our view of the queue.
+    fn get_view(&self) -> Result<QueueView, Error>;
+
+    /// Gets en entry from the queue.
+    fn get_entry(&self, idx: u64) -> Result<Option<QueueEntry>, Error>;
+
+    /// Consumes entries up to some index, leaving the provided index as the next to be read.
+    fn consume_upto(&self, next_read_idx: u64) -> Result<(), Error>;
+}
+
+pub trait AsyncQueueReader {
+    /// Queries our view of the queue.
+    fn get_view(&self) -> impl Future<Output = Result<QueueView, Error>> + 'static;
+
+    /// Gets en entry from the queue.
+    fn get_entry(
+        &self,
+        idx: u64,
+    ) -> impl Future<Output = Result<Option<QueueEntry>, Error>> + 'static;
+
+    /// Consumes entries up to some index, leaving the provided index as the next to be read.
+    fn consume_upto(&self, next_read_idx: u64)
+        -> impl Future<Output = Result<(), Error>> + 'static;
+}

+ 1 - 1
client/src/session/command.rs

@@ -23,7 +23,7 @@ impl MessageData {
         }
     }
 
-    pub fn application_payload(self) -> &[u8] {
+    pub fn application_payload(&self) -> &[u8] {
         &self.application_payload
     }