Adding initial camera functions
This commit is contained in:
82
src/camera.py
Normal file
82
src/camera.py
Normal file
@@ -0,0 +1,82 @@
|
||||
import logging
|
||||
import threading
|
||||
from collections.abc import Iterator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
from picamera2 import Picamera2
|
||||
from picamera2.encoders import H264Encoder
|
||||
from picamera2.outputs import FileOutput
|
||||
|
||||
PICAMERA_AVAILABLE = True
|
||||
except ImportError:
|
||||
PICAMERA_AVAILABLE = False
|
||||
logger.warning("picamera2 not available — running in mock mode")
|
||||
|
||||
|
||||
class StreamOutput:
|
||||
"""Thread-safe buffer that holds the latest H.264 frame."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.frame: bytes = b""
|
||||
self.condition = threading.Condition()
|
||||
|
||||
def write(self, data: bytes) -> None:
|
||||
with self.condition:
|
||||
self.frame = data
|
||||
self.condition.notify_all()
|
||||
|
||||
|
||||
class Camera:
|
||||
def __init__(self) -> None:
|
||||
self._picam: Picamera2 | None = None
|
||||
self._output: StreamOutput | None = None
|
||||
self._encoder: H264Encoder | None = None
|
||||
self.running = False
|
||||
|
||||
def start(self) -> None:
|
||||
if self.running:
|
||||
return
|
||||
if not PICAMERA_AVAILABLE:
|
||||
logger.info("Mock camera started")
|
||||
self.running = True
|
||||
return
|
||||
|
||||
self._picam = Picamera2()
|
||||
config = self._picam.create_video_configuration(
|
||||
main={"size": (1280, 720)},
|
||||
)
|
||||
self._picam.configure(config)
|
||||
self._output = StreamOutput()
|
||||
self._encoder = H264Encoder(bitrate=2_000_000)
|
||||
self._picam.start_recording(self._encoder, FileOutput(self._output))
|
||||
self.running = True
|
||||
logger.info("Camera started")
|
||||
|
||||
def stop(self) -> None:
|
||||
if not self.running:
|
||||
return
|
||||
if self._picam:
|
||||
self._picam.stop_recording()
|
||||
self._picam.close()
|
||||
self._picam = None
|
||||
self.running = False
|
||||
logger.info("Camera stopped")
|
||||
|
||||
def frames(self) -> Iterator[bytes]:
|
||||
"""Yield H.264 frames for multipart streaming."""
|
||||
if not PICAMERA_AVAILABLE:
|
||||
# yield a small empty frame in mock mode
|
||||
while self.running:
|
||||
yield b""
|
||||
return
|
||||
assert self._output is not None
|
||||
while self.running:
|
||||
with self._output.condition:
|
||||
self._output.condition.wait()
|
||||
frame = self._output.frame
|
||||
yield frame
|
||||
|
||||
|
||||
camera = Camera()
|
||||
Reference in New Issue
Block a user