123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469 |
- use std::fs::{File, OpenOptions};
- use std::io::{BufWriter, Read, Write};
- use std::path::Path;
- use std::time::SystemTime;
- use std::{
- collections::hash_map::DefaultHasher,
- hash::{Hash, Hasher},
- sync::mpsc::{Receiver, Sender},
- };
- type SwarmTime = Option<u32>;
- type ProposalId = u64;
- type Data = Vec<u8>;
- struct Config {
- swarm_diameter: u8,
- response_timeout_ms: u16,
- neighbors: Vec<(Sender<Message>, Receiver<Message>)>,
- min_neighbors: u16,
- max_neighbors: u16,
- archive_dir: Option<&'static Path>,
- }
- impl Default for Config {
- fn default() -> Self {
- Config {
- swarm_diameter: 7,
- response_timeout_ms: 250,
- neighbors: vec![],
- min_neighbors: 1,
- max_neighbors: 100,
- archive_dir: None,
- }
- }
- }
- impl Config {
- fn swarm_diameter(mut self, d: u8) -> Self {
- self.swarm_diameter = d;
- self
- }
- fn response_timeout_ms(mut self, t: u16) -> Self {
- self.response_timeout_ms = t;
- self
- }
- fn min_neighbors(mut self, m: u16) -> Self {
- self.min_neighbors = m;
- self
- }
- fn max_neighbors(mut self, m: u16) -> Self {
- self.max_neighbors = m;
- self
- }
- fn neighbors(mut self, ns: Vec<(Sender<Message>, Receiver<Message>)>) -> Self {
- self.neighbors = ns;
- self
- }
- }
- #[derive(Debug, Copy, Clone, PartialEq)]
- enum Awarness {
- Unaware,
- Aware(ProposalId, u8),
- Confused,
- }
- #[derive(Debug, Clone, PartialEq)]
- enum Message {
- KeepAlive(SwarmTime, Awarness),
- Announcement(SwarmTime, Proposal),
- }
- #[derive(Debug, Clone, PartialEq)]
- struct Proposal {
- time: SwarmTime,
- data: Data,
- }
- impl Proposal {
- fn get_id(&self) -> u64 {
- let mut hasher = DefaultHasher::new();
- self.data.hash(&mut hasher);
- hasher.finish()
- }
- }
- struct GnomeState {
- swarm_time: SwarmTime,
- proposal_id: Option<ProposalId>,
- proposal: Option<Proposal>,
- awarness: Awarness,
- }
- impl GnomeState {
- fn update_proposal(&mut self, proposal: Proposal) {
- let new_proposal_id = proposal.get_id();
- if let Some(proposal_id) = self.proposal_id {
- if new_proposal_id < proposal_id {
- self.proposal_id = Some(new_proposal_id);
- self.proposal = Some(proposal);
- self.awarness = Awarness::Aware(new_proposal_id, 0)
- }
- } else {
- self.proposal_id = Some(new_proposal_id);
- self.proposal = Some(proposal);
- self.awarness = Awarness::Aware(new_proposal_id, 0)
- }
- }
- fn update_swarm_time(&mut self, swarm_time: SwarmTime) {
- let new_swarm_time = match swarm_time {
- Some(neighbor_time) => Some(neighbor_time.wrapping_add(1)),
- None => None,
- };
- match self.swarm_time {
- None => self.swarm_time = new_swarm_time,
- Some(old_swarm_time) => {
- if let Some(new_time) = new_swarm_time {
- self.swarm_time = Some(old_swarm_time.min(new_time));
- }
- }
- }
- }
- fn update_awarness(&mut self, awarness: Awarness) {
- match awarness {
- Awarness::Unaware => return,
- Awarness::Confused => self.awarness = Awarness::Confused,
- Awarness::Aware(new_proposal_id, new_radius) => match self.awarness {
- Awarness::Unaware => {
- self.awarness = Awarness::Aware(new_proposal_id, new_radius + 1);
- }
- Awarness::Confused => return,
- Awarness::Aware(old_proposal_id, old_radius) => {
- if new_proposal_id == old_proposal_id {
- self.awarness =
- Awarness::Aware(new_proposal_id, old_radius.min(new_radius + 1));
- return;
- }
- if new_proposal_id < old_proposal_id {
- self.awarness = Awarness::Aware(new_proposal_id, new_radius + 1);
- }
- }
- },
- }
- }
- }
- //TODO keep history of last two awarness states of each neighbor for dealing with misbehaving neighbors
- struct Gnome {
- swarm_diameter: u8,
- swarm_time: SwarmTime,
- neighbors: Vec<(Sender<Message>, Receiver<Message>)>,
- neighbors_limit: (u16, u16),
- proposal: Option<Proposal>,
- awarness: Awarness,
- reader: Box<dyn Read>,
- writer: Box<dyn Write>,
- }
- impl Gnome {
- fn new(config: Config, reader: Box<dyn Read>, writer: Box<dyn Write>) -> Self {
- Gnome {
- swarm_diameter: config.swarm_diameter,
- swarm_time: None,
- neighbors: config.neighbors,
- neighbors_limit: (config.min_neighbors, config.max_neighbors),
- proposal: None,
- awarness: Awarness::Unaware,
- reader: reader,
- writer: writer,
- }
- }
- fn prepare_message(&self) -> Message {
- if self.proposal.is_some() && self.awarness == Awarness::Unaware {
- let proposal = self.proposal.clone().unwrap();
- Message::Announcement(self.swarm_time, proposal)
- } else {
- Message::KeepAlive(self.swarm_time, self.awarness)
- }
- }
- fn update_state(&mut self, msgs: &mut Vec<Message>) {
- let mut new_state = GnomeState {
- swarm_time: None,
- proposal_id: None,
- proposal: None,
- awarness: self.awarness,
- };
- for msg in msgs {
- self.process_message(&mut new_state, msg.clone())
- }
- //TODO implement different behavior for gnomes joining a synced swarm
- if let Some(_swarm_time) = self.swarm_time {
- self.swarm_time = new_state.swarm_time;
- }
- if new_state.proposal_id.is_some() {
- self.proposal = new_state.proposal;
- }
- self.awarness = new_state.awarness;
- }
- fn process_message(&mut self, new_state: &mut GnomeState, m: Message) {
- match m {
- Message::Announcement(swarm_time, proposal) => {
- new_state.update_swarm_time(swarm_time);
- if self.awarness == Awarness::Unaware {
- new_state.update_proposal(proposal);
- }
- }
- Message::KeepAlive(swarm_time, awarness) => {
- new_state.update_swarm_time(swarm_time);
- }
- }
- }
- fn apply_proposal(proposal: Proposal) {}
- //TODO After implementing support for built in synchronization
- // allow for some KeepAlive messages to contain Data
- fn broadcast_state(&self) {
- let message = self.prepare_message();
- for (tx, _) in &self.neighbors {
- tx.send(message.clone());
- }
- }
- //TODO implement timeout
- //TODO async?
- fn recv_msgs(&self, msgs: &mut Vec<Message>) {
- msgs.clear();
- for (_tx, rx) in &self.neighbors {
- let msg = rx.recv().unwrap();
- msgs.push(msg);
- }
- }
- fn run(&mut self) {
- let mut received_msgs = Vec::new();
- loop {
- self.update_state(&mut received_msgs);
- self.broadcast_state();
- self.recv_msgs(&mut received_msgs);
- // self.exchange_data();
- }
- }
- }
- struct Log {
- buf_writer: BufWriter<File>,
- init_time: SystemTime,
- }
- impl Log {
- fn new(bw: BufWriter<File>) -> Self {
- Log {
- buf_writer: bw,
- init_time: SystemTime::now(),
- }
- }
- fn to_file(file: &str) -> Self {
- let mut bw = BufWriter::new(
- OpenOptions::new()
- .create(true)
- .append(true)
- .write(true)
- .open("/var/log/swarm/test.log")
- .unwrap(),
- );
- Log {
- buf_writer: bw,
- init_time: SystemTime::now(),
- }
- }
- fn info(&mut self, msg: &str) {
- self.write("[I]", msg);
- }
- fn error(&mut self, msg: &str) {
- self.write("[E]", msg);
- }
- fn debug(&mut self, msg: &str) {
- self.write("[D]", msg);
- }
- fn legend(&mut self) {
- self.info("This is an INFO message");
- self.error("This is an ERROR message");
- self.debug("This is an DEBUG message");
- self.buf_writer.write(&"\n".as_bytes());
- }
- fn write(&mut self, ttype: &str, msg: &str) {
- let elapsed = self.init_time.elapsed().unwrap();
- // let formatted = format!("{} {}\n", ttype, msg);
- let formatted = format!("{:10?} {} {}\n", elapsed, ttype, msg);
- self.buf_writer.write_all(formatted.as_bytes());
- // self.buf_writer.flush();
- }
- }
- impl Drop for Log {
- fn drop(&mut self) {
- self.buf_writer.flush();
- }
- }
- #[cfg(test)]
- mod tests {
- use std::sync::mpsc::channel;
- use crate::{Config, Gnome, Log, Message, Proposal};
- #[test]
- fn it_works() {
- let mut log = &mut Log::to_file("/var/log/swarm/test.log");
- log.info("Test it_works");
- let proposal = Proposal {
- time: None,
- data: vec![0],
- };
- let zero_hash: u64 = 15877719499815107262;
- assert_eq!(proposal.get_id(), zero_hash);
- }
- #[test]
- fn communicate() {
- use std::fs::File;
- let log = &mut Log::to_file("/var/log/swarm/test.log");
- log.info("Test communicate");
- let (tx1, rx1) = channel::<Message>();
- let (tx2, rx2) = channel::<Message>();
- let mut reader = Box::new(File::open("/home/dxtr/doo").unwrap());
- let mut writer = Box::new(File::create("/home/dxtr/pa").unwrap());
- let mut reader2 = Box::new(File::open("/home/dxtr/doo2").unwrap());
- let mut writer2 = Box::new(File::create("/home/dxtr/pa2").unwrap());
- let gnome1 = Gnome::new(
- Config::default().neighbors(vec![(tx1, rx2)]),
- reader,
- writer,
- );
- let gnome2 = Gnome::new(
- Config::default().neighbors(vec![(tx2, rx1)]),
- reader2,
- writer2,
- );
- let message = gnome1.prepare_message();
- gnome1.broadcast_state();
- let mut msgs: Vec<Message> = Vec::new();
- gnome2.recv_msgs(&mut msgs);
- assert_eq!(message, msgs[0]);
- }
- #[test]
- fn swarm_time_no_update() {
- use std::fs::File;
- let log = &mut Log::to_file("/var/log/swarm/test.log");
- log.info("Test swarm_time_no_update");
- let (tx1, rx1) = channel::<Message>();
- let (tx2, rx2) = channel::<Message>();
- let mut reader = Box::new(File::open("/home/dxtr/doo").unwrap());
- let mut writer = Box::new(File::create("/home/dxtr/pa").unwrap());
- let mut reader2 = Box::new(File::open("/home/dxtr/doo2").unwrap());
- let mut writer2 = Box::new(File::create("/home/dxtr/pa2").unwrap());
- let mut gnome1 = Gnome::new(
- Config::default().neighbors(vec![(tx1, rx2)]),
- reader,
- writer,
- );
- let mut gnome2 = Gnome::new(
- Config::default().neighbors(vec![(tx2, rx1)]),
- reader2,
- writer2,
- );
- // let mut gnome1 = Gnome::new(Config::default().neighbors(vec![(tx1, rx2)]));
- // let mut gnome2 = Gnome::new(Config::default().neighbors(vec![(tx2, rx1)]));
- gnome1.swarm_time = Some(1);
- let message = gnome1.prepare_message();
- gnome1.broadcast_state();
- let mut msgs: Vec<Message> = Vec::new();
- gnome2.recv_msgs(&mut msgs);
- gnome2.update_state(&mut msgs);
- assert_eq!(gnome2.swarm_time, None);
- }
- #[test]
- fn swarm_time_update() {
- use std::fs::File;
- let log = &mut Log::to_file("/var/log/swarm/test.log");
- log.info("Test swarm_time_update");
- let (tx1, rx1) = channel::<Message>();
- let (tx2, rx2) = channel::<Message>();
- let mut reader = Box::new(File::open("/home/dxtr/doo").unwrap());
- let mut writer = Box::new(File::create("/home/dxtr/pa").unwrap());
- let mut reader2 = Box::new(File::open("/home/dxtr/doo2").unwrap());
- let mut writer2 = Box::new(File::create("/home/dxtr/pa2").unwrap());
- let mut gnome1 = Gnome::new(
- Config::default().neighbors(vec![(tx1, rx2)]),
- reader,
- writer,
- );
- let mut gnome2 = Gnome::new(
- Config::default().neighbors(vec![(tx2, rx1)]),
- reader2,
- writer2,
- );
- // let mut gnome1 = Gnome::new(Config::default().neighbors(vec![(tx1, rx2)]));
- // let mut gnome2 = Gnome::new(Config::default().neighbors(vec![(tx2, rx1)]));
- gnome1.swarm_time = Some(10);
- gnome2.swarm_time = Some(1);
- let message = gnome1.prepare_message();
- gnome1.broadcast_state();
- let mut msgs: Vec<Message> = Vec::new();
- gnome2.recv_msgs(&mut msgs);
- gnome2.update_state(&mut msgs);
- assert_eq!(gnome2.swarm_time, Some(11));
- }
- #[test]
- fn submit_proposal() {
- use std::fs::File;
- let log = &mut Log::to_file("/var/log/swarm/test.log");
- log.info("Test submit_proposal");
- let (tx1, rx1) = channel::<Message>();
- let (tx2, rx2) = channel::<Message>();
- let mut reader = Box::new(File::open("/home/dxtr/doo").unwrap());
- let mut writer = Box::new(File::create("/home/dxtr/pa").unwrap());
- let mut reader2 = Box::new(File::open("/home/dxtr/doo2").unwrap());
- let mut writer2 = Box::new(File::create("/home/dxtr/pa2").unwrap());
- let mut gnome1 = Gnome::new(
- Config::default().neighbors(vec![(tx1, rx2)]),
- reader,
- writer,
- );
- let gnome2 = Gnome::new(
- Config::default().neighbors(vec![(tx2, rx1)]),
- reader2,
- writer2,
- );
- // let mut gnome1 = Gnome::new(Config::default().neighbors(vec![(tx1, rx2)]));
- // let gnome2 = Gnome::new(Config::default().neighbors(vec![(tx2, rx1)]));
- let proposal = Proposal {
- time: None,
- data: vec![0u8],
- };
- gnome1.proposal = Some(proposal.clone());
- let message = gnome1.prepare_message();
- gnome1.broadcast_state();
- let mut msgs: Vec<Message> = Vec::new();
- gnome2.recv_msgs(&mut msgs);
- let announcement = Message::Announcement(None, proposal);
- assert_eq!(msgs[0], announcement);
- }
- #[test]
- fn log() {
- use std::fs::File;
- let log = &mut Log::to_file("/var/log/swarm/test.log");
- let mut reader = Box::new(File::open("/home/dxtr/doo").unwrap());
- let mut writer = Box::new(File::create("/home/dxtr/pa").unwrap());
- let mut gnome1 = Gnome::new(Config::default(), reader, writer);
- log.error("This is ERROR log");
- log.debug("This is DEBUG log");
- log.info("This is INFO log");
- }
- }
|