1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import sys
- import argparse
- import asyncio
- import random
- import syndicate
- from syndicate import patterns as P, actor, dataspace, turn
- import cv2 as cv
- import numpy as np
- import pyapriltags
- from syndicate.schema import sturdy
- from preserves.schema import load_schema_file
- from pprint import pprint
- AprilTagProtocol = load_schema_file('./apriltag.bin').apriltag
- AprilTag = AprilTagProtocol.AprilTag
- Says = AprilTagProtocol.Says
- parser = argparse.ArgumentParser(description='Opens a webcam and tracks any detected Apriltags.',
- formatter_class=argparse.ArgumentDefaultsHelpFormatter)
- parser.add_argument('--address', metavar='\'<tcp "HOST" PORT>\'',
- help='transport address of the server',
- default='<ws "ws://localhost:9001/">')
- parser.add_argument('--cap', metavar='\'<ref ...>\'',
- help='capability for the dataspace on the server',
- default='<ref {oid: "syndicate" sig: #[acowDB2/oI+6aSEC3YIxGg==]}>')
- args = parser.parse_args()
- def getPoints(cap, detector):
- red = (0, 0, 255)
- if cap.isOpened():
- ret, img = cap.read()
- gray = cv.cvtColor(img, cv.COLOR_BGR2GRAY)
- detections = detector.detect(gray)
- center = None
- output = []
- for d in detections:
- center = np.array(d.center)
- center = np.round(center, 0).astype('int')
- cv.drawMarker(img, center, red, cv.MARKER_CROSS, thickness=2)
- center[1] - 50
- cv.putText(img, f'{d.tag_id}', center, cv.FONT_HERSHEY_SIMPLEX, 1.5, red, 2)
- output.append([d.tag_id, center[0], center[1]])
- cv.imshow('raw', img)
- key = cv.waitKey(1)
- if key == ord('q'):
- return False, output
-
- return True, output
- @actor.run_system(name = 'apriltag-emitter', debug = False)
- def main():
- root_facet = turn.active_facet()
- @syndicate.relay.connect(args.address, sturdy.SturdyRef.decode(syndicate.parse(args.cap)))
- def on_connected(ds):
- turn.on_stop(lambda: turn.stop(root_facet))
- cap = cv.VideoCapture(0)
- detector = pyapriltags.Detector(families='tagStandard52h13')
- @turn.linked_task()
- async def accept_input(f):
- run = True
- tag_handles = {}
- while run:
- continueRunning, d_list = await asyncio.to_thread(getPoints, cap, detector)
- if d_list:
- for d in d_list:
- tag_id = int(d[0])
- x = int(d[1])
- y = int(d[2])
- turn.external(f, lambda: turn.publish(ds, AprilTag(tag_id, x, y)))
- print(f'Publishing: {tag_id}, {x}, {y} !')
- run = continueRunning
|