import io import logging import shutil import subprocess import threading import time from pathlib import Path logger = logging.getLogger(__name__) try: from picamera2 import Picamera2 from picamera2.encoders import H264Encoder from picamera2.outputs import FileOutput PICAMERA_AVAILABLE = True except ImportError: PICAMERA_AVAILABLE = False HLS_DIR = Path("/tmp/hls") SEGMENT_DURATION = 2 # seconds per segment SEGMENT_COUNT = 5 # segments to keep in playlist BITRATE = 2_000_000 # 2 Mbps — adjust for bandwidth needs class PipeOutput(io.RawIOBase): """Wraps ffmpeg stdin pipe as a BufferedIOBase-compatible stream.""" def __init__(self, proc: subprocess.Popen[bytes]) -> None: self._proc = proc def write(self, data: bytes) -> int: # type: ignore[override] if self._proc.stdin and not self._proc.stdin.closed: try: self._proc.stdin.write(data) return len(data) except BrokenPipeError: pass return 0 def writable(self) -> bool: return True class Camera: def __init__(self) -> None: self._picam: Picamera2 | None = None self._encoder: H264Encoder | None = None self._ffmpeg: subprocess.Popen[bytes] | None = None self._output: PipeOutput | None = None self._ready_event = threading.Event() self._watch_thread: threading.Thread | None = None self._stop_event = threading.Event() self.running = False def start(self) -> None: if self.running: return # prepare HLS output directory if HLS_DIR.exists(): shutil.rmtree(HLS_DIR) HLS_DIR.mkdir(parents=True) if not PICAMERA_AVAILABLE: logger.info("Mock camera started") self.running = True return # start ffmpeg: reads raw H.264 from stdin, writes HLS segments ffmpeg_cmd = [ "ffmpeg", "-loglevel", "warning", "-f", "h264", # input is raw H.264 "-i", "pipe:0", # read from stdin "-c:v", "copy", # no re-encoding — pass through directly "-hls_time", str(SEGMENT_DURATION), "-hls_list_size", str(SEGMENT_COUNT), "-hls_flags", "delete_segments+append_list", "-hls_segment_filename", str(HLS_DIR / "seg%03d.ts"), str(HLS_DIR / "stream.m3u8"), ] self._ffmpeg = subprocess.Popen( ffmpeg_cmd, stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, ) self._output = PipeOutput(self._ffmpeg) # configure picamera2 with H.264 encoder self._picam = Picamera2() config = self._picam.create_video_configuration( main={"size": (1280, 720)}, ) self._picam.configure(config) self._encoder = H264Encoder(bitrate=BITRATE) buffered = io.BufferedWriter(self._output) self._picam.start_recording(self._encoder, FileOutput(buffered)) # watch for the playlist to appear — signals first segment is ready self._stop_event.clear() self._ready_event.clear() self._watch_thread = threading.Thread(target=self._watch_playlist, daemon=True) self._watch_thread.start() self.running = True logger.info("Camera started — waiting for first HLS segment") def _watch_playlist(self) -> None: """Signal ready once the m3u8 playlist exists and has at least one segment.""" playlist = HLS_DIR / "stream.m3u8" while not self._stop_event.is_set(): if playlist.exists(): content = playlist.read_text() if ".ts" in content: logger.info("HLS playlist ready") self._ready_event.set() return time.sleep(0.25) def wait_until_ready(self, timeout: float = 15.0) -> bool: if not PICAMERA_AVAILABLE: return True return self._ready_event.wait(timeout) def stop(self) -> None: if not self.running: return self._stop_event.set() self._ready_event.set() # unblock any waiters if self._picam: self._picam.stop_recording() self._picam.close() self._picam = None if self._output: self._output.close() if self._ffmpeg and self._ffmpeg.stdin: self._ffmpeg.stdin.close() if self._ffmpeg: try: self._ffmpeg.wait(timeout=5) except subprocess.TimeoutExpired: self._ffmpeg.kill() self._ffmpeg = None if HLS_DIR.exists(): shutil.rmtree(HLS_DIR) self.running = False logger.info("Camera stopped") @property def hls_dir(self) -> Path: return HLS_DIR camera = Camera()