4 Commits

5 changed files with 1179 additions and 141 deletions
+119 -17
View File
@@ -2,11 +2,11 @@
import hashlib
import time
import urllib.parse
from dataclasses import dataclass
from typing import Optional
import requests
from requests.auth import HTTPDigestAuth
@dataclass
@@ -29,7 +29,11 @@ class AuthSession:
class NVRAuthenticator:
"""Handles authentication with TP-Link Vigi NVR OpenAPI."""
"""Handles authentication with TP-Link Vigi NVR OpenAPI.
The NVR uses HTTP Digest Authentication with SHA-256 to obtain an access_token.
Endpoint: /openapi/token
"""
def __init__(
self,
@@ -71,12 +75,66 @@ class NVRAuthenticator:
self._session = self._authenticate()
return self._session
def _calculate_digest_response(
self,
method: str,
uri: str,
nonce: str,
realm: str,
algorithm: str = "SHA-256",
) -> str:
"""
Calculate Digest Authentication response.
A1 = SHA256(username:realm:password)
A2 = SHA256(method:uri)
response = SHA256(A1:nonce:A2)
"""
if algorithm.upper() in ("SHA-256", "SHA256"):
hash_func = hashlib.sha256
else:
hash_func = hashlib.md5
# Calculate A1
a1_string = f"{self.username}:{realm}:{self.password}"
a1 = hash_func(a1_string.encode()).hexdigest()
# Calculate A2
a2_string = f"{method}:{uri}"
a2 = hash_func(a2_string.encode()).hexdigest()
# Calculate response
response_string = f"{a1}:{nonce}:{a2}"
response = hash_func(response_string.encode()).hexdigest()
return response
def _parse_www_authenticate(self, header: str) -> dict:
"""Parse WWW-Authenticate header into dict."""
# Example: Digest realm="VIGI", nonce="abc123", algorithm=SHA-256
result = {}
# Remove 'Digest ' prefix
if header.lower().startswith("digest "):
header = header[7:]
# Parse key=value pairs
import re
pattern = r'(\w+)=(?:"([^"]+)"|([^\s,]+))'
for match in re.finditer(pattern, header):
key = match.group(1)
value = match.group(2) or match.group(3)
result[key] = value
return result
def _authenticate(self) -> AuthSession:
"""
Authenticate with NVR and obtain access token.
The NVR uses HTTP Digest Authentication for the initial login,
then returns a bearer token for subsequent requests.
1. GET /openapi/token without auth to get nonce
2. Calculate digest response
3. GET /openapi/token with Authorization header
4. Extract access_token from response
Returns:
AuthSession with access token
@@ -84,29 +142,73 @@ class NVRAuthenticator:
Raises:
AuthenticationError: If authentication fails
"""
login_url = f"{self.base_url}/api/v1/login"
token_url = f"{self.base_url}/openapi/token"
uri = "/openapi/token"
try:
# First attempt with Digest Auth
response = self._http_session.post(
login_url,
auth=HTTPDigestAuth(self.username, self.password),
json={"method": "login"},
# Step 1: Initial request to get nonce
response = self._http_session.get(token_url, timeout=30)
if response.status_code != 401:
raise AuthenticationError(
f"Expected 401 for initial auth, got {response.status_code}"
)
# Parse WWW-Authenticate header
www_auth = response.headers.get("WWW-Authenticate", "")
if not www_auth:
raise AuthenticationError("No WWW-Authenticate header in response")
auth_params = self._parse_www_authenticate(www_auth)
nonce = auth_params.get("nonce", "")
realm = auth_params.get("realm", "VIGI")
algorithm = auth_params.get("algorithm", "SHA-256")
if not nonce:
raise AuthenticationError("No nonce in WWW-Authenticate header")
# Step 2: Calculate digest response
digest_response = self._calculate_digest_response(
method="GET",
uri=uri,
nonce=nonce,
realm=realm,
algorithm=algorithm,
)
# Step 3: Build Authorization header
auth_header = (
f'Digest username="{self.username}", '
f'realm="{realm}", '
f'nonce="{nonce}", '
f'uri="{uri}", '
f'algorithm={algorithm}, '
f'response="{digest_response}"'
)
# Step 4: Authenticated request
response = self._http_session.get(
token_url,
headers={"Authorization": auth_header},
timeout=30,
)
response.raise_for_status()
data = response.json()
if data.get("error_code", 0) != 0:
error_msg = data.get("error_msg", "Unknown error")
raise AuthenticationError(f"Login failed: {error_msg}")
result = data.get("result", {})
access_token = result.get("stok", "")
# Extract access_token
access_token = data.get("access_token", data.get("token", ""))
if not access_token:
raise AuthenticationError("No access token in response")
# Try to get from result object
result = data.get("result", {})
access_token = result.get("access_token", result.get("stok", ""))
if not access_token:
raise AuthenticationError(f"No access token in response: {data}")
# URL decode the token
access_token = urllib.parse.unquote(access_token)
# Token typically expires in 1 hour, refresh at 50 minutes
expires_at = time.time() + (50 * 60)
+205 -13
View File
@@ -9,6 +9,7 @@ import click
from . import __version__
from .auth import AuthenticationError
from .debug import setup_debug_logging
from .nvr_client import NVRAPIError, NVRClient
@@ -56,20 +57,35 @@ DATETIME = DateTimeParamType()
@click.group()
@click.version_option(version=__version__, prog_name="nvr-export")
def main():
@click.option("--debug", "-d", is_flag=True, help="Enable debug logging (shows all API requests/responses)")
@click.option("--debug-file", type=click.Path(), help="Write debug log to file")
@click.pass_context
def main(ctx, debug: bool, debug_file: str):
"""TP-Link Vigi NVR Export Tool.
Export video recordings from TP-Link Vigi NVRs via OpenAPI.
Make sure OpenAPI is enabled on your NVR:
Settings > Network > OpenAPI (default port: 20443)
\b
Debug mode:
nvr-export --debug export ...
nvr-export --debug --debug-file debug.log export ...
"""
pass
ctx.ensure_object(dict)
ctx.obj['debug'] = debug
# Setup debug logging
setup_debug_logging(enabled=debug, log_file=debug_file)
if debug:
click.echo("🔧 Debug mode enabled - all API requests will be logged", err=True)
@main.command()
@click.option("--host", "-h", required=True, help="NVR IP address or hostname")
@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)")
@click.option("--port", "-p", default=443, help="Port (default: 443 for web, 20443 for openapi)")
@click.option("--user", "-u", required=True, help="Admin username")
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
@click.option("--channel", "-c", required=True, type=int, help="Camera channel ID (1-based)")
@@ -81,7 +97,9 @@ def main():
help="Recording type filter")
@click.option("--no-ssl-verify", is_flag=True, default=True, help="Skip SSL certificate verification")
@click.option("--quiet", "-q", is_flag=True, help="Suppress progress output")
@click.pass_context
def export(
ctx,
host: str,
port: int,
user: str,
@@ -96,23 +114,27 @@ def export(
):
"""Export recordings from NVR for a time range.
Uses the web interface API (/stok/ds) for recording access.
\b
Examples:
# Export channel 1 for a specific day
nvr-export export -h 192.168.1.100 -u admin -c 1 \\
-s "2024-12-28 00:00" -e "2024-12-28 23:59" -o ./exports
# Export only motion recordings
nvr-export export -h 192.168.1.100 -u admin -c 2 \\
-s "2024-12-01" -e "2024-12-31" --type motion -o ./exports
# Export with debug logging
nvr-export --debug export -h 192.168.1.100 -u admin -c 1 \\
-s "2024-12-28" -e "2024-12-29" -o ./exports
"""
from .web_client import WebClient, WebClientError
output_dir = Path(output)
if not quiet:
click.echo(f"Connecting to NVR at {host}:{port}...")
click.echo(f"Connecting to NVR web interface at {host}:{port}...")
try:
with NVRClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client:
with WebClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client:
if not quiet:
click.echo(f"Searching recordings: Channel {channel}, {start} to {end}")
@@ -120,6 +142,8 @@ def export(
if not recordings:
click.echo("No recordings found for the specified time range.")
if ctx.obj.get('debug'):
click.echo("💡 Tip: Check debug output above for API responses", err=True)
return
# Calculate total size
@@ -136,14 +160,14 @@ def export(
if not quiet:
click.echo(f"\nSuccessfully exported {len(downloaded)} recordings to {output_dir}")
except AuthenticationError as e:
click.echo(f"Authentication failed: {e}", err=True)
sys.exit(1)
except NVRAPIError as e:
click.echo(f"NVR API error: {e}", err=True)
except WebClientError as e:
click.echo(f"Web interface error: {e}", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Unexpected error: {e}", err=True)
if ctx.obj.get('debug'):
import traceback
click.echo(traceback.format_exc(), err=True)
sys.exit(1)
@@ -153,7 +177,9 @@ def export(
@click.option("--user", "-u", required=True, help="Admin username")
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
@click.option("--no-ssl-verify", is_flag=True, default=True, help="Skip SSL certificate verification")
@click.pass_context
def channels(
ctx,
host: str,
port: int,
user: str,
@@ -196,7 +222,9 @@ def channels(
@click.option("--type", "rec_type", default="all",
type=click.Choice(["all", "continuous", "motion", "alarm"]))
@click.option("--no-ssl-verify", is_flag=True, default=True)
@click.pass_context
def search(
ctx,
host: str,
port: int,
user: str,
@@ -214,6 +242,8 @@ def search(
if not recordings:
click.echo("No recordings found.")
if ctx.obj.get('debug'):
click.echo("💡 Tip: Check debug output above for API responses", err=True)
return
total_size = sum(r.size_bytes for r in recordings) / (1024 * 1024)
@@ -238,5 +268,167 @@ def search(
sys.exit(1)
@main.command()
@click.option("--host", "-h", required=True, help="NVR IP address or hostname")
@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)")
@click.option("--user", "-u", required=True, help="Admin username")
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
@click.option("--no-ssl-verify", is_flag=True, default=True)
@click.pass_context
def discover(
ctx,
host: str,
port: int,
user: str,
password: str,
no_ssl_verify: bool,
):
"""Discover available API endpoints on the NVR.
This command probes many possible API paths to find which ones
are available on your NVR. Useful for debugging and development.
"""
import requests
# Common endpoints to try
endpoints_to_probe = [
# Root and info
("GET", "/openapi"),
("GET", "/openapi/"),
("GET", "/openapi/info"),
("GET", "/openapi/version"),
("GET", "/openapi/api"),
("GET", "/openapi/swagger"),
("GET", "/openapi/docs"),
# Device/channels
("GET", "/openapi/device"),
("GET", "/openapi/devices"),
("GET", "/openapi/added_devices"),
("GET", "/openapi/channel"),
("GET", "/openapi/channels"),
("GET", "/openapi/channelList"),
("GET", "/openapi/device/info"),
("GET", "/openapi/device/list"),
("GET", "/openapi/device/channels"),
("GET", "/openapi/nvr/info"),
("GET", "/openapi/nvr/channels"),
# Recording/playback
("GET", "/openapi/record"),
("GET", "/openapi/records"),
("GET", "/openapi/recording"),
("GET", "/openapi/recordings"),
("GET", "/openapi/playback"),
("GET", "/openapi/video"),
("GET", "/openapi/videos"),
("GET", "/openapi/storage"),
("GET", "/openapi/storage/recordings"),
("GET", "/openapi/hdd"),
("GET", "/openapi/disk"),
# Search endpoints
("GET", "/openapi/search"),
("GET", "/openapi/record/list"),
("GET", "/openapi/recording/list"),
("GET", "/openapi/playback/list"),
("GET", "/openapi/video/list"),
# Live stream
("GET", "/openapi/live"),
("GET", "/openapi/stream"),
("GET", "/openapi/liveStream"),
("GET", "/openapi/rtsp"),
# Events
("GET", "/openapi/event"),
("GET", "/openapi/events"),
("GET", "/openapi/event/list"),
("GET", "/openapi/alarm"),
("GET", "/openapi/alarms"),
# Settings
("GET", "/openapi/settings"),
("GET", "/openapi/config"),
("GET", "/openapi/system"),
("GET", "/openapi/sound"),
("GET", "/openapi/network"),
# User
("GET", "/openapi/user"),
("GET", "/openapi/users"),
("GET", "/openapi/account"),
]
click.echo(f"Connecting to NVR at {host}:{port}...")
try:
from .auth import NVRAuthenticator
auth = NVRAuthenticator(host, user, password, port, verify_ssl=not no_ssl_verify)
session = auth.get_authenticated_session()
base_url = f"https://{host}:{port}"
click.echo(f"Authentication successful!")
click.echo(f"\nProbing {len(endpoints_to_probe)} endpoints...\n")
found_endpoints = []
for method, endpoint in endpoints_to_probe:
url = f"{base_url}{endpoint}"
try:
if method == "GET":
response = session.get(url, timeout=5)
else:
response = session.post(url, json={}, timeout=5)
status = response.status_code
if status == 200:
# Try to get response preview
try:
data = response.json()
preview = str(data)[:100]
except:
preview = response.text[:100] if response.text else "(empty)"
click.echo(f"{status} {method:4} {endpoint}")
click.echo(f" Response: {preview}...")
found_endpoints.append((method, endpoint, status))
elif status in (400, 401, 403, 405):
# Exists but needs different params/auth
click.echo(f"⚠️ {status} {method:4} {endpoint} (exists but access denied/bad request)")
found_endpoints.append((method, endpoint, status))
# 404 = not found, skip silently
except requests.RequestException as e:
pass # Skip connection errors
click.echo(f"\n{'='*60}")
click.echo(f"Summary: Found {len(found_endpoints)} accessible endpoints")
if found_endpoints:
click.echo("\nWorking endpoints (200 OK):")
for method, endpoint, status in found_endpoints:
if status == 200:
click.echo(f" {method} {endpoint}")
click.echo("\nEndpoints that exist but need work:")
for method, endpoint, status in found_endpoints:
if status != 200:
click.echo(f" {method} {endpoint} ({status})")
else:
click.echo("\n⚠️ No working endpoints found!")
click.echo("The NVR might use a different API structure.")
click.echo("Try checking the NVR web interface with browser DevTools.")
except AuthenticationError as e:
click.echo(f"Authentication failed: {e}", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()
+111
View File
@@ -0,0 +1,111 @@
"""Debug logging utilities for NVR Export."""
import json
import logging
import sys
from datetime import datetime
from typing import Any
# Create logger
logger = logging.getLogger("nvr_export")
def setup_debug_logging(enabled: bool = False, log_file: str = None) -> None:
"""
Setup debug logging.
Args:
enabled: Whether to enable debug logging
log_file: Optional file path to write logs to
"""
level = logging.DEBUG if enabled else logging.WARNING
# Clear existing handlers
logger.handlers.clear()
logger.setLevel(level)
# Console handler
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setLevel(level)
# Formatter with timestamp
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File handler if specified
if log_file:
file_handler = logging.FileHandler(log_file, mode='w', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
"[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
))
logger.addHandler(file_handler)
if enabled:
logger.debug("Debug logging enabled")
def log_request(method: str, url: str, headers: dict = None, body: Any = None) -> None:
"""Log an outgoing HTTP request."""
logger.debug(f">>> {method} {url}")
if headers:
# Filter out sensitive headers
safe_headers = {k: v for k, v in headers.items() if 'auth' not in k.lower()}
if safe_headers:
logger.debug(f" Headers: {safe_headers}")
if body:
try:
body_str = json.dumps(body, indent=2, default=str)
logger.debug(f" Body: {body_str}")
except (TypeError, ValueError):
logger.debug(f" Body: {body}")
def log_response(status_code: int, headers: dict = None, body: Any = None, truncate: int = 2000) -> None:
"""Log an incoming HTTP response."""
logger.debug(f"<<< Response: {status_code}")
if headers:
content_type = headers.get('content-type', 'unknown')
content_length = headers.get('content-length', 'unknown')
logger.debug(f" Content-Type: {content_type}, Length: {content_length}")
if body:
try:
if isinstance(body, (dict, list)):
body_str = json.dumps(body, indent=2, default=str)
else:
body_str = str(body)
if len(body_str) > truncate:
body_str = body_str[:truncate] + f"\n... (truncated, total {len(body_str)} chars)"
logger.debug(f" Body: {body_str}")
except (TypeError, ValueError):
logger.debug(f" Body: {body}")
def log_error(message: str, exception: Exception = None) -> None:
"""Log an error."""
if exception:
logger.error(f"{message}: {type(exception).__name__}: {exception}")
else:
logger.error(message)
def log_info(message: str) -> None:
"""Log info message."""
logger.info(message)
def log_debug(message: str) -> None:
"""Log debug message."""
logger.debug(message)
def is_debug_enabled() -> bool:
"""Check if debug logging is enabled."""
return logger.level <= logging.DEBUG
+248 -111
View File
@@ -9,11 +9,16 @@ import requests
from tqdm import tqdm
from .auth import AuthenticationError, NVRAuthenticator
from .debug import log_debug, log_error, log_request, log_response, is_debug_enabled
from .models import Channel, ExportJob, Recording
class NVRClient:
"""Client for TP-Link Vigi NVR API operations."""
"""Client for TP-Link Vigi NVR OpenAPI operations.
Base endpoint: /openapi/
Authentication: Bearer token from /openapi/token
"""
def __init__(
self,
@@ -38,6 +43,7 @@ class NVRClient:
self.base_url = f"https://{host}:{port}"
self.auth = NVRAuthenticator(host, username, password, port, verify_ssl)
self._session: Optional[requests.Session] = None
log_debug(f"NVRClient initialized for {host}:{port}")
@property
def session(self) -> requests.Session:
@@ -57,7 +63,7 @@ class NVRClient:
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint path
endpoint: API endpoint path (will be prefixed with /openapi/)
**kwargs: Additional arguments for requests
Returns:
@@ -66,21 +72,44 @@ class NVRClient:
Raises:
NVRAPIError: If request fails
"""
# Ensure endpoint starts with /openapi/
if not endpoint.startswith("/openapi/"):
endpoint = f"/openapi/{endpoint.lstrip('/')}"
url = f"{self.base_url}{endpoint}"
# Log request
log_request(method, url, kwargs.get('headers'), kwargs.get('json'))
try:
response = self.session.request(method, url, timeout=60, **kwargs)
# Log response
try:
response_data = response.json() if response.content else {}
except ValueError:
response_data = {"raw": response.text[:500] if response.text else "empty"}
log_response(response.status_code, dict(response.headers), response_data)
response.raise_for_status()
# Some endpoints might return empty response
if not response.content:
return {}
data = response.json()
if data.get("error_code", 0) != 0:
error_msg = data.get("error_msg", "Unknown API error")
raise NVRAPIError(f"API error: {error_msg}")
# Check for error in response
error_code = data.get("error_code", data.get("errorCode", 0))
if error_code != 0:
error_msg = data.get("error_msg", data.get("errorMsg", "Unknown API error"))
raise NVRAPIError(f"API error {error_code}: {error_msg}")
return data
except requests.RequestException as e:
log_error(f"Request to {endpoint} failed", e)
raise NVRAPIError(f"Request failed: {e}") from e
def get_channels(self) -> list[Channel]:
@@ -90,33 +119,66 @@ class NVRClient:
Returns:
List of Channel objects
"""
# Try the standard device/channels endpoint
try:
data = self._api_request("POST", "/api/v1/channels", json={"method": "get"})
channels = []
for ch_data in data.get("result", {}).get("channel_list", []):
channels.append(Channel(
id=ch_data.get("channel_id", 0),
name=ch_data.get("channel_name", f"Channel {ch_data.get('channel_id', 0)}"),
enabled=ch_data.get("enabled", True),
))
return channels
except NVRAPIError:
# Fallback: try alternative endpoint structure
data = self._api_request("POST", "/api/v1/device", json={"method": "getChannels"})
channels = []
for ch_data in data.get("result", {}).get("channels", []):
channels.append(Channel(
id=ch_data.get("id", 0),
name=ch_data.get("name", f"Channel {ch_data.get('id', 0)}"),
enabled=ch_data.get("status", "on") == "on",
))
return channels
channels = []
# Try different endpoint patterns
endpoints_to_try = [
("GET", "added_devices"),
("GET", "channels"),
("POST", "channels", {"method": "get"}),
("GET", "device/channels"),
]
log_debug(f"Trying {len(endpoints_to_try)} endpoint patterns for channels")
for endpoint_info in endpoints_to_try:
try:
method = endpoint_info[0]
endpoint = endpoint_info[1]
json_data = endpoint_info[2] if len(endpoint_info) > 2 else None
log_debug(f"Trying endpoint: {method} {endpoint}")
if json_data:
data = self._api_request(method, endpoint, json=json_data)
else:
data = self._api_request(method, endpoint)
log_debug(f"Response keys: {list(data.keys()) if data else 'empty'}")
# Parse response - try different structures
channel_list = (
data.get("result", {}).get("channel_list", []) or
data.get("result", {}).get("channels", []) or
data.get("result", {}).get("devices", []) or
data.get("channel_list", []) or
data.get("channels", []) or
data.get("devices", []) or
[]
)
log_debug(f"Found {len(channel_list)} channels in response")
if channel_list:
for ch_data in channel_list:
channels.append(Channel(
id=ch_data.get("channel_id", ch_data.get("id", ch_data.get("channelId", 0))),
name=ch_data.get("channel_name", ch_data.get("name", ch_data.get("deviceName", f"Channel"))),
enabled=ch_data.get("enabled", ch_data.get("status", "on") == "on"),
))
return channels
except NVRAPIError as e:
log_debug(f"Endpoint {endpoint} failed: {e}")
continue
# If no channels found via API, return default channels 1-8
log_debug("No channels found via API, returning defaults 1-8")
if not channels:
for i in range(1, 9):
channels.append(Channel(id=i, name=f"Channel {i}", enabled=True))
return channels
def search_recordings(
self,
@@ -137,10 +199,17 @@ class NVRClient:
Returns:
List of Recording objects matching criteria
"""
# Format times as expected by NVR API (typically Unix timestamp or ISO format)
# Format times as expected by NVR API
start_ts = int(start_time.timestamp())
end_ts = int(end_time.timestamp())
# ISO format alternative
start_iso = start_time.strftime("%Y-%m-%dT%H:%M:%S")
end_iso = end_time.strftime("%Y-%m-%dT%H:%M:%S")
log_debug(f"Searching recordings: channel={channel_id}, start={start_time}, end={end_time}")
log_debug(f"Timestamps: start_ts={start_ts}, end_ts={end_ts}")
type_map = {
"all": 0,
"continuous": 1,
@@ -149,69 +218,114 @@ class NVRClient:
}
rec_type = type_map.get(recording_type, 0)
# Try primary search endpoint
try:
data = self._api_request(
"POST",
"/api/v1/playback/search",
json={
"method": "searchRecordings",
"channel_id": channel_id,
"start_time": start_ts,
"end_time": end_ts,
"record_type": rec_type,
},
)
except NVRAPIError:
# Fallback endpoint format
data = self._api_request(
"POST",
"/api/v1/record/search",
json={
"method": "search",
"params": {
"channel": channel_id,
"start": start_ts,
"end": end_ts,
"type": rec_type,
},
},
)
recordings = []
result = data.get("result", {})
record_list = result.get("record_list", result.get("recordings", []))
for rec_data in record_list:
# Parse timestamps (could be Unix or string format)
rec_start = self._parse_timestamp(rec_data.get("start_time", rec_data.get("start", 0)))
rec_end = self._parse_timestamp(rec_data.get("end_time", rec_data.get("end", 0)))
recordings.append(Recording(
id=str(rec_data.get("record_id", rec_data.get("id", ""))),
channel_id=channel_id,
start_time=rec_start,
end_time=rec_end,
size_bytes=rec_data.get("size", rec_data.get("file_size", 0)),
recording_type=rec_data.get("type", rec_data.get("record_type", "unknown")),
file_path=rec_data.get("file_path", rec_data.get("path")),
))
# Try different endpoint patterns
endpoints_to_try = [
("POST", "playback/search", {
"channelId": channel_id,
"startTime": start_ts,
"endTime": end_ts,
"recordType": rec_type,
}),
("POST", "record/search", {
"channel": channel_id,
"start": start_ts,
"end": end_ts,
"type": rec_type,
}),
("GET", f"playback/search?channelId={channel_id}&startTime={start_ts}&endTime={end_ts}"),
("POST", "search", {
"method": "searchRecordings",
"params": {
"channel_id": channel_id,
"start_time": start_iso,
"end_time": end_iso,
}
}),
]
log_debug(f"Trying {len(endpoints_to_try)} endpoint patterns for recordings")
for endpoint_info in endpoints_to_try:
try:
method = endpoint_info[0]
endpoint = endpoint_info[1]
json_data = endpoint_info[2] if len(endpoint_info) > 2 and isinstance(endpoint_info[2], dict) else None
log_debug(f"Trying endpoint: {method} {endpoint}")
if json_data:
data = self._api_request(method, endpoint, json=json_data)
else:
data = self._api_request(method, endpoint)
log_debug(f"Response keys: {list(data.keys()) if data else 'empty'}")
# Log full response structure for debugging
if is_debug_enabled():
import json as json_module
log_debug(f"Full response: {json_module.dumps(data, indent=2, default=str)[:1000]}")
# Parse response - try different structures
result = data.get("result", data)
record_list = (
result.get("record_list", []) or
result.get("recordings", []) or
result.get("recordList", []) or
result.get("items", []) or
result.get("searchResult", []) or
result.get("list", []) or
[]
)
log_debug(f"Found {len(record_list)} recordings in response")
if record_list:
for rec_data in record_list:
log_debug(f"Recording data: {rec_data}")
rec_start = self._parse_timestamp(
rec_data.get("start_time", rec_data.get("startTime", rec_data.get("start", 0)))
)
rec_end = self._parse_timestamp(
rec_data.get("end_time", rec_data.get("endTime", rec_data.get("end", 0)))
)
recordings.append(Recording(
id=str(rec_data.get("record_id", rec_data.get("recordId", rec_data.get("id", "")))),
channel_id=channel_id,
start_time=rec_start,
end_time=rec_end,
size_bytes=rec_data.get("size", rec_data.get("fileSize", rec_data.get("file_size", 0))),
recording_type=str(rec_data.get("type", rec_data.get("recordType", rec_data.get("record_type", "unknown")))),
file_path=rec_data.get("file_path", rec_data.get("filePath", rec_data.get("path"))),
))
return recordings
except NVRAPIError as e:
log_debug(f"Endpoint {endpoint} failed: {e}")
continue
log_debug("No recordings found with any endpoint pattern")
return recordings
def _parse_timestamp(self, ts) -> datetime:
"""Parse timestamp from various formats."""
if isinstance(ts, (int, float)):
return datetime.fromtimestamp(ts)
if ts > 0:
return datetime.fromtimestamp(ts)
if isinstance(ts, str):
# Try common formats
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"]:
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ"]:
try:
return datetime.strptime(ts, fmt)
except ValueError:
continue
# Try Unix timestamp as string
return datetime.fromtimestamp(int(ts))
try:
return datetime.fromtimestamp(int(ts))
except (ValueError, OSError):
pass
return datetime.now()
def download_recording(
@@ -231,15 +345,20 @@ class NVRClient:
Returns:
Path to downloaded file
"""
# Construct download URL
# Try different download URL patterns
download_urls = []
if recording.file_path:
download_url = f"{self.base_url}/api/v1/playback/download?path={recording.file_path}"
else:
download_url = (
f"{self.base_url}/api/v1/playback/download"
f"?record_id={recording.id}"
f"&channel_id={recording.channel_id}"
)
download_urls.append(f"{self.base_url}/openapi/playback/download?path={recording.file_path}")
download_urls.append(f"{self.base_url}/openapi/download?filePath={recording.file_path}")
download_urls.extend([
f"{self.base_url}/openapi/playback/download?recordId={recording.id}&channelId={recording.channel_id}",
f"{self.base_url}/openapi/record/download?id={recording.id}",
f"{self.base_url}/openapi/download?recordId={recording.id}",
])
log_debug(f"Download URLs to try: {download_urls}")
# Determine output filename
if output_path.is_dir():
@@ -251,28 +370,46 @@ class NVRClient:
# Ensure parent directory exists
output_file.parent.mkdir(parents=True, exist_ok=True)
try:
response = self.session.get(download_url, stream=True, timeout=300)
response.raise_for_status()
# Get total size from headers or recording metadata
total_size = int(response.headers.get("content-length", recording.size_bytes))
downloaded = 0
chunk_size = 8192
with open(output_file, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if progress_callback:
progress_callback(downloaded, total_size)
return output_file
except requests.RequestException as e:
raise NVRAPIError(f"Download failed: {e}") from e
last_error = None
for download_url in download_urls:
try:
log_debug(f"Trying download from: {download_url}")
response = self.session.get(download_url, stream=True, timeout=300)
log_debug(f"Download response: {response.status_code}, Content-Type: {response.headers.get('content-type')}")
response.raise_for_status()
# Check if response is actually video data
content_type = response.headers.get("content-type", "")
if "json" in content_type or "html" in content_type:
# This is an error response, try next URL
log_debug(f"Got non-video response, trying next URL")
continue
# Get total size from headers or recording metadata
total_size = int(response.headers.get("content-length", recording.size_bytes or 0))
downloaded = 0
chunk_size = 8192
with open(output_file, "wb") as f:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if progress_callback:
progress_callback(downloaded, total_size)
log_debug(f"Downloaded {downloaded} bytes to {output_file}")
return output_file
except requests.RequestException as e:
log_error(f"Download from {download_url} failed", e)
last_error = e
continue
raise NVRAPIError(f"Download failed after trying all URLs: {last_error}")
def _generate_filename(self, recording: Recording) -> str:
"""Generate a filename for a recording."""
+496
View File
@@ -0,0 +1,496 @@
"""Web Interface API client for TP-Link Vigi NVR.
Uses the stok-based authentication for the /ds endpoint,
which provides access to recordings and playback.
"""
import hashlib
import json
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Optional
import requests
from tqdm import tqdm
from .debug import log_debug, log_error, log_request, log_response, is_debug_enabled
from .models import Channel, Recording
@dataclass
class StokSession:
"""Holds stok session data."""
stok: str
expires_at: float
@property
def is_expired(self) -> bool:
"""Check if token has expired (valid ~30 min)."""
return time.time() >= self.expires_at
class WebClient:
"""Client for TP-Link Vigi NVR Web Interface API.
Uses /stok={token}/ds endpoint for all operations.
This provides access to recordings, playback, and other features
not available via the OpenAPI.
"""
def __init__(
self,
host: str,
username: str,
password: str,
port: int = 443,
verify_ssl: bool = False,
):
"""
Initialize Web client.
Args:
host: NVR IP address or hostname
username: Admin username
password: Admin password
port: Web interface port (default: 443)
verify_ssl: Whether to verify SSL certificates
"""
self.host = host
self.username = username
self.password = password
self.port = port
self.verify_ssl = verify_ssl
self.base_url = f"https://{host}" if port == 443 else f"https://{host}:{port}"
self._session: Optional[StokSession] = None
self._http_session = requests.Session()
self._http_session.verify = verify_ssl
# Suppress SSL warnings if not verifying
if not verify_ssl:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
log_debug(f"WebClient initialized for {host}:{port}")
@property
def stok(self) -> str:
"""Get current stok token, refreshing if expired."""
if self._session is None or self._session.is_expired:
self._session = self._login()
return self._session.stok
def _hash_password(self, password: str) -> str:
"""Hash password for login. TP-Link uses MD5."""
return hashlib.md5(password.encode()).hexdigest()
def _login(self) -> StokSession:
"""
Login to NVR web interface and obtain stok token.
Returns:
StokSession with stok token
Raises:
WebClientError: If login fails
"""
login_url = f"{self.base_url}/"
log_debug(f"Logging in to {login_url}")
try:
# First, try standard TP-Link login format
# The login endpoint is usually POST to / with form data
login_data = {
"method": "do",
"login": {
"username": self.username,
"password": self._hash_password(self.password),
}
}
log_request("POST", login_url, body=login_data)
response = self._http_session.post(
login_url,
json=login_data,
timeout=30,
)
log_response(response.status_code, dict(response.headers),
response.json() if response.content else None)
if response.status_code == 200:
data = response.json()
stok = data.get("stok", "")
if stok:
log_debug(f"Got stok token: {stok[:20]}...")
# Token valid for ~25 minutes (refresh before 30)
expires_at = time.time() + (25 * 60)
return StokSession(stok=stok, expires_at=expires_at)
# Try alternative login format
alt_login_data = {
"username": self.username,
"password": self._hash_password(self.password),
}
log_debug("Trying alternative login format")
log_request("POST", login_url, body=alt_login_data)
response = self._http_session.post(
login_url,
data=alt_login_data,
timeout=30,
)
log_response(response.status_code, dict(response.headers),
response.text[:500] if response.text else None)
# Check for stok in response
try:
data = response.json()
stok = data.get("stok", data.get("result", {}).get("stok", ""))
if stok:
expires_at = time.time() + (25 * 60)
return StokSession(stok=stok, expires_at=expires_at)
except ValueError:
pass
raise WebClientError("Could not obtain stok token from login response")
except requests.RequestException as e:
log_error("Login failed", e)
raise WebClientError(f"Login failed: {e}") from e
def _ds_request(
self,
data: dict,
) -> dict:
"""
Make a request to the /ds endpoint.
Args:
data: Request data (method, params, etc.)
Returns:
JSON response as dict
"""
url = f"{self.base_url}/stok={self.stok}/ds"
log_request("POST", url, body=data)
try:
response = self._http_session.post(
url,
json=data,
timeout=60,
)
try:
response_data = response.json()
except ValueError:
response_data = {"raw": response.text[:500] if response.text else "empty"}
log_response(response.status_code, dict(response.headers), response_data)
response.raise_for_status()
# Check for error in response
error_code = response_data.get("error_code", 0)
if error_code != 0:
error_msg = response_data.get("error_msg", f"Error code: {error_code}")
raise WebClientError(f"API error: {error_msg}")
return response_data
except requests.RequestException as e:
log_error(f"Request to /ds failed", e)
raise WebClientError(f"Request failed: {e}") from e
def get_channels(self) -> list[Channel]:
"""Get list of camera channels."""
log_debug("Getting channels via /ds")
# Try different methods to get channel list
methods_to_try = [
{"method": "get", "channel": {"table": ["channel"]}},
{"method": "get", "device": {"name": ["device_info"]}},
{"method": "get", "channel_info": {}},
{"method": "do", "channel": {"action": "list"}},
]
for method_data in methods_to_try:
try:
data = self._ds_request(method_data)
# Try to find channels in response
channels = []
# Look for channel data in various places
channel_data = (
data.get("channel", {}).get("table", {}).get("channel", []) or
data.get("channel", []) or
data.get("channels", []) or
data.get("result", {}).get("channels", []) or
[]
)
if channel_data:
for i, ch in enumerate(channel_data):
if isinstance(ch, dict):
channels.append(Channel(
id=ch.get("id", ch.get("channel_id", i + 1)),
name=ch.get("name", ch.get("channel_name", f"Channel {i + 1}")),
enabled=ch.get("enabled", True),
))
if channels:
return channels
except WebClientError as e:
log_debug(f"Method failed: {e}")
continue
# Return default channels if API doesn't provide them
log_debug("Using default channels 1-32")
return [Channel(id=i, name=f"Channel {i}", enabled=True) for i in range(1, 33)]
def search_recordings(
self,
channel_id: int,
start_time: datetime,
end_time: datetime,
recording_type: str = "all",
) -> list[Recording]:
"""
Search for recordings in a time range.
Args:
channel_id: Camera channel ID
start_time: Start of time range
end_time: End of time range
recording_type: Type filter
Returns:
List of Recording objects
"""
start_str = start_time.strftime("%Y%m%d")
end_str = end_time.strftime("%Y%m%d")
start_ts = int(start_time.timestamp())
end_ts = int(end_time.timestamp())
log_debug(f"Searching recordings: ch={channel_id}, {start_time} to {end_time}")
# Try different search methods
methods_to_try = [
{
"method": "get",
"playback": {
"table": ["search"],
"search": {
"channel": channel_id,
"start_date": start_str,
"end_date": end_str,
}
}
},
{
"method": "do",
"playback": {
"action": "search",
"channel": channel_id,
"start_time": start_ts,
"end_time": end_ts,
}
},
{
"method": "get",
"record": {
"table": ["search"],
"search": {
"channel_id": channel_id,
"start": start_ts,
"end": end_ts,
}
}
},
{
"method": "do",
"record": {
"action": "search",
"channel": channel_id,
"start_date": start_str,
"end_date": end_str,
}
},
]
recordings = []
for method_data in methods_to_try:
try:
log_debug(f"Trying: {json.dumps(method_data)[:100]}")
data = self._ds_request(method_data)
# Log full response for debugging
if is_debug_enabled():
log_debug(f"Full response: {json.dumps(data, indent=2, default=str)[:1500]}")
# Look for recordings in various places
record_list = (
data.get("playback", {}).get("table", {}).get("search", []) or
data.get("playback", {}).get("search", []) or
data.get("record", {}).get("table", {}).get("search", []) or
data.get("record", {}).get("search", []) or
data.get("result", {}).get("records", []) or
data.get("records", []) or
data.get("list", []) or
[]
)
log_debug(f"Found {len(record_list)} records in response")
if record_list:
for rec in record_list:
try:
rec_start = self._parse_time(rec.get("start", rec.get("start_time", 0)))
rec_end = self._parse_time(rec.get("end", rec.get("end_time", 0)))
recordings.append(Recording(
id=str(rec.get("id", rec.get("record_id", ""))),
channel_id=channel_id,
start_time=rec_start,
end_time=rec_end,
size_bytes=rec.get("size", rec.get("file_size", 0)),
recording_type=str(rec.get("type", "unknown")),
file_path=rec.get("path", rec.get("file_path")),
))
except Exception as e:
log_debug(f"Error parsing record: {e}")
continue
if recordings:
return recordings
except WebClientError as e:
log_debug(f"Method failed: {e}")
continue
return recordings
def _parse_time(self, t) -> datetime:
"""Parse time from various formats."""
if isinstance(t, (int, float)) and t > 0:
return datetime.fromtimestamp(t)
if isinstance(t, str):
for fmt in ["%Y%m%d%H%M%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"]:
try:
return datetime.strptime(t, fmt)
except ValueError:
continue
try:
return datetime.fromtimestamp(int(t))
except:
pass
return datetime.now()
def download_recording(
self,
recording: Recording,
output_path: Path,
progress_callback: Optional[Callable[[int, int], None]] = None,
) -> Path:
"""Download a recording to disk."""
# Build download URL
if recording.file_path:
download_url = f"{self.base_url}/stok={self.stok}/ds?download={recording.file_path}"
else:
download_url = f"{self.base_url}/stok={self.stok}/ds?download&channel={recording.channel_id}&id={recording.id}"
log_debug(f"Downloading from: {download_url}")
# Determine output filename
if output_path.is_dir():
start_str = recording.start_time.strftime("%Y%m%d_%H%M%S")
end_str = recording.end_time.strftime("%H%M%S")
filename = f"ch{recording.channel_id}_{start_str}-{end_str}.mp4"
output_file = output_path / filename
else:
output_file = output_path
output_file.parent.mkdir(parents=True, exist_ok=True)
try:
response = self._http_session.get(download_url, stream=True, timeout=300)
response.raise_for_status()
total_size = int(response.headers.get("content-length", recording.size_bytes or 0))
downloaded = 0
with open(output_file, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if progress_callback:
progress_callback(downloaded, total_size)
log_debug(f"Downloaded {downloaded} bytes to {output_file}")
return output_file
except requests.RequestException as e:
log_error(f"Download failed", e)
raise WebClientError(f"Download failed: {e}") from e
def export_time_range(
self,
channel_id: int,
start_time: datetime,
end_time: datetime,
output_dir: Path,
recording_type: str = "all",
show_progress: bool = True,
) -> list[Path]:
"""Export all recordings in a time range."""
recordings = self.search_recordings(channel_id, start_time, end_time, recording_type)
if not recordings:
return []
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
downloaded_files = []
recordings_iter = tqdm(recordings, desc="Downloading", disable=not show_progress)
for recording in recordings_iter:
if show_progress:
recordings_iter.set_postfix_str(f"Ch{recording.channel_id} {recording.start_time:%H:%M}")
try:
output_file = self.download_recording(recording, output_dir)
downloaded_files.append(output_file)
except WebClientError as e:
if show_progress:
tqdm.write(f"Warning: Failed to download {recording}: {e}")
return downloaded_files
def close(self) -> None:
"""Close the HTTP session."""
self._http_session.close()
def __enter__(self) -> "WebClient":
return self
def __exit__(self, *args) -> None:
self.close()
class WebClientError(Exception):
"""Raised when web interface operations fail."""
pass