comm.rs 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. // -*- coding: utf-8 -*-
  2. //
  3. // Simple CMS
  4. //
  5. // Copyright (C) 2011-2024 Michael Büsch <m@bues.ch>
  6. //
  7. // Licensed under the Apache License version 2.0
  8. // or the MIT license, at your option.
  9. // SPDX-License-Identifier: Apache-2.0 OR MIT
  10. use anyhow::{self as ah, format_err as err, Context as _};
  11. use chrono::prelude::*;
  12. use cms_ident::{CheckedIdent, CheckedIdentElem, Tail};
  13. use cms_socket::{CmsSocketConn, MsgSerde as _};
  14. use cms_socket_db::{Msg as MsgDb, SOCK_FILE as SOCK_FILE_DB};
  15. use cms_socket_post::{Msg as MsgPost, SOCK_FILE as SOCK_FILE_POST};
  16. use lru::LruCache;
  17. use std::{
  18. collections::HashMap,
  19. path::{Path, PathBuf},
  20. };
  21. const DEBUG: bool = false;
  22. const MACRO_CACHE_SIZE: usize = 512;
  23. fn epoch_stamp(seconds: u64) -> DateTime<Utc> {
  24. DateTime::from_timestamp(seconds.try_into().unwrap_or_default(), 0).unwrap_or_default()
  25. }
  26. #[derive(Clone, Debug, Default)]
  27. pub struct CommGetPage {
  28. pub path: CheckedIdent,
  29. pub get_title: bool,
  30. pub get_data: bool,
  31. pub get_stamp: bool,
  32. pub get_redirect: bool,
  33. }
  34. #[derive(Clone, Debug, Default)]
  35. pub struct CommPage {
  36. pub title: Option<String>,
  37. pub data: Option<String>,
  38. pub stamp: Option<DateTime<Utc>>,
  39. pub redirect: Option<String>,
  40. }
  41. #[derive(Clone, Debug, Default)]
  42. pub struct CommSubPages {
  43. pub names: Vec<String>,
  44. pub nav_labels: Vec<String>,
  45. pub nav_stops: Vec<bool>,
  46. pub stamps: Vec<DateTime<Utc>>,
  47. pub prios: Vec<u64>,
  48. }
  49. #[derive(Clone, Debug, Default)]
  50. pub struct CommRunPostHandler {
  51. pub path: CheckedIdent,
  52. pub query: HashMap<String, Vec<u8>>,
  53. pub form_fields: HashMap<String, Vec<u8>>,
  54. }
  55. #[derive(Clone, Debug, Default)]
  56. pub struct CommPostHandlerResult {
  57. pub error: String,
  58. pub body: Vec<u8>,
  59. pub mime: String,
  60. }
  61. /// Communication with database and post handler.
  62. pub struct CmsComm {
  63. sock_path_db: PathBuf,
  64. sock_path_post: PathBuf,
  65. sock_db: Option<CmsSocketConn>,
  66. sock_post: Option<CmsSocketConn>,
  67. macro_cache: LruCache<String, String>,
  68. }
  69. impl CmsComm {
  70. pub fn new(rundir: &Path) -> Self {
  71. let sock_path_db = rundir.join(SOCK_FILE_DB);
  72. let sock_path_post = rundir.join(SOCK_FILE_POST);
  73. Self {
  74. sock_path_db,
  75. sock_path_post,
  76. sock_db: None,
  77. sock_post: None,
  78. macro_cache: LruCache::new(MACRO_CACHE_SIZE.try_into().unwrap()),
  79. }
  80. }
  81. async fn sock_db(&mut self) -> ah::Result<&mut CmsSocketConn> {
  82. if self.sock_db.is_none() {
  83. self.sock_db = Some(CmsSocketConn::connect(&self.sock_path_db).await?);
  84. }
  85. Ok(self.sock_db.as_mut().unwrap())
  86. }
  87. async fn sock_post(&mut self) -> ah::Result<&mut CmsSocketConn> {
  88. if self.sock_post.is_none() {
  89. self.sock_post = Some(CmsSocketConn::connect(&self.sock_path_post).await?);
  90. }
  91. Ok(self.sock_post.as_mut().unwrap())
  92. }
  93. async fn comm_db(&mut self, request: &MsgDb) -> ah::Result<MsgDb> {
  94. if DEBUG {
  95. println!("DB comm: {request:?}");
  96. }
  97. let sock = self.sock_db().await?;
  98. sock.send_msg(request).await?;
  99. if let Some(reply) = sock.recv_msg(MsgDb::try_msg_deserialize).await? {
  100. Ok(reply)
  101. } else {
  102. Err(err!("cms-fsd disconnected"))
  103. }
  104. }
  105. async fn comm_post(&mut self, request: &MsgPost) -> ah::Result<MsgPost> {
  106. if DEBUG {
  107. println!("Post comm: {request:?}");
  108. }
  109. let sock = self.sock_post().await?;
  110. sock.send_msg(request).await?;
  111. if let Some(reply) = sock.recv_msg(MsgPost::try_msg_deserialize).await? {
  112. Ok(reply)
  113. } else {
  114. Err(err!("cms-postd disconnected"))
  115. }
  116. }
  117. pub async fn get_db_page(&mut self, get: CommGetPage) -> ah::Result<CommPage> {
  118. let reply = self
  119. .comm_db(&MsgDb::GetPage {
  120. path: get.path.downgrade_clone(),
  121. get_title: get.get_title,
  122. get_data: get.get_data,
  123. get_stamp: get.get_stamp,
  124. get_redirect: get.get_redirect,
  125. })
  126. .await;
  127. if let Ok(MsgDb::Page {
  128. title,
  129. data,
  130. stamp,
  131. redirect,
  132. }) = reply
  133. {
  134. Ok(CommPage {
  135. title: title.and_then(|x| String::from_utf8(x).ok()),
  136. data: data.and_then(|x| String::from_utf8(x).ok()),
  137. stamp: stamp.map(epoch_stamp),
  138. redirect: redirect.and_then(|x| String::from_utf8(x).ok()),
  139. })
  140. } else {
  141. Err(err!("Page: Invalid db reply."))
  142. }
  143. }
  144. pub async fn get_db_sub_pages(&mut self, path: &CheckedIdent) -> ah::Result<CommSubPages> {
  145. let reply = self
  146. .comm_db(&MsgDb::GetSubPages {
  147. path: path.downgrade_clone(),
  148. get_nav_labels: true,
  149. get_nav_stops: true,
  150. get_stamps: true,
  151. get_prios: true,
  152. })
  153. .await;
  154. if let Ok(MsgDb::SubPages {
  155. names,
  156. nav_labels,
  157. nav_stops,
  158. stamps,
  159. prios,
  160. }) = reply
  161. {
  162. let count = names.len();
  163. if nav_labels.len() == count
  164. && nav_stops.len() == count
  165. && stamps.len() == count
  166. && prios.len() == count
  167. {
  168. Ok(CommSubPages {
  169. names: names
  170. .into_iter()
  171. .map(|x| String::from_utf8(x).unwrap_or_default())
  172. .collect(),
  173. nav_labels: nav_labels
  174. .into_iter()
  175. .map(|x| String::from_utf8(x).unwrap_or_default())
  176. .collect(),
  177. nav_stops,
  178. stamps: stamps.into_iter().map(epoch_stamp).collect(),
  179. prios,
  180. })
  181. } else {
  182. Err(err!("GetSubPages: Invalid db reply (length)."))
  183. }
  184. } else {
  185. Err(err!("GetSubPages: Invalid db reply."))
  186. }
  187. }
  188. pub async fn get_db_headers(&mut self, path: &CheckedIdent) -> ah::Result<String> {
  189. let reply = self
  190. .comm_db(&MsgDb::GetHeaders {
  191. path: path.downgrade_clone(),
  192. })
  193. .await;
  194. if let Ok(MsgDb::Headers { data }) = reply {
  195. Ok(String::from_utf8(data).context("Headers: Data is not valid UTF-8")?)
  196. } else {
  197. Err(err!("Headers: Invalid db reply."))
  198. }
  199. }
  200. pub async fn get_db_string(&mut self, name: &str) -> ah::Result<String> {
  201. let reply = self
  202. .comm_db(&MsgDb::GetString {
  203. name: name.parse().context("Invalid DB string name")?,
  204. })
  205. .await;
  206. if let Ok(MsgDb::String { data }) = reply {
  207. Ok(String::from_utf8(data).context("String: Data is not valid UTF-8")?)
  208. } else {
  209. Err(err!("String: Invalid db reply."))
  210. }
  211. }
  212. pub async fn get_db_macro(
  213. &mut self,
  214. parent: Option<&CheckedIdent>,
  215. name: &CheckedIdentElem,
  216. ) -> ah::Result<String> {
  217. let cache_name = if let Some(parent) = parent {
  218. parent.to_fs_path(Path::new(""), &Tail::One(name.clone()))
  219. } else {
  220. name.to_fs_path(Path::new(""), &Tail::None)
  221. };
  222. let cache_name = cache_name.into_os_string().into_string().unwrap();
  223. // Try to get it from the cache.
  224. if let Some(data) = self.macro_cache.get(&cache_name) {
  225. return Ok(data.clone());
  226. }
  227. let reply = self
  228. .comm_db(&MsgDb::GetMacro {
  229. parent: parent.unwrap_or(&CheckedIdent::ROOT).downgrade_clone(),
  230. name: name.downgrade_clone(),
  231. })
  232. .await;
  233. if let Ok(MsgDb::Macro { data }) = reply {
  234. let data = String::from_utf8(data).context("Macro: Data is not valid UTF-8")?;
  235. // Put it into the cache.
  236. self.macro_cache.push(cache_name, data.clone());
  237. Ok(data)
  238. } else {
  239. Err(err!("Macro: Invalid db reply."))
  240. }
  241. }
  242. pub async fn get_db_image(&mut self, name: &CheckedIdentElem) -> ah::Result<Vec<u8>> {
  243. let reply = self
  244. .comm_db(&MsgDb::GetImage {
  245. name: name.downgrade_clone(),
  246. })
  247. .await;
  248. if let Ok(MsgDb::Image { data }) = reply {
  249. Ok(data)
  250. } else {
  251. Err(err!("Image: Invalid db reply."))
  252. }
  253. }
  254. pub async fn run_post_handler(
  255. &mut self,
  256. run: CommRunPostHandler,
  257. ) -> ah::Result<CommPostHandlerResult> {
  258. let reply = self
  259. .comm_post(&MsgPost::RunPostHandler {
  260. path: run.path.downgrade_clone(),
  261. query: run.query,
  262. form_fields: run.form_fields,
  263. })
  264. .await;
  265. if let Ok(MsgPost::PostHandlerResult { error, body, mime }) = reply {
  266. Ok(CommPostHandlerResult { error, body, mime })
  267. } else {
  268. Err(err!("RunPostHandler: Invalid postd reply."))
  269. }
  270. }
  271. }
  272. // vim: ts=4 sw=4 expandtab