1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- import sys
- import argparse
- import asyncio
- import random
- import syndicate
- from syndicate import patterns as P, actor, dataspace, turn
- from syndicate.schema import sturdy
- from preserves.schema import load_schema_file
- from pprint import pprint
- from multiprocessing import shared_memory
- ShmProtocol = load_schema_file('./shm.bin').shm
- UseBuffer = ShmProtocol.UseBuffer
- Buffer = ShmProtocol.Buffer
- Read = ShmProtocol.Read
- Free = ShmProtocol.Free
- Commit = ShmProtocol.Commit
- parser = argparse.ArgumentParser(description='Allows other actors to open shared memory areas for IPC',
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument('--address', metavar='\'<tcp "HOST" PORT>\'',
- help='transport address of the server',
- default='<ws "ws://localhost:9001/">')
- parser.add_argument('--cap', metavar='\'<ref ...>\'',
- help='capability for the dataspace on the server',
- default='<ref {oid: "syndicate" sig: #[acowDB2/oI+6aSEC3YIxGg==]}>')
- args = parser.parse_args()
- @actor.run_system(name = 'shm-actor', debug = False)
- def main():
- root_facet = turn.active_facet()
- self_ref = int(random.random() * 255)
- @syndicate.relay.connect(args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)))
- def on_connected(ds):
- turn.on_stop(lambda: turn.stop(root_facet))
- @dataspace.during(ds, P.rec('UseBuffer', P.CAPTURE, P.CAPTURE))
- def on_use_buffer(name, size):
- print("opening shared memory")
- shm = shared_memory.SharedMemory(name, create=True, size=size)
- h = turn.publish(ds, Buffer(name, size))
- turn.send(ds, Free(name))
- turn.on_stop(lambda: turn.retract(h))
- turn.on_stop(lambda: print("cleaning up shm"))
- turn.on_stop(lambda: shm.close())
- turn.on_stop(lambda: shm.unlink())
- @dataspace.during(ds, P.rec('Read', P.CAPTURE))
- def during_commit(name):
- turn.on_stop(lambda: turn.send(ds, Free(name)))
|