mirror of
https://github.com/MrUnknownDE/tplink-nvr-export.git
synced 2026-04-18 14:23:47 +02:00
Add WebClient for web interface API with stok authentication
This commit is contained in:
@@ -85,7 +85,7 @@ def main(ctx, debug: bool, debug_file: str):
|
||||
|
||||
@main.command()
|
||||
@click.option("--host", "-h", required=True, help="NVR IP address or hostname")
|
||||
@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)")
|
||||
@click.option("--port", "-p", default=443, help="Port (default: 443 for web, 20443 for openapi)")
|
||||
@click.option("--user", "-u", required=True, help="Admin username")
|
||||
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
|
||||
@click.option("--channel", "-c", required=True, type=int, help="Camera channel ID (1-based)")
|
||||
@@ -114,6 +114,8 @@ def export(
|
||||
):
|
||||
"""Export recordings from NVR for a time range.
|
||||
|
||||
Uses the web interface API (/stok/ds) for recording access.
|
||||
|
||||
\b
|
||||
Examples:
|
||||
# Export channel 1 for a specific day
|
||||
@@ -124,13 +126,15 @@ def export(
|
||||
nvr-export --debug export -h 192.168.1.100 -u admin -c 1 \\
|
||||
-s "2024-12-28" -e "2024-12-29" -o ./exports
|
||||
"""
|
||||
from .web_client import WebClient, WebClientError
|
||||
|
||||
output_dir = Path(output)
|
||||
|
||||
if not quiet:
|
||||
click.echo(f"Connecting to NVR at {host}:{port}...")
|
||||
click.echo(f"Connecting to NVR web interface at {host}:{port}...")
|
||||
|
||||
try:
|
||||
with NVRClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client:
|
||||
with WebClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client:
|
||||
if not quiet:
|
||||
click.echo(f"Searching recordings: Channel {channel}, {start} to {end}")
|
||||
|
||||
@@ -156,11 +160,8 @@ def export(
|
||||
if not quiet:
|
||||
click.echo(f"\nSuccessfully exported {len(downloaded)} recordings to {output_dir}")
|
||||
|
||||
except AuthenticationError as e:
|
||||
click.echo(f"Authentication failed: {e}", err=True)
|
||||
sys.exit(1)
|
||||
except NVRAPIError as e:
|
||||
click.echo(f"NVR API error: {e}", err=True)
|
||||
except WebClientError as e:
|
||||
click.echo(f"Web interface error: {e}", err=True)
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
click.echo(f"Unexpected error: {e}", err=True)
|
||||
|
||||
496
src/tplink_nvr_export/web_client.py
Normal file
496
src/tplink_nvr_export/web_client.py
Normal file
@@ -0,0 +1,496 @@
|
||||
"""Web Interface API client for TP-Link Vigi NVR.
|
||||
|
||||
Uses the stok-based authentication for the /ds endpoint,
|
||||
which provides access to recordings and playback.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
import requests
|
||||
from tqdm import tqdm
|
||||
|
||||
from .debug import log_debug, log_error, log_request, log_response, is_debug_enabled
|
||||
from .models import Channel, Recording
|
||||
|
||||
|
||||
@dataclass
|
||||
class StokSession:
|
||||
"""Holds stok session data."""
|
||||
|
||||
stok: str
|
||||
expires_at: float
|
||||
|
||||
@property
|
||||
def is_expired(self) -> bool:
|
||||
"""Check if token has expired (valid ~30 min)."""
|
||||
return time.time() >= self.expires_at
|
||||
|
||||
|
||||
class WebClient:
|
||||
"""Client for TP-Link Vigi NVR Web Interface API.
|
||||
|
||||
Uses /stok={token}/ds endpoint for all operations.
|
||||
This provides access to recordings, playback, and other features
|
||||
not available via the OpenAPI.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str,
|
||||
username: str,
|
||||
password: str,
|
||||
port: int = 443,
|
||||
verify_ssl: bool = False,
|
||||
):
|
||||
"""
|
||||
Initialize Web client.
|
||||
|
||||
Args:
|
||||
host: NVR IP address or hostname
|
||||
username: Admin username
|
||||
password: Admin password
|
||||
port: Web interface port (default: 443)
|
||||
verify_ssl: Whether to verify SSL certificates
|
||||
"""
|
||||
self.host = host
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.port = port
|
||||
self.verify_ssl = verify_ssl
|
||||
self.base_url = f"https://{host}" if port == 443 else f"https://{host}:{port}"
|
||||
self._session: Optional[StokSession] = None
|
||||
self._http_session = requests.Session()
|
||||
self._http_session.verify = verify_ssl
|
||||
|
||||
# Suppress SSL warnings if not verifying
|
||||
if not verify_ssl:
|
||||
import urllib3
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
log_debug(f"WebClient initialized for {host}:{port}")
|
||||
|
||||
@property
|
||||
def stok(self) -> str:
|
||||
"""Get current stok token, refreshing if expired."""
|
||||
if self._session is None or self._session.is_expired:
|
||||
self._session = self._login()
|
||||
return self._session.stok
|
||||
|
||||
def _hash_password(self, password: str) -> str:
|
||||
"""Hash password for login. TP-Link uses MD5."""
|
||||
return hashlib.md5(password.encode()).hexdigest()
|
||||
|
||||
def _login(self) -> StokSession:
|
||||
"""
|
||||
Login to NVR web interface and obtain stok token.
|
||||
|
||||
Returns:
|
||||
StokSession with stok token
|
||||
|
||||
Raises:
|
||||
WebClientError: If login fails
|
||||
"""
|
||||
login_url = f"{self.base_url}/"
|
||||
|
||||
log_debug(f"Logging in to {login_url}")
|
||||
|
||||
try:
|
||||
# First, try standard TP-Link login format
|
||||
# The login endpoint is usually POST to / with form data
|
||||
login_data = {
|
||||
"method": "do",
|
||||
"login": {
|
||||
"username": self.username,
|
||||
"password": self._hash_password(self.password),
|
||||
}
|
||||
}
|
||||
|
||||
log_request("POST", login_url, body=login_data)
|
||||
|
||||
response = self._http_session.post(
|
||||
login_url,
|
||||
json=login_data,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
log_response(response.status_code, dict(response.headers),
|
||||
response.json() if response.content else None)
|
||||
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
stok = data.get("stok", "")
|
||||
|
||||
if stok:
|
||||
log_debug(f"Got stok token: {stok[:20]}...")
|
||||
# Token valid for ~25 minutes (refresh before 30)
|
||||
expires_at = time.time() + (25 * 60)
|
||||
return StokSession(stok=stok, expires_at=expires_at)
|
||||
|
||||
# Try alternative login format
|
||||
alt_login_data = {
|
||||
"username": self.username,
|
||||
"password": self._hash_password(self.password),
|
||||
}
|
||||
|
||||
log_debug("Trying alternative login format")
|
||||
log_request("POST", login_url, body=alt_login_data)
|
||||
|
||||
response = self._http_session.post(
|
||||
login_url,
|
||||
data=alt_login_data,
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
log_response(response.status_code, dict(response.headers),
|
||||
response.text[:500] if response.text else None)
|
||||
|
||||
# Check for stok in response
|
||||
try:
|
||||
data = response.json()
|
||||
stok = data.get("stok", data.get("result", {}).get("stok", ""))
|
||||
if stok:
|
||||
expires_at = time.time() + (25 * 60)
|
||||
return StokSession(stok=stok, expires_at=expires_at)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
raise WebClientError("Could not obtain stok token from login response")
|
||||
|
||||
except requests.RequestException as e:
|
||||
log_error("Login failed", e)
|
||||
raise WebClientError(f"Login failed: {e}") from e
|
||||
|
||||
def _ds_request(
|
||||
self,
|
||||
data: dict,
|
||||
) -> dict:
|
||||
"""
|
||||
Make a request to the /ds endpoint.
|
||||
|
||||
Args:
|
||||
data: Request data (method, params, etc.)
|
||||
|
||||
Returns:
|
||||
JSON response as dict
|
||||
"""
|
||||
url = f"{self.base_url}/stok={self.stok}/ds"
|
||||
|
||||
log_request("POST", url, body=data)
|
||||
|
||||
try:
|
||||
response = self._http_session.post(
|
||||
url,
|
||||
json=data,
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
try:
|
||||
response_data = response.json()
|
||||
except ValueError:
|
||||
response_data = {"raw": response.text[:500] if response.text else "empty"}
|
||||
|
||||
log_response(response.status_code, dict(response.headers), response_data)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
# Check for error in response
|
||||
error_code = response_data.get("error_code", 0)
|
||||
if error_code != 0:
|
||||
error_msg = response_data.get("error_msg", f"Error code: {error_code}")
|
||||
raise WebClientError(f"API error: {error_msg}")
|
||||
|
||||
return response_data
|
||||
|
||||
except requests.RequestException as e:
|
||||
log_error(f"Request to /ds failed", e)
|
||||
raise WebClientError(f"Request failed: {e}") from e
|
||||
|
||||
def get_channels(self) -> list[Channel]:
|
||||
"""Get list of camera channels."""
|
||||
log_debug("Getting channels via /ds")
|
||||
|
||||
# Try different methods to get channel list
|
||||
methods_to_try = [
|
||||
{"method": "get", "channel": {"table": ["channel"]}},
|
||||
{"method": "get", "device": {"name": ["device_info"]}},
|
||||
{"method": "get", "channel_info": {}},
|
||||
{"method": "do", "channel": {"action": "list"}},
|
||||
]
|
||||
|
||||
for method_data in methods_to_try:
|
||||
try:
|
||||
data = self._ds_request(method_data)
|
||||
|
||||
# Try to find channels in response
|
||||
channels = []
|
||||
|
||||
# Look for channel data in various places
|
||||
channel_data = (
|
||||
data.get("channel", {}).get("table", {}).get("channel", []) or
|
||||
data.get("channel", []) or
|
||||
data.get("channels", []) or
|
||||
data.get("result", {}).get("channels", []) or
|
||||
[]
|
||||
)
|
||||
|
||||
if channel_data:
|
||||
for i, ch in enumerate(channel_data):
|
||||
if isinstance(ch, dict):
|
||||
channels.append(Channel(
|
||||
id=ch.get("id", ch.get("channel_id", i + 1)),
|
||||
name=ch.get("name", ch.get("channel_name", f"Channel {i + 1}")),
|
||||
enabled=ch.get("enabled", True),
|
||||
))
|
||||
|
||||
if channels:
|
||||
return channels
|
||||
|
||||
except WebClientError as e:
|
||||
log_debug(f"Method failed: {e}")
|
||||
continue
|
||||
|
||||
# Return default channels if API doesn't provide them
|
||||
log_debug("Using default channels 1-32")
|
||||
return [Channel(id=i, name=f"Channel {i}", enabled=True) for i in range(1, 33)]
|
||||
|
||||
def search_recordings(
|
||||
self,
|
||||
channel_id: int,
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
recording_type: str = "all",
|
||||
) -> list[Recording]:
|
||||
"""
|
||||
Search for recordings in a time range.
|
||||
|
||||
Args:
|
||||
channel_id: Camera channel ID
|
||||
start_time: Start of time range
|
||||
end_time: End of time range
|
||||
recording_type: Type filter
|
||||
|
||||
Returns:
|
||||
List of Recording objects
|
||||
"""
|
||||
start_str = start_time.strftime("%Y%m%d")
|
||||
end_str = end_time.strftime("%Y%m%d")
|
||||
start_ts = int(start_time.timestamp())
|
||||
end_ts = int(end_time.timestamp())
|
||||
|
||||
log_debug(f"Searching recordings: ch={channel_id}, {start_time} to {end_time}")
|
||||
|
||||
# Try different search methods
|
||||
methods_to_try = [
|
||||
{
|
||||
"method": "get",
|
||||
"playback": {
|
||||
"table": ["search"],
|
||||
"search": {
|
||||
"channel": channel_id,
|
||||
"start_date": start_str,
|
||||
"end_date": end_str,
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"method": "do",
|
||||
"playback": {
|
||||
"action": "search",
|
||||
"channel": channel_id,
|
||||
"start_time": start_ts,
|
||||
"end_time": end_ts,
|
||||
}
|
||||
},
|
||||
{
|
||||
"method": "get",
|
||||
"record": {
|
||||
"table": ["search"],
|
||||
"search": {
|
||||
"channel_id": channel_id,
|
||||
"start": start_ts,
|
||||
"end": end_ts,
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"method": "do",
|
||||
"record": {
|
||||
"action": "search",
|
||||
"channel": channel_id,
|
||||
"start_date": start_str,
|
||||
"end_date": end_str,
|
||||
}
|
||||
},
|
||||
]
|
||||
|
||||
recordings = []
|
||||
|
||||
for method_data in methods_to_try:
|
||||
try:
|
||||
log_debug(f"Trying: {json.dumps(method_data)[:100]}")
|
||||
data = self._ds_request(method_data)
|
||||
|
||||
# Log full response for debugging
|
||||
if is_debug_enabled():
|
||||
log_debug(f"Full response: {json.dumps(data, indent=2, default=str)[:1500]}")
|
||||
|
||||
# Look for recordings in various places
|
||||
record_list = (
|
||||
data.get("playback", {}).get("table", {}).get("search", []) or
|
||||
data.get("playback", {}).get("search", []) or
|
||||
data.get("record", {}).get("table", {}).get("search", []) or
|
||||
data.get("record", {}).get("search", []) or
|
||||
data.get("result", {}).get("records", []) or
|
||||
data.get("records", []) or
|
||||
data.get("list", []) or
|
||||
[]
|
||||
)
|
||||
|
||||
log_debug(f"Found {len(record_list)} records in response")
|
||||
|
||||
if record_list:
|
||||
for rec in record_list:
|
||||
try:
|
||||
rec_start = self._parse_time(rec.get("start", rec.get("start_time", 0)))
|
||||
rec_end = self._parse_time(rec.get("end", rec.get("end_time", 0)))
|
||||
|
||||
recordings.append(Recording(
|
||||
id=str(rec.get("id", rec.get("record_id", ""))),
|
||||
channel_id=channel_id,
|
||||
start_time=rec_start,
|
||||
end_time=rec_end,
|
||||
size_bytes=rec.get("size", rec.get("file_size", 0)),
|
||||
recording_type=str(rec.get("type", "unknown")),
|
||||
file_path=rec.get("path", rec.get("file_path")),
|
||||
))
|
||||
except Exception as e:
|
||||
log_debug(f"Error parsing record: {e}")
|
||||
continue
|
||||
|
||||
if recordings:
|
||||
return recordings
|
||||
|
||||
except WebClientError as e:
|
||||
log_debug(f"Method failed: {e}")
|
||||
continue
|
||||
|
||||
return recordings
|
||||
|
||||
def _parse_time(self, t) -> datetime:
|
||||
"""Parse time from various formats."""
|
||||
if isinstance(t, (int, float)) and t > 0:
|
||||
return datetime.fromtimestamp(t)
|
||||
if isinstance(t, str):
|
||||
for fmt in ["%Y%m%d%H%M%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"]:
|
||||
try:
|
||||
return datetime.strptime(t, fmt)
|
||||
except ValueError:
|
||||
continue
|
||||
try:
|
||||
return datetime.fromtimestamp(int(t))
|
||||
except:
|
||||
pass
|
||||
return datetime.now()
|
||||
|
||||
def download_recording(
|
||||
self,
|
||||
recording: Recording,
|
||||
output_path: Path,
|
||||
progress_callback: Optional[Callable[[int, int], None]] = None,
|
||||
) -> Path:
|
||||
"""Download a recording to disk."""
|
||||
# Build download URL
|
||||
if recording.file_path:
|
||||
download_url = f"{self.base_url}/stok={self.stok}/ds?download={recording.file_path}"
|
||||
else:
|
||||
download_url = f"{self.base_url}/stok={self.stok}/ds?download&channel={recording.channel_id}&id={recording.id}"
|
||||
|
||||
log_debug(f"Downloading from: {download_url}")
|
||||
|
||||
# Determine output filename
|
||||
if output_path.is_dir():
|
||||
start_str = recording.start_time.strftime("%Y%m%d_%H%M%S")
|
||||
end_str = recording.end_time.strftime("%H%M%S")
|
||||
filename = f"ch{recording.channel_id}_{start_str}-{end_str}.mp4"
|
||||
output_file = output_path / filename
|
||||
else:
|
||||
output_file = output_path
|
||||
|
||||
output_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
response = self._http_session.get(download_url, stream=True, timeout=300)
|
||||
response.raise_for_status()
|
||||
|
||||
total_size = int(response.headers.get("content-length", recording.size_bytes or 0))
|
||||
|
||||
downloaded = 0
|
||||
with open(output_file, "wb") as f:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
if chunk:
|
||||
f.write(chunk)
|
||||
downloaded += len(chunk)
|
||||
if progress_callback:
|
||||
progress_callback(downloaded, total_size)
|
||||
|
||||
log_debug(f"Downloaded {downloaded} bytes to {output_file}")
|
||||
return output_file
|
||||
|
||||
except requests.RequestException as e:
|
||||
log_error(f"Download failed", e)
|
||||
raise WebClientError(f"Download failed: {e}") from e
|
||||
|
||||
def export_time_range(
|
||||
self,
|
||||
channel_id: int,
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
output_dir: Path,
|
||||
recording_type: str = "all",
|
||||
show_progress: bool = True,
|
||||
) -> list[Path]:
|
||||
"""Export all recordings in a time range."""
|
||||
recordings = self.search_recordings(channel_id, start_time, end_time, recording_type)
|
||||
|
||||
if not recordings:
|
||||
return []
|
||||
|
||||
output_dir = Path(output_dir)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
downloaded_files = []
|
||||
recordings_iter = tqdm(recordings, desc="Downloading", disable=not show_progress)
|
||||
|
||||
for recording in recordings_iter:
|
||||
if show_progress:
|
||||
recordings_iter.set_postfix_str(f"Ch{recording.channel_id} {recording.start_time:%H:%M}")
|
||||
|
||||
try:
|
||||
output_file = self.download_recording(recording, output_dir)
|
||||
downloaded_files.append(output_file)
|
||||
except WebClientError as e:
|
||||
if show_progress:
|
||||
tqdm.write(f"Warning: Failed to download {recording}: {e}")
|
||||
|
||||
return downloaded_files
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the HTTP session."""
|
||||
self._http_session.close()
|
||||
|
||||
def __enter__(self) -> "WebClient":
|
||||
return self
|
||||
|
||||
def __exit__(self, *args) -> None:
|
||||
self.close()
|
||||
|
||||
|
||||
class WebClientError(Exception):
|
||||
"""Raised when web interface operations fail."""
|
||||
pass
|
||||
Reference in New Issue
Block a user