mirror of
https://github.com/MrUnknownDE/tplink-nvr-export.git
synced 2026-05-03 21:46:06 +02:00
Compare commits
5 Commits
v0.1.0-alpha.2
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| d8457b2339 | |||
| 63cf8afd76 | |||
| 1c0508544d | |||
| 2755fcb6cf | |||
| 73730ad2f5 |
+119
-17
@@ -2,11 +2,11 @@
|
||||
|
||||
import hashlib
|
||||
import time
|
||||
import urllib.parse
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
from requests.auth import HTTPDigestAuth
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -29,7 +29,11 @@ class AuthSession:
|
||||
|
||||
|
||||
class NVRAuthenticator:
|
||||
"""Handles authentication with TP-Link Vigi NVR OpenAPI."""
|
||||
"""Handles authentication with TP-Link Vigi NVR OpenAPI.
|
||||
|
||||
The NVR uses HTTP Digest Authentication with SHA-256 to obtain an access_token.
|
||||
Endpoint: /openapi/token
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -71,12 +75,66 @@ class NVRAuthenticator:
|
||||
self._session = self._authenticate()
|
||||
return self._session
|
||||
|
||||
def _calculate_digest_response(
|
||||
self,
|
||||
method: str,
|
||||
uri: str,
|
||||
nonce: str,
|
||||
realm: str,
|
||||
algorithm: str = "SHA-256",
|
||||
) -> str:
|
||||
"""
|
||||
Calculate Digest Authentication response.
|
||||
|
||||
A1 = SHA256(username:realm:password)
|
||||
A2 = SHA256(method:uri)
|
||||
response = SHA256(A1:nonce:A2)
|
||||
"""
|
||||
if algorithm.upper() in ("SHA-256", "SHA256"):
|
||||
hash_func = hashlib.sha256
|
||||
else:
|
||||
hash_func = hashlib.md5
|
||||
|
||||
# Calculate A1
|
||||
a1_string = f"{self.username}:{realm}:{self.password}"
|
||||
a1 = hash_func(a1_string.encode()).hexdigest()
|
||||
|
||||
# Calculate A2
|
||||
a2_string = f"{method}:{uri}"
|
||||
a2 = hash_func(a2_string.encode()).hexdigest()
|
||||
|
||||
# Calculate response
|
||||
response_string = f"{a1}:{nonce}:{a2}"
|
||||
response = hash_func(response_string.encode()).hexdigest()
|
||||
|
||||
return response
|
||||
|
||||
def _parse_www_authenticate(self, header: str) -> dict:
|
||||
"""Parse WWW-Authenticate header into dict."""
|
||||
# Example: Digest realm="VIGI", nonce="abc123", algorithm=SHA-256
|
||||
result = {}
|
||||
# Remove 'Digest ' prefix
|
||||
if header.lower().startswith("digest "):
|
||||
header = header[7:]
|
||||
|
||||
# Parse key=value pairs
|
||||
import re
|
||||
pattern = r'(\w+)=(?:"([^"]+)"|([^\s,]+))'
|
||||
for match in re.finditer(pattern, header):
|
||||
key = match.group(1)
|
||||
value = match.group(2) or match.group(3)
|
||||
result[key] = value
|
||||
|
||||
return result
|
||||
|
||||
def _authenticate(self) -> AuthSession:
|
||||
"""
|
||||
Authenticate with NVR and obtain access token.
|
||||
|
||||
The NVR uses HTTP Digest Authentication for the initial login,
|
||||
then returns a bearer token for subsequent requests.
|
||||
1. GET /openapi/token without auth to get nonce
|
||||
2. Calculate digest response
|
||||
3. GET /openapi/token with Authorization header
|
||||
4. Extract access_token from response
|
||||
|
||||
Returns:
|
||||
AuthSession with access token
|
||||
@@ -84,29 +142,73 @@ class NVRAuthenticator:
|
||||
Raises:
|
||||
AuthenticationError: If authentication fails
|
||||
"""
|
||||
login_url = f"{self.base_url}/api/v1/login"
|
||||
token_url = f"{self.base_url}/openapi/token"
|
||||
uri = "/openapi/token"
|
||||
|
||||
try:
|
||||
# First attempt with Digest Auth
|
||||
response = self._http_session.post(
|
||||
login_url,
|
||||
auth=HTTPDigestAuth(self.username, self.password),
|
||||
json={"method": "login"},
|
||||
# Step 1: Initial request to get nonce
|
||||
response = self._http_session.get(token_url, timeout=30)
|
||||
|
||||
if response.status_code != 401:
|
||||
raise AuthenticationError(
|
||||
f"Expected 401 for initial auth, got {response.status_code}"
|
||||
)
|
||||
|
||||
# Parse WWW-Authenticate header
|
||||
www_auth = response.headers.get("WWW-Authenticate", "")
|
||||
if not www_auth:
|
||||
raise AuthenticationError("No WWW-Authenticate header in response")
|
||||
|
||||
auth_params = self._parse_www_authenticate(www_auth)
|
||||
nonce = auth_params.get("nonce", "")
|
||||
realm = auth_params.get("realm", "VIGI")
|
||||
algorithm = auth_params.get("algorithm", "SHA-256")
|
||||
|
||||
if not nonce:
|
||||
raise AuthenticationError("No nonce in WWW-Authenticate header")
|
||||
|
||||
# Step 2: Calculate digest response
|
||||
digest_response = self._calculate_digest_response(
|
||||
method="GET",
|
||||
uri=uri,
|
||||
nonce=nonce,
|
||||
realm=realm,
|
||||
algorithm=algorithm,
|
||||
)
|
||||
|
||||
# Step 3: Build Authorization header
|
||||
auth_header = (
|
||||
f'Digest username="{self.username}", '
|
||||
f'realm="{realm}", '
|
||||
f'nonce="{nonce}", '
|
||||
f'uri="{uri}", '
|
||||
f'algorithm={algorithm}, '
|
||||
f'response="{digest_response}"'
|
||||
)
|
||||
|
||||
# Step 4: Authenticated request
|
||||
response = self._http_session.get(
|
||||
token_url,
|
||||
headers={"Authorization": auth_header},
|
||||
timeout=30,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
|
||||
if data.get("error_code", 0) != 0:
|
||||
error_msg = data.get("error_msg", "Unknown error")
|
||||
raise AuthenticationError(f"Login failed: {error_msg}")
|
||||
|
||||
result = data.get("result", {})
|
||||
access_token = result.get("stok", "")
|
||||
# Extract access_token
|
||||
access_token = data.get("access_token", data.get("token", ""))
|
||||
|
||||
if not access_token:
|
||||
raise AuthenticationError("No access token in response")
|
||||
# Try to get from result object
|
||||
result = data.get("result", {})
|
||||
access_token = result.get("access_token", result.get("stok", ""))
|
||||
|
||||
if not access_token:
|
||||
raise AuthenticationError(f"No access token in response: {data}")
|
||||
|
||||
# URL decode the token
|
||||
access_token = urllib.parse.unquote(access_token)
|
||||
|
||||
# Token typically expires in 1 hour, refresh at 50 minutes
|
||||
expires_at = time.time() + (50 * 60)
|
||||
|
||||
+325
-13
@@ -9,6 +9,7 @@ import click
|
||||
|
||||
from . import __version__
|
||||
from .auth import AuthenticationError
|
||||
from .debug import setup_debug_logging
|
||||
from .nvr_client import NVRAPIError, NVRClient
|
||||
|
||||
|
||||
@@ -56,20 +57,35 @@ DATETIME = DateTimeParamType()
|
||||
|
||||
@click.group()
|
||||
@click.version_option(version=__version__, prog_name="nvr-export")
|
||||
def main():
|
||||
@click.option("--debug", "-d", is_flag=True, help="Enable debug logging (shows all API requests/responses)")
|
||||
@click.option("--debug-file", type=click.Path(), help="Write debug log to file")
|
||||
@click.pass_context
|
||||
def main(ctx, debug: bool, debug_file: str):
|
||||
"""TP-Link Vigi NVR Export Tool.
|
||||
|
||||
Export video recordings from TP-Link Vigi NVRs via OpenAPI.
|
||||
|
||||
Make sure OpenAPI is enabled on your NVR:
|
||||
Settings > Network > OpenAPI (default port: 20443)
|
||||
|
||||
\b
|
||||
Debug mode:
|
||||
nvr-export --debug export ...
|
||||
nvr-export --debug --debug-file debug.log export ...
|
||||
"""
|
||||
pass
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj['debug'] = debug
|
||||
|
||||
# Setup debug logging
|
||||
setup_debug_logging(enabled=debug, log_file=debug_file)
|
||||
|
||||
if debug:
|
||||
click.echo("🔧 Debug mode enabled - all API requests will be logged", err=True)
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.option("--host", "-h", required=True, help="NVR IP address or hostname")
|
||||
@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)")
|
||||
@click.option("--port", "-p", default=443, help="Port (default: 443 for web, 20443 for openapi)")
|
||||
@click.option("--user", "-u", required=True, help="Admin username")
|
||||
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
|
||||
@click.option("--channel", "-c", required=True, type=int, help="Camera channel ID (1-based)")
|
||||
@@ -81,7 +97,9 @@ def main():
|
||||
help="Recording type filter")
|
||||
@click.option("--no-ssl-verify", is_flag=True, default=True, help="Skip SSL certificate verification")
|
||||
@click.option("--quiet", "-q", is_flag=True, help="Suppress progress output")
|
||||
@click.pass_context
|
||||
def export(
|
||||
ctx,
|
||||
host: str,
|
||||
port: int,
|
||||
user: str,
|
||||
@@ -96,23 +114,27 @@ def export(
|
||||
):
|
||||
"""Export recordings from NVR for a time range.
|
||||
|
||||
Uses the web interface API (/stok/ds) for recording access.
|
||||
|
||||
\b
|
||||
Examples:
|
||||
# Export channel 1 for a specific day
|
||||
nvr-export export -h 192.168.1.100 -u admin -c 1 \\
|
||||
-s "2024-12-28 00:00" -e "2024-12-28 23:59" -o ./exports
|
||||
|
||||
# Export only motion recordings
|
||||
nvr-export export -h 192.168.1.100 -u admin -c 2 \\
|
||||
-s "2024-12-01" -e "2024-12-31" --type motion -o ./exports
|
||||
# Export with debug logging
|
||||
nvr-export --debug export -h 192.168.1.100 -u admin -c 1 \\
|
||||
-s "2024-12-28" -e "2024-12-29" -o ./exports
|
||||
"""
|
||||
from .web_client import WebClient, WebClientError
|
||||
|
||||
output_dir = Path(output)
|
||||
|
||||
if not quiet:
|
||||
click.echo(f"Connecting to NVR at {host}:{port}...")
|
||||
click.echo(f"Connecting to NVR web interface at {host}:{port}...")
|
||||
|
||||
try:
|
||||
with NVRClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client:
|
||||
with WebClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client:
|
||||
if not quiet:
|
||||
click.echo(f"Searching recordings: Channel {channel}, {start} to {end}")
|
||||
|
||||
@@ -120,6 +142,8 @@ def export(
|
||||
|
||||
if not recordings:
|
||||
click.echo("No recordings found for the specified time range.")
|
||||
if ctx.obj.get('debug'):
|
||||
click.echo("💡 Tip: Check debug output above for API responses", err=True)
|
||||
return
|
||||
|
||||
# Calculate total size
|
||||
@@ -136,14 +160,14 @@ def export(
|
||||
if not quiet:
|
||||
click.echo(f"\nSuccessfully exported {len(downloaded)} recordings to {output_dir}")
|
||||
|
||||
except AuthenticationError as e:
|
||||
click.echo(f"Authentication failed: {e}", err=True)
|
||||
sys.exit(1)
|
||||
except NVRAPIError as e:
|
||||
click.echo(f"NVR API error: {e}", err=True)
|
||||
except WebClientError as e:
|
||||
click.echo(f"Web interface error: {e}", err=True)
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
click.echo(f"Unexpected error: {e}", err=True)
|
||||
if ctx.obj.get('debug'):
|
||||
import traceback
|
||||
click.echo(traceback.format_exc(), err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@@ -153,7 +177,9 @@ def export(
|
||||
@click.option("--user", "-u", required=True, help="Admin username")
|
||||
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
|
||||
@click.option("--no-ssl-verify", is_flag=True, default=True, help="Skip SSL certificate verification")
|
||||
@click.pass_context
|
||||
def channels(
|
||||
ctx,
|
||||
host: str,
|
||||
port: int,
|
||||
user: str,
|
||||
@@ -196,7 +222,9 @@ def channels(
|
||||
@click.option("--type", "rec_type", default="all",
|
||||
type=click.Choice(["all", "continuous", "motion", "alarm"]))
|
||||
@click.option("--no-ssl-verify", is_flag=True, default=True)
|
||||
@click.pass_context
|
||||
def search(
|
||||
ctx,
|
||||
host: str,
|
||||
port: int,
|
||||
user: str,
|
||||
@@ -214,6 +242,8 @@ def search(
|
||||
|
||||
if not recordings:
|
||||
click.echo("No recordings found.")
|
||||
if ctx.obj.get('debug'):
|
||||
click.echo("💡 Tip: Check debug output above for API responses", err=True)
|
||||
return
|
||||
|
||||
total_size = sum(r.size_bytes for r in recordings) / (1024 * 1024)
|
||||
@@ -238,5 +268,287 @@ def search(
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@main.command()
|
||||
@click.option("--host", "-h", required=True, help="NVR IP address or hostname")
|
||||
@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)")
|
||||
@click.option("--user", "-u", required=True, help="Admin username")
|
||||
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
|
||||
@click.option("--no-ssl-verify", is_flag=True, default=True)
|
||||
@click.pass_context
|
||||
def discover(
|
||||
ctx,
|
||||
host: str,
|
||||
port: int,
|
||||
user: str,
|
||||
password: str,
|
||||
no_ssl_verify: bool,
|
||||
):
|
||||
"""Discover available API endpoints on the NVR.
|
||||
|
||||
This command probes many possible API paths to find which ones
|
||||
are available on your NVR. Useful for debugging and development.
|
||||
"""
|
||||
import requests
|
||||
|
||||
# Common endpoints to try
|
||||
endpoints_to_probe = [
|
||||
# Root and info
|
||||
("GET", "/openapi"),
|
||||
("GET", "/openapi/"),
|
||||
("GET", "/openapi/info"),
|
||||
("GET", "/openapi/version"),
|
||||
("GET", "/openapi/api"),
|
||||
("GET", "/openapi/swagger"),
|
||||
("GET", "/openapi/docs"),
|
||||
|
||||
# Device/channels
|
||||
("GET", "/openapi/device"),
|
||||
("GET", "/openapi/devices"),
|
||||
("GET", "/openapi/added_devices"),
|
||||
("GET", "/openapi/channel"),
|
||||
("GET", "/openapi/channels"),
|
||||
("GET", "/openapi/channelList"),
|
||||
("GET", "/openapi/device/info"),
|
||||
("GET", "/openapi/device/list"),
|
||||
("GET", "/openapi/device/channels"),
|
||||
("GET", "/openapi/nvr/info"),
|
||||
("GET", "/openapi/nvr/channels"),
|
||||
|
||||
# Recording/playback
|
||||
("GET", "/openapi/record"),
|
||||
("GET", "/openapi/records"),
|
||||
("GET", "/openapi/recording"),
|
||||
("GET", "/openapi/recordings"),
|
||||
("GET", "/openapi/playback"),
|
||||
("GET", "/openapi/video"),
|
||||
("GET", "/openapi/videos"),
|
||||
("GET", "/openapi/storage"),
|
||||
("GET", "/openapi/storage/recordings"),
|
||||
("GET", "/openapi/hdd"),
|
||||
("GET", "/openapi/disk"),
|
||||
|
||||
# Search endpoints
|
||||
("GET", "/openapi/search"),
|
||||
("GET", "/openapi/record/list"),
|
||||
("GET", "/openapi/recording/list"),
|
||||
("GET", "/openapi/playback/list"),
|
||||
("GET", "/openapi/video/list"),
|
||||
|
||||
# Live stream
|
||||
("GET", "/openapi/live"),
|
||||
("GET", "/openapi/stream"),
|
||||
("GET", "/openapi/liveStream"),
|
||||
("GET", "/openapi/rtsp"),
|
||||
|
||||
# Events
|
||||
("GET", "/openapi/event"),
|
||||
("GET", "/openapi/events"),
|
||||
("GET", "/openapi/event/list"),
|
||||
("GET", "/openapi/alarm"),
|
||||
("GET", "/openapi/alarms"),
|
||||
|
||||
# Settings
|
||||
("GET", "/openapi/settings"),
|
||||
("GET", "/openapi/config"),
|
||||
("GET", "/openapi/system"),
|
||||
("GET", "/openapi/sound"),
|
||||
("GET", "/openapi/network"),
|
||||
|
||||
# User
|
||||
("GET", "/openapi/user"),
|
||||
("GET", "/openapi/users"),
|
||||
("GET", "/openapi/account"),
|
||||
]
|
||||
|
||||
click.echo(f"Connecting to NVR at {host}:{port}...")
|
||||
|
||||
try:
|
||||
from .auth import NVRAuthenticator
|
||||
|
||||
auth = NVRAuthenticator(host, user, password, port, verify_ssl=not no_ssl_verify)
|
||||
session = auth.get_authenticated_session()
|
||||
base_url = f"https://{host}:{port}"
|
||||
|
||||
click.echo(f"Authentication successful!")
|
||||
click.echo(f"\nProbing {len(endpoints_to_probe)} endpoints...\n")
|
||||
|
||||
found_endpoints = []
|
||||
|
||||
for method, endpoint in endpoints_to_probe:
|
||||
url = f"{base_url}{endpoint}"
|
||||
try:
|
||||
if method == "GET":
|
||||
response = session.get(url, timeout=5)
|
||||
else:
|
||||
response = session.post(url, json={}, timeout=5)
|
||||
|
||||
status = response.status_code
|
||||
|
||||
if status == 200:
|
||||
# Try to get response preview
|
||||
try:
|
||||
data = response.json()
|
||||
preview = str(data)[:100]
|
||||
except:
|
||||
preview = response.text[:100] if response.text else "(empty)"
|
||||
|
||||
click.echo(f"✅ {status} {method:4} {endpoint}")
|
||||
click.echo(f" Response: {preview}...")
|
||||
found_endpoints.append((method, endpoint, status))
|
||||
elif status in (400, 401, 403, 405):
|
||||
# Exists but needs different params/auth
|
||||
click.echo(f"⚠️ {status} {method:4} {endpoint} (exists but access denied/bad request)")
|
||||
found_endpoints.append((method, endpoint, status))
|
||||
# 404 = not found, skip silently
|
||||
|
||||
except requests.RequestException as e:
|
||||
pass # Skip connection errors
|
||||
|
||||
click.echo(f"\n{'='*60}")
|
||||
click.echo(f"Summary: Found {len(found_endpoints)} accessible endpoints")
|
||||
|
||||
if found_endpoints:
|
||||
click.echo("\nWorking endpoints (200 OK):")
|
||||
for method, endpoint, status in found_endpoints:
|
||||
if status == 200:
|
||||
click.echo(f" {method} {endpoint}")
|
||||
|
||||
click.echo("\nEndpoints that exist but need work:")
|
||||
for method, endpoint, status in found_endpoints:
|
||||
if status != 200:
|
||||
click.echo(f" {method} {endpoint} ({status})")
|
||||
else:
|
||||
click.echo("\n⚠️ No working endpoints found!")
|
||||
click.echo("The NVR might use a different API structure.")
|
||||
click.echo("Try checking the NVR web interface with browser DevTools.")
|
||||
|
||||
except AuthenticationError as e:
|
||||
click.echo(f"Authentication failed: {e}", err=True)
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
click.echo(f"Error: {e}", err=True)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
@main.command("login-test")
|
||||
@click.option("--host", "-h", required=True, help="NVR IP address or hostname")
|
||||
@click.option("--user", "-u", required=True, help="Admin username")
|
||||
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
|
||||
@click.option("--no-ssl-verify", is_flag=True, default=True)
|
||||
@click.pass_context
|
||||
def login_test(
|
||||
ctx,
|
||||
host: str,
|
||||
user: str,
|
||||
password: str,
|
||||
no_ssl_verify: bool,
|
||||
):
|
||||
"""Test different login formats to find the correct one.
|
||||
|
||||
This probes multiple login endpoints and formats to discover
|
||||
how your NVR handles authentication.
|
||||
"""
|
||||
import requests
|
||||
import hashlib
|
||||
|
||||
# Suppress SSL warnings
|
||||
if no_ssl_verify:
|
||||
import urllib3
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
session = requests.Session()
|
||||
session.verify = not no_ssl_verify
|
||||
base_url = f"https://{host}"
|
||||
|
||||
pw_md5 = hashlib.md5(password.encode()).hexdigest()
|
||||
pw_sha256 = hashlib.sha256(password.encode()).hexdigest()
|
||||
|
||||
click.echo(f"Testing login formats for {host}...")
|
||||
click.echo(f"Username: {user}")
|
||||
click.echo(f"Password MD5: {pw_md5}")
|
||||
click.echo(f"Password SHA256: {pw_sha256[:32]}...")
|
||||
click.echo("")
|
||||
|
||||
# Various login formats to try
|
||||
login_attempts = [
|
||||
# Format 1: POST / with JSON method:do login
|
||||
("POST", "/", {"method": "do", "login": {"username": user, "password": pw_md5}}, "JSON method:do login MD5"),
|
||||
("POST", "/", {"method": "do", "login": {"username": user, "password": password}}, "JSON method:do login plain"),
|
||||
|
||||
# Format 2: POST with form data
|
||||
("POST", "/", {"username": user, "password": pw_md5}, "Form data MD5"),
|
||||
("POST", "/", {"username": user, "password": password}, "Form data plain"),
|
||||
|
||||
# Format 3: Different endpoints
|
||||
("POST", "/login", {"username": user, "password": pw_md5}, "/login endpoint MD5"),
|
||||
("POST", "/api/login", {"username": user, "password": pw_md5}, "/api/login endpoint"),
|
||||
("POST", "/cgi-bin/login.cgi", {"username": user, "password": pw_md5}, "/cgi-bin/login.cgi"),
|
||||
|
||||
# Format 4: JSON-RPC style
|
||||
("POST", "/", {"jsonrpc": "2.0", "method": "login", "params": {"username": user, "password": pw_md5}}, "JSON-RPC login"),
|
||||
|
||||
# Format 5: TP-Link specific
|
||||
("POST", "/", {"method": "login", "data": {"username": user, "password": pw_md5}}, "TP-Link data format"),
|
||||
("POST", "/", {"operation": "login", "username": user, "password": pw_md5}, "operation:login format"),
|
||||
|
||||
# Format 6: Try without password hash
|
||||
("POST", "/", {"method": "do", "login": {"username": user}}, "JSON method:do username only"),
|
||||
]
|
||||
|
||||
found_stok = False
|
||||
|
||||
for method, endpoint, data, description in login_attempts:
|
||||
url = f"{base_url}{endpoint}"
|
||||
try:
|
||||
if isinstance(data, dict) and any(k in ["method", "jsonrpc", "operation"] for k in data.keys()):
|
||||
# JSON request
|
||||
response = session.post(url, json=data, timeout=10)
|
||||
else:
|
||||
# Form data request
|
||||
response = session.post(url, data=data, timeout=10)
|
||||
|
||||
try:
|
||||
result = response.json()
|
||||
result_str = str(result)[:200]
|
||||
|
||||
# Check for stok in response
|
||||
stok = None
|
||||
if isinstance(result, dict):
|
||||
stok = result.get("stok", result.get("result", {}).get("stok", None) if isinstance(result.get("result"), dict) else None)
|
||||
|
||||
if stok:
|
||||
click.echo(f"✅ SUCCESS: {description}")
|
||||
click.echo(f" URL: {url}")
|
||||
click.echo(f" Data: {data}")
|
||||
click.echo(f" STOK: {stok}")
|
||||
found_stok = True
|
||||
elif result.get("error_code", 0) == 0:
|
||||
click.echo(f"⚠️ {description} - 200 OK but no stok")
|
||||
click.echo(f" Response: {result_str}")
|
||||
else:
|
||||
error_code = result.get("error_code", "unknown")
|
||||
click.echo(f"❌ {description} - error_code: {error_code}")
|
||||
|
||||
except ValueError:
|
||||
if response.status_code == 200:
|
||||
click.echo(f"⚠️ {description} - 200 OK but not JSON")
|
||||
click.echo(f" Response: {response.text[:100]}")
|
||||
|
||||
except requests.RequestException as e:
|
||||
click.echo(f"❌ {description} - Connection error: {e}")
|
||||
|
||||
click.echo("")
|
||||
if found_stok:
|
||||
click.echo("✅ Found working login format! Use the format marked with SUCCESS.")
|
||||
else:
|
||||
click.echo("❌ No working login format found.")
|
||||
click.echo("")
|
||||
click.echo("Please capture the actual login request from your browser:")
|
||||
click.echo("1. Open browser DevTools (F12)")
|
||||
click.echo("2. Go to Network tab")
|
||||
click.echo("3. Log into NVR web interface")
|
||||
click.echo("4. Find the login request and share the Request Payload")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -0,0 +1,111 @@
|
||||
"""Debug logging utilities for NVR Export."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
|
||||
# Create logger
|
||||
logger = logging.getLogger("nvr_export")
|
||||
|
||||
|
||||
def setup_debug_logging(enabled: bool = False, log_file: str = None) -> None:
|
||||
"""
|
||||
Setup debug logging.
|
||||
|
||||
Args:
|
||||
enabled: Whether to enable debug logging
|
||||
log_file: Optional file path to write logs to
|
||||
"""
|
||||
level = logging.DEBUG if enabled else logging.WARNING
|
||||
|
||||
# Clear existing handlers
|
||||
logger.handlers.clear()
|
||||
logger.setLevel(level)
|
||||
|
||||
# Console handler
|
||||
console_handler = logging.StreamHandler(sys.stderr)
|
||||
console_handler.setLevel(level)
|
||||
|
||||
# Formatter with timestamp
|
||||
formatter = logging.Formatter(
|
||||
"[%(asctime)s] %(levelname)s: %(message)s",
|
||||
datefmt="%H:%M:%S"
|
||||
)
|
||||
console_handler.setFormatter(formatter)
|
||||
logger.addHandler(console_handler)
|
||||
|
||||
# File handler if specified
|
||||
if log_file:
|
||||
file_handler = logging.FileHandler(log_file, mode='w', encoding='utf-8')
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
file_handler.setFormatter(logging.Formatter(
|
||||
"[%(asctime)s] %(levelname)s: %(message)s",
|
||||
datefmt="%Y-%m-%d %H:%M:%S"
|
||||
))
|
||||
logger.addHandler(file_handler)
|
||||
|
||||
if enabled:
|
||||
logger.debug("Debug logging enabled")
|
||||
|
||||
|
||||
def log_request(method: str, url: str, headers: dict = None, body: Any = None) -> None:
|
||||
"""Log an outgoing HTTP request."""
|
||||
logger.debug(f">>> {method} {url}")
|
||||
if headers:
|
||||
# Filter out sensitive headers
|
||||
safe_headers = {k: v for k, v in headers.items() if 'auth' not in k.lower()}
|
||||
if safe_headers:
|
||||
logger.debug(f" Headers: {safe_headers}")
|
||||
if body:
|
||||
try:
|
||||
body_str = json.dumps(body, indent=2, default=str)
|
||||
logger.debug(f" Body: {body_str}")
|
||||
except (TypeError, ValueError):
|
||||
logger.debug(f" Body: {body}")
|
||||
|
||||
|
||||
def log_response(status_code: int, headers: dict = None, body: Any = None, truncate: int = 2000) -> None:
|
||||
"""Log an incoming HTTP response."""
|
||||
logger.debug(f"<<< Response: {status_code}")
|
||||
if headers:
|
||||
content_type = headers.get('content-type', 'unknown')
|
||||
content_length = headers.get('content-length', 'unknown')
|
||||
logger.debug(f" Content-Type: {content_type}, Length: {content_length}")
|
||||
if body:
|
||||
try:
|
||||
if isinstance(body, (dict, list)):
|
||||
body_str = json.dumps(body, indent=2, default=str)
|
||||
else:
|
||||
body_str = str(body)
|
||||
|
||||
if len(body_str) > truncate:
|
||||
body_str = body_str[:truncate] + f"\n... (truncated, total {len(body_str)} chars)"
|
||||
logger.debug(f" Body: {body_str}")
|
||||
except (TypeError, ValueError):
|
||||
logger.debug(f" Body: {body}")
|
||||
|
||||
|
||||
def log_error(message: str, exception: Exception = None) -> None:
|
||||
"""Log an error."""
|
||||
if exception:
|
||||
logger.error(f"{message}: {type(exception).__name__}: {exception}")
|
||||
else:
|
||||
logger.error(message)
|
||||
|
||||
|
||||
def log_info(message: str) -> None:
|
||||
"""Log info message."""
|
||||
logger.info(message)
|
||||
|
||||
|
||||
def log_debug(message: str) -> None:
|
||||
"""Log debug message."""
|
||||
logger.debug(message)
|
||||
|
||||
|
||||
def is_debug_enabled() -> bool:
|
||||
"""Check if debug logging is enabled."""
|
||||
return logger.level <= logging.DEBUG
|
||||
+248
-111
@@ -9,11 +9,16 @@ import requests
|
||||
from tqdm import tqdm
|
||||
|
||||
from .auth import AuthenticationError, NVRAuthenticator
|
||||
from .debug import log_debug, log_error, log_request, log_response, is_debug_enabled
|
||||
from .models import Channel, ExportJob, Recording
|
||||
|
||||
|
||||
class NVRClient:
|
||||
"""Client for TP-Link Vigi NVR API operations."""
|
||||
"""Client for TP-Link Vigi NVR OpenAPI operations.
|
||||
|
||||
Base endpoint: /openapi/
|
||||
Authentication: Bearer token from /openapi/token
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -38,6 +43,7 @@ class NVRClient:
|
||||
self.base_url = f"https://{host}:{port}"
|
||||
self.auth = NVRAuthenticator(host, username, password, port, verify_ssl)
|
||||
self._session: Optional[requests.Session] = None
|
||||
log_debug(f"NVRClient initialized for {host}:{port}")
|
||||
|
||||
@property
|
||||
def session(self) -> requests.Session:
|
||||
@@ -57,7 +63,7 @@ class NVRClient:
|
||||
|
||||
Args:
|
||||
method: HTTP method (GET, POST, etc.)
|
||||
endpoint: API endpoint path
|
||||
endpoint: API endpoint path (will be prefixed with /openapi/)
|
||||
**kwargs: Additional arguments for requests
|
||||
|
||||
Returns:
|
||||
@@ -66,21 +72,44 @@ class NVRClient:
|
||||
Raises:
|
||||
NVRAPIError: If request fails
|
||||
"""
|
||||
# Ensure endpoint starts with /openapi/
|
||||
if not endpoint.startswith("/openapi/"):
|
||||
endpoint = f"/openapi/{endpoint.lstrip('/')}"
|
||||
|
||||
url = f"{self.base_url}{endpoint}"
|
||||
|
||||
# Log request
|
||||
log_request(method, url, kwargs.get('headers'), kwargs.get('json'))
|
||||
|
||||
try:
|
||||
response = self.session.request(method, url, timeout=60, **kwargs)
|
||||
|
||||
# Log response
|
||||
try:
|
||||
response_data = response.json() if response.content else {}
|
||||
except ValueError:
|
||||
response_data = {"raw": response.text[:500] if response.text else "empty"}
|
||||
|
||||
log_response(response.status_code, dict(response.headers), response_data)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
# Some endpoints might return empty response
|
||||
if not response.content:
|
||||
return {}
|
||||
|
||||
data = response.json()
|
||||
|
||||
if data.get("error_code", 0) != 0:
|
||||
error_msg = data.get("error_msg", "Unknown API error")
|
||||
raise NVRAPIError(f"API error: {error_msg}")
|
||||
# Check for error in response
|
||||
error_code = data.get("error_code", data.get("errorCode", 0))
|
||||
if error_code != 0:
|
||||
error_msg = data.get("error_msg", data.get("errorMsg", "Unknown API error"))
|
||||
raise NVRAPIError(f"API error {error_code}: {error_msg}")
|
||||
|
||||
return data
|
||||
|
||||
except requests.RequestException as e:
|
||||
log_error(f"Request to {endpoint} failed", e)
|
||||
raise NVRAPIError(f"Request failed: {e}") from e
|
||||
|
||||
def get_channels(self) -> list[Channel]:
|
||||
@@ -90,33 +119,66 @@ class NVRClient:
|
||||
Returns:
|
||||
List of Channel objects
|
||||
"""
|
||||
# Try the standard device/channels endpoint
|
||||
try:
|
||||
data = self._api_request("POST", "/api/v1/channels", json={"method": "get"})
|
||||
channels = []
|
||||
|
||||
for ch_data in data.get("result", {}).get("channel_list", []):
|
||||
channels.append(Channel(
|
||||
id=ch_data.get("channel_id", 0),
|
||||
name=ch_data.get("channel_name", f"Channel {ch_data.get('channel_id', 0)}"),
|
||||
enabled=ch_data.get("enabled", True),
|
||||
))
|
||||
|
||||
return channels
|
||||
|
||||
except NVRAPIError:
|
||||
# Fallback: try alternative endpoint structure
|
||||
data = self._api_request("POST", "/api/v1/device", json={"method": "getChannels"})
|
||||
channels = []
|
||||
|
||||
for ch_data in data.get("result", {}).get("channels", []):
|
||||
channels.append(Channel(
|
||||
id=ch_data.get("id", 0),
|
||||
name=ch_data.get("name", f"Channel {ch_data.get('id', 0)}"),
|
||||
enabled=ch_data.get("status", "on") == "on",
|
||||
))
|
||||
|
||||
return channels
|
||||
channels = []
|
||||
|
||||
# Try different endpoint patterns
|
||||
endpoints_to_try = [
|
||||
("GET", "added_devices"),
|
||||
("GET", "channels"),
|
||||
("POST", "channels", {"method": "get"}),
|
||||
("GET", "device/channels"),
|
||||
]
|
||||
|
||||
log_debug(f"Trying {len(endpoints_to_try)} endpoint patterns for channels")
|
||||
|
||||
for endpoint_info in endpoints_to_try:
|
||||
try:
|
||||
method = endpoint_info[0]
|
||||
endpoint = endpoint_info[1]
|
||||
json_data = endpoint_info[2] if len(endpoint_info) > 2 else None
|
||||
|
||||
log_debug(f"Trying endpoint: {method} {endpoint}")
|
||||
|
||||
if json_data:
|
||||
data = self._api_request(method, endpoint, json=json_data)
|
||||
else:
|
||||
data = self._api_request(method, endpoint)
|
||||
|
||||
log_debug(f"Response keys: {list(data.keys()) if data else 'empty'}")
|
||||
|
||||
# Parse response - try different structures
|
||||
channel_list = (
|
||||
data.get("result", {}).get("channel_list", []) or
|
||||
data.get("result", {}).get("channels", []) or
|
||||
data.get("result", {}).get("devices", []) or
|
||||
data.get("channel_list", []) or
|
||||
data.get("channels", []) or
|
||||
data.get("devices", []) or
|
||||
[]
|
||||
)
|
||||
|
||||
log_debug(f"Found {len(channel_list)} channels in response")
|
||||
|
||||
if channel_list:
|
||||
for ch_data in channel_list:
|
||||
channels.append(Channel(
|
||||
id=ch_data.get("channel_id", ch_data.get("id", ch_data.get("channelId", 0))),
|
||||
name=ch_data.get("channel_name", ch_data.get("name", ch_data.get("deviceName", f"Channel"))),
|
||||
enabled=ch_data.get("enabled", ch_data.get("status", "on") == "on"),
|
||||
))
|
||||
return channels
|
||||
|
||||
except NVRAPIError as e:
|
||||
log_debug(f"Endpoint {endpoint} failed: {e}")
|
||||
continue
|
||||
|
||||
# If no channels found via API, return default channels 1-8
|
||||
log_debug("No channels found via API, returning defaults 1-8")
|
||||
if not channels:
|
||||
for i in range(1, 9):
|
||||
channels.append(Channel(id=i, name=f"Channel {i}", enabled=True))
|
||||
|
||||
return channels
|
||||
|
||||
def search_recordings(
|
||||
self,
|
||||
@@ -137,10 +199,17 @@ class NVRClient:
|
||||
Returns:
|
||||
List of Recording objects matching criteria
|
||||
"""
|
||||
# Format times as expected by NVR API (typically Unix timestamp or ISO format)
|
||||
# Format times as expected by NVR API
|
||||
start_ts = int(start_time.timestamp())
|
||||
end_ts = int(end_time.timestamp())
|
||||
|
||||
# ISO format alternative
|
||||
start_iso = start_time.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
end_iso = end_time.strftime("%Y-%m-%dT%H:%M:%S")
|
||||
|
||||
log_debug(f"Searching recordings: channel={channel_id}, start={start_time}, end={end_time}")
|
||||
log_debug(f"Timestamps: start_ts={start_ts}, end_ts={end_ts}")
|
||||
|
||||
type_map = {
|
||||
"all": 0,
|
||||
"continuous": 1,
|
||||
@@ -149,69 +218,114 @@ class NVRClient:
|
||||
}
|
||||
rec_type = type_map.get(recording_type, 0)
|
||||
|
||||
# Try primary search endpoint
|
||||
try:
|
||||
data = self._api_request(
|
||||
"POST",
|
||||
"/api/v1/playback/search",
|
||||
json={
|
||||
"method": "searchRecordings",
|
||||
"channel_id": channel_id,
|
||||
"start_time": start_ts,
|
||||
"end_time": end_ts,
|
||||
"record_type": rec_type,
|
||||
},
|
||||
)
|
||||
except NVRAPIError:
|
||||
# Fallback endpoint format
|
||||
data = self._api_request(
|
||||
"POST",
|
||||
"/api/v1/record/search",
|
||||
json={
|
||||
"method": "search",
|
||||
"params": {
|
||||
"channel": channel_id,
|
||||
"start": start_ts,
|
||||
"end": end_ts,
|
||||
"type": rec_type,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
recordings = []
|
||||
result = data.get("result", {})
|
||||
record_list = result.get("record_list", result.get("recordings", []))
|
||||
|
||||
for rec_data in record_list:
|
||||
# Parse timestamps (could be Unix or string format)
|
||||
rec_start = self._parse_timestamp(rec_data.get("start_time", rec_data.get("start", 0)))
|
||||
rec_end = self._parse_timestamp(rec_data.get("end_time", rec_data.get("end", 0)))
|
||||
|
||||
recordings.append(Recording(
|
||||
id=str(rec_data.get("record_id", rec_data.get("id", ""))),
|
||||
channel_id=channel_id,
|
||||
start_time=rec_start,
|
||||
end_time=rec_end,
|
||||
size_bytes=rec_data.get("size", rec_data.get("file_size", 0)),
|
||||
recording_type=rec_data.get("type", rec_data.get("record_type", "unknown")),
|
||||
file_path=rec_data.get("file_path", rec_data.get("path")),
|
||||
))
|
||||
# Try different endpoint patterns
|
||||
endpoints_to_try = [
|
||||
("POST", "playback/search", {
|
||||
"channelId": channel_id,
|
||||
"startTime": start_ts,
|
||||
"endTime": end_ts,
|
||||
"recordType": rec_type,
|
||||
}),
|
||||
("POST", "record/search", {
|
||||
"channel": channel_id,
|
||||
"start": start_ts,
|
||||
"end": end_ts,
|
||||
"type": rec_type,
|
||||
}),
|
||||
("GET", f"playback/search?channelId={channel_id}&startTime={start_ts}&endTime={end_ts}"),
|
||||
("POST", "search", {
|
||||
"method": "searchRecordings",
|
||||
"params": {
|
||||
"channel_id": channel_id,
|
||||
"start_time": start_iso,
|
||||
"end_time": end_iso,
|
||||
}
|
||||
}),
|
||||
]
|
||||
|
||||
log_debug(f"Trying {len(endpoints_to_try)} endpoint patterns for recordings")
|
||||
|
||||
for endpoint_info in endpoints_to_try:
|
||||
try:
|
||||
method = endpoint_info[0]
|
||||
endpoint = endpoint_info[1]
|
||||
json_data = endpoint_info[2] if len(endpoint_info) > 2 and isinstance(endpoint_info[2], dict) else None
|
||||
|
||||
log_debug(f"Trying endpoint: {method} {endpoint}")
|
||||
|
||||
if json_data:
|
||||
data = self._api_request(method, endpoint, json=json_data)
|
||||
else:
|
||||
data = self._api_request(method, endpoint)
|
||||
|
||||
log_debug(f"Response keys: {list(data.keys()) if data else 'empty'}")
|
||||
|
||||
# Log full response structure for debugging
|
||||
if is_debug_enabled():
|
||||
import json as json_module
|
||||
log_debug(f"Full response: {json_module.dumps(data, indent=2, default=str)[:1000]}")
|
||||
|
||||
# Parse response - try different structures
|
||||
result = data.get("result", data)
|
||||
record_list = (
|
||||
result.get("record_list", []) or
|
||||
result.get("recordings", []) or
|
||||
result.get("recordList", []) or
|
||||
result.get("items", []) or
|
||||
result.get("searchResult", []) or
|
||||
result.get("list", []) or
|
||||
[]
|
||||
)
|
||||
|
||||
log_debug(f"Found {len(record_list)} recordings in response")
|
||||
|
||||
if record_list:
|
||||
for rec_data in record_list:
|
||||
log_debug(f"Recording data: {rec_data}")
|
||||
rec_start = self._parse_timestamp(
|
||||
rec_data.get("start_time", rec_data.get("startTime", rec_data.get("start", 0)))
|
||||
)
|
||||
rec_end = self._parse_timestamp(
|
||||
rec_data.get("end_time", rec_data.get("endTime", rec_data.get("end", 0)))
|
||||
)
|
||||
|
||||
recordings.append(Recording(
|
||||
id=str(rec_data.get("record_id", rec_data.get("recordId", rec_data.get("id", "")))),
|
||||
channel_id=channel_id,
|
||||
start_time=rec_start,
|
||||
end_time=rec_end,
|
||||
size_bytes=rec_data.get("size", rec_data.get("fileSize", rec_data.get("file_size", 0))),
|
||||
recording_type=str(rec_data.get("type", rec_data.get("recordType", rec_data.get("record_type", "unknown")))),
|
||||
file_path=rec_data.get("file_path", rec_data.get("filePath", rec_data.get("path"))),
|
||||
))
|
||||
return recordings
|
||||
|
||||
except NVRAPIError as e:
|
||||
log_debug(f"Endpoint {endpoint} failed: {e}")
|
||||
continue
|
||||
|
||||
log_debug("No recordings found with any endpoint pattern")
|
||||
return recordings
|
||||
|
||||
def _parse_timestamp(self, ts) -> datetime:
|
||||
"""Parse timestamp from various formats."""
|
||||
if isinstance(ts, (int, float)):
|
||||
return datetime.fromtimestamp(ts)
|
||||
if ts > 0:
|
||||
return datetime.fromtimestamp(ts)
|
||||
if isinstance(ts, str):
|
||||
# Try common formats
|
||||
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"]:
|
||||
for fmt in ["%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ"]:
|
||||
try:
|
||||
return datetime.strptime(ts, fmt)
|
||||
except ValueError:
|
||||
continue
|
||||
# Try Unix timestamp as string
|
||||
return datetime.fromtimestamp(int(ts))
|
||||
try:
|
||||
return datetime.fromtimestamp(int(ts))
|
||||
except (ValueError, OSError):
|
||||
pass
|
||||
return datetime.now()
|
||||
|
||||
def download_recording(
|
||||
@@ -231,15 +345,20 @@ class NVRClient:
|
||||
Returns:
|
||||
Path to downloaded file
|
||||
"""
|
||||
# Construct download URL
|
||||
# Try different download URL patterns
|
||||
download_urls = []
|
||||
|
||||
if recording.file_path:
|
||||
download_url = f"{self.base_url}/api/v1/playback/download?path={recording.file_path}"
|
||||
else:
|
||||
download_url = (
|
||||
f"{self.base_url}/api/v1/playback/download"
|
||||
f"?record_id={recording.id}"
|
||||
f"&channel_id={recording.channel_id}"
|
||||
)
|
||||
download_urls.append(f"{self.base_url}/openapi/playback/download?path={recording.file_path}")
|
||||
download_urls.append(f"{self.base_url}/openapi/download?filePath={recording.file_path}")
|
||||
|
||||
download_urls.extend([
|
||||
f"{self.base_url}/openapi/playback/download?recordId={recording.id}&channelId={recording.channel_id}",
|
||||
f"{self.base_url}/openapi/record/download?id={recording.id}",
|
||||
f"{self.base_url}/openapi/download?recordId={recording.id}",
|
||||
])
|
||||
|
||||
log_debug(f"Download URLs to try: {download_urls}")
|
||||
|
||||
# Determine output filename
|
||||
if output_path.is_dir():
|
||||
@@ -251,28 +370,46 @@ class NVRClient:
|
||||
# Ensure parent directory exists
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
response = self.session.get(download_url, stream=True, timeout=300)
|
||||
response.raise_for_status()
|
||||
|
||||
# Get total size from headers or recording metadata
|
||||
total_size = int(response.headers.get("content-length", recording.size_bytes))
|
||||
|
||||
downloaded = 0
|
||||
chunk_size = 8192
|
||||
|
||||
with open(output_file, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=chunk_size):
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
downloaded += len(chunk)
|
||||
if progress_callback:
|
||||
progress_callback(downloaded, total_size)
|
||||
|
||||
return output_file
|
||||
|
||||
except requests.RequestException as e:
|
||||
raise NVRAPIError(f"Download failed: {e}") from e
|
||||
last_error = None
|
||||
for download_url in download_urls:
|
||||
try:
|
||||
log_debug(f"Trying download from: {download_url}")
|
||||
response = self.session.get(download_url, stream=True, timeout=300)
|
||||
|
||||
log_debug(f"Download response: {response.status_code}, Content-Type: {response.headers.get('content-type')}")
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
# Check if response is actually video data
|
||||
content_type = response.headers.get("content-type", "")
|
||||
if "json" in content_type or "html" in content_type:
|
||||
# This is an error response, try next URL
|
||||
log_debug(f"Got non-video response, trying next URL")
|
||||
continue
|
||||
|
||||
# Get total size from headers or recording metadata
|
||||
total_size = int(response.headers.get("content-length", recording.size_bytes or 0))
|
||||
|
||||
downloaded = 0
|
||||
chunk_size = 8192
|
||||
|
||||
with open(output_file, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=chunk_size):
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
downloaded += len(chunk)
|
||||
if progress_callback:
|
||||
progress_callback(downloaded, total_size)
|
||||
|
||||
log_debug(f"Downloaded {downloaded} bytes to {output_file}")
|
||||
return output_file
|
||||
|
||||
except requests.RequestException as e:
|
||||
log_error(f"Download from {download_url} failed", e)
|
||||
last_error = e
|
||||
continue
|
||||
|
||||
raise NVRAPIError(f"Download failed after trying all URLs: {last_error}")
|
||||
|
||||
def _generate_filename(self, recording: Recording) -> str:
|
||||
"""Generate a filename for a recording."""
|
||||
|
||||
@@ -0,0 +1,496 @@
|
||||
"""Web Interface API client for TP-Link Vigi NVR.
|
||||
|
||||
Uses the stok-based authentication for the /ds endpoint,
|
||||
which provides access to recordings and playback.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
import requests
|
||||
from tqdm import tqdm
|
||||
|
||||
from .debug import log_debug, log_error, log_request, log_response, is_debug_enabled
|
||||
from .models import Channel, Recording
|
||||
|
||||
|
||||
@dataclass
|
||||
class StokSession:
|
||||
"""Holds stok session data."""
|
||||
|
||||
stok: str
|
||||
expires_at: float
|
||||
|
||||
@property
|
||||
def is_expired(self) -> bool:
|
||||
"""Check if token has expired (valid ~30 min)."""
|
||||
return time.time() >= self.expires_at
|
||||
|
||||
|
||||
class WebClient:
|
||||
"""Client for TP-Link Vigi NVR Web Interface API.
|
||||
|
||||
Uses /stok={token}/ds endpoint for all operations.
|
||||
This provides access to recordings, playback, and other features
|
||||
not available via the OpenAPI.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
username: str,
|
||||
password: str,
|
||||
port: int = 443,
|
||||
verify_ssl: bool = False,
|
||||
):
|
||||
"""
|
||||
Initialize Web client.
|
||||
|
||||
Args:
|
||||
host: NVR IP address or hostname
|
||||
username: Admin username
|
||||
password: Admin password
|
||||
port: Web interface port (default: 443)
|
||||
verify_ssl: Whether to verify SSL certificates
|
||||
"""
|
||||
self.host = host
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.port = port
|
||||
self.verify_ssl = verify_ssl
|
||||
self.base_url = f"https://{host}" if port == 443 else f"https://{host}:{port}"
|
||||
self._session: Optional[StokSession] = None
|
||||
self._http_session = requests.Session()
|
||||
self._http_session.verify = verify_ssl
|
||||
|
||||
# Suppress SSL warnings if not verifying
|
||||
if not verify_ssl:
|
||||
import urllib3
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
log_debug(f"WebClient initialized for {host}:{port}")
|
||||
|
||||
@property
|
||||
def stok(self) -> str:
|
||||
"""Get current stok token, refreshing if expired."""
|
||||
if self._session is None or self._session.is_expired:
|
||||
self._session = self._login()
|
||||
return self._session.stok
|
||||
|
||||
def _hash_password(self, password: str) -> str:
|
||||
"""Hash password for login. TP-Link uses MD5."""
|
||||
return hashlib.md5(password.encode()).hexdigest()
|
||||
|
||||
def _login(self) -> StokSession:
|
||||
"""
|
||||
Login to NVR web interface and obtain stok token.
|
||||
|
||||
Returns:
|
||||
StokSession with stok token
|
||||
|
||||
Raises:
|
||||
WebClientError: If login fails
|
||||
"""
|
||||
login_url = f"{self.base_url}/"
|
||||
|
||||
log_debug(f"Logging in to {login_url}")
|
||||
|
||||
try:
|
||||
# First, try standard TP-Link login format
|
||||
# The login endpoint is usually POST to / with form data
|
||||
login_data = {
|
||||
"method": "do",
|
||||
"login": {
|
||||
"username": self.username,
|
||||
"password": self._hash_password(self.password),
|
||||
}
|
||||
}
|
||||
|
||||
log_request("POST", login_url, body=login_data)
|
||||
|
||||
response = self._http_session.post(
|
||||
login_url,
|
||||
json=login_data,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
log_response(response.status_code, dict(response.headers),
|
||||
response.json() if response.content else None)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
stok = data.get("stok", "")
|
||||
|
||||
if stok:
|
||||
log_debug(f"Got stok token: {stok[:20]}...")
|
||||
# Token valid for ~25 minutes (refresh before 30)
|
||||
expires_at = time.time() + (25 * 60)
|
||||
return StokSession(stok=stok, expires_at=expires_at)
|
||||
|
||||
# Try alternative login format
|
||||
alt_login_data = {
|
||||
"username": self.username,
|
||||
"password": self._hash_password(self.password),
|
||||
}
|
||||
|
||||
log_debug("Trying alternative login format")
|
||||
log_request("POST", login_url, body=alt_login_data)
|
||||
|
||||
response = self._http_session.post(
|
||||
login_url,
|
||||
data=alt_login_data,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
log_response(response.status_code, dict(response.headers),
|
||||
response.text[:500] if response.text else None)
|
||||
|
||||
# Check for stok in response
|
||||
try:
|
||||
data = response.json()
|
||||
stok = data.get("stok", data.get("result", {}).get("stok", ""))
|
||||
if stok:
|
||||
expires_at = time.time() + (25 * 60)
|
||||
return StokSession(stok=stok, expires_at=expires_at)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
raise WebClientError("Could not obtain stok token from login response")
|
||||
|
||||
except requests.RequestException as e:
|
||||
log_error("Login failed", e)
|
||||
raise WebClientError(f"Login failed: {e}") from e
|
||||
|
||||
def _ds_request(
|
||||
self,
|
||||
data: dict,
|
||||
) -> dict:
|
||||
"""
|
||||
Make a request to the /ds endpoint.
|
||||
|
||||
Args:
|
||||
data: Request data (method, params, etc.)
|
||||
|
||||
Returns:
|
||||
JSON response as dict
|
||||
"""
|
||||
url = f"{self.base_url}/stok={self.stok}/ds"
|
||||
|
||||
log_request("POST", url, body=data)
|
||||
|
||||
try:
|
||||
response = self._http_session.post(
|
||||
url,
|
||||
json=data,
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
try:
|
||||
response_data = response.json()
|
||||
except ValueError:
|
||||
response_data = {"raw": response.text[:500] if response.text else "empty"}
|
||||
|
||||
log_response(response.status_code, dict(response.headers), response_data)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
# Check for error in response
|
||||
error_code = response_data.get("error_code", 0)
|
||||
if error_code != 0:
|
||||
error_msg = response_data.get("error_msg", f"Error code: {error_code}")
|
||||
raise WebClientError(f"API error: {error_msg}")
|
||||
|
||||
return response_data
|
||||
|
||||
except requests.RequestException as e:
|
||||
log_error(f"Request to /ds failed", e)
|
||||
raise WebClientError(f"Request failed: {e}") from e
|
||||
|
||||
def get_channels(self) -> list[Channel]:
|
||||
"""Get list of camera channels."""
|
||||
log_debug("Getting channels via /ds")
|
||||
|
||||
# Try different methods to get channel list
|
||||
methods_to_try = [
|
||||
{"method": "get", "channel": {"table": ["channel"]}},
|
||||
{"method": "get", "device": {"name": ["device_info"]}},
|
||||
{"method": "get", "channel_info": {}},
|
||||
{"method": "do", "channel": {"action": "list"}},
|
||||
]
|
||||
|
||||
for method_data in methods_to_try:
|
||||
try:
|
||||
data = self._ds_request(method_data)
|
||||
|
||||
# Try to find channels in response
|
||||
channels = []
|
||||
|
||||
# Look for channel data in various places
|
||||
channel_data = (
|
||||
data.get("channel", {}).get("table", {}).get("channel", []) or
|
||||
data.get("channel", []) or
|
||||
data.get("channels", []) or
|
||||
data.get("result", {}).get("channels", []) or
|
||||
[]
|
||||
)
|
||||
|
||||
if channel_data:
|
||||
for i, ch in enumerate(channel_data):
|
||||
if isinstance(ch, dict):
|
||||
channels.append(Channel(
|
||||
id=ch.get("id", ch.get("channel_id", i + 1)),
|
||||
name=ch.get("name", ch.get("channel_name", f"Channel {i + 1}")),
|
||||
enabled=ch.get("enabled", True),
|
||||
))
|
||||
|
||||
if channels:
|
||||
return channels
|
||||
|
||||
except WebClientError as e:
|
||||
log_debug(f"Method failed: {e}")
|
||||
continue
|
||||
|
||||
# Return default channels if API doesn't provide them
|
||||
log_debug("Using default channels 1-32")
|
||||
return [Channel(id=i, name=f"Channel {i}", enabled=True) for i in range(1, 33)]
|
||||
|
||||
def search_recordings(
|
||||
self,
|
||||
channel_id: int,
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
recording_type: str = "all",
|
||||
) -> list[Recording]:
|
||||
"""
|
||||
Search for recordings in a time range.
|
||||
|
||||
Args:
|
||||
channel_id: Camera channel ID
|
||||
start_time: Start of time range
|
||||
end_time: End of time range
|
||||
recording_type: Type filter
|
||||
|
||||
Returns:
|
||||
List of Recording objects
|
||||
"""
|
||||
start_str = start_time.strftime("%Y%m%d")
|
||||
end_str = end_time.strftime("%Y%m%d")
|
||||
start_ts = int(start_time.timestamp())
|
||||
end_ts = int(end_time.timestamp())
|
||||
|
||||
log_debug(f"Searching recordings: ch={channel_id}, {start_time} to {end_time}")
|
||||
|
||||
# Try different search methods
|
||||
methods_to_try = [
|
||||
{
|
||||
"method": "get",
|
||||
"playback": {
|
||||
"table": ["search"],
|
||||
"search": {
|
||||
"channel": channel_id,
|
||||
"start_date": start_str,
|
||||
"end_date": end_str,
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"method": "do",
|
||||
"playback": {
|
||||
"action": "search",
|
||||
"channel": channel_id,
|
||||
"start_time": start_ts,
|
||||
"end_time": end_ts,
|
||||
}
|
||||
},
|
||||
{
|
||||
"method": "get",
|
||||
"record": {
|
||||
"table": ["search"],
|
||||
"search": {
|
||||
"channel_id": channel_id,
|
||||
"start": start_ts,
|
||||
"end": end_ts,
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"method": "do",
|
||||
"record": {
|
||||
"action": "search",
|
||||
"channel": channel_id,
|
||||
"start_date": start_str,
|
||||
"end_date": end_str,
|
||||
}
|
||||
},
|
||||
]
|
||||
|
||||
recordings = []
|
||||
|
||||
for method_data in methods_to_try:
|
||||
try:
|
||||
log_debug(f"Trying: {json.dumps(method_data)[:100]}")
|
||||
data = self._ds_request(method_data)
|
||||
|
||||
# Log full response for debugging
|
||||
if is_debug_enabled():
|
||||
log_debug(f"Full response: {json.dumps(data, indent=2, default=str)[:1500]}")
|
||||
|
||||
# Look for recordings in various places
|
||||
record_list = (
|
||||
data.get("playback", {}).get("table", {}).get("search", []) or
|
||||
data.get("playback", {}).get("search", []) or
|
||||
data.get("record", {}).get("table", {}).get("search", []) or
|
||||
data.get("record", {}).get("search", []) or
|
||||
data.get("result", {}).get("records", []) or
|
||||
data.get("records", []) or
|
||||
data.get("list", []) or
|
||||
[]
|
||||
)
|
||||
|
||||
log_debug(f"Found {len(record_list)} records in response")
|
||||
|
||||
if record_list:
|
||||
for rec in record_list:
|
||||
try:
|
||||
rec_start = self._parse_time(rec.get("start", rec.get("start_time", 0)))
|
||||
rec_end = self._parse_time(rec.get("end", rec.get("end_time", 0)))
|
||||
|
||||
recordings.append(Recording(
|
||||
id=str(rec.get("id", rec.get("record_id", ""))),
|
||||
channel_id=channel_id,
|
||||
start_time=rec_start,
|
||||
end_time=rec_end,
|
||||
size_bytes=rec.get("size", rec.get("file_size", 0)),
|
||||
recording_type=str(rec.get("type", "unknown")),
|
||||
file_path=rec.get("path", rec.get("file_path")),
|
||||
))
|
||||
except Exception as e:
|
||||
log_debug(f"Error parsing record: {e}")
|
||||
continue
|
||||
|
||||
if recordings:
|
||||
return recordings
|
||||
|
||||
except WebClientError as e:
|
||||
log_debug(f"Method failed: {e}")
|
||||
continue
|
||||
|
||||
return recordings
|
||||
|
||||
def _parse_time(self, t) -> datetime:
|
||||
"""Parse time from various formats."""
|
||||
if isinstance(t, (int, float)) and t > 0:
|
||||
return datetime.fromtimestamp(t)
|
||||
if isinstance(t, str):
|
||||
for fmt in ["%Y%m%d%H%M%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"]:
|
||||
try:
|
||||
return datetime.strptime(t, fmt)
|
||||
except ValueError:
|
||||
continue
|
||||
try:
|
||||
return datetime.fromtimestamp(int(t))
|
||||
except:
|
||||
pass
|
||||
return datetime.now()
|
||||
|
||||
def download_recording(
|
||||
self,
|
||||
recording: Recording,
|
||||
output_path: Path,
|
||||
progress_callback: Optional[Callable[[int, int], None]] = None,
|
||||
) -> Path:
|
||||
"""Download a recording to disk."""
|
||||
# Build download URL
|
||||
if recording.file_path:
|
||||
download_url = f"{self.base_url}/stok={self.stok}/ds?download={recording.file_path}"
|
||||
else:
|
||||
download_url = f"{self.base_url}/stok={self.stok}/ds?download&channel={recording.channel_id}&id={recording.id}"
|
||||
|
||||
log_debug(f"Downloading from: {download_url}")
|
||||
|
||||
# Determine output filename
|
||||
if output_path.is_dir():
|
||||
start_str = recording.start_time.strftime("%Y%m%d_%H%M%S")
|
||||
end_str = recording.end_time.strftime("%H%M%S")
|
||||
filename = f"ch{recording.channel_id}_{start_str}-{end_str}.mp4"
|
||||
output_file = output_path / filename
|
||||
else:
|
||||
output_file = output_path
|
||||
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
response = self._http_session.get(download_url, stream=True, timeout=300)
|
||||
response.raise_for_status()
|
||||
|
||||
total_size = int(response.headers.get("content-length", recording.size_bytes or 0))
|
||||
|
||||
downloaded = 0
|
||||
with open(output_file, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
downloaded += len(chunk)
|
||||
if progress_callback:
|
||||
progress_callback(downloaded, total_size)
|
||||
|
||||
log_debug(f"Downloaded {downloaded} bytes to {output_file}")
|
||||
return output_file
|
||||
|
||||
except requests.RequestException as e:
|
||||
log_error(f"Download failed", e)
|
||||
raise WebClientError(f"Download failed: {e}") from e
|
||||
|
||||
def export_time_range(
|
||||
self,
|
||||
channel_id: int,
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
output_dir: Path,
|
||||
recording_type: str = "all",
|
||||
show_progress: bool = True,
|
||||
) -> list[Path]:
|
||||
"""Export all recordings in a time range."""
|
||||
recordings = self.search_recordings(channel_id, start_time, end_time, recording_type)
|
||||
|
||||
if not recordings:
|
||||
return []
|
||||
|
||||
output_dir = Path(output_dir)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
downloaded_files = []
|
||||
recordings_iter = tqdm(recordings, desc="Downloading", disable=not show_progress)
|
||||
|
||||
for recording in recordings_iter:
|
||||
if show_progress:
|
||||
recordings_iter.set_postfix_str(f"Ch{recording.channel_id} {recording.start_time:%H:%M}")
|
||||
|
||||
try:
|
||||
output_file = self.download_recording(recording, output_dir)
|
||||
downloaded_files.append(output_file)
|
||||
except WebClientError as e:
|
||||
if show_progress:
|
||||
tqdm.write(f"Warning: Failed to download {recording}: {e}")
|
||||
|
||||
return downloaded_files
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the HTTP session."""
|
||||
self._http_session.close()
|
||||
|
||||
def __enter__(self) -> "WebClient":
|
||||
return self
|
||||
|
||||
def __exit__(self, *args) -> None:
|
||||
self.close()
|
||||
|
||||
|
||||
class WebClientError(Exception):
|
||||
"""Raised when web interface operations fail."""
|
||||
pass
|
||||
Reference in New Issue
Block a user