Adding recording

This commit is contained in:
2026-05-24 14:36:26 -07:00
parent 0f8572b26e
commit 486f55f228
5 changed files with 658 additions and 64 deletions
+65 -4
View File
@@ -1,5 +1,8 @@
from __future__ import annotations
import logging
from datetime import UTC, datetime
from pathlib import Path
from flask import (
Flask,
@@ -12,8 +15,8 @@ from flask import (
)
from werkzeug.middleware.proxy_fix import ProxyFix
from src.camera import camera
from src.database import CameraEvent, CameraStatus, db
from src.camera import RECORDINGS_DIR, camera
from src.database import CameraEvent, CameraRecordingEvent, CameraStatus, db
logging.basicConfig(level=logging.WARNING)
@@ -33,7 +36,6 @@ def create_app() -> Flask:
# sync camera state with db on startup
status = CameraStatus.get()
if status.running and not camera.running:
# was running before restart — mark as stopped
CameraStatus.set_running(False)
return flask_app
@@ -50,6 +52,9 @@ def get_client_ip() -> str:
)
# ── Health ───────────────────────────────────────────────────────────────────
@app.get("/heartbeat")
def heartbeat() -> tuple[Response, int]:
return (
@@ -63,11 +68,17 @@ def heartbeat() -> tuple[Response, int]:
)
# ── Pages ────────────────────────────────────────────────────────────────────
@app.get("/")
def index() -> str:
return render_template("index.html")
# ── Stream endpoints ──────────────────────────────────────────────────────────
@app.post("/camera/start")
def camera_start() -> tuple[Response, int]:
status = CameraStatus.get()
@@ -95,6 +106,7 @@ def camera_stop() -> tuple[Response, int]:
@app.get("/camera/status")
def camera_status() -> tuple[Response, int]:
status = CameraStatus.get()
rec = camera.recording_status()
return (
jsonify(
{
@@ -103,6 +115,8 @@ def camera_status() -> tuple[Response, int]:
camera.wait_until_ready(timeout=0) if status.running else False
),
"updated_at": status.updated_at.isoformat(),
"recording": rec["recording"],
"recording_started_at": rec["started_at"],
}
),
200,
@@ -133,7 +147,6 @@ def hls_segment(filename: str) -> Response:
if not hls_dir.exists():
abort(404)
# set correct MIME types for HLS files
if filename.endswith(".m3u8"):
mimetype = "application/vnd.apple.mpegurl"
elif filename.endswith(".ts"):
@@ -144,5 +157,53 @@ def hls_segment(filename: str) -> Response:
return send_from_directory(str(hls_dir), filename, mimetype=mimetype)
# ── Recording endpoints ───────────────────────────────────────────────────────
@app.post("/camera/record/start")
def record_start() -> tuple[Response, int]:
if camera.recording:
return jsonify({"status": "already_recording"}), 200
try:
path = camera.start_recording()
CameraRecordingEvent.log("record_start", get_client_ip(), str(path))
return jsonify({"status": "recording", "path": str(path)}), 200
except RuntimeError as exc:
return jsonify({"status": "error", "message": str(exc)}), 409
@app.post("/camera/record/stop")
def record_stop() -> tuple[Response, int]:
if not camera.recording:
return jsonify({"status": "not_recording"}), 200
path = camera.stop_recording()
filename = Path(path).name if path else None
CameraRecordingEvent.log("record_stop", get_client_ip(), str(path))
return jsonify({"status": "stopped", "filename": filename}), 200
@app.get("/camera/recordings")
def list_recordings() -> tuple[Response, int]:
recordings = camera.list_recordings()
return jsonify(recordings), 200
@app.get("/camera/recordings/<filename>")
def download_recording(filename: str) -> Response:
# safety: only allow the expected filename pattern
if not filename.startswith("recording_") or not filename.endswith(".mp4"):
abort(404)
if not RECORDINGS_DIR.exists():
abort(404)
return send_from_directory(
str(RECORDINGS_DIR),
filename,
mimetype="video/mp4",
as_attachment=True,
)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=True, threaded=True)
+196 -17
View File
@@ -1,9 +1,12 @@
from __future__ import annotations
import io
import logging
import shutil
import subprocess
import threading
import time
from datetime import UTC, datetime
from pathlib import Path
logger = logging.getLogger(__name__)
@@ -11,14 +14,15 @@ logger = logging.getLogger(__name__)
try:
from picamera2 import Picamera2
from picamera2.encoders import H264Encoder
from picamera2.outputs import FileOutput
from picamera2.encoders import H264Encoder, MJPEGEncoder
from picamera2.outputs import FileOutput, FfmpegOutput
PICAMERA_AVAILABLE = True
except ImportError:
PICAMERA_AVAILABLE = False
HLS_DIR = Path("/tmp/hls")
RECORDINGS_DIR = Path("/var/lib/birdcam/recordings")
SEGMENT_DURATION = 2 # seconds per segment
SEGMENT_COUNT = 5 # segments to keep in playlist
BITRATE = 2_000_000 # 2 Mbps — adjust for bandwidth needs
@@ -47,13 +51,21 @@ class Camera:
def __init__(self) -> None:
self._picam: Picamera2 | None = None
self._encoder: H264Encoder | None = None
self._stream_encoder: H264Encoder | None = None
self._record_encoder: H264Encoder | None = None
self._ffmpeg: subprocess.Popen[bytes] | None = None
self._record_ffmpeg: subprocess.Popen[bytes] | None = None
self._output: PipeOutput | None = None
self._ready_event = threading.Event()
self._watch_thread: threading.Thread | None = None
self._stop_event = threading.Event()
self.running = False
self.recording = False
self._current_recording_path: Path | None = None
self._recording_started_at: datetime | None = None
self._lock = threading.Lock()
# ── Streaming ────────────────────────────────────────────────────────────
def start(self) -> None:
if self.running:
@@ -65,7 +77,7 @@ class Camera:
HLS_DIR.mkdir(parents=True)
if not PICAMERA_AVAILABLE:
logger.info("Mock camera started")
logger.info("Mock camera started (picamera2 not available)")
self.running = True
return
@@ -75,11 +87,11 @@ class Camera:
"-loglevel",
"warning",
"-f",
"h264", # input is raw H.264
"h264",
"-i",
"pipe:0", # read from stdin
"pipe:0",
"-c:v",
"copy", # no re-encoding — pass through directly
"copy",
"-hls_time",
str(SEGMENT_DURATION),
"-hls_list_size",
@@ -99,7 +111,6 @@ class Camera:
)
self._output = PipeOutput(self._ffmpeg)
# configure picamera2 with H.264 encoder
self._picam = Picamera2()
config = self._picam.create_video_configuration(
main={"size": (1280, 720)},
@@ -107,19 +118,18 @@ class Camera:
self._picam.configure(config)
self._picam.set_controls(
{
"Brightness": 0.1, # -1.0 to 1.0, default 0.0
"Contrast": 1.1, # 0.0 to 32.0, default 1.0
"Saturation": 1.1, # 0.0 to 32.0, default 1.0
"Sharpness": 1.0, # 0.0 to 16.0, default 1.0
"AwbEnable": True, # auto white balance
"AeEnable": True, # auto exposure
"Brightness": 0.1,
"Contrast": 1.1,
"Saturation": 1.1,
"Sharpness": 1.0,
"AwbEnable": True,
"AeEnable": True,
}
)
self._encoder = H264Encoder(bitrate=BITRATE)
self._stream_encoder = H264Encoder(bitrate=BITRATE)
buffered = io.BufferedWriter(self._output)
self._picam.start_recording(self._encoder, FileOutput(buffered))
self._picam.start_recording(self._stream_encoder, FileOutput(buffered))
# watch for the playlist to appear — signals first segment is ready
self._stop_event.clear()
self._ready_event.clear()
self._watch_thread = threading.Thread(target=self._watch_playlist, daemon=True)
@@ -147,6 +157,11 @@ class Camera:
def stop(self) -> None:
if not self.running:
return
# stop any active recording first
if self.recording:
self.stop_recording()
self._stop_event.set()
self._ready_event.set() # unblock any waiters
@@ -174,6 +189,170 @@ class Camera:
self.running = False
logger.info("Camera stopped")
# ── Recording ────────────────────────────────────────────────────────────
def start_recording(self) -> Path:
"""
Start recording to an MP4 file. Can run independently of or
simultaneously with the HLS stream. Returns the output file path.
"""
with self._lock:
if self.recording:
raise RuntimeError("Already recording")
RECORDINGS_DIR.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
output_path = RECORDINGS_DIR / f"recording_{timestamp}.mp4"
if not PICAMERA_AVAILABLE:
# mock mode — create an empty placeholder file
output_path.touch()
self._current_recording_path = output_path
self._recording_started_at = datetime.now(UTC)
self.recording = True
logger.info("Mock recording started: %s", output_path)
return output_path
if self._picam is None:
# camera not yet streaming — start it temporarily in record-only mode
self._picam = Picamera2()
config = self._picam.create_video_configuration(
main={"size": (1280, 720)},
)
self._picam.configure(config)
self._picam.set_controls(
{
"Brightness": 0.1,
"Contrast": 1.1,
"Saturation": 1.1,
"Sharpness": 1.0,
"AwbEnable": True,
"AeEnable": True,
}
)
# pipe raw H.264 → ffmpeg → MP4 container
ffmpeg_cmd = [
"ffmpeg",
"-loglevel",
"warning",
"-f",
"h264",
"-i",
"pipe:0",
"-c:v",
"copy",
"-movflags",
"+faststart",
str(output_path),
]
self._record_ffmpeg = subprocess.Popen(
ffmpeg_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
)
record_pipe = PipeOutput(self._record_ffmpeg)
record_buffered = io.BufferedWriter(record_pipe)
self._record_encoder = H264Encoder(bitrate=BITRATE)
if self.running:
# camera already running for streaming — add a second encoder output
self._picam.start_encoder(
self._record_encoder,
FileOutput(record_buffered),
)
else:
# no stream active — start camera just for recording
self._picam.start_recording(
self._record_encoder,
FileOutput(record_buffered),
)
self._current_recording_path = output_path
self._recording_started_at = datetime.now(UTC)
self.recording = True
logger.info("Recording started: %s", output_path)
return output_path
def stop_recording(self) -> Path | None:
"""Stop the active recording and finalise the MP4 file."""
with self._lock:
if not self.recording:
return None
path = self._current_recording_path
if PICAMERA_AVAILABLE and self._record_encoder is not None:
if self.running:
# streaming still active — only stop the recording encoder
self._picam.stop_encoder(self._record_encoder) # type: ignore[union-attr]
else:
# recording-only mode — stop the whole camera
if self._picam:
self._picam.stop_recording()
self._picam.close()
self._picam = None
self._record_encoder = None
if self._record_ffmpeg:
if self._record_ffmpeg.stdin:
self._record_ffmpeg.stdin.close()
try:
self._record_ffmpeg.wait(timeout=10)
except subprocess.TimeoutExpired:
self._record_ffmpeg.kill()
self._record_ffmpeg = None
self.recording = False
self._current_recording_path = None
self._recording_started_at = None
logger.info("Recording stopped: %s", path)
return path
def recording_status(self) -> dict[str, object]:
"""Return current recording state for the API."""
return {
"recording": self.recording,
"path": (
str(self._current_recording_path)
if self._current_recording_path
else None
),
"started_at": (
self._recording_started_at.isoformat()
if self._recording_started_at
else None
),
}
def list_recordings(self) -> list[dict[str, object]]:
"""Return metadata for all saved recordings, newest first."""
if not RECORDINGS_DIR.exists():
return []
recordings = []
for f in sorted(RECORDINGS_DIR.glob("recording_*.mp4"), reverse=True):
stat = f.stat()
recordings.append(
{
"filename": f.name,
"size_bytes": stat.st_size,
"created_at": datetime.fromtimestamp(
stat.st_ctime, tz=UTC
).isoformat(),
"duration_seconds": None, # could be derived via ffprobe if needed
}
)
return recordings
# ── Properties ───────────────────────────────────────────────────────────
@property
def hls_dir(self) -> Path:
return HLS_DIR
+42
View File
@@ -46,6 +46,7 @@ class CameraStatus(Base):
class CameraEvent(Base):
__tablename__ = "camera_events"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
action: Mapped[str] = mapped_column(String(10), nullable=False) # 'start' | 'stop'
ip_address: Mapped[str] = mapped_column(String(45), nullable=False)
@@ -73,3 +74,44 @@ class CameraEvent(Base):
.scalars()
.all()
)
class CameraRecordingEvent(Base):
"""Audit log for recording start/stop actions."""
__tablename__ = "camera_recording_events"
id: Mapped[int] = mapped_column(Integer, primary_key=True)
action: Mapped[str] = mapped_column(
String(20), nullable=False
) # 'record_start' | 'record_stop'
ip_address: Mapped[str] = mapped_column(String(45), nullable=False)
file_path: Mapped[str] = mapped_column(String(512), nullable=False, default="")
timestamp: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(UTC),
)
@staticmethod
def log(action: str, ip_address: str, file_path: str = "") -> CameraRecordingEvent:
event = CameraRecordingEvent(
action=action,
ip_address=ip_address,
file_path=file_path,
)
db.session.add(event)
db.session.commit()
return event
@staticmethod
def recent(limit: int = 50) -> list[CameraRecordingEvent]:
return list(
db.session.execute(
db.select(CameraRecordingEvent)
.order_by(CameraRecordingEvent.timestamp.desc())
.limit(limit)
)
.scalars()
.all()
)
+262 -41
View File
@@ -44,6 +44,24 @@
justify-content: center;
}
#preview.is-recording {
outline: 2px solid #ef4444;
outline-offset: 2px;
animation: rec-pulse 2s ease-in-out infinite;
}
@keyframes rec-pulse {
0%,
100% {
outline-color: #ef4444;
}
50% {
outline-color: #7f1d1d;
}
}
#stream-video {
width: 100%;
height: 100%;
@@ -54,6 +72,10 @@
#placeholder {
color: #555;
font-size: 0.9rem;
display: flex;
flex-direction: column;
align-items: center;
gap: 0.5rem;
}
.controls {
@@ -70,7 +92,7 @@
border: none;
border-radius: 6px;
cursor: pointer;
transition: opacity 0.2s;
transition: opacity 0.2s, background 0.2s;
}
button:disabled {
@@ -88,11 +110,128 @@
color: #fff;
}
#btn-record {
background: #3f3f3f;
color: #f0f0f0;
display: flex;
align-items: center;
gap: 0.5rem;
}
#btn-record.is-recording {
background: #b91c1c;
color: #fff;
}
.rec-dot {
width: 10px;
height: 10px;
border-radius: 50%;
background: currentColor;
flex-shrink: 0;
}
#btn-record.is-recording .rec-dot {
animation: blink 1s step-start infinite;
}
@keyframes blink {
0%,
100% {
opacity: 1;
}
50% {
opacity: 0;
}
}
.badge {
font-size: 0.8rem;
padding: 0.25rem 0.75rem;
border-radius: 999px;
font-weight: 600;
letter-spacing: 0.03em;
}
.badge.live {
background: #dc2626;
color: #fff;
}
#status {
font-size: 0.85rem;
color: #888;
min-height: 1.2em;
}
#recordings-section {
width: 100%;
max-width: 720px;
}
#recordings-section h2 {
font-size: 0.9rem;
letter-spacing: 0.06em;
text-transform: uppercase;
color: #666;
margin-bottom: 0.75rem;
padding-bottom: 0.4rem;
border-bottom: 1px solid #222;
}
#recordings-list {
display: flex;
flex-direction: column;
gap: 0.4rem;
}
.recording-item {
display: flex;
align-items: center;
justify-content: space-between;
background: #1a1a1a;
border: 1px solid #2a2a2a;
border-radius: 6px;
padding: 0.5rem 0.85rem;
font-size: 0.82rem;
gap: 1rem;
}
.recording-item .rec-name {
font-family: monospace;
color: #ccc;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
flex: 1;
}
.recording-item .rec-meta {
color: #555;
white-space: nowrap;
font-size: 0.75rem;
}
.recording-item a {
color: #22c55e;
text-decoration: none;
font-size: 0.78rem;
white-space: nowrap;
transition: color 0.15s;
}
.recording-item a:hover {
color: #4ade80;
}
#recordings-empty {
color: #444;
font-size: 0.82rem;
text-align: center;
padding: 0.75rem 0;
}
</style>
</head>
@@ -101,71 +240,165 @@
<div id="preview">
<div id="placeholder">
<div class="dot" id="status-dot"></div>
<span id="placeholder-text">Checking stream...</span>
</div>
<video id="stream-video" autoplay muted playsinline></video>
</div>
<div class="controls" id="controls"></div>
<p id="status"></p>
<!-- hls.js from CDN -->
<div id="recordings-section">
<h2>Recordings</h2>
<div id="recordings-list">
<p id="recordings-empty">No recordings yet.</p>
</div>
</div>
<script src="https://cdn.jsdelivr.net/npm/hls.js@1/dist/hls.min.js"></script>
<script>
const video = document.getElementById("stream-video");
const placeholder = document.getElementById("placeholder");
const placeholderText = document.getElementById("placeholder-text");
const statusDot = document.getElementById("status-dot");
const controls = document.getElementById("controls");
const statusEl = document.getElementById("status");
const preview = document.getElementById("preview");
const recordingsList = document.getElementById("recordings-list");
let hls = null;
let pollInterval = null;
let isRecording = false;
function setStatus(msg) { statusEl.textContent = msg; }
function fmtBytes(b) {
if (b < 1024) return b + " B";
if (b < 1048576) return (b / 1024).toFixed(1) + " KB";
return (b / 1048576).toFixed(1) + " MB";
}
function fmtDate(iso) { return new Date(iso).toLocaleString(); }
// ── Recordings list ─────────────────────────────────────────────────────
async function refreshRecordings() {
const res = await fetch("/camera/recordings");
const list = await res.json();
recordingsList.innerHTML = "";
if (list.length === 0) {
recordingsList.innerHTML = '<p id="recordings-empty">No recordings yet.</p>';
return;
}
list.forEach(rec => {
const item = document.createElement("div");
item.className = "recording-item";
item.innerHTML =
'<span class="rec-name">' + rec.filename + '</span>' +
'<span class="rec-meta">' + fmtBytes(rec.size_bytes) + ' &nbsp;&middot;&nbsp; ' + fmtDate(rec.created_at) + '</span>' +
'<a href="/camera/recordings/' + rec.filename + '" download>Download</a>';
recordingsList.appendChild(item);
});
}
// ── Record button ───────────────────────────────────────────────────────
function createRecordButton() {
const btn = document.createElement("button");
btn.id = "btn-record";
const dot = document.createElement("span");
dot.className = "rec-dot";
const label = document.createElement("span");
label.className = "rec-label";
if (isRecording) {
btn.classList.add("is-recording");
label.textContent = "Stop Recording";
} else {
label.textContent = "Record";
}
btn.appendChild(dot);
btn.appendChild(label);
btn.addEventListener("click", toggleRecording);
return btn;
}
async function toggleRecording() {
const btn = document.getElementById("btn-record");
if (btn) btn.disabled = true;
if (!isRecording) {
setStatus("Starting recording...");
const res = await fetch("/camera/record/start", { method: "POST" });
const data = await res.json();
if (data.status === "recording" || data.status === "already_recording") {
isRecording = true;
setStatus("Recording");
} else {
setStatus("Failed to start recording: " + (data.message || "unknown error"));
}
} else {
setStatus("Stopping recording...");
await fetch("/camera/record/stop", { method: "POST" });
isRecording = false;
setStatus("Recording saved");
await refreshRecordings();
}
const streaming = !!document.getElementById("btn-stop");
renderControls(streaming);
}
// ── Controls ────────────────────────────────────────────────────────────
function renderControls(running) {
controls.innerHTML = "";
if (running) {
const stop = document.createElement("button");
stop.id = "btn-stop";
stop.textContent = "Stop";
stop.textContent = "Stop Stream";
stop.addEventListener("click", stopStream);
controls.appendChild(stop);
const badge = document.createElement("span");
badge.className = "badge live";
badge.textContent = "● Live";
controls.appendChild(badge);
} else {
const start = document.createElement("button");
start.id = "btn-start";
start.textContent = "Start";
start.textContent = "Start Stream";
start.addEventListener("click", startStream);
controls.appendChild(start);
}
controls.appendChild(createRecordButton());
if (running) {
const badge = document.createElement("span");
badge.className = "badge live";
badge.textContent = "Live";
controls.appendChild(badge);
}
function showOffline(message = "Stream is offline") {
if (isRecording) {
preview.classList.add("is-recording");
} else {
preview.classList.remove("is-recording");
}
}
// ── Video display ────────────────────────────────────────────────────────
function showOffline(message) {
video.style.display = "none";
video.src = "";
placeholder.style.display = "flex";
placeholderText.textContent = message;
statusDot.classList.remove("live");
placeholderText.textContent = message || "Stream is offline";
}
function showLive() {
placeholder.style.display = "none";
video.style.display = "block";
statusDot.classList.add("live");
}
// ── HLS ─────────────────────────────────────────────────────────────────
// ── HLS ─────────────────────────────────────────────────────────────────
function startHls() {
const src = "/camera/hls/stream.m3u8";
if (hls) { hls.destroy(); hls = null; }
if (Hls.isSupported()) {
@@ -179,7 +412,6 @@
});
hls.on(Hls.Events.ERROR, (event, data) => {
if (data.fatal) {
console.error("HLS fatal error:", data);
showOffline("Stream error — reloading...");
setTimeout(attachToStream, 3000);
}
@@ -201,18 +433,14 @@
video.style.display = "none";
}
// ── Status polling ───────────────────────────────────────────────────────
// ── Status polling ───────────────────────────────────────────────────────
async function fetchStatus() {
const res = await fetch("/camera/status");
return res.json();
}
function setStatus(msg) {
statusEl.textContent = msg;
}
async function waitForReady(maxAttempts = 40) {
async function waitForReady(maxAttempts) {
maxAttempts = maxAttempts || 40;
for (let i = 0; i < maxAttempts; i++) {
const data = await fetchStatus();
if (data.ready) return true;
@@ -221,9 +449,10 @@
return false;
}
// Called on page load and after stream errors — attach if already live
async function attachToStream() {
const data = await fetchStatus();
isRecording = data.recording || false;
if (data.ready) {
startHls();
renderControls(true);
@@ -259,15 +488,10 @@
if (pollInterval) { clearInterval(pollInterval); pollInterval = null; }
}
// ── Stream start / stop ──────────────────────────────────────────────────
// ── Stream start / stop ──────────────────────────────────────────────────
async function startStream() {
// guard: re-check status in case another client just started it
const current = await fetchStatus();
if (current.running || current.ready) {
await attachToStream();
return;
}
if (current.running || current.ready) { await attachToStream(); return; }
const btn = document.getElementById("btn-start");
if (btn) btn.disabled = true;
@@ -275,7 +499,6 @@
placeholderText.textContent = "Starting...";
await fetch("/camera/start", { method: "POST" });
setStatus("Waiting for first segment...");
const ready = await waitForReady();
@@ -301,14 +524,12 @@
startOfflinePoll();
}
// ── Init ────────────────────────────────────────────────────────────────
// ── Init ──────────────────────────────────────────────────────────────────
(async () => {
await attachToStream();
await refreshRecordings();
const data = await fetchStatus();
if (!data.running && !data.ready) {
startOfflinePoll();
}
if (!data.running && !data.ready) startOfflinePoll();
})();
</script>
</body>
+93 -2
View File
@@ -17,6 +17,9 @@ def client() -> Generator[FlaskClient, Any, Any]:
db.drop_all()
# ── Stream tests ─────────────────────────────────────────────────────────────
def test_heartbeat_status_code(client: FlaskClient) -> None:
response = client.get("/heartbeat")
assert response.status_code == 200
@@ -35,7 +38,7 @@ def test_camera_stop(client: FlaskClient) -> None:
assert response.get_json()["status"] in ("stopped", "already_stopped")
def test_double_start_is_idempotent(client):
def test_double_start_is_idempotent(client: FlaskClient) -> None:
client.post("/camera/start")
res = client.post("/camera/start")
assert res.get_json()["status"] == "already_running"
@@ -48,9 +51,19 @@ def test_camera_status(client: FlaskClient) -> None:
assert "running" in data
assert "ready" in data
assert "updated_at" in data
# recording fields must always be present
assert "recording" in data
assert "recording_started_at" in data
def test_camera_log(client):
def test_camera_status_includes_recording_false_by_default(client: FlaskClient) -> None:
res = client.get("/camera/status")
data = res.get_json()
assert data["recording"] is False
assert data["recording_started_at"] is None
def test_camera_log(client: FlaskClient) -> None:
client.post("/camera/start")
client.post("/camera/stop")
res = client.get("/camera/log")
@@ -72,3 +85,81 @@ def test_index_page(client: FlaskClient) -> None:
response = client.get("/")
assert response.status_code == 200
assert b"Pi Camera" in response.data
# ── Recording tests ───────────────────────────────────────────────────────────
def test_record_start_returns_200(client: FlaskClient) -> None:
res = client.post("/camera/record/start")
assert res.status_code == 200
data = res.get_json()
assert data["status"] in ("recording", "already_recording")
def test_record_stop_when_not_recording(client: FlaskClient) -> None:
res = client.post("/camera/record/stop")
assert res.status_code == 200
assert res.get_json()["status"] == "not_recording"
def test_record_start_then_stop(client: FlaskClient) -> None:
start = client.post("/camera/record/start")
assert start.get_json()["status"] == "recording"
stop = client.post("/camera/record/stop")
assert stop.status_code == 200
assert stop.get_json()["status"] == "stopped"
def test_double_record_start_is_idempotent(client: FlaskClient) -> None:
client.post("/camera/record/start")
res = client.post("/camera/record/start")
assert res.get_json()["status"] == "already_recording"
# clean up
client.post("/camera/record/stop")
def test_status_reflects_recording_state(client: FlaskClient) -> None:
client.post("/camera/record/start")
status = client.get("/camera/status").get_json()
assert status["recording"] is True
assert status["recording_started_at"] is not None
client.post("/camera/record/stop")
status = client.get("/camera/status").get_json()
assert status["recording"] is False
assert status["recording_started_at"] is None
def test_stream_and_record_simultaneously(client: FlaskClient) -> None:
"""Stream and record can be active at the same time without interference."""
client.post("/camera/start")
rec = client.post("/camera/record/start")
assert rec.get_json()["status"] in ("recording", "already_recording")
status = client.get("/camera/status").get_json()
assert status["running"] is True
assert status["recording"] is True
# stopping stream does not affect recording state report from camera object
client.post("/camera/stop")
# recording was stopped as part of camera.stop() — that is expected behaviour
# (the camera itself cleans up on stop)
def test_list_recordings_empty(client: FlaskClient) -> None:
res = client.get("/camera/recordings")
assert res.status_code == 200
assert res.get_json() == []
def test_download_recording_invalid_filename(client: FlaskClient) -> None:
# filename pattern not matching should 404
res = client.get("/camera/recordings/../../etc/passwd")
assert res.status_code in (404, 400)
def test_download_recording_wrong_prefix(client: FlaskClient) -> None:
res = client.get("/camera/recordings/evil.mp4")
assert res.status_code == 404