Skip to content

Commit

Permalink
Slightly more robust RTSP support (#46)
Browse files Browse the repository at this point in the history
* Slightly more robust RTSP support

* Bump version
  • Loading branch information
tyler-romero authored Nov 17, 2024
1 parent c4ea1a9 commit c91c123
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 11 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,12 @@ To connect an RTSP stream from a camera or other source, you'll need the RTSP UR
Once you have the RTSP URL, pass it to the `-s` parameter:

``` shell
RTSP_URL="rtsp://username:password@camera_ip_address:554/path/to/stream"

docker run groundlight/stream \
-t api_29imEXAMPLE \
-d det_2MiD5Elu8bza7sil9l7KPpr694a \
-s "rtsp://username:password@camera_ip_address:554/path/to/stream" \
-s "${RTSP_URL}" \
-f 1
```

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "stream"
version = "0.5.1"
version = "0.5.2"
description = "Groundlight Stream Processor - Container for analyzing video using RTSP etc"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
67 changes: 58 additions & 9 deletions src/stream/grabber.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,33 @@ class RTSPFrameGrabber(FrameGrabber):
latest frame when explicitly requested.
"""

def __init__(self, stream=None):
def __init__(self, stream: str, max_fps=10, keep_connection_open=True):
self.rtsp_url = stream
self.max_fps = max_fps
self.keep_connection_open = keep_connection_open

self.lock = Lock()
self.stream = stream
self.capture = cv2.VideoCapture(self.stream)
self.run = True

self.capture = cv2.VideoCapture(self.rtsp_url)
logger.debug(f"initialized video capture with backend={self.capture.getBackendName()}")

if self.keep_connection_open:
self._open_connection()
self._init_drain_thread()

def _open_connection(self):
self.capture = cv2.VideoCapture(self.rtsp_url)
if not self.capture.isOpened():
raise ValueError(f"could not open {self.stream=}")
self.thread = Thread(target=self._drain, name="drain_thread")
self.thread.start()
raise ValueError(
f"Could not open RTSP stream: {self.rtsp_url}. Is the RTSP URL correct? Is the camera connected to the network?"
)
logger.debug(f"Initialized video capture with backend={self.capture.getBackendName()}")

def _close_connection(self):
with self.lock:
if self.capture is not None:
self.capture.release()

def grab(self):
start = time.time()
Expand All @@ -179,11 +197,42 @@ def grab(self):
logger.debug(f"read the frame in {1000*(now-start):.1f}ms")
return frame

def _grab_implementation(self) -> np.ndarray:
if not self.keep_connection_open:
self._open_connection()
try:
return self._grab_open()
finally:
self._close_connection()
else:
return self._grab_open()

def _grab_open(self) -> np.ndarray:
with self.lock:
ret, frame = self.capture.retrieve() if self.keep_connection_open else self.capture.read()
if not ret:
logger.error(f"Could not read frame from {self.capture}")
return frame

def release(self) -> None:
if self.keep_connection_open:
self.run = False # to stop the buffer drain thread
self._close_connection()

def _init_drain_thread(self):
if not self.keep_connection_open:
return # No need to drain if we're not keeping the connection open

self.drain_rate = 1 / self.max_fps
thread = Thread(target=self._drain)
thread.daemon = True
thread.start()

def _drain(self):
logger.debug("starting thread to drain the video capture buffer")
while True:
while self.run:
with self.lock:
ret = self.capture.grab() # just grab and don't decode
_ = self.capture.grab()
time.sleep(self.drain_rate)


class YouTubeFrameGrabber(FrameGrabber):
Expand Down
15 changes: 15 additions & 0 deletions src/stream/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,19 @@ def run_capture_loop( # noqa: PLR0912 PLR0913
time.sleep(actual_delay)


def print_banner(gl: Groundlight, args: argparse.Namespace) -> None:
detector = gl.get_detector(id=args.detector)
motdet = "enabled" if args.motion else "disabled"
print("==================================================")
print("Groundlight Stream Processor")
print(f" Target Detector: {detector}")
print(f" Groundlight Endpoint: {gl.endpoint}")
print(f" Whoami: {gl.whoami()}")
print(f" Frames/sec: {args.fps} (Seconds/frame: {1/args.fps:.3f})")
print(f" Motion Detection: {motdet}")
print("==================================================")


def main():
"""Main entry point - parse args and run frame capture loop"""
parser = argparse.ArgumentParser(
Expand Down Expand Up @@ -267,6 +280,8 @@ def main():
# Setup motion detection if enabled
motion_detector = MotionDetector(pct_threshold=motion_threshold) if motion_detect else None

print_banner(gl=gl, args=args)

# Main capture loop
try:
run_capture_loop(
Expand Down

0 comments on commit c91c123

Please sign in to comment.