shm-actor.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. import sys
  2. import argparse
  3. import asyncio
  4. import random
  5. import syndicate
  6. from syndicate import patterns as P, actor, dataspace, turn
  7. from syndicate.schema import sturdy
  8. from preserves.schema import load_schema_file
  9. from pprint import pprint
  10. from multiprocessing import shared_memory
  11. ShmProtocol = load_schema_file('./shm.bin').shm
  12. UseBuffer = ShmProtocol.UseBuffer
  13. Buffer = ShmProtocol.Buffer
  14. Read = ShmProtocol.Read
  15. Free = ShmProtocol.Free
  16. Commit = ShmProtocol.Commit
  17. parser = argparse.ArgumentParser(description='Allows other actors to open shared memory areas for IPC',
  18. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  19. parser.add_argument('--address', metavar='\'<tcp "HOST" PORT>\'',
  20. help='transport address of the server',
  21. default='<ws "ws://localhost:9001/">')
  22. parser.add_argument('--cap', metavar='\'<ref ...>\'',
  23. help='capability for the dataspace on the server',
  24. default='<ref {oid: "syndicate" sig: #[acowDB2/oI+6aSEC3YIxGg==]}>')
  25. args = parser.parse_args()
  26. @actor.run_system(name = 'shm-actor', debug = False)
  27. def main():
  28. root_facet = turn.active_facet()
  29. self_ref = int(random.random() * 255)
  30. @syndicate.relay.connect(args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)))
  31. def on_connected(ds):
  32. turn.on_stop(lambda: turn.stop(root_facet))
  33. @dataspace.during(ds, P.rec('UseBuffer', P.CAPTURE, P.CAPTURE))
  34. def on_use_buffer(name, size):
  35. print("opening shared memory")
  36. shm = shared_memory.SharedMemory(name, create=True, size=size)
  37. h = turn.publish(ds, Buffer(name, size))
  38. turn.send(ds, Free(name))
  39. turn.on_stop(lambda: turn.retract(h))
  40. turn.on_stop(lambda: print("cleaning up shm"))
  41. turn.on_stop(lambda: shm.close())
  42. turn.on_stop(lambda: shm.unlink())
  43. @dataspace.during(ds, P.rec('Read', P.CAPTURE))
  44. def during_commit(name):
  45. turn.on_stop(lambda: turn.send(ds, Free(name)))