2 Commits

4 changed files with 512 additions and 133 deletions
+119 -17
View File
@@ -2,11 +2,11 @@
import hashlib
import time
import urllib.parse
from dataclasses import dataclass
from typing import Optional
import requests
from requests.auth import HTTPDigestAuth
@dataclass
@@ -29,7 +29,11 @@ class AuthSession:
class NVRAuthenticator:
"""Handles authentication with TP-Link Vigi NVR OpenAPI."""
"""Handles authentication with TP-Link Vigi NVR OpenAPI.
The NVR uses HTTP Digest Authentication with SHA-256 to obtain an access_token.
Endpoint: /openapi/token
"""
def __init__(
self,
@@ -71,12 +75,66 @@ class NVRAuthenticator:
self._session = self._authenticate()
return self._session
def _calculate_digest_response(
self,
method: str,
uri: str,
nonce: str,
realm: str,
algorithm: str = "SHA-256",
) -> str:
"""
Calculate Digest Authentication response.
A1 = SHA256(username:realm:password)
A2 = SHA256(method:uri)
response = SHA256(A1:nonce:A2)
"""
if algorithm.upper() in ("SHA-256", "SHA256"):
hash_func = hashlib.sha256
else:
hash_func = hashlib.md5
# Calculate A1
a1_string = f"{self.username}:{realm}:{self.password}"
a1 = hash_func(a1_string.encode()).hexdigest()
# Calculate A2
a2_string = f"{method}:{uri}"
a2 = hash_func(a2_string.encode()).hexdigest()
# Calculate response
response_string = f"{a1}:{nonce}:{a2}"
response = hash_func(response_string.encode()).hexdigest()
return response
def _parse_www_authenticate(self, header: str) -> dict:
"""Parse WWW-Authenticate header into dict."""
# Example: Digest realm="VIGI", nonce="abc123", algorithm=SHA-256
result = {}
# Remove 'Digest ' prefix
if header.lower().startswith("digest "):
header = header[7:]
# Parse key=value pairs
import re
pattern = r'(\w+)=(?:"([^"]+)"|([^\s,]+))'
for match in re.finditer(pattern, header):
key = match.group(1)
value = match.group(2) or match.group(3)
result[key] = value
return result
def _authenticate(self) -> AuthSession:
"""
Authenticate with NVR and obtain access token.
The NVR uses HTTP Digest Authentication for the initial login,
then returns a bearer token for subsequent requests.
1. GET /openapi/token without auth to get nonce
2. Calculate digest response
3. GET /openapi/token with Authorization header
4. Extract access_token from response
Returns:
AuthSession with access token
@@ -84,29 +142,73 @@ class NVRAuthenticator:
Raises:
AuthenticationError: If authentication fails
"""
login_url = f"{self.base_url}/api/v1/login"
token_url = f"{self.base_url}/openapi/token"
uri = "/openapi/token"
try:
# First attempt with Digest Auth
response = self._http_session.post(
login_url,
auth=HTTPDigestAuth(self.username, self.password),
json={"method": "login"},
# Step 1: Initial request to get nonce
response = self._http_session.get(token_url, timeout=30)
if response.status_code != 401:
raise AuthenticationError(
f"Expected 401 for initial auth, got {response.status_code}"
)
# Parse WWW-Authenticate header
www_auth = response.headers.get("WWW-Authenticate", "")
if not www_auth:
raise AuthenticationError("No WWW-Authenticate header in response")
auth_params = self._parse_www_authenticate(www_auth)
nonce = auth_params.get("nonce", "")
realm = auth_params.get("realm", "VIGI")
algorithm = auth_params.get("algorithm", "SHA-256")
if not nonce:
raise AuthenticationError("No nonce in WWW-Authenticate header")
# Step 2: Calculate digest response
digest_response = self._calculate_digest_response(
method="GET",
uri=uri,
nonce=nonce,
realm=realm,
algorithm=algorithm,
)
# Step 3: Build Authorization header
auth_header = (
f'Digest username="{self.username}", '
f'realm="{realm}", '
f'nonce="{nonce}", '
f'uri="{uri}", '
f'algorithm={algorithm}, '
f'response="{digest_response}"'
)
# Step 4: Authenticated request
response = self._http_session.get(
token_url,
headers={"Authorization": auth_header},
timeout=30,
)
response.raise_for_status()
data = response.json()
if data.get("error_code", 0) != 0:
error_msg = data.get("error_msg", "Unknown error")
raise AuthenticationError(f"Login failed: {error_msg}")
result = data.get("result", {})
access_token = result.get("stok", "")
# Extract access_token
access_token = data.get("access_token", data.get("token", ""))
if not access_token:
raise AuthenticationError("No access token in response")
# Try to get from result object
result = data.get("result", {})
access_token = result.get("access_token", result.get("stok", ""))
if not access_token:
raise AuthenticationError(f"No access token in response: {data}")
# URL decode the token
access_token = urllib.parse.unquote(access_token)
# Token typically expires in 1 hour, refresh at 50 minutes
expires_at = time.time() + (50 * 60)
+34 -5
View File
@@ -9,6 +9,7 @@ import click
from . import __version__
from .auth import AuthenticationError
from .debug import setup_debug_logging
from .nvr_client import NVRAPIError, NVRClient
@@ -56,15 +57,30 @@ DATETIME = DateTimeParamType()
@click.group()
@click.version_option(version=__version__, prog_name="nvr-export")
def main():
@click.option("--debug", "-d", is_flag=True, help="Enable debug logging (shows all API requests/responses)")
@click.option("--debug-file", type=click.Path(), help="Write debug log to file")
@click.pass_context
def main(ctx, debug: bool, debug_file: str):
"""TP-Link Vigi NVR Export Tool.
Export video recordings from TP-Link Vigi NVRs via OpenAPI.
Make sure OpenAPI is enabled on your NVR:
Settings > Network > OpenAPI (default port: 20443)
\b
Debug mode:
nvr-export --debug export ...
nvr-export --debug --debug-file debug.log export ...
"""
pass
ctx.ensure_object(dict)
ctx.obj['debug'] = debug
# Setup debug logging
setup_debug_logging(enabled=debug, log_file=debug_file)
if debug:
click.echo("🔧 Debug mode enabled - all API requests will be logged", err=True)
@main.command()
@@ -81,7 +97,9 @@ def main():
help="Recording type filter")
@click.option("--no-ssl-verify", is_flag=True, default=True, help="Skip SSL certificate verification")
@click.option("--quiet", "-q", is_flag=True, help="Suppress progress output")
@click.pass_context
def export(
ctx,
host: str,
port: int,
user: str,
@@ -102,9 +120,9 @@ def export(
nvr-export export -h 192.168.1.100 -u admin -c 1 \\
-s "2024-12-28 00:00" -e "2024-12-28 23:59" -o ./exports
# Export only motion recordings
nvr-export export -h 192.168.1.100 -u admin -c 2 \\
-s "2024-12-01" -e "2024-12-31" --type motion -o ./exports
# Export with debug logging
nvr-export --debug export -h 192.168.1.100 -u admin -c 1 \\
-s "2024-12-28" -e "2024-12-29" -o ./exports
"""
output_dir = Path(output)
@@ -120,6 +138,8 @@ def export(
if not recordings:
click.echo("No recordings found for the specified time range.")
if ctx.obj.get('debug'):
click.echo("💡 Tip: Check debug output above for API responses", err=True)
return
# Calculate total size
@@ -144,6 +164,9 @@ def export(
sys.exit(1)
except Exception as e:
click.echo(f"Unexpected error: {e}", err=True)
if ctx.obj.get('debug'):
import traceback
click.echo(traceback.format_exc(), err=True)
sys.exit(1)
@@ -153,7 +176,9 @@ def export(
@click.option("--user", "-u", required=True, help="Admin username")
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
@click.option("--no-ssl-verify", is_flag=True, default=True, help="Skip SSL certificate verification")
@click.pass_context
def channels(
ctx,
host: str,
port: int,
user: str,
@@ -196,7 +221,9 @@ def channels(
@click.option("--type", "rec_type", default="all",
type=click.Choice(["all", "continuous", "motion", "alarm"]))
@click.option("--no-ssl-verify", is_flag=True, default=True)
@click.pass_context
def search(
ctx,
host: str,
port: int,
user: str,
@@ -214,6 +241,8 @@ def search(
if not recordings:
click.echo("No recordings found.")
if ctx.obj.get('debug'):
click.echo("💡 Tip: Check debug output above for API responses", err=True)
return
total_size = sum(r.size_bytes for r in recordings) / (1024 * 1024)
+111
View File
@@ -0,0 +1,111 @@
"""Debug logging utilities for NVR Export."""
import json
import logging
import sys
from datetime import datetime
from typing import Any
# Create logger
logger = logging.getLogger("nvr_export")
def setup_debug_logging(enabled: bool = False, log_file: str = None) -> None:
"""
Setup debug logging.
Args:
enabled: Whether to enable debug logging
log_file: Optional file path to write logs to
"""
level = logging.DEBUG if enabled else logging.WARNING
# Clear existing handlers
logger.handlers.clear()
logger.setLevel(level)
# Console handler
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setLevel(level)
# Formatter with timestamp
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File handler if specified
if log_file:
file_handler = logging.FileHandler(log_file, mode='w', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
"[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
))
logger.addHandler(file_handler)
if enabled:
logger.debug("Debug logging enabled")
def log_request(method: str, url: str, headers: dict = None, body: Any = None) -> None:
"""Log an outgoing HTTP request."""
logger.debug(f">>> {method} {url}")
if headers:
# Filter out sensitive headers
safe_headers = {k: v for k, v in headers.items() if 'auth' not in k.lower()}
if safe_headers:
logger.debug(f" Headers: {safe_headers}")
if body:
try:
body_str = json.dumps(body, indent=2, default=str)
logger.debug(f" Body: {body_str}")
except (TypeError, ValueError):
logger.debug(f" Body: {body}")
def log_response(status_code: int, headers: dict = None, body: Any = None, truncate: int = 2000) -> None:
"""Log an incoming HTTP response."""
logger.debug(f"<<< Response: {status_code}")
if headers:
content_type = headers.get('content-type', 'unknown')
content_length = headers.get('content-length', 'unknown')
logger.debug(f" Content-Type: {content_type}, Length: {content_length}")
if body:
try:
if isinstance(body, (dict, list)):
body_str = json.dumps(body, indent=2, default=str)
else:
body_str = str(body)
if len(body_str) > truncate:
body_str = body_str[:truncate] + f"\n... (truncated, total {len(body_str)} chars)"
logger.debug(f" Body: {body_str}")
except (TypeError, ValueError):
logger.debug(f" Body: {body}")
def log_error(message: str, exception: Exception = None) -> None:
"""Log an error."""
if exception:
logger.error(f"{message}: {type(exception).__name__}: {exception}")
else:
logger.error(message)
def log_info(message: str) -> None:
"""Log info message."""
logger.info(message)
def log_debug(message: str) -> None:
"""Log debug message."""
logger.debug(message)
def is_debug_enabled() -> bool:
"""Check if debug logging is enabled."""
return logger.level <= logging.DEBUG
+248 -111
View File
@@ -9,11 +9,16 @@ import requests
from tqdm import tqdm
from .auth import AuthenticationError, NVRAuthenticator
from .debug import log_debug, log_error, log_request, log_response, is_debug_enabled
from .models import Channel, ExportJob, Recording
class NVRClient:
"""Client for TP-Link Vigi NVR API operations."""
"""Client for TP-Link Vigi NVR OpenAPI operations.
Base endpoint: /openapi/
Authentication: Bearer token from /openapi/token
"""
def __init__(
self,
@@ -38,6 +43,7 @@ class NVRClient:
self.base_url = f"https://{host}:{port}"
self.auth = NVRAuthenticator(host, username, password, port, verify_ssl)
self._session: Optional[requests.Session] = None
log_debug(f"NVRClient initialized for {host}:{port}")
@property
def session(self) -> requests.Session:
@@ -57,7 +63,7 @@ class NVRClient:
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint path
endpoint: API endpoint path (will be prefixed with /openapi/)
**kwargs: Additional arguments for requests
Returns:
@@ -66,21 +72,44 @@ class NVRClient:
Raises:
NVRAPIError: If request fails
"""
# Ensure endpoint starts with /openapi/
if not endpoint.startswith("/openapi/"):
endpoint = f"/openapi/{endpoint.lstrip('/')}"
url = f"{self.base_url}{endpoint}"
# Log request
log_request(method, url, kwargs.get('headers'), kwargs.get('json'))
try:
response = self.session.request(method, url, timeout=60, **kwargs)
# Log response
try:
response_data = response.json() if response.content else {}
except ValueError:
response_data = {"raw": response.text[:500] if response.text else "empty"}
log_response(response.status_code, dict(response.headers), response_data)
response.raise_for_status()
# Some endpoints might return empty response
if not response.content:
return {}
data = response.json()
if data.get("error_code", 0) != 0:
error_msg = data.get("error_msg", "Unknown API error")
raise NVRAPIError(f"API error: {error_msg}")
# Check for error in response
error_code = data.get("error_code", data.get("errorCode", 0))
if error_code != 0:
error_msg = data.get("error_msg", data.get("errorMsg", "Unknown API error"))
raise NVRAPIError(f"API error {error_code}: {error_msg}")
return data
except requests.RequestException as e:
log_error(f"Request to {endpoint} failed", e)
raise NVRAPIError(f"Request failed: {e}") from e
def get_channels(self) -> list[Channel]:
@@ -90,33 +119,66 @@ class NVRClient:
Returns:
List of Channel objects
"""
# Try the standard device/channels endpoint
try:
data = self._api_request("POST", "/api/v1/channels", json={"method": "get"})
channels = []
for ch_data in data.get("result", {}).get("channel_list", []):
channels.append(Channel(
id=ch_data.get("channel_id", 0),
name=ch_data.get("channel_name", f"Channel {ch_data.get('channel_id', 0)}"),
enabled=ch_data.get("enabled", True),
))
return channels
except NVRAPIError:
# Fallback: try alternative endpoint structure
data = self._api_request("POST", "/api/v1/device", json={"method": "getChannels"})
channels = []
for ch_data in data.get("result", {}).get("channels", []):
channels.append(Channel(
id=ch_data.get("id", 0),
name=ch_data.get("name", f"Channel {ch_data.get('id', 0)}"),
enabled=ch_data.get("status", "on") == "on",
))
return channels
channels = []
# Try different endpoint patterns
endpoints_to_try = [
("GET", "added_devices"),
("GET", "channels"),
("POST", "channels", {"method": "get"}),
("GET", "device/channels"),
]
log_debug(f"Trying {len(endpoints_to_try)} endpoint patterns for channels")
for endpoint_info in endpoints_to_try:
try:
method = endpoint_info[0]
endpoint = endpoint_info[1]
json_data = endpoint_info[2] if len(endpoint_info) > 2 else None
log_debug(f"Trying endpoint: {method} {endpoint}")
if json_data:
data = self._api_request(method, endpoint, json=json_data)
else:
data = self._api_request(method, endpoint)
log_debug(f"Response keys: {list(data.keys()) if data else 'empty'}")
# Parse response - try different structures
channel_list = (
data.get("result", {}).get("channel_list", []) or
data.get("result", {}).get("channels", []) or
data.get("result", {}).get("devices", []) or
data.get("channel_list", []) or
data.get("channels", []) or
data.get("devices", []) or
[]
)
log_debug(f"Found {len(channel_list)} channels in response")
if channel_list:
for ch_data in channel_list:
channels.append(Channel(
id=ch_data.get("channel_id", ch_data.get("id", ch_data.get("channelId", 0))),
name=ch_data.get("channel_name", ch_data.get("name", ch_data.get("deviceName", f"Channel"))),
enabled=ch_data.get("enabled", ch_data.get("status", "on") == "on"),
))
return channels
except NVRAPIError as e:
log_debug(f"Endpoint {endpoint} failed: {e}")
continue
# If no channels found via API, return default channels 1-8
log_debug("No channels found via API, returning defaults 1-8")
if not channels:
for i in range(1, 9):
channels.append(Channel(id=i, name=f"Channel {i}", enabled=True))
return channels
def search_recordings(
self,
@@ -137,10 +199,17 @@ class NVRClient:
Returns:
List of Recording objects matching criteria
"""
# Format times as expected by NVR API (typically Unix timestamp or ISO format)
# Format times as expected by NVR API
start_ts = int(start_time.timestamp())
end_ts = int(end_time.timestamp())
# ISO format alternative
start_iso = start_time.strftime("%Y-%m-%dT%H:%M:%S")
end_iso = end_time.strftime("%Y-%m-%dT%H:%M:%S")
log_debug(f"Searching recordings: channel={channel_id}, start={start_time}, end={end_time}")
log_debug(f"Timestamps: start_ts={start_ts}, end_ts={end_ts}")
type_map = {
"all": 0,
"continuous": 1,
@@ -149,69 +218,114 @@ class NVRClient:
}
rec_type = type_map.get(recording_type, 0)
# Try primary search endpoint
try:
data = self._api_request(
"POST",
"/api/v1/playback/search",
json={
"method": "searchRecordings",
"channel_id": channel_id,
"start_time": start_ts,
"end_time": end_ts,
"record_type": rec_type,
},
)
except NVRAPIError:
# Fallback endpoint format
data = self._api_request(
"POST",
"/api/v1/record/search",
json={
"method": "search",
"params": {
"channel": channel_id,
"start": start_ts,
"end": end_ts,
"type": rec_type,
},
},
)
recordings = []
result = data.get("result", {})
record_list = result.get("record_list", result.get("recordings", []))
for rec_data in record_list:
# Parse timestamps (could be Unix or string format)
rec_start = self._parse_timestamp(rec_data.get("start_time", rec_data.get("start", 0)))
rec_end = self._parse_timestamp(rec_data.get("end_time", rec_data.get("end", 0)))
recordings.append(Recording(
id=str(rec_data.get("record_id", rec_data.get("id", ""))),
channel_id=channel_id,
start_time=rec_start,
end_time=rec_end,
size_bytes=rec_data.get("size", rec_data.get("file_size", 0)),
recording_type=rec_data.get("type", rec_data.get("record_type", "unknown")),
file_path=rec_data.get("file_path", rec_data.get("path")),
))
# Try different endpoint patterns
endpoints_to_try = [
("POST", "playback/search", {
"channelId": channel_id,
"startTime": start_ts,
"endTime": end_ts,
"recordType": rec_type,
}),
("POST", "record/search", {
"channel": channel_id,
"start": start_ts,
"end": end_ts,
"type": rec_type,
}),
("GET", f"playback/search?channelId={channel_id}&startTime={start_ts}&endTime={end_ts}"),
("POST", "search", {
"method": "searchRecordings",
"params": {
"channel_id": channel_id,
"start_time": start_iso,
"end_time": end_iso,
}
}),
]
log_debug(f"Trying {len(endpoints_to_try)} endpoint patterns for recordings")
for endpoint_info in endpoints_to_try:
try:
method = endpoint_info[0]
endpoint = endpoint_info[1]
json_data = endpoint_info[2] if len(endpoint_info) > 2 and isinstance(endpoint_info[2], dict) else None
log_debug(f"Trying endpoint: {method} {endpoint}")
if json_data:
data = self._api_request(method, endpoint, json=json_data)
else:
data = self._api_request(method, endpoint)
log_debug(f"Response keys: {list(data.keys()) if data else 'empty'}")
# Log full response structure for debugging
if is_debug_enabled():
import json as json_module
log_debug(f"Full response: {json_module.dumps(data, indent=2, default=str)[:1000]}")
# Parse response - try different structures
result = data.get("result", data)
record_list = (
result.get("record_list", []) or
result.get("recordings", []) or
result.get("recordList", []) or
result.get("items", []) or
result.get("searchResult", []) or
result.get("list", []) or
[]
)
log_debug(f"Found {len(record_list)} recordings in response")
if record_list:
for rec_data in record_list:
log_debug(f"Recording data: {rec_data}")
rec_start = self._parse_timestamp(
rec_data.get("start_time", rec_data.get("startTime", rec_data.get("start", 0)))
)
rec_end = self._parse_timestamp(
rec_data.get("end_time", rec_data.get("endTime", rec_data.get("end", 0)))
)
recordings.append(Recording(
id=str(rec_data.get("record_id", rec_data.get("recordId", rec_data.get("id", "")))),
channel_id=channel_id,
start_time=rec_start,
end_time=rec_end,
size_bytes=rec_data.get("size", rec_data.get("fileSize", rec_data.get("file_size", 0))),
recording_type=str(rec_data.get("type", rec_data.get("recordType", rec_data.get("record_type", "unknown")))),
file_path=rec_data.get("file_path", rec_data.get("filePath", rec_data.get("path"))),
))
return recordings
except NVRAPIError as e:
log_debug(f"Endpoint {endpoint} failed: {e}")
continue
log_debug("No recordings found with any endpoint pattern")
return recordings
def _parse_timestamp(self, ts) -> datetime:
"""Parse timestamp from various formats."""
if isinstance(ts, (int, float)):
return datetime.fromtimestamp(ts)
if ts > 0:
return datetime.fromtimestamp(ts)
if isinstance(ts, str):
# Try common formats
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"]:
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ"]:
try:
return datetime.strptime(ts, fmt)
except ValueError:
continue
# Try Unix timestamp as string
return datetime.fromtimestamp(int(ts))
try:
return datetime.fromtimestamp(int(ts))
except (ValueError, OSError):
pass
return datetime.now()
def download_recording(
@@ -231,15 +345,20 @@ class NVRClient:
Returns:
Path to downloaded file
"""
# Construct download URL
# Try different download URL patterns
download_urls = []
if recording.file_path:
download_url = f"{self.base_url}/api/v1/playback/download?path={recording.file_path}"
else:
download_url = (
f"{self.base_url}/api/v1/playback/download"
f"?record_id={recording.id}"
f"&channel_id={recording.channel_id}"
)
download_urls.append(f"{self.base_url}/openapi/playback/download?path={recording.file_path}")
download_urls.append(f"{self.base_url}/openapi/download?filePath={recording.file_path}")
download_urls.extend([
f"{self.base_url}/openapi/playback/download?recordId={recording.id}&channelId={recording.channel_id}",
f"{self.base_url}/openapi/record/download?id={recording.id}",
f"{self.base_url}/openapi/download?recordId={recording.id}",
])
log_debug(f"Download URLs to try: {download_urls}")
# Determine output filename
if output_path.is_dir():
@@ -251,28 +370,46 @@ class NVRClient:
# Ensure parent directory exists
output_file.parent.mkdir(parents=True, exist_ok=True)
try:
response = self.session.get(download_url, stream=True, timeout=300)
response.raise_for_status()
# Get total size from headers or recording metadata
total_size = int(response.headers.get("content-length", recording.size_bytes))
downloaded = 0
chunk_size = 8192
with open(output_file, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if progress_callback:
progress_callback(downloaded, total_size)
return output_file
except requests.RequestException as e:
raise NVRAPIError(f"Download failed: {e}") from e
last_error = None
for download_url in download_urls:
try:
log_debug(f"Trying download from: {download_url}")
response = self.session.get(download_url, stream=True, timeout=300)
log_debug(f"Download response: {response.status_code}, Content-Type: {response.headers.get('content-type')}")
response.raise_for_status()
# Check if response is actually video data
content_type = response.headers.get("content-type", "")
if "json" in content_type or "html" in content_type:
# This is an error response, try next URL
log_debug(f"Got non-video response, trying next URL")
continue
# Get total size from headers or recording metadata
total_size = int(response.headers.get("content-length", recording.size_bytes or 0))
downloaded = 0
chunk_size = 8192
with open(output_file, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if progress_callback:
progress_callback(downloaded, total_size)
log_debug(f"Downloaded {downloaded} bytes to {output_file}")
return output_file
except requests.RequestException as e:
log_error(f"Download from {download_url} failed", e)
last_error = e
continue
raise NVRAPIError(f"Download failed after trying all URLs: {last_error}")
def _generate_filename(self, recording: Recording) -> str:
"""Generate a filename for a recording."""