feat: Add CLI tool to export recordings from TP-Link Vigi NVRs.

This commit is contained in:
MrUnknownDE
2025-12-30 14:10:08 +01:00
parent aa4c9947f3
commit e8a77bfff6
7 changed files with 995 additions and 2 deletions

129
README.md
View File

@@ -1,2 +1,127 @@
# tplink-nvr-export
A CLI tool that exports videos from TP-Link Vigi NVRs over a certain period of time.
# TP-Link Vigi NVR Export
A CLI tool to export video recordings from TP-Link Vigi NVRs over a specified time period.
> ⚠️ **Note**: This tool is in early development. API endpoints may need adjustment based on your specific NVR model and firmware version.
## Features
- 📹 Export recordings by time range
- 📅 Filter by date/time with flexible formats
- 🎯 Filter by recording type (continuous, motion, alarm)
- 📊 Progress bar during downloads
- 🔒 Secure authentication via OpenAPI
## Requirements
- Python 3.10+
- TP-Link Vigi NVR with OpenAPI enabled
- Network access to NVR on port 20443 (default)
## Installation
```bash
# Clone the repository
git clone https://github.com/johannes/tplink-nvr-export.git
cd tplink-nvr-export
# Install with pip
pip install -e .
# Or with pipx for isolated environment
pipx install .
```
## NVR Setup
Before using this tool, enable OpenAPI on your NVR:
1. Open NVR web interface (https://your-nvr-ip)
2. Navigate to **Settings > Network > OpenAPI**
3. Enable OpenAPI
4. Note the port (default: 20443)
## Usage
### Export recordings
```bash
# Export recordings from channel 1 for a specific day
nvr-export export \
--host 192.168.1.100 \
--user admin \
--channel 1 \
--start "2024-12-28 00:00" \
--end "2024-12-28 23:59" \
--output ./exports
# Export only motion recordings
nvr-export export \
--host 192.168.1.100 \
--user admin \
--channel 2 \
--start "2024-12-01" \
--end "2024-12-31" \
--type motion \
--output ./exports
```
### List channels
```bash
nvr-export channels --host 192.168.1.100 --user admin
```
### Search recordings (without downloading)
```bash
nvr-export search \
--host 192.168.1.100 \
--user admin \
--channel 1 \
--start "2024-12-28" \
--end "2024-12-29"
```
## Supported Time Formats
- `YYYY-MM-DD HH:MM:SS`
- `YYYY-MM-DD HH:MM`
- `YYYY-MM-DD`
- `DD.MM.YYYY HH:MM:SS`
- `DD.MM.YYYY HH:MM`
- `DD.MM.YYYY`
## Tested NVR Models
- VIGI NVR4032H (Firmware 1.4.0)
Should work with other VIGI NVR models supporting OpenAPI:
- VIGI NVR1008H
- VIGI NVR1016H
- VIGI NVR2016H
- VIGI NVR4016H
## Troubleshooting
### Connection refused
- Ensure OpenAPI is enabled on the NVR
- Check firewall allows port 20443
- Verify NVR IP address
### Authentication failed
- Verify username and password
- Ensure user has admin privileges
### No recordings found
- Check the channel ID exists (use `channels` command)
- Verify recordings exist for the time range
- Try with `--type all`
## License
MIT License
## Contributing
Contributions welcome! Please open an issue first to discuss changes.

59
pyproject.toml Normal file
View File

@@ -0,0 +1,59 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "tplink-nvr-export"
version = "0.1.0"
description = "CLI tool to export video recordings from TP-Link Vigi NVRs"
readme = "README.md"
license = "MIT"
requires-python = ">=3.10"
authors = [
{ name = "Johannes" }
]
keywords = ["tp-link", "vigi", "nvr", "video", "export", "cli"]
classifiers = [
"Development Status :: 3 - Alpha",
"Environment :: Console",
"Intended Audience :: System Administrators",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Multimedia :: Video",
]
dependencies = [
"requests>=2.31.0",
"click>=8.1.0",
"tqdm>=4.66.0",
"urllib3>=2.0.0",
]
[project.optional-dependencies]
dev = [
"pytest>=7.4.0",
"pytest-cov>=4.1.0",
"black>=23.0.0",
"ruff>=0.1.0",
]
[project.scripts]
nvr-export = "tplink_nvr_export.cli:main"
[project.urls]
Homepage = "https://github.com/johannes/tplink-nvr-export"
Repository = "https://github.com/johannes/tplink-nvr-export"
[tool.hatch.build.targets.wheel]
packages = ["src/tplink_nvr_export"]
[tool.black]
line-length = 100
target-version = ["py310"]
[tool.ruff]
line-length = 100
target-version = "py310"

View File

@@ -0,0 +1,3 @@
"""TP-Link Vigi NVR Export Tool."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,150 @@
"""Authentication module for TP-Link Vigi NVR OpenAPI."""
import hashlib
import time
from dataclasses import dataclass
from typing import Optional
import requests
from requests.auth import HTTPDigestAuth
@dataclass
class AuthSession:
"""Holds authentication session data."""
access_token: str
token_type: str
expires_at: float
@property
def is_expired(self) -> bool:
"""Check if token has expired."""
return time.time() >= self.expires_at
@property
def authorization_header(self) -> str:
"""Get Authorization header value."""
return f"{self.token_type} {self.access_token}"
class NVRAuthenticator:
"""Handles authentication with TP-Link Vigi NVR OpenAPI."""
def __init__(
self,
host: str,
username: str,
password: str,
port: int = 20443,
verify_ssl: bool = False,
):
"""
Initialize authenticator.
Args:
host: NVR IP address or hostname
username: Admin username
password: Admin password
port: OpenAPI port (default: 20443)
verify_ssl: Whether to verify SSL certificates
"""
self.host = host
self.username = username
self.password = password
self.port = port
self.verify_ssl = verify_ssl
self.base_url = f"https://{host}:{port}"
self._session: Optional[AuthSession] = None
self._http_session = requests.Session()
self._http_session.verify = verify_ssl
# Suppress SSL warnings if not verifying
if not verify_ssl:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@property
def session(self) -> Optional[AuthSession]:
"""Get current auth session, refreshing if expired."""
if self._session is None or self._session.is_expired:
self._session = self._authenticate()
return self._session
def _authenticate(self) -> AuthSession:
"""
Authenticate with NVR and obtain access token.
The NVR uses HTTP Digest Authentication for the initial login,
then returns a bearer token for subsequent requests.
Returns:
AuthSession with access token
Raises:
AuthenticationError: If authentication fails
"""
login_url = f"{self.base_url}/api/v1/login"
try:
# First attempt with Digest Auth
response = self._http_session.post(
login_url,
auth=HTTPDigestAuth(self.username, self.password),
json={"method": "login"},
timeout=30,
)
response.raise_for_status()
data = response.json()
if data.get("error_code", 0) != 0:
error_msg = data.get("error_msg", "Unknown error")
raise AuthenticationError(f"Login failed: {error_msg}")
result = data.get("result", {})
access_token = result.get("stok", "")
if not access_token:
raise AuthenticationError("No access token in response")
# Token typically expires in 1 hour, refresh at 50 minutes
expires_at = time.time() + (50 * 60)
return AuthSession(
access_token=access_token,
token_type="Bearer",
expires_at=expires_at,
)
except requests.RequestException as e:
raise AuthenticationError(f"Connection failed: {e}") from e
def get_authenticated_session(self) -> requests.Session:
"""
Get a requests session with authentication headers configured.
Returns:
Configured requests.Session
"""
session = self.session
if session:
self._http_session.headers.update({
"Authorization": session.authorization_header,
})
return self._http_session
def close(self) -> None:
"""Close the HTTP session."""
self._http_session.close()
def __enter__(self) -> "NVRAuthenticator":
return self
def __exit__(self, *args) -> None:
self.close()
class AuthenticationError(Exception):
"""Raised when authentication with NVR fails."""
pass

View File

@@ -0,0 +1,242 @@
"""Command-line interface for TP-Link Vigi NVR Export."""
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional
import click
from . import __version__
from .auth import AuthenticationError
from .nvr_client import NVRAPIError, NVRClient
def parse_datetime(value: str) -> datetime:
"""Parse datetime from various formats."""
formats = [
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%Y-%m-%d",
"%d.%m.%Y %H:%M:%S",
"%d.%m.%Y %H:%M",
"%d.%m.%Y",
]
for fmt in formats:
try:
return datetime.strptime(value, fmt)
except ValueError:
continue
raise click.BadParameter(
f"Invalid datetime format: {value}. "
f"Use 'YYYY-MM-DD HH:MM' or 'DD.MM.YYYY HH:MM'"
)
class DateTimeParamType(click.ParamType):
"""Click parameter type for datetime values."""
name = "datetime"
def convert(self, value, param, ctx):
if value is None:
return None
if isinstance(value, datetime):
return value
try:
return parse_datetime(value)
except click.BadParameter as e:
self.fail(str(e), param, ctx)
DATETIME = DateTimeParamType()
@click.group()
@click.version_option(version=__version__, prog_name="nvr-export")
def main():
"""TP-Link Vigi NVR Export Tool.
Export video recordings from TP-Link Vigi NVRs via OpenAPI.
Make sure OpenAPI is enabled on your NVR:
Settings > Network > OpenAPI (default port: 20443)
"""
pass
@main.command()
@click.option("--host", "-h", required=True, help="NVR IP address or hostname")
@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)")
@click.option("--user", "-u", required=True, help="Admin username")
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
@click.option("--channel", "-c", required=True, type=int, help="Camera channel ID (1-based)")
@click.option("--start", "-s", required=True, type=DATETIME, help="Start time (YYYY-MM-DD HH:MM)")
@click.option("--end", "-e", required=True, type=DATETIME, help="End time (YYYY-MM-DD HH:MM)")
@click.option("--output", "-o", required=True, type=click.Path(), help="Output directory")
@click.option("--type", "rec_type", default="all",
type=click.Choice(["all", "continuous", "motion", "alarm"]),
help="Recording type filter")
@click.option("--no-ssl-verify", is_flag=True, default=True, help="Skip SSL certificate verification")
@click.option("--quiet", "-q", is_flag=True, help="Suppress progress output")
def export(
host: str,
port: int,
user: str,
password: str,
channel: int,
start: datetime,
end: datetime,
output: str,
rec_type: str,
no_ssl_verify: bool,
quiet: bool,
):
"""Export recordings from NVR for a time range.
\b
Examples:
# Export channel 1 for a specific day
nvr-export export -h 192.168.1.100 -u admin -c 1 \\
-s "2024-12-28 00:00" -e "2024-12-28 23:59" -o ./exports
# Export only motion recordings
nvr-export export -h 192.168.1.100 -u admin -c 2 \\
-s "2024-12-01" -e "2024-12-31" --type motion -o ./exports
"""
output_dir = Path(output)
if not quiet:
click.echo(f"Connecting to NVR at {host}:{port}...")
try:
with NVRClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client:
if not quiet:
click.echo(f"Searching recordings: Channel {channel}, {start} to {end}")
recordings = client.search_recordings(channel, start, end, rec_type)
if not recordings:
click.echo("No recordings found for the specified time range.")
return
# Calculate total size
total_size_mb = sum(r.size_bytes for r in recordings) / (1024 * 1024)
if not quiet:
click.echo(f"Found {len(recordings)} recordings ({total_size_mb:.1f} MB total)")
# Download recordings
downloaded = client.export_time_range(
channel, start, end, output_dir, rec_type, show_progress=not quiet
)
if not quiet:
click.echo(f"\nSuccessfully exported {len(downloaded)} recordings to {output_dir}")
except AuthenticationError as e:
click.echo(f"Authentication failed: {e}", err=True)
sys.exit(1)
except NVRAPIError as e:
click.echo(f"NVR API error: {e}", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Unexpected error: {e}", err=True)
sys.exit(1)
@main.command()
@click.option("--host", "-h", required=True, help="NVR IP address or hostname")
@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)")
@click.option("--user", "-u", required=True, help="Admin username")
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
@click.option("--no-ssl-verify", is_flag=True, default=True, help="Skip SSL certificate verification")
def channels(
host: str,
port: int,
user: str,
password: str,
no_ssl_verify: bool,
):
"""List available camera channels on NVR."""
try:
with NVRClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client:
click.echo(f"Connecting to NVR at {host}:{port}...")
channel_list = client.get_channels()
if not channel_list:
click.echo("No channels found.")
return
click.echo(f"\nFound {len(channel_list)} channels:")
click.echo("-" * 40)
for ch in channel_list:
status = "" if ch.enabled else ""
click.echo(f" [{status}] Channel {ch.id}: {ch.name}")
except AuthenticationError as e:
click.echo(f"Authentication failed: {e}", err=True)
sys.exit(1)
except NVRAPIError as e:
click.echo(f"NVR API error: {e}", err=True)
sys.exit(1)
@main.command()
@click.option("--host", "-h", required=True, help="NVR IP address or hostname")
@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)")
@click.option("--user", "-u", required=True, help="Admin username")
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
@click.option("--channel", "-c", required=True, type=int, help="Camera channel ID")
@click.option("--start", "-s", required=True, type=DATETIME, help="Start time")
@click.option("--end", "-e", required=True, type=DATETIME, help="End time")
@click.option("--type", "rec_type", default="all",
type=click.Choice(["all", "continuous", "motion", "alarm"]))
@click.option("--no-ssl-verify", is_flag=True, default=True)
def search(
host: str,
port: int,
user: str,
password: str,
channel: int,
start: datetime,
end: datetime,
rec_type: str,
no_ssl_verify: bool,
):
"""Search for recordings without downloading."""
try:
with NVRClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client:
recordings = client.search_recordings(channel, start, end, rec_type)
if not recordings:
click.echo("No recordings found.")
return
total_size = sum(r.size_bytes for r in recordings) / (1024 * 1024)
total_duration = sum(r.duration_seconds for r in recordings)
click.echo(f"\nFound {len(recordings)} recordings:")
click.echo(f"Total size: {total_size:.1f} MB")
click.echo(f"Total duration: {total_duration // 3600}h {(total_duration % 3600) // 60}m")
click.echo("-" * 60)
for rec in recordings:
click.echo(
f" {rec.start_time:%Y-%m-%d %H:%M} - {rec.end_time:%H:%M} "
f"({rec.size_mb:.1f} MB, {rec.recording_type})"
)
except AuthenticationError as e:
click.echo(f"Authentication failed: {e}", err=True)
sys.exit(1)
except NVRAPIError as e:
click.echo(f"NVR API error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,68 @@
"""Data models for NVR recordings and channels."""
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
@dataclass
class Channel:
"""Represents a camera channel on the NVR."""
id: int
name: str
enabled: bool = True
def __str__(self) -> str:
return f"Channel {self.id}: {self.name}"
@dataclass
class Recording:
"""Represents a video recording segment."""
id: str
channel_id: int
start_time: datetime
end_time: datetime
size_bytes: int
recording_type: str # "continuous", "motion", "alarm", etc.
file_path: Optional[str] = None
@property
def duration_seconds(self) -> int:
"""Get recording duration in seconds."""
return int((self.end_time - self.start_time).total_seconds())
@property
def size_mb(self) -> float:
"""Get recording size in megabytes."""
return self.size_bytes / (1024 * 1024)
def __str__(self) -> str:
return (
f"Recording {self.id}: Ch{self.channel_id} "
f"{self.start_time.strftime('%Y-%m-%d %H:%M')} - "
f"{self.end_time.strftime('%H:%M')} ({self.size_mb:.1f} MB)"
)
@dataclass
class ExportJob:
"""Represents an export job with multiple recordings."""
channel_id: int
start_time: datetime
end_time: datetime
recordings: list[Recording]
output_dir: str
@property
def total_size_bytes(self) -> int:
"""Total size of all recordings."""
return sum(r.size_bytes for r in self.recordings)
@property
def total_duration_seconds(self) -> int:
"""Total duration of all recordings."""
return sum(r.duration_seconds for r in self.recordings)

View File

@@ -0,0 +1,346 @@
"""NVR API client for interacting with TP-Link Vigi NVR."""
import re
from datetime import datetime
from pathlib import Path
from typing import Callable, Optional
import requests
from tqdm import tqdm
from .auth import AuthenticationError, NVRAuthenticator
from .models import Channel, ExportJob, Recording
class NVRClient:
"""Client for TP-Link Vigi NVR API operations."""
def __init__(
self,
host: str,
username: str,
password: str,
port: int = 20443,
verify_ssl: bool = False,
):
"""
Initialize NVR client.
Args:
host: NVR IP address or hostname
username: Admin username
password: Admin password
port: OpenAPI port (default: 20443)
verify_ssl: Whether to verify SSL certificates
"""
self.host = host
self.port = port
self.base_url = f"https://{host}:{port}"
self.auth = NVRAuthenticator(host, username, password, port, verify_ssl)
self._session: Optional[requests.Session] = None
@property
def session(self) -> requests.Session:
"""Get authenticated HTTP session."""
if self._session is None:
self._session = self.auth.get_authenticated_session()
return self._session
def _api_request(
self,
method: str,
endpoint: str,
**kwargs,
) -> dict:
"""
Make an authenticated API request.
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint path
**kwargs: Additional arguments for requests
Returns:
JSON response as dict
Raises:
NVRAPIError: If request fails
"""
url = f"{self.base_url}{endpoint}"
try:
response = self.session.request(method, url, timeout=60, **kwargs)
response.raise_for_status()
data = response.json()
if data.get("error_code", 0) != 0:
error_msg = data.get("error_msg", "Unknown API error")
raise NVRAPIError(f"API error: {error_msg}")
return data
except requests.RequestException as e:
raise NVRAPIError(f"Request failed: {e}") from e
def get_channels(self) -> list[Channel]:
"""
Get list of camera channels configured on NVR.
Returns:
List of Channel objects
"""
# Try the standard device/channels endpoint
try:
data = self._api_request("POST", "/api/v1/channels", json={"method": "get"})
channels = []
for ch_data in data.get("result", {}).get("channel_list", []):
channels.append(Channel(
id=ch_data.get("channel_id", 0),
name=ch_data.get("channel_name", f"Channel {ch_data.get('channel_id', 0)}"),
enabled=ch_data.get("enabled", True),
))
return channels
except NVRAPIError:
# Fallback: try alternative endpoint structure
data = self._api_request("POST", "/api/v1/device", json={"method": "getChannels"})
channels = []
for ch_data in data.get("result", {}).get("channels", []):
channels.append(Channel(
id=ch_data.get("id", 0),
name=ch_data.get("name", f"Channel {ch_data.get('id', 0)}"),
enabled=ch_data.get("status", "on") == "on",
))
return channels
def search_recordings(
self,
channel_id: int,
start_time: datetime,
end_time: datetime,
recording_type: str = "all",
) -> list[Recording]:
"""
Search for recordings in a time range.
Args:
channel_id: Camera channel ID (1-based)
start_time: Start of time range
end_time: End of time range
recording_type: Type filter ("all", "continuous", "motion", "alarm")
Returns:
List of Recording objects matching criteria
"""
# Format times as expected by NVR API (typically Unix timestamp or ISO format)
start_ts = int(start_time.timestamp())
end_ts = int(end_time.timestamp())
type_map = {
"all": 0,
"continuous": 1,
"motion": 2,
"alarm": 4,
}
rec_type = type_map.get(recording_type, 0)
# Try primary search endpoint
try:
data = self._api_request(
"POST",
"/api/v1/playback/search",
json={
"method": "searchRecordings",
"channel_id": channel_id,
"start_time": start_ts,
"end_time": end_ts,
"record_type": rec_type,
},
)
except NVRAPIError:
# Fallback endpoint format
data = self._api_request(
"POST",
"/api/v1/record/search",
json={
"method": "search",
"params": {
"channel": channel_id,
"start": start_ts,
"end": end_ts,
"type": rec_type,
},
},
)
recordings = []
result = data.get("result", {})
record_list = result.get("record_list", result.get("recordings", []))
for rec_data in record_list:
# Parse timestamps (could be Unix or string format)
rec_start = self._parse_timestamp(rec_data.get("start_time", rec_data.get("start", 0)))
rec_end = self._parse_timestamp(rec_data.get("end_time", rec_data.get("end", 0)))
recordings.append(Recording(
id=str(rec_data.get("record_id", rec_data.get("id", ""))),
channel_id=channel_id,
start_time=rec_start,
end_time=rec_end,
size_bytes=rec_data.get("size", rec_data.get("file_size", 0)),
recording_type=rec_data.get("type", rec_data.get("record_type", "unknown")),
file_path=rec_data.get("file_path", rec_data.get("path")),
))
return recordings
def _parse_timestamp(self, ts) -> datetime:
"""Parse timestamp from various formats."""
if isinstance(ts, (int, float)):
return datetime.fromtimestamp(ts)
if isinstance(ts, str):
# Try common formats
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"]:
try:
return datetime.strptime(ts, fmt)
except ValueError:
continue
# Try Unix timestamp as string
return datetime.fromtimestamp(int(ts))
return datetime.now()
def download_recording(
self,
recording: Recording,
output_path: Path,
progress_callback: Optional[Callable[[int, int], None]] = None,
) -> Path:
"""
Download a single recording to disk.
Args:
recording: Recording to download
output_path: Directory or file path for output
progress_callback: Optional callback(bytes_downloaded, total_bytes)
Returns:
Path to downloaded file
"""
# Construct download URL
if recording.file_path:
download_url = f"{self.base_url}/api/v1/playback/download?path={recording.file_path}"
else:
download_url = (
f"{self.base_url}/api/v1/playback/download"
f"?record_id={recording.id}"
f"&channel_id={recording.channel_id}"
)
# Determine output filename
if output_path.is_dir():
filename = self._generate_filename(recording)
output_file = output_path / filename
else:
output_file = output_path
# Ensure parent directory exists
output_file.parent.mkdir(parents=True, exist_ok=True)
try:
response = self.session.get(download_url, stream=True, timeout=300)
response.raise_for_status()
# Get total size from headers or recording metadata
total_size = int(response.headers.get("content-length", recording.size_bytes))
downloaded = 0
chunk_size = 8192
with open(output_file, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if progress_callback:
progress_callback(downloaded, total_size)
return output_file
except requests.RequestException as e:
raise NVRAPIError(f"Download failed: {e}") from e
def _generate_filename(self, recording: Recording) -> str:
"""Generate a filename for a recording."""
start_str = recording.start_time.strftime("%Y%m%d_%H%M%S")
end_str = recording.end_time.strftime("%H%M%S")
return f"ch{recording.channel_id}_{start_str}-{end_str}.mp4"
def export_time_range(
self,
channel_id: int,
start_time: datetime,
end_time: datetime,
output_dir: Path,
recording_type: str = "all",
show_progress: bool = True,
) -> list[Path]:
"""
Export all recordings in a time range.
Args:
channel_id: Camera channel ID
start_time: Start of time range
end_time: End of time range
output_dir: Directory to save recordings
recording_type: Type filter
show_progress: Whether to show progress bars
Returns:
List of paths to downloaded files
"""
# Search for recordings
recordings = self.search_recordings(channel_id, start_time, end_time, recording_type)
if not recordings:
return []
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
downloaded_files = []
# Download with progress
recordings_iter = tqdm(recordings, desc="Downloading", disable=not show_progress)
for recording in recordings_iter:
if show_progress:
recordings_iter.set_postfix_str(f"Ch{recording.channel_id} {recording.start_time:%H:%M}")
try:
output_file = self.download_recording(recording, output_dir)
downloaded_files.append(output_file)
except NVRAPIError as e:
if show_progress:
tqdm.write(f"Warning: Failed to download {recording}: {e}")
return downloaded_files
def close(self) -> None:
"""Close connections."""
self.auth.close()
def __enter__(self) -> "NVRClient":
return self
def __exit__(self, *args) -> None:
self.close()
class NVRAPIError(Exception):
"""Raised when NVR API operations fail."""
pass