diff --git a/README.md b/README.md index 5f43aec..bd81386 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,127 @@ -# tplink-nvr-export -A CLI tool that exports videos from TP-Link Vigi NVRs over a certain period of time. +# TP-Link Vigi NVR Export + +A CLI tool to export video recordings from TP-Link Vigi NVRs over a specified time period. + +> ⚠️ **Note**: This tool is in early development. API endpoints may need adjustment based on your specific NVR model and firmware version. + +## Features + +- 📹 Export recordings by time range +- 📅 Filter by date/time with flexible formats +- 🎯 Filter by recording type (continuous, motion, alarm) +- 📊 Progress bar during downloads +- 🔒 Secure authentication via OpenAPI + +## Requirements + +- Python 3.10+ +- TP-Link Vigi NVR with OpenAPI enabled +- Network access to NVR on port 20443 (default) + +## Installation + +```bash +# Clone the repository +git clone https://github.com/johannes/tplink-nvr-export.git +cd tplink-nvr-export + +# Install with pip +pip install -e . + +# Or with pipx for isolated environment +pipx install . +``` + +## NVR Setup + +Before using this tool, enable OpenAPI on your NVR: + +1. Open NVR web interface (https://your-nvr-ip) +2. Navigate to **Settings > Network > OpenAPI** +3. Enable OpenAPI +4. Note the port (default: 20443) + +## Usage + +### Export recordings + +```bash +# Export recordings from channel 1 for a specific day +nvr-export export \ + --host 192.168.1.100 \ + --user admin \ + --channel 1 \ + --start "2024-12-28 00:00" \ + --end "2024-12-28 23:59" \ + --output ./exports + +# Export only motion recordings +nvr-export export \ + --host 192.168.1.100 \ + --user admin \ + --channel 2 \ + --start "2024-12-01" \ + --end "2024-12-31" \ + --type motion \ + --output ./exports +``` + +### List channels + +```bash +nvr-export channels --host 192.168.1.100 --user admin +``` + +### Search recordings (without downloading) + +```bash +nvr-export search \ + --host 192.168.1.100 \ + --user admin \ + --channel 1 \ + --start "2024-12-28" \ + --end "2024-12-29" +``` + +## Supported Time Formats + +- `YYYY-MM-DD HH:MM:SS` +- `YYYY-MM-DD HH:MM` +- `YYYY-MM-DD` +- `DD.MM.YYYY HH:MM:SS` +- `DD.MM.YYYY HH:MM` +- `DD.MM.YYYY` + +## Tested NVR Models + +- VIGI NVR4032H (Firmware 1.4.0) + +Should work with other VIGI NVR models supporting OpenAPI: +- VIGI NVR1008H +- VIGI NVR1016H +- VIGI NVR2016H +- VIGI NVR4016H + +## Troubleshooting + +### Connection refused +- Ensure OpenAPI is enabled on the NVR +- Check firewall allows port 20443 +- Verify NVR IP address + +### Authentication failed +- Verify username and password +- Ensure user has admin privileges + +### No recordings found +- Check the channel ID exists (use `channels` command) +- Verify recordings exist for the time range +- Try with `--type all` + +## License + +MIT License + +## Contributing + +Contributions welcome! Please open an issue first to discuss changes. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..8b6923c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,59 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "tplink-nvr-export" +version = "0.1.0" +description = "CLI tool to export video recordings from TP-Link Vigi NVRs" +readme = "README.md" +license = "MIT" +requires-python = ">=3.10" +authors = [ + { name = "Johannes" } +] +keywords = ["tp-link", "vigi", "nvr", "video", "export", "cli"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Environment :: Console", + "Intended Audience :: System Administrators", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Multimedia :: Video", +] +dependencies = [ + "requests>=2.31.0", + "click>=8.1.0", + "tqdm>=4.66.0", + "urllib3>=2.0.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.4.0", + "pytest-cov>=4.1.0", + "black>=23.0.0", + "ruff>=0.1.0", +] + +[project.scripts] +nvr-export = "tplink_nvr_export.cli:main" + +[project.urls] +Homepage = "https://github.com/johannes/tplink-nvr-export" +Repository = "https://github.com/johannes/tplink-nvr-export" + +[tool.hatch.build.targets.wheel] +packages = ["src/tplink_nvr_export"] + +[tool.black] +line-length = 100 +target-version = ["py310"] + +[tool.ruff] +line-length = 100 +target-version = "py310" diff --git a/src/tplink_nvr_export/__init__.py b/src/tplink_nvr_export/__init__.py new file mode 100644 index 0000000..88653f1 --- /dev/null +++ b/src/tplink_nvr_export/__init__.py @@ -0,0 +1,3 @@ +"""TP-Link Vigi NVR Export Tool.""" + +__version__ = "0.1.0" diff --git a/src/tplink_nvr_export/auth.py b/src/tplink_nvr_export/auth.py new file mode 100644 index 0000000..5d9f803 --- /dev/null +++ b/src/tplink_nvr_export/auth.py @@ -0,0 +1,150 @@ +"""Authentication module for TP-Link Vigi NVR OpenAPI.""" + +import hashlib +import time +from dataclasses import dataclass +from typing import Optional + +import requests +from requests.auth import HTTPDigestAuth + + +@dataclass +class AuthSession: + """Holds authentication session data.""" + + access_token: str + token_type: str + expires_at: float + + @property + def is_expired(self) -> bool: + """Check if token has expired.""" + return time.time() >= self.expires_at + + @property + def authorization_header(self) -> str: + """Get Authorization header value.""" + return f"{self.token_type} {self.access_token}" + + +class NVRAuthenticator: + """Handles authentication with TP-Link Vigi NVR OpenAPI.""" + + def __init__( + self, + host: str, + username: str, + password: str, + port: int = 20443, + verify_ssl: bool = False, + ): + """ + Initialize authenticator. + + Args: + host: NVR IP address or hostname + username: Admin username + password: Admin password + port: OpenAPI port (default: 20443) + verify_ssl: Whether to verify SSL certificates + """ + self.host = host + self.username = username + self.password = password + self.port = port + self.verify_ssl = verify_ssl + self.base_url = f"https://{host}:{port}" + self._session: Optional[AuthSession] = None + self._http_session = requests.Session() + self._http_session.verify = verify_ssl + + # Suppress SSL warnings if not verifying + if not verify_ssl: + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + @property + def session(self) -> Optional[AuthSession]: + """Get current auth session, refreshing if expired.""" + if self._session is None or self._session.is_expired: + self._session = self._authenticate() + return self._session + + def _authenticate(self) -> AuthSession: + """ + Authenticate with NVR and obtain access token. + + The NVR uses HTTP Digest Authentication for the initial login, + then returns a bearer token for subsequent requests. + + Returns: + AuthSession with access token + + Raises: + AuthenticationError: If authentication fails + """ + login_url = f"{self.base_url}/api/v1/login" + + try: + # First attempt with Digest Auth + response = self._http_session.post( + login_url, + auth=HTTPDigestAuth(self.username, self.password), + json={"method": "login"}, + timeout=30, + ) + response.raise_for_status() + + data = response.json() + + if data.get("error_code", 0) != 0: + error_msg = data.get("error_msg", "Unknown error") + raise AuthenticationError(f"Login failed: {error_msg}") + + result = data.get("result", {}) + access_token = result.get("stok", "") + + if not access_token: + raise AuthenticationError("No access token in response") + + # Token typically expires in 1 hour, refresh at 50 minutes + expires_at = time.time() + (50 * 60) + + return AuthSession( + access_token=access_token, + token_type="Bearer", + expires_at=expires_at, + ) + + except requests.RequestException as e: + raise AuthenticationError(f"Connection failed: {e}") from e + + def get_authenticated_session(self) -> requests.Session: + """ + Get a requests session with authentication headers configured. + + Returns: + Configured requests.Session + """ + session = self.session + if session: + self._http_session.headers.update({ + "Authorization": session.authorization_header, + }) + return self._http_session + + def close(self) -> None: + """Close the HTTP session.""" + self._http_session.close() + + def __enter__(self) -> "NVRAuthenticator": + return self + + def __exit__(self, *args) -> None: + self.close() + + +class AuthenticationError(Exception): + """Raised when authentication with NVR fails.""" + pass diff --git a/src/tplink_nvr_export/cli.py b/src/tplink_nvr_export/cli.py new file mode 100644 index 0000000..9e58556 --- /dev/null +++ b/src/tplink_nvr_export/cli.py @@ -0,0 +1,242 @@ +"""Command-line interface for TP-Link Vigi NVR Export.""" + +import sys +from datetime import datetime +from pathlib import Path +from typing import Optional + +import click + +from . import __version__ +from .auth import AuthenticationError +from .nvr_client import NVRAPIError, NVRClient + + +def parse_datetime(value: str) -> datetime: + """Parse datetime from various formats.""" + formats = [ + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%d %H:%M", + "%Y-%m-%d", + "%d.%m.%Y %H:%M:%S", + "%d.%m.%Y %H:%M", + "%d.%m.%Y", + ] + + for fmt in formats: + try: + return datetime.strptime(value, fmt) + except ValueError: + continue + + raise click.BadParameter( + f"Invalid datetime format: {value}. " + f"Use 'YYYY-MM-DD HH:MM' or 'DD.MM.YYYY HH:MM'" + ) + + +class DateTimeParamType(click.ParamType): + """Click parameter type for datetime values.""" + + name = "datetime" + + def convert(self, value, param, ctx): + if value is None: + return None + if isinstance(value, datetime): + return value + try: + return parse_datetime(value) + except click.BadParameter as e: + self.fail(str(e), param, ctx) + + +DATETIME = DateTimeParamType() + + +@click.group() +@click.version_option(version=__version__, prog_name="nvr-export") +def main(): + """TP-Link Vigi NVR Export Tool. + + Export video recordings from TP-Link Vigi NVRs via OpenAPI. + + Make sure OpenAPI is enabled on your NVR: + Settings > Network > OpenAPI (default port: 20443) + """ + pass + + +@main.command() +@click.option("--host", "-h", required=True, help="NVR IP address or hostname") +@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)") +@click.option("--user", "-u", required=True, help="Admin username") +@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password") +@click.option("--channel", "-c", required=True, type=int, help="Camera channel ID (1-based)") +@click.option("--start", "-s", required=True, type=DATETIME, help="Start time (YYYY-MM-DD HH:MM)") +@click.option("--end", "-e", required=True, type=DATETIME, help="End time (YYYY-MM-DD HH:MM)") +@click.option("--output", "-o", required=True, type=click.Path(), help="Output directory") +@click.option("--type", "rec_type", default="all", + type=click.Choice(["all", "continuous", "motion", "alarm"]), + help="Recording type filter") +@click.option("--no-ssl-verify", is_flag=True, default=True, help="Skip SSL certificate verification") +@click.option("--quiet", "-q", is_flag=True, help="Suppress progress output") +def export( + host: str, + port: int, + user: str, + password: str, + channel: int, + start: datetime, + end: datetime, + output: str, + rec_type: str, + no_ssl_verify: bool, + quiet: bool, +): + """Export recordings from NVR for a time range. + + \b + Examples: + # Export channel 1 for a specific day + nvr-export export -h 192.168.1.100 -u admin -c 1 \\ + -s "2024-12-28 00:00" -e "2024-12-28 23:59" -o ./exports + + # Export only motion recordings + nvr-export export -h 192.168.1.100 -u admin -c 2 \\ + -s "2024-12-01" -e "2024-12-31" --type motion -o ./exports + """ + output_dir = Path(output) + + if not quiet: + click.echo(f"Connecting to NVR at {host}:{port}...") + + try: + with NVRClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client: + if not quiet: + click.echo(f"Searching recordings: Channel {channel}, {start} to {end}") + + recordings = client.search_recordings(channel, start, end, rec_type) + + if not recordings: + click.echo("No recordings found for the specified time range.") + return + + # Calculate total size + total_size_mb = sum(r.size_bytes for r in recordings) / (1024 * 1024) + + if not quiet: + click.echo(f"Found {len(recordings)} recordings ({total_size_mb:.1f} MB total)") + + # Download recordings + downloaded = client.export_time_range( + channel, start, end, output_dir, rec_type, show_progress=not quiet + ) + + if not quiet: + click.echo(f"\nSuccessfully exported {len(downloaded)} recordings to {output_dir}") + + except AuthenticationError as e: + click.echo(f"Authentication failed: {e}", err=True) + sys.exit(1) + except NVRAPIError as e: + click.echo(f"NVR API error: {e}", err=True) + sys.exit(1) + except Exception as e: + click.echo(f"Unexpected error: {e}", err=True) + sys.exit(1) + + +@main.command() +@click.option("--host", "-h", required=True, help="NVR IP address or hostname") +@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)") +@click.option("--user", "-u", required=True, help="Admin username") +@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password") +@click.option("--no-ssl-verify", is_flag=True, default=True, help="Skip SSL certificate verification") +def channels( + host: str, + port: int, + user: str, + password: str, + no_ssl_verify: bool, +): + """List available camera channels on NVR.""" + try: + with NVRClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client: + click.echo(f"Connecting to NVR at {host}:{port}...") + channel_list = client.get_channels() + + if not channel_list: + click.echo("No channels found.") + return + + click.echo(f"\nFound {len(channel_list)} channels:") + click.echo("-" * 40) + + for ch in channel_list: + status = "✓" if ch.enabled else "✗" + click.echo(f" [{status}] Channel {ch.id}: {ch.name}") + + except AuthenticationError as e: + click.echo(f"Authentication failed: {e}", err=True) + sys.exit(1) + except NVRAPIError as e: + click.echo(f"NVR API error: {e}", err=True) + sys.exit(1) + + +@main.command() +@click.option("--host", "-h", required=True, help="NVR IP address or hostname") +@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)") +@click.option("--user", "-u", required=True, help="Admin username") +@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password") +@click.option("--channel", "-c", required=True, type=int, help="Camera channel ID") +@click.option("--start", "-s", required=True, type=DATETIME, help="Start time") +@click.option("--end", "-e", required=True, type=DATETIME, help="End time") +@click.option("--type", "rec_type", default="all", + type=click.Choice(["all", "continuous", "motion", "alarm"])) +@click.option("--no-ssl-verify", is_flag=True, default=True) +def search( + host: str, + port: int, + user: str, + password: str, + channel: int, + start: datetime, + end: datetime, + rec_type: str, + no_ssl_verify: bool, +): + """Search for recordings without downloading.""" + try: + with NVRClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client: + recordings = client.search_recordings(channel, start, end, rec_type) + + if not recordings: + click.echo("No recordings found.") + return + + total_size = sum(r.size_bytes for r in recordings) / (1024 * 1024) + total_duration = sum(r.duration_seconds for r in recordings) + + click.echo(f"\nFound {len(recordings)} recordings:") + click.echo(f"Total size: {total_size:.1f} MB") + click.echo(f"Total duration: {total_duration // 3600}h {(total_duration % 3600) // 60}m") + click.echo("-" * 60) + + for rec in recordings: + click.echo( + f" {rec.start_time:%Y-%m-%d %H:%M} - {rec.end_time:%H:%M} " + f"({rec.size_mb:.1f} MB, {rec.recording_type})" + ) + + except AuthenticationError as e: + click.echo(f"Authentication failed: {e}", err=True) + sys.exit(1) + except NVRAPIError as e: + click.echo(f"NVR API error: {e}", err=True) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/tplink_nvr_export/models.py b/src/tplink_nvr_export/models.py new file mode 100644 index 0000000..5a4ceab --- /dev/null +++ b/src/tplink_nvr_export/models.py @@ -0,0 +1,68 @@ +"""Data models for NVR recordings and channels.""" + +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + + +@dataclass +class Channel: + """Represents a camera channel on the NVR.""" + + id: int + name: str + enabled: bool = True + + def __str__(self) -> str: + return f"Channel {self.id}: {self.name}" + + +@dataclass +class Recording: + """Represents a video recording segment.""" + + id: str + channel_id: int + start_time: datetime + end_time: datetime + size_bytes: int + recording_type: str # "continuous", "motion", "alarm", etc. + file_path: Optional[str] = None + + @property + def duration_seconds(self) -> int: + """Get recording duration in seconds.""" + return int((self.end_time - self.start_time).total_seconds()) + + @property + def size_mb(self) -> float: + """Get recording size in megabytes.""" + return self.size_bytes / (1024 * 1024) + + def __str__(self) -> str: + return ( + f"Recording {self.id}: Ch{self.channel_id} " + f"{self.start_time.strftime('%Y-%m-%d %H:%M')} - " + f"{self.end_time.strftime('%H:%M')} ({self.size_mb:.1f} MB)" + ) + + +@dataclass +class ExportJob: + """Represents an export job with multiple recordings.""" + + channel_id: int + start_time: datetime + end_time: datetime + recordings: list[Recording] + output_dir: str + + @property + def total_size_bytes(self) -> int: + """Total size of all recordings.""" + return sum(r.size_bytes for r in self.recordings) + + @property + def total_duration_seconds(self) -> int: + """Total duration of all recordings.""" + return sum(r.duration_seconds for r in self.recordings) diff --git a/src/tplink_nvr_export/nvr_client.py b/src/tplink_nvr_export/nvr_client.py new file mode 100644 index 0000000..2794065 --- /dev/null +++ b/src/tplink_nvr_export/nvr_client.py @@ -0,0 +1,346 @@ +"""NVR API client for interacting with TP-Link Vigi NVR.""" + +import re +from datetime import datetime +from pathlib import Path +from typing import Callable, Optional + +import requests +from tqdm import tqdm + +from .auth import AuthenticationError, NVRAuthenticator +from .models import Channel, ExportJob, Recording + + +class NVRClient: + """Client for TP-Link Vigi NVR API operations.""" + + def __init__( + self, + host: str, + username: str, + password: str, + port: int = 20443, + verify_ssl: bool = False, + ): + """ + Initialize NVR client. + + Args: + host: NVR IP address or hostname + username: Admin username + password: Admin password + port: OpenAPI port (default: 20443) + verify_ssl: Whether to verify SSL certificates + """ + self.host = host + self.port = port + self.base_url = f"https://{host}:{port}" + self.auth = NVRAuthenticator(host, username, password, port, verify_ssl) + self._session: Optional[requests.Session] = None + + @property + def session(self) -> requests.Session: + """Get authenticated HTTP session.""" + if self._session is None: + self._session = self.auth.get_authenticated_session() + return self._session + + def _api_request( + self, + method: str, + endpoint: str, + **kwargs, + ) -> dict: + """ + Make an authenticated API request. + + Args: + method: HTTP method (GET, POST, etc.) + endpoint: API endpoint path + **kwargs: Additional arguments for requests + + Returns: + JSON response as dict + + Raises: + NVRAPIError: If request fails + """ + url = f"{self.base_url}{endpoint}" + + try: + response = self.session.request(method, url, timeout=60, **kwargs) + response.raise_for_status() + + data = response.json() + + if data.get("error_code", 0) != 0: + error_msg = data.get("error_msg", "Unknown API error") + raise NVRAPIError(f"API error: {error_msg}") + + return data + + except requests.RequestException as e: + raise NVRAPIError(f"Request failed: {e}") from e + + def get_channels(self) -> list[Channel]: + """ + Get list of camera channels configured on NVR. + + Returns: + List of Channel objects + """ + # Try the standard device/channels endpoint + try: + data = self._api_request("POST", "/api/v1/channels", json={"method": "get"}) + channels = [] + + for ch_data in data.get("result", {}).get("channel_list", []): + channels.append(Channel( + id=ch_data.get("channel_id", 0), + name=ch_data.get("channel_name", f"Channel {ch_data.get('channel_id', 0)}"), + enabled=ch_data.get("enabled", True), + )) + + return channels + + except NVRAPIError: + # Fallback: try alternative endpoint structure + data = self._api_request("POST", "/api/v1/device", json={"method": "getChannels"}) + channels = [] + + for ch_data in data.get("result", {}).get("channels", []): + channels.append(Channel( + id=ch_data.get("id", 0), + name=ch_data.get("name", f"Channel {ch_data.get('id', 0)}"), + enabled=ch_data.get("status", "on") == "on", + )) + + return channels + + def search_recordings( + self, + channel_id: int, + start_time: datetime, + end_time: datetime, + recording_type: str = "all", + ) -> list[Recording]: + """ + Search for recordings in a time range. + + Args: + channel_id: Camera channel ID (1-based) + start_time: Start of time range + end_time: End of time range + recording_type: Type filter ("all", "continuous", "motion", "alarm") + + Returns: + List of Recording objects matching criteria + """ + # Format times as expected by NVR API (typically Unix timestamp or ISO format) + start_ts = int(start_time.timestamp()) + end_ts = int(end_time.timestamp()) + + type_map = { + "all": 0, + "continuous": 1, + "motion": 2, + "alarm": 4, + } + rec_type = type_map.get(recording_type, 0) + + # Try primary search endpoint + try: + data = self._api_request( + "POST", + "/api/v1/playback/search", + json={ + "method": "searchRecordings", + "channel_id": channel_id, + "start_time": start_ts, + "end_time": end_ts, + "record_type": rec_type, + }, + ) + except NVRAPIError: + # Fallback endpoint format + data = self._api_request( + "POST", + "/api/v1/record/search", + json={ + "method": "search", + "params": { + "channel": channel_id, + "start": start_ts, + "end": end_ts, + "type": rec_type, + }, + }, + ) + + recordings = [] + result = data.get("result", {}) + record_list = result.get("record_list", result.get("recordings", [])) + + for rec_data in record_list: + # Parse timestamps (could be Unix or string format) + rec_start = self._parse_timestamp(rec_data.get("start_time", rec_data.get("start", 0))) + rec_end = self._parse_timestamp(rec_data.get("end_time", rec_data.get("end", 0))) + + recordings.append(Recording( + id=str(rec_data.get("record_id", rec_data.get("id", ""))), + channel_id=channel_id, + start_time=rec_start, + end_time=rec_end, + size_bytes=rec_data.get("size", rec_data.get("file_size", 0)), + recording_type=rec_data.get("type", rec_data.get("record_type", "unknown")), + file_path=rec_data.get("file_path", rec_data.get("path")), + )) + + return recordings + + def _parse_timestamp(self, ts) -> datetime: + """Parse timestamp from various formats.""" + if isinstance(ts, (int, float)): + return datetime.fromtimestamp(ts) + if isinstance(ts, str): + # Try common formats + for fmt in ["%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"]: + try: + return datetime.strptime(ts, fmt) + except ValueError: + continue + # Try Unix timestamp as string + return datetime.fromtimestamp(int(ts)) + return datetime.now() + + def download_recording( + self, + recording: Recording, + output_path: Path, + progress_callback: Optional[Callable[[int, int], None]] = None, + ) -> Path: + """ + Download a single recording to disk. + + Args: + recording: Recording to download + output_path: Directory or file path for output + progress_callback: Optional callback(bytes_downloaded, total_bytes) + + Returns: + Path to downloaded file + """ + # Construct download URL + if recording.file_path: + download_url = f"{self.base_url}/api/v1/playback/download?path={recording.file_path}" + else: + download_url = ( + f"{self.base_url}/api/v1/playback/download" + f"?record_id={recording.id}" + f"&channel_id={recording.channel_id}" + ) + + # Determine output filename + if output_path.is_dir(): + filename = self._generate_filename(recording) + output_file = output_path / filename + else: + output_file = output_path + + # Ensure parent directory exists + output_file.parent.mkdir(parents=True, exist_ok=True) + + try: + response = self.session.get(download_url, stream=True, timeout=300) + response.raise_for_status() + + # Get total size from headers or recording metadata + total_size = int(response.headers.get("content-length", recording.size_bytes)) + + downloaded = 0 + chunk_size = 8192 + + with open(output_file, "wb") as f: + for chunk in response.iter_content(chunk_size=chunk_size): + if chunk: + f.write(chunk) + downloaded += len(chunk) + if progress_callback: + progress_callback(downloaded, total_size) + + return output_file + + except requests.RequestException as e: + raise NVRAPIError(f"Download failed: {e}") from e + + def _generate_filename(self, recording: Recording) -> str: + """Generate a filename for a recording.""" + start_str = recording.start_time.strftime("%Y%m%d_%H%M%S") + end_str = recording.end_time.strftime("%H%M%S") + return f"ch{recording.channel_id}_{start_str}-{end_str}.mp4" + + def export_time_range( + self, + channel_id: int, + start_time: datetime, + end_time: datetime, + output_dir: Path, + recording_type: str = "all", + show_progress: bool = True, + ) -> list[Path]: + """ + Export all recordings in a time range. + + Args: + channel_id: Camera channel ID + start_time: Start of time range + end_time: End of time range + output_dir: Directory to save recordings + recording_type: Type filter + show_progress: Whether to show progress bars + + Returns: + List of paths to downloaded files + """ + # Search for recordings + recordings = self.search_recordings(channel_id, start_time, end_time, recording_type) + + if not recordings: + return [] + + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + downloaded_files = [] + + # Download with progress + recordings_iter = tqdm(recordings, desc="Downloading", disable=not show_progress) + + for recording in recordings_iter: + if show_progress: + recordings_iter.set_postfix_str(f"Ch{recording.channel_id} {recording.start_time:%H:%M}") + + try: + output_file = self.download_recording(recording, output_dir) + downloaded_files.append(output_file) + except NVRAPIError as e: + if show_progress: + tqdm.write(f"Warning: Failed to download {recording}: {e}") + + return downloaded_files + + def close(self) -> None: + """Close connections.""" + self.auth.close() + + def __enter__(self) -> "NVRClient": + return self + + def __exit__(self, *args) -> None: + self.close() + + +class NVRAPIError(Exception): + """Raised when NVR API operations fail.""" + pass