2 Commits

Author SHA1 Message Date
MrUnknownDE 1c0508544d Add discover command to probe available API endpoints 2025-12-30 15:04:40 +01:00
MrUnknownDE 2755fcb6cf Add debug mode with --debug flag for API request/response logging 2025-12-30 14:58:23 +01:00
3 changed files with 364 additions and 7 deletions
+196 -5
View File
@@ -9,6 +9,7 @@ import click
from . import __version__
from .auth import AuthenticationError
from .debug import setup_debug_logging
from .nvr_client import NVRAPIError, NVRClient
@@ -56,15 +57,30 @@ DATETIME = DateTimeParamType()
@click.group()
@click.version_option(version=__version__, prog_name="nvr-export")
def main():
@click.option("--debug", "-d", is_flag=True, help="Enable debug logging (shows all API requests/responses)")
@click.option("--debug-file", type=click.Path(), help="Write debug log to file")
@click.pass_context
def main(ctx, debug: bool, debug_file: str):
"""TP-Link Vigi NVR Export Tool.
Export video recordings from TP-Link Vigi NVRs via OpenAPI.
Make sure OpenAPI is enabled on your NVR:
Settings > Network > OpenAPI (default port: 20443)
\b
Debug mode:
nvr-export --debug export ...
nvr-export --debug --debug-file debug.log export ...
"""
pass
ctx.ensure_object(dict)
ctx.obj['debug'] = debug
# Setup debug logging
setup_debug_logging(enabled=debug, log_file=debug_file)
if debug:
click.echo("🔧 Debug mode enabled - all API requests will be logged", err=True)
@main.command()
@@ -81,7 +97,9 @@ def main():
help="Recording type filter")
@click.option("--no-ssl-verify", is_flag=True, default=True, help="Skip SSL certificate verification")
@click.option("--quiet", "-q", is_flag=True, help="Suppress progress output")
@click.pass_context
def export(
ctx,
host: str,
port: int,
user: str,
@@ -102,9 +120,9 @@ def export(
nvr-export export -h 192.168.1.100 -u admin -c 1 \\
-s "2024-12-28 00:00" -e "2024-12-28 23:59" -o ./exports
# Export only motion recordings
nvr-export export -h 192.168.1.100 -u admin -c 2 \\
-s "2024-12-01" -e "2024-12-31" --type motion -o ./exports
# Export with debug logging
nvr-export --debug export -h 192.168.1.100 -u admin -c 1 \\
-s "2024-12-28" -e "2024-12-29" -o ./exports
"""
output_dir = Path(output)
@@ -120,6 +138,8 @@ def export(
if not recordings:
click.echo("No recordings found for the specified time range.")
if ctx.obj.get('debug'):
click.echo("💡 Tip: Check debug output above for API responses", err=True)
return
# Calculate total size
@@ -144,6 +164,9 @@ def export(
sys.exit(1)
except Exception as e:
click.echo(f"Unexpected error: {e}", err=True)
if ctx.obj.get('debug'):
import traceback
click.echo(traceback.format_exc(), err=True)
sys.exit(1)
@@ -153,7 +176,9 @@ def export(
@click.option("--user", "-u", required=True, help="Admin username")
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
@click.option("--no-ssl-verify", is_flag=True, default=True, help="Skip SSL certificate verification")
@click.pass_context
def channels(
ctx,
host: str,
port: int,
user: str,
@@ -196,7 +221,9 @@ def channels(
@click.option("--type", "rec_type", default="all",
type=click.Choice(["all", "continuous", "motion", "alarm"]))
@click.option("--no-ssl-verify", is_flag=True, default=True)
@click.pass_context
def search(
ctx,
host: str,
port: int,
user: str,
@@ -214,6 +241,8 @@ def search(
if not recordings:
click.echo("No recordings found.")
if ctx.obj.get('debug'):
click.echo("💡 Tip: Check debug output above for API responses", err=True)
return
total_size = sum(r.size_bytes for r in recordings) / (1024 * 1024)
@@ -238,5 +267,167 @@ def search(
sys.exit(1)
@main.command()
@click.option("--host", "-h", required=True, help="NVR IP address or hostname")
@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)")
@click.option("--user", "-u", required=True, help="Admin username")
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
@click.option("--no-ssl-verify", is_flag=True, default=True)
@click.pass_context
def discover(
ctx,
host: str,
port: int,
user: str,
password: str,
no_ssl_verify: bool,
):
"""Discover available API endpoints on the NVR.
This command probes many possible API paths to find which ones
are available on your NVR. Useful for debugging and development.
"""
import requests
# Common endpoints to try
endpoints_to_probe = [
# Root and info
("GET", "/openapi"),
("GET", "/openapi/"),
("GET", "/openapi/info"),
("GET", "/openapi/version"),
("GET", "/openapi/api"),
("GET", "/openapi/swagger"),
("GET", "/openapi/docs"),
# Device/channels
("GET", "/openapi/device"),
("GET", "/openapi/devices"),
("GET", "/openapi/added_devices"),
("GET", "/openapi/channel"),
("GET", "/openapi/channels"),
("GET", "/openapi/channelList"),
("GET", "/openapi/device/info"),
("GET", "/openapi/device/list"),
("GET", "/openapi/device/channels"),
("GET", "/openapi/nvr/info"),
("GET", "/openapi/nvr/channels"),
# Recording/playback
("GET", "/openapi/record"),
("GET", "/openapi/records"),
("GET", "/openapi/recording"),
("GET", "/openapi/recordings"),
("GET", "/openapi/playback"),
("GET", "/openapi/video"),
("GET", "/openapi/videos"),
("GET", "/openapi/storage"),
("GET", "/openapi/storage/recordings"),
("GET", "/openapi/hdd"),
("GET", "/openapi/disk"),
# Search endpoints
("GET", "/openapi/search"),
("GET", "/openapi/record/list"),
("GET", "/openapi/recording/list"),
("GET", "/openapi/playback/list"),
("GET", "/openapi/video/list"),
# Live stream
("GET", "/openapi/live"),
("GET", "/openapi/stream"),
("GET", "/openapi/liveStream"),
("GET", "/openapi/rtsp"),
# Events
("GET", "/openapi/event"),
("GET", "/openapi/events"),
("GET", "/openapi/event/list"),
("GET", "/openapi/alarm"),
("GET", "/openapi/alarms"),
# Settings
("GET", "/openapi/settings"),
("GET", "/openapi/config"),
("GET", "/openapi/system"),
("GET", "/openapi/sound"),
("GET", "/openapi/network"),
# User
("GET", "/openapi/user"),
("GET", "/openapi/users"),
("GET", "/openapi/account"),
]
click.echo(f"Connecting to NVR at {host}:{port}...")
try:
from .auth import NVRAuthenticator
auth = NVRAuthenticator(host, user, password, port, verify_ssl=not no_ssl_verify)
session = auth.get_authenticated_session()
base_url = f"https://{host}:{port}"
click.echo(f"Authentication successful!")
click.echo(f"\nProbing {len(endpoints_to_probe)} endpoints...\n")
found_endpoints = []
for method, endpoint in endpoints_to_probe:
url = f"{base_url}{endpoint}"
try:
if method == "GET":
response = session.get(url, timeout=5)
else:
response = session.post(url, json={}, timeout=5)
status = response.status_code
if status == 200:
# Try to get response preview
try:
data = response.json()
preview = str(data)[:100]
except:
preview = response.text[:100] if response.text else "(empty)"
click.echo(f"{status} {method:4} {endpoint}")
click.echo(f" Response: {preview}...")
found_endpoints.append((method, endpoint, status))
elif status in (400, 401, 403, 405):
# Exists but needs different params/auth
click.echo(f"⚠️ {status} {method:4} {endpoint} (exists but access denied/bad request)")
found_endpoints.append((method, endpoint, status))
# 404 = not found, skip silently
except requests.RequestException as e:
pass # Skip connection errors
click.echo(f"\n{'='*60}")
click.echo(f"Summary: Found {len(found_endpoints)} accessible endpoints")
if found_endpoints:
click.echo("\nWorking endpoints (200 OK):")
for method, endpoint, status in found_endpoints:
if status == 200:
click.echo(f" {method} {endpoint}")
click.echo("\nEndpoints that exist but need work:")
for method, endpoint, status in found_endpoints:
if status != 200:
click.echo(f" {method} {endpoint} ({status})")
else:
click.echo("\n⚠️ No working endpoints found!")
click.echo("The NVR might use a different API structure.")
click.echo("Try checking the NVR web interface with browser DevTools.")
except AuthenticationError as e:
click.echo(f"Authentication failed: {e}", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
if __name__ == "__main__":
main()
+111
View File
@@ -0,0 +1,111 @@
"""Debug logging utilities for NVR Export."""
import json
import logging
import sys
from datetime import datetime
from typing import Any
# Create logger
logger = logging.getLogger("nvr_export")
def setup_debug_logging(enabled: bool = False, log_file: str = None) -> None:
"""
Setup debug logging.
Args:
enabled: Whether to enable debug logging
log_file: Optional file path to write logs to
"""
level = logging.DEBUG if enabled else logging.WARNING
# Clear existing handlers
logger.handlers.clear()
logger.setLevel(level)
# Console handler
console_handler = logging.StreamHandler(sys.stderr)
console_handler.setLevel(level)
# Formatter with timestamp
formatter = logging.Formatter(
"[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%H:%M:%S"
)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# File handler if specified
if log_file:
file_handler = logging.FileHandler(log_file, mode='w', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter(
"[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
))
logger.addHandler(file_handler)
if enabled:
logger.debug("Debug logging enabled")
def log_request(method: str, url: str, headers: dict = None, body: Any = None) -> None:
"""Log an outgoing HTTP request."""
logger.debug(f">>> {method} {url}")
if headers:
# Filter out sensitive headers
safe_headers = {k: v for k, v in headers.items() if 'auth' not in k.lower()}
if safe_headers:
logger.debug(f" Headers: {safe_headers}")
if body:
try:
body_str = json.dumps(body, indent=2, default=str)
logger.debug(f" Body: {body_str}")
except (TypeError, ValueError):
logger.debug(f" Body: {body}")
def log_response(status_code: int, headers: dict = None, body: Any = None, truncate: int = 2000) -> None:
"""Log an incoming HTTP response."""
logger.debug(f"<<< Response: {status_code}")
if headers:
content_type = headers.get('content-type', 'unknown')
content_length = headers.get('content-length', 'unknown')
logger.debug(f" Content-Type: {content_type}, Length: {content_length}")
if body:
try:
if isinstance(body, (dict, list)):
body_str = json.dumps(body, indent=2, default=str)
else:
body_str = str(body)
if len(body_str) > truncate:
body_str = body_str[:truncate] + f"\n... (truncated, total {len(body_str)} chars)"
logger.debug(f" Body: {body_str}")
except (TypeError, ValueError):
logger.debug(f" Body: {body}")
def log_error(message: str, exception: Exception = None) -> None:
"""Log an error."""
if exception:
logger.error(f"{message}: {type(exception).__name__}: {exception}")
else:
logger.error(message)
def log_info(message: str) -> None:
"""Log info message."""
logger.info(message)
def log_debug(message: str) -> None:
"""Log debug message."""
logger.debug(message)
def is_debug_enabled() -> bool:
"""Check if debug logging is enabled."""
return logger.level <= logging.DEBUG
+57 -2
View File
@@ -9,6 +9,7 @@ import requests
from tqdm import tqdm
from .auth import AuthenticationError, NVRAuthenticator
from .debug import log_debug, log_error, log_request, log_response, is_debug_enabled
from .models import Channel, ExportJob, Recording
@@ -42,6 +43,7 @@ class NVRClient:
self.base_url = f"https://{host}:{port}"
self.auth = NVRAuthenticator(host, username, password, port, verify_ssl)
self._session: Optional[requests.Session] = None
log_debug(f"NVRClient initialized for {host}:{port}")
@property
def session(self) -> requests.Session:
@@ -76,8 +78,20 @@ class NVRClient:
url = f"{self.base_url}{endpoint}"
# Log request
log_request(method, url, kwargs.get('headers'), kwargs.get('json'))
try:
response = self.session.request(method, url, timeout=60, **kwargs)
# Log response
try:
response_data = response.json() if response.content else {}
except ValueError:
response_data = {"raw": response.text[:500] if response.text else "empty"}
log_response(response.status_code, dict(response.headers), response_data)
response.raise_for_status()
# Some endpoints might return empty response
@@ -95,6 +109,7 @@ class NVRClient:
return data
except requests.RequestException as e:
log_error(f"Request to {endpoint} failed", e)
raise NVRAPIError(f"Request failed: {e}") from e
def get_channels(self) -> list[Channel]:
@@ -114,17 +129,23 @@ class NVRClient:
("GET", "device/channels"),
]
log_debug(f"Trying {len(endpoints_to_try)} endpoint patterns for channels")
for endpoint_info in endpoints_to_try:
try:
method = endpoint_info[0]
endpoint = endpoint_info[1]
json_data = endpoint_info[2] if len(endpoint_info) > 2 else None
log_debug(f"Trying endpoint: {method} {endpoint}")
if json_data:
data = self._api_request(method, endpoint, json=json_data)
else:
data = self._api_request(method, endpoint)
log_debug(f"Response keys: {list(data.keys()) if data else 'empty'}")
# Parse response - try different structures
channel_list = (
data.get("result", {}).get("channel_list", []) or
@@ -136,6 +157,8 @@ class NVRClient:
[]
)
log_debug(f"Found {len(channel_list)} channels in response")
if channel_list:
for ch_data in channel_list:
channels.append(Channel(
@@ -145,10 +168,12 @@ class NVRClient:
))
return channels
except NVRAPIError:
except NVRAPIError as e:
log_debug(f"Endpoint {endpoint} failed: {e}")
continue
# If no channels found via API, return default channels 1-8
log_debug("No channels found via API, returning defaults 1-8")
if not channels:
for i in range(1, 9):
channels.append(Channel(id=i, name=f"Channel {i}", enabled=True))
@@ -182,6 +207,9 @@ class NVRClient:
start_iso = start_time.strftime("%Y-%m-%dT%H:%M:%S")
end_iso = end_time.strftime("%Y-%m-%dT%H:%M:%S")
log_debug(f"Searching recordings: channel={channel_id}, start={start_time}, end={end_time}")
log_debug(f"Timestamps: start_ts={start_ts}, end_ts={end_ts}")
type_map = {
"all": 0,
"continuous": 1,
@@ -217,17 +245,28 @@ class NVRClient:
}),
]
log_debug(f"Trying {len(endpoints_to_try)} endpoint patterns for recordings")
for endpoint_info in endpoints_to_try:
try:
method = endpoint_info[0]
endpoint = endpoint_info[1]
json_data = endpoint_info[2] if len(endpoint_info) > 2 and isinstance(endpoint_info[2], dict) else None
log_debug(f"Trying endpoint: {method} {endpoint}")
if json_data:
data = self._api_request(method, endpoint, json=json_data)
else:
data = self._api_request(method, endpoint)
log_debug(f"Response keys: {list(data.keys()) if data else 'empty'}")
# Log full response structure for debugging
if is_debug_enabled():
import json as json_module
log_debug(f"Full response: {json_module.dumps(data, indent=2, default=str)[:1000]}")
# Parse response - try different structures
result = data.get("result", data)
record_list = (
@@ -235,11 +274,16 @@ class NVRClient:
result.get("recordings", []) or
result.get("recordList", []) or
result.get("items", []) or
result.get("searchResult", []) or
result.get("list", []) or
[]
)
log_debug(f"Found {len(record_list)} recordings in response")
if record_list:
for rec_data in record_list:
log_debug(f"Recording data: {rec_data}")
rec_start = self._parse_timestamp(
rec_data.get("start_time", rec_data.get("startTime", rec_data.get("start", 0)))
)
@@ -258,9 +302,11 @@ class NVRClient:
))
return recordings
except NVRAPIError:
except NVRAPIError as e:
log_debug(f"Endpoint {endpoint} failed: {e}")
continue
log_debug("No recordings found with any endpoint pattern")
return recordings
def _parse_timestamp(self, ts) -> datetime:
@@ -312,6 +358,8 @@ class NVRClient:
f"{self.base_url}/openapi/download?recordId={recording.id}",
])
log_debug(f"Download URLs to try: {download_urls}")
# Determine output filename
if output_path.is_dir():
filename = self._generate_filename(recording)
@@ -325,13 +373,18 @@ class NVRClient:
last_error = None
for download_url in download_urls:
try:
log_debug(f"Trying download from: {download_url}")
response = self.session.get(download_url, stream=True, timeout=300)
log_debug(f"Download response: {response.status_code}, Content-Type: {response.headers.get('content-type')}")
response.raise_for_status()
# Check if response is actually video data
content_type = response.headers.get("content-type", "")
if "json" in content_type or "html" in content_type:
# This is an error response, try next URL
log_debug(f"Got non-video response, trying next URL")
continue
# Get total size from headers or recording metadata
@@ -348,9 +401,11 @@ class NVRClient:
if progress_callback:
progress_callback(downloaded, total_size)
log_debug(f"Downloaded {downloaded} bytes to {output_file}")
return output_file
except requests.RequestException as e:
log_error(f"Download from {download_url} failed", e)
last_error = e
continue