lib.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. use std::fs::{File, OpenOptions};
  2. use std::io::{BufWriter, Read, Write};
  3. use std::path::Path;
  4. use std::time::SystemTime;
  5. use std::{
  6. collections::hash_map::DefaultHasher,
  7. hash::{Hash, Hasher},
  8. sync::mpsc::{Receiver, Sender},
  9. };
  10. type SwarmTime = Option<u32>;
  11. type ProposalId = u64;
  12. type Data = Vec<u8>;
  13. struct Config {
  14. swarm_diameter: u8,
  15. response_timeout_ms: u16,
  16. neighbors: Vec<(Sender<Message>, Receiver<Message>)>,
  17. min_neighbors: u16,
  18. max_neighbors: u16,
  19. archive_dir: Option<&'static Path>,
  20. }
  21. impl Default for Config {
  22. fn default() -> Self {
  23. Config {
  24. swarm_diameter: 7,
  25. response_timeout_ms: 250,
  26. neighbors: vec![],
  27. min_neighbors: 1,
  28. max_neighbors: 100,
  29. archive_dir: None,
  30. }
  31. }
  32. }
  33. impl Config {
  34. fn swarm_diameter(mut self, d: u8) -> Self {
  35. self.swarm_diameter = d;
  36. self
  37. }
  38. fn response_timeout_ms(mut self, t: u16) -> Self {
  39. self.response_timeout_ms = t;
  40. self
  41. }
  42. fn min_neighbors(mut self, m: u16) -> Self {
  43. self.min_neighbors = m;
  44. self
  45. }
  46. fn max_neighbors(mut self, m: u16) -> Self {
  47. self.max_neighbors = m;
  48. self
  49. }
  50. fn neighbors(mut self, ns: Vec<(Sender<Message>, Receiver<Message>)>) -> Self {
  51. self.neighbors = ns;
  52. self
  53. }
  54. }
  55. #[derive(Debug, Copy, Clone, PartialEq)]
  56. enum Awarness {
  57. Unaware,
  58. Aware(ProposalId, u8),
  59. Confused,
  60. }
  61. #[derive(Debug, Clone, PartialEq)]
  62. enum Message {
  63. KeepAlive(SwarmTime, Awarness),
  64. Announcement(SwarmTime, Proposal),
  65. }
  66. #[derive(Debug, Clone, PartialEq)]
  67. struct Proposal {
  68. time: SwarmTime,
  69. data: Data,
  70. }
  71. impl Proposal {
  72. fn get_id(&self) -> u64 {
  73. let mut hasher = DefaultHasher::new();
  74. self.data.hash(&mut hasher);
  75. hasher.finish()
  76. }
  77. }
  78. struct GnomeState {
  79. swarm_time: SwarmTime,
  80. proposal_id: Option<ProposalId>,
  81. proposal: Option<Proposal>,
  82. awarness: Awarness,
  83. }
  84. impl GnomeState {
  85. fn update_proposal(&mut self, proposal: Proposal) {
  86. let new_proposal_id = proposal.get_id();
  87. if let Some(proposal_id) = self.proposal_id {
  88. if new_proposal_id < proposal_id {
  89. self.proposal_id = Some(new_proposal_id);
  90. self.proposal = Some(proposal);
  91. self.awarness = Awarness::Aware(new_proposal_id, 0)
  92. }
  93. } else {
  94. self.proposal_id = Some(new_proposal_id);
  95. self.proposal = Some(proposal);
  96. self.awarness = Awarness::Aware(new_proposal_id, 0)
  97. }
  98. }
  99. fn update_swarm_time(&mut self, swarm_time: SwarmTime) {
  100. let new_swarm_time = match swarm_time {
  101. Some(neighbor_time) => Some(neighbor_time.wrapping_add(1)),
  102. None => None,
  103. };
  104. match self.swarm_time {
  105. None => self.swarm_time = new_swarm_time,
  106. Some(old_swarm_time) => {
  107. if let Some(new_time) = new_swarm_time {
  108. self.swarm_time = Some(old_swarm_time.min(new_time));
  109. }
  110. }
  111. }
  112. }
  113. fn update_awarness(&mut self, awarness: Awarness) {
  114. match awarness {
  115. Awarness::Unaware => return,
  116. Awarness::Confused => self.awarness = Awarness::Confused,
  117. Awarness::Aware(new_proposal_id, new_radius) => match self.awarness {
  118. Awarness::Unaware => {
  119. self.awarness = Awarness::Aware(new_proposal_id, new_radius + 1);
  120. }
  121. Awarness::Confused => return,
  122. Awarness::Aware(old_proposal_id, old_radius) => {
  123. if new_proposal_id == old_proposal_id {
  124. self.awarness =
  125. Awarness::Aware(new_proposal_id, old_radius.min(new_radius + 1));
  126. return;
  127. }
  128. if new_proposal_id < old_proposal_id {
  129. self.awarness = Awarness::Aware(new_proposal_id, new_radius + 1);
  130. }
  131. }
  132. },
  133. }
  134. }
  135. }
  136. //TODO keep history of last two awarness states of each neighbor for dealing with misbehaving neighbors
  137. struct Gnome {
  138. swarm_diameter: u8,
  139. swarm_time: SwarmTime,
  140. neighbors: Vec<(Sender<Message>, Receiver<Message>)>,
  141. neighbors_limit: (u16, u16),
  142. proposal: Option<Proposal>,
  143. awarness: Awarness,
  144. reader: Box<dyn Read>,
  145. writer: Box<dyn Write>,
  146. }
  147. impl Gnome {
  148. fn new(config: Config, reader: Box<dyn Read>, writer: Box<dyn Write>) -> Self {
  149. Gnome {
  150. swarm_diameter: config.swarm_diameter,
  151. swarm_time: None,
  152. neighbors: config.neighbors,
  153. neighbors_limit: (config.min_neighbors, config.max_neighbors),
  154. proposal: None,
  155. awarness: Awarness::Unaware,
  156. reader: reader,
  157. writer: writer,
  158. }
  159. }
  160. fn prepare_message(&self) -> Message {
  161. if self.proposal.is_some() && self.awarness == Awarness::Unaware {
  162. let proposal = self.proposal.clone().unwrap();
  163. Message::Announcement(self.swarm_time, proposal)
  164. } else {
  165. Message::KeepAlive(self.swarm_time, self.awarness)
  166. }
  167. }
  168. fn update_state(&mut self, msgs: &mut Vec<Message>) {
  169. let mut new_state = GnomeState {
  170. swarm_time: None,
  171. proposal_id: None,
  172. proposal: None,
  173. awarness: self.awarness,
  174. };
  175. for msg in msgs {
  176. self.process_message(&mut new_state, msg.clone())
  177. }
  178. //TODO implement different behavior for gnomes joining a synced swarm
  179. if let Some(_swarm_time) = self.swarm_time {
  180. self.swarm_time = new_state.swarm_time;
  181. }
  182. if new_state.proposal_id.is_some() {
  183. self.proposal = new_state.proposal;
  184. }
  185. self.awarness = new_state.awarness;
  186. }
  187. fn process_message(&mut self, new_state: &mut GnomeState, m: Message) {
  188. match m {
  189. Message::Announcement(swarm_time, proposal) => {
  190. new_state.update_swarm_time(swarm_time);
  191. if self.awarness == Awarness::Unaware {
  192. new_state.update_proposal(proposal);
  193. }
  194. }
  195. Message::KeepAlive(swarm_time, awarness) => {
  196. new_state.update_swarm_time(swarm_time);
  197. }
  198. }
  199. }
  200. fn apply_proposal(proposal: Proposal) {}
  201. //TODO After implementing support for built in synchronization
  202. // allow for some KeepAlive messages to contain Data
  203. fn broadcast_state(&self) {
  204. let message = self.prepare_message();
  205. for (tx, _) in &self.neighbors {
  206. tx.send(message.clone());
  207. }
  208. }
  209. //TODO implement timeout
  210. //TODO async?
  211. fn recv_msgs(&self, msgs: &mut Vec<Message>) {
  212. msgs.clear();
  213. for (_tx, rx) in &self.neighbors {
  214. let msg = rx.recv().unwrap();
  215. msgs.push(msg);
  216. }
  217. }
  218. fn run(&mut self) {
  219. let mut received_msgs = Vec::new();
  220. loop {
  221. self.update_state(&mut received_msgs);
  222. self.broadcast_state();
  223. self.recv_msgs(&mut received_msgs);
  224. // self.exchange_data();
  225. }
  226. }
  227. }
  228. struct Log {
  229. buf_writer: BufWriter<File>,
  230. init_time: SystemTime,
  231. }
  232. impl Log {
  233. fn new(bw: BufWriter<File>) -> Self {
  234. Log {
  235. buf_writer: bw,
  236. init_time: SystemTime::now(),
  237. }
  238. }
  239. fn to_file(file: &str) -> Self {
  240. let mut bw = BufWriter::new(
  241. OpenOptions::new()
  242. .create(true)
  243. .append(true)
  244. .write(true)
  245. .open("/var/log/swarm/test.log")
  246. .unwrap(),
  247. );
  248. Log {
  249. buf_writer: bw,
  250. init_time: SystemTime::now(),
  251. }
  252. }
  253. fn info(&mut self, msg: &str) {
  254. self.write("[I]", msg);
  255. }
  256. fn error(&mut self, msg: &str) {
  257. self.write("[E]", msg);
  258. }
  259. fn debug(&mut self, msg: &str) {
  260. self.write("[D]", msg);
  261. }
  262. fn legend(&mut self) {
  263. self.info("This is an INFO message");
  264. self.error("This is an ERROR message");
  265. self.debug("This is an DEBUG message");
  266. self.buf_writer.write(&"\n".as_bytes());
  267. }
  268. fn write(&mut self, ttype: &str, msg: &str) {
  269. let elapsed = self.init_time.elapsed().unwrap();
  270. // let formatted = format!("{} {}\n", ttype, msg);
  271. let formatted = format!("{:10?} {} {}\n", elapsed, ttype, msg);
  272. self.buf_writer.write_all(formatted.as_bytes());
  273. // self.buf_writer.flush();
  274. }
  275. }
  276. impl Drop for Log {
  277. fn drop(&mut self) {
  278. self.buf_writer.flush();
  279. }
  280. }
  281. #[cfg(test)]
  282. mod tests {
  283. use std::sync::mpsc::channel;
  284. use crate::{Config, Gnome, Log, Message, Proposal};
  285. #[test]
  286. fn it_works() {
  287. let mut log = &mut Log::to_file("/var/log/swarm/test.log");
  288. log.info("Test it_works");
  289. let proposal = Proposal {
  290. time: None,
  291. data: vec![0],
  292. };
  293. let zero_hash: u64 = 15877719499815107262;
  294. assert_eq!(proposal.get_id(), zero_hash);
  295. }
  296. #[test]
  297. fn communicate() {
  298. use std::fs::File;
  299. let log = &mut Log::to_file("/var/log/swarm/test.log");
  300. log.info("Test communicate");
  301. let (tx1, rx1) = channel::<Message>();
  302. let (tx2, rx2) = channel::<Message>();
  303. let mut reader = Box::new(File::open("/home/dxtr/doo").unwrap());
  304. let mut writer = Box::new(File::create("/home/dxtr/pa").unwrap());
  305. let mut reader2 = Box::new(File::open("/home/dxtr/doo2").unwrap());
  306. let mut writer2 = Box::new(File::create("/home/dxtr/pa2").unwrap());
  307. let gnome1 = Gnome::new(
  308. Config::default().neighbors(vec![(tx1, rx2)]),
  309. reader,
  310. writer,
  311. );
  312. let gnome2 = Gnome::new(
  313. Config::default().neighbors(vec![(tx2, rx1)]),
  314. reader2,
  315. writer2,
  316. );
  317. let message = gnome1.prepare_message();
  318. gnome1.broadcast_state();
  319. let mut msgs: Vec<Message> = Vec::new();
  320. gnome2.recv_msgs(&mut msgs);
  321. assert_eq!(message, msgs[0]);
  322. }
  323. #[test]
  324. fn swarm_time_no_update() {
  325. use std::fs::File;
  326. let log = &mut Log::to_file("/var/log/swarm/test.log");
  327. log.info("Test swarm_time_no_update");
  328. let (tx1, rx1) = channel::<Message>();
  329. let (tx2, rx2) = channel::<Message>();
  330. let mut reader = Box::new(File::open("/home/dxtr/doo").unwrap());
  331. let mut writer = Box::new(File::create("/home/dxtr/pa").unwrap());
  332. let mut reader2 = Box::new(File::open("/home/dxtr/doo2").unwrap());
  333. let mut writer2 = Box::new(File::create("/home/dxtr/pa2").unwrap());
  334. let mut gnome1 = Gnome::new(
  335. Config::default().neighbors(vec![(tx1, rx2)]),
  336. reader,
  337. writer,
  338. );
  339. let mut gnome2 = Gnome::new(
  340. Config::default().neighbors(vec![(tx2, rx1)]),
  341. reader2,
  342. writer2,
  343. );
  344. // let mut gnome1 = Gnome::new(Config::default().neighbors(vec![(tx1, rx2)]));
  345. // let mut gnome2 = Gnome::new(Config::default().neighbors(vec![(tx2, rx1)]));
  346. gnome1.swarm_time = Some(1);
  347. let message = gnome1.prepare_message();
  348. gnome1.broadcast_state();
  349. let mut msgs: Vec<Message> = Vec::new();
  350. gnome2.recv_msgs(&mut msgs);
  351. gnome2.update_state(&mut msgs);
  352. assert_eq!(gnome2.swarm_time, None);
  353. }
  354. #[test]
  355. fn swarm_time_update() {
  356. use std::fs::File;
  357. let log = &mut Log::to_file("/var/log/swarm/test.log");
  358. log.info("Test swarm_time_update");
  359. let (tx1, rx1) = channel::<Message>();
  360. let (tx2, rx2) = channel::<Message>();
  361. let mut reader = Box::new(File::open("/home/dxtr/doo").unwrap());
  362. let mut writer = Box::new(File::create("/home/dxtr/pa").unwrap());
  363. let mut reader2 = Box::new(File::open("/home/dxtr/doo2").unwrap());
  364. let mut writer2 = Box::new(File::create("/home/dxtr/pa2").unwrap());
  365. let mut gnome1 = Gnome::new(
  366. Config::default().neighbors(vec![(tx1, rx2)]),
  367. reader,
  368. writer,
  369. );
  370. let mut gnome2 = Gnome::new(
  371. Config::default().neighbors(vec![(tx2, rx1)]),
  372. reader2,
  373. writer2,
  374. );
  375. // let mut gnome1 = Gnome::new(Config::default().neighbors(vec![(tx1, rx2)]));
  376. // let mut gnome2 = Gnome::new(Config::default().neighbors(vec![(tx2, rx1)]));
  377. gnome1.swarm_time = Some(10);
  378. gnome2.swarm_time = Some(1);
  379. let message = gnome1.prepare_message();
  380. gnome1.broadcast_state();
  381. let mut msgs: Vec<Message> = Vec::new();
  382. gnome2.recv_msgs(&mut msgs);
  383. gnome2.update_state(&mut msgs);
  384. assert_eq!(gnome2.swarm_time, Some(11));
  385. }
  386. #[test]
  387. fn submit_proposal() {
  388. use std::fs::File;
  389. let log = &mut Log::to_file("/var/log/swarm/test.log");
  390. log.info("Test submit_proposal");
  391. let (tx1, rx1) = channel::<Message>();
  392. let (tx2, rx2) = channel::<Message>();
  393. let mut reader = Box::new(File::open("/home/dxtr/doo").unwrap());
  394. let mut writer = Box::new(File::create("/home/dxtr/pa").unwrap());
  395. let mut reader2 = Box::new(File::open("/home/dxtr/doo2").unwrap());
  396. let mut writer2 = Box::new(File::create("/home/dxtr/pa2").unwrap());
  397. let mut gnome1 = Gnome::new(
  398. Config::default().neighbors(vec![(tx1, rx2)]),
  399. reader,
  400. writer,
  401. );
  402. let gnome2 = Gnome::new(
  403. Config::default().neighbors(vec![(tx2, rx1)]),
  404. reader2,
  405. writer2,
  406. );
  407. // let mut gnome1 = Gnome::new(Config::default().neighbors(vec![(tx1, rx2)]));
  408. // let gnome2 = Gnome::new(Config::default().neighbors(vec![(tx2, rx1)]));
  409. let proposal = Proposal {
  410. time: None,
  411. data: vec![0u8],
  412. };
  413. gnome1.proposal = Some(proposal.clone());
  414. let message = gnome1.prepare_message();
  415. gnome1.broadcast_state();
  416. let mut msgs: Vec<Message> = Vec::new();
  417. gnome2.recv_msgs(&mut msgs);
  418. let announcement = Message::Announcement(None, proposal);
  419. assert_eq!(msgs[0], announcement);
  420. }
  421. #[test]
  422. fn log() {
  423. use std::fs::File;
  424. let log = &mut Log::to_file("/var/log/swarm/test.log");
  425. let mut reader = Box::new(File::open("/home/dxtr/doo").unwrap());
  426. let mut writer = Box::new(File::create("/home/dxtr/pa").unwrap());
  427. let mut gnome1 = Gnome::new(Config::default(), reader, writer);
  428. log.error("This is ERROR log");
  429. log.debug("This is DEBUG log");
  430. log.info("This is INFO log");
  431. }
  432. }