|
@@ -0,0 +1,68 @@
|
|
|
+//! Reader that interacts directly with the LLMQ datastore.
|
|
|
+
|
|
|
+use aspect_core::message_queue::{Origin, QueueEntry};
|
|
|
+use aspect_db::{
|
|
|
+ generic_msg_queue::{QueueId, ReaderId},
|
|
|
+ llmq::query::MqDatastore,
|
|
|
+};
|
|
|
+
|
|
|
+use super::errors::*;
|
|
|
+use super::reader_interface::*;
|
|
|
+
|
|
|
+/// Direct queue reader.
|
|
|
+pub struct DirectQueueReader {
|
|
|
+ /// The queue datastore.
|
|
|
+ mqds: MqDatastore,
|
|
|
+
|
|
|
+ /// The queue ID.
|
|
|
+ queue_id: QueueId,
|
|
|
+
|
|
|
+ /// The reader ID.
|
|
|
+ reader_id: ReaderId,
|
|
|
+}
|
|
|
+
|
|
|
+impl DirectQueueReader {
|
|
|
+ pub fn new(mqds: MqDatastore, queue_id: QueueId, reader_id: ReaderId) -> Self {
|
|
|
+ Self {
|
|
|
+ mqds,
|
|
|
+ queue_id,
|
|
|
+ reader_id,
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+impl QueueReader for DirectQueueReader {
|
|
|
+ fn get_view(&self) -> Result<QueueView, Error> {
|
|
|
+ let raw_qv = self
|
|
|
+ .mqds
|
|
|
+ .query_queue_view(&self.queue_id, Some(&self.reader_id))?;
|
|
|
+ Ok(QueueView::new(raw_qv.next_idx(), raw_qv.tail_idx()))
|
|
|
+ }
|
|
|
+
|
|
|
+ fn get_entry(&self, idx: u64) -> Result<Option<aspect_core::message_queue::QueueEntry>, Error> {
|
|
|
+ let mut msg_range =
|
|
|
+ self.mqds
|
|
|
+ .query_queue_message_range(&self.queue_id, &self.reader_id, idx, 1)?;
|
|
|
+
|
|
|
+ if msg_range.is_empty() {
|
|
|
+ return Ok(None);
|
|
|
+ }
|
|
|
+
|
|
|
+ let (idx, msg) = msg_range.remove(0);
|
|
|
+ let ent = QueueEntry {
|
|
|
+ idx,
|
|
|
+ timestamp: msg.timestamp(),
|
|
|
+ // TODO make this reflect real origin
|
|
|
+ origin: Origin::none(),
|
|
|
+ data: msg.into_data(),
|
|
|
+ };
|
|
|
+
|
|
|
+ Ok(Some(ent))
|
|
|
+ }
|
|
|
+
|
|
|
+ fn consume_upto(&self, next_read_idx: u64) -> Result<(), Error> {
|
|
|
+ self.mqds
|
|
|
+ .update_consumed_position(&self.queue_id, &self.reader_id, next_read_idx)?;
|
|
|
+ Ok(())
|
|
|
+ }
|
|
|
+}
|