Source code for draugr.opencv_utilities.raster_sequences.async_video_stream
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "Christian Heider Nielsen"
__doc__ = r"""
Created on 25-01-2021
"""
from threading import Thread
from typing import Any, Union
import cv2
__all__ = ["AsyncVideoStream"]
[docs]class AsyncVideoStream:
"""description"""
[docs] def __init__(
self, src: Union[int, str] = 0, thread_name: str = None, group: Any = None
):
"""
threaded async wrapper around opencv cv2.VideoCapture with alike interface
:param src:
:param thread_name:
"""
self._stream = cv2.VideoCapture(src)
(self.grabbed, self.frame) = self._stream.read()
self._stopped = False
self._thread = Thread(
target=self.update, name=thread_name, args=(), daemon=True, group=group
)
[docs] def start(self):
"""
# start the thread to read frames from the video stream
:return:
"""
self._thread.start()
return self
[docs] def update(self):
"""description"""
while not self._stopped: # keep looping infinitely until the thread is stopped
(
self.grabbed,
self.frame,
) = self._stream.read() # otherwise, read the next frame from the stream
[docs] def read(self):
"""description"""
return self.grabbed, self.frame # return the frame most recently read
[docs] def stop(self):
"""description"""
self._stream.release()
self._stopped = True # indicate that the thread should be stopped
# noinspection PyPep8Naming
[docs] def isOpened(self):
"""description"""
return self._stream.isOpened()
def __call__(self, *args, **kwargs):
return self.frame
def __next__(self):
return self.frame
def __iter__(self):
return self.start()
def __del__(self):
self.stop()
self._thread.join()
def __enter__(self):
return self.start()
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
if __name__ == "__main__":
for a in AsyncVideoStream():
pass