2 Commits

Author SHA1 Message Date
MrUnknownDE d8457b2339 Add login-test command to probe different login formats 2025-12-30 15:23:40 +01:00
MrUnknownDE 63cf8afd76 Add WebClient for web interface API with stok authentication 2025-12-30 15:17:17 +01:00
2 changed files with 625 additions and 8 deletions
+129 -8
View File
@@ -85,7 +85,7 @@ def main(ctx, debug: bool, debug_file: str):
@main.command()
@click.option("--host", "-h", required=True, help="NVR IP address or hostname")
@click.option("--port", "-p", default=20443, help="OpenAPI port (default: 20443)")
@click.option("--port", "-p", default=443, help="Port (default: 443 for web, 20443 for openapi)")
@click.option("--user", "-u", required=True, help="Admin username")
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
@click.option("--channel", "-c", required=True, type=int, help="Camera channel ID (1-based)")
@@ -114,6 +114,8 @@ def export(
):
"""Export recordings from NVR for a time range.
Uses the web interface API (/stok/ds) for recording access.
\b
Examples:
# Export channel 1 for a specific day
@@ -124,13 +126,15 @@ def export(
nvr-export --debug export -h 192.168.1.100 -u admin -c 1 \\
-s "2024-12-28" -e "2024-12-29" -o ./exports
"""
from .web_client import WebClient, WebClientError
output_dir = Path(output)
if not quiet:
click.echo(f"Connecting to NVR at {host}:{port}...")
click.echo(f"Connecting to NVR web interface at {host}:{port}...")
try:
with NVRClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client:
with WebClient(host, user, password, port, verify_ssl=not no_ssl_verify) as client:
if not quiet:
click.echo(f"Searching recordings: Channel {channel}, {start} to {end}")
@@ -156,11 +160,8 @@ def export(
if not quiet:
click.echo(f"\nSuccessfully exported {len(downloaded)} recordings to {output_dir}")
except AuthenticationError as e:
click.echo(f"Authentication failed: {e}", err=True)
sys.exit(1)
except NVRAPIError as e:
click.echo(f"NVR API error: {e}", err=True)
except WebClientError as e:
click.echo(f"Web interface error: {e}", err=True)
sys.exit(1)
except Exception as e:
click.echo(f"Unexpected error: {e}", err=True)
@@ -429,5 +430,125 @@ def discover(
sys.exit(1)
@main.command("login-test")
@click.option("--host", "-h", required=True, help="NVR IP address or hostname")
@click.option("--user", "-u", required=True, help="Admin username")
@click.option("--password", "-P", required=True, prompt=True, hide_input=True, help="Admin password")
@click.option("--no-ssl-verify", is_flag=True, default=True)
@click.pass_context
def login_test(
ctx,
host: str,
user: str,
password: str,
no_ssl_verify: bool,
):
"""Test different login formats to find the correct one.
This probes multiple login endpoints and formats to discover
how your NVR handles authentication.
"""
import requests
import hashlib
# Suppress SSL warnings
if no_ssl_verify:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
session = requests.Session()
session.verify = not no_ssl_verify
base_url = f"https://{host}"
pw_md5 = hashlib.md5(password.encode()).hexdigest()
pw_sha256 = hashlib.sha256(password.encode()).hexdigest()
click.echo(f"Testing login formats for {host}...")
click.echo(f"Username: {user}")
click.echo(f"Password MD5: {pw_md5}")
click.echo(f"Password SHA256: {pw_sha256[:32]}...")
click.echo("")
# Various login formats to try
login_attempts = [
# Format 1: POST / with JSON method:do login
("POST", "/", {"method": "do", "login": {"username": user, "password": pw_md5}}, "JSON method:do login MD5"),
("POST", "/", {"method": "do", "login": {"username": user, "password": password}}, "JSON method:do login plain"),
# Format 2: POST with form data
("POST", "/", {"username": user, "password": pw_md5}, "Form data MD5"),
("POST", "/", {"username": user, "password": password}, "Form data plain"),
# Format 3: Different endpoints
("POST", "/login", {"username": user, "password": pw_md5}, "/login endpoint MD5"),
("POST", "/api/login", {"username": user, "password": pw_md5}, "/api/login endpoint"),
("POST", "/cgi-bin/login.cgi", {"username": user, "password": pw_md5}, "/cgi-bin/login.cgi"),
# Format 4: JSON-RPC style
("POST", "/", {"jsonrpc": "2.0", "method": "login", "params": {"username": user, "password": pw_md5}}, "JSON-RPC login"),
# Format 5: TP-Link specific
("POST", "/", {"method": "login", "data": {"username": user, "password": pw_md5}}, "TP-Link data format"),
("POST", "/", {"operation": "login", "username": user, "password": pw_md5}, "operation:login format"),
# Format 6: Try without password hash
("POST", "/", {"method": "do", "login": {"username": user}}, "JSON method:do username only"),
]
found_stok = False
for method, endpoint, data, description in login_attempts:
url = f"{base_url}{endpoint}"
try:
if isinstance(data, dict) and any(k in ["method", "jsonrpc", "operation"] for k in data.keys()):
# JSON request
response = session.post(url, json=data, timeout=10)
else:
# Form data request
response = session.post(url, data=data, timeout=10)
try:
result = response.json()
result_str = str(result)[:200]
# Check for stok in response
stok = None
if isinstance(result, dict):
stok = result.get("stok", result.get("result", {}).get("stok", None) if isinstance(result.get("result"), dict) else None)
if stok:
click.echo(f"✅ SUCCESS: {description}")
click.echo(f" URL: {url}")
click.echo(f" Data: {data}")
click.echo(f" STOK: {stok}")
found_stok = True
elif result.get("error_code", 0) == 0:
click.echo(f"⚠️ {description} - 200 OK but no stok")
click.echo(f" Response: {result_str}")
else:
error_code = result.get("error_code", "unknown")
click.echo(f"{description} - error_code: {error_code}")
except ValueError:
if response.status_code == 200:
click.echo(f"⚠️ {description} - 200 OK but not JSON")
click.echo(f" Response: {response.text[:100]}")
except requests.RequestException as e:
click.echo(f"{description} - Connection error: {e}")
click.echo("")
if found_stok:
click.echo("✅ Found working login format! Use the format marked with SUCCESS.")
else:
click.echo("❌ No working login format found.")
click.echo("")
click.echo("Please capture the actual login request from your browser:")
click.echo("1. Open browser DevTools (F12)")
click.echo("2. Go to Network tab")
click.echo("3. Log into NVR web interface")
click.echo("4. Find the login request and share the Request Payload")
if __name__ == "__main__":
main()
+496
View File
@@ -0,0 +1,496 @@
"""Web Interface API client for TP-Link Vigi NVR.
Uses the stok-based authentication for the /ds endpoint,
which provides access to recordings and playback.
"""
import hashlib
import json
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Optional
import requests
from tqdm import tqdm
from .debug import log_debug, log_error, log_request, log_response, is_debug_enabled
from .models import Channel, Recording
@dataclass
class StokSession:
"""Holds stok session data."""
stok: str
expires_at: float
@property
def is_expired(self) -> bool:
"""Check if token has expired (valid ~30 min)."""
return time.time() >= self.expires_at
class WebClient:
"""Client for TP-Link Vigi NVR Web Interface API.
Uses /stok={token}/ds endpoint for all operations.
This provides access to recordings, playback, and other features
not available via the OpenAPI.
"""
def __init__(
self,
host: str,
username: str,
password: str,
port: int = 443,
verify_ssl: bool = False,
):
"""
Initialize Web client.
Args:
host: NVR IP address or hostname
username: Admin username
password: Admin password
port: Web interface port (default: 443)
verify_ssl: Whether to verify SSL certificates
"""
self.host = host
self.username = username
self.password = password
self.port = port
self.verify_ssl = verify_ssl
self.base_url = f"https://{host}" if port == 443 else f"https://{host}:{port}"
self._session: Optional[StokSession] = None
self._http_session = requests.Session()
self._http_session.verify = verify_ssl
# Suppress SSL warnings if not verifying
if not verify_ssl:
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
log_debug(f"WebClient initialized for {host}:{port}")
@property
def stok(self) -> str:
"""Get current stok token, refreshing if expired."""
if self._session is None or self._session.is_expired:
self._session = self._login()
return self._session.stok
def _hash_password(self, password: str) -> str:
"""Hash password for login. TP-Link uses MD5."""
return hashlib.md5(password.encode()).hexdigest()
def _login(self) -> StokSession:
"""
Login to NVR web interface and obtain stok token.
Returns:
StokSession with stok token
Raises:
WebClientError: If login fails
"""
login_url = f"{self.base_url}/"
log_debug(f"Logging in to {login_url}")
try:
# First, try standard TP-Link login format
# The login endpoint is usually POST to / with form data
login_data = {
"method": "do",
"login": {
"username": self.username,
"password": self._hash_password(self.password),
}
}
log_request("POST", login_url, body=login_data)
response = self._http_session.post(
login_url,
json=login_data,
timeout=30,
)
log_response(response.status_code, dict(response.headers),
response.json() if response.content else None)
if response.status_code == 200:
data = response.json()
stok = data.get("stok", "")
if stok:
log_debug(f"Got stok token: {stok[:20]}...")
# Token valid for ~25 minutes (refresh before 30)
expires_at = time.time() + (25 * 60)
return StokSession(stok=stok, expires_at=expires_at)
# Try alternative login format
alt_login_data = {
"username": self.username,
"password": self._hash_password(self.password),
}
log_debug("Trying alternative login format")
log_request("POST", login_url, body=alt_login_data)
response = self._http_session.post(
login_url,
data=alt_login_data,
timeout=30,
)
log_response(response.status_code, dict(response.headers),
response.text[:500] if response.text else None)
# Check for stok in response
try:
data = response.json()
stok = data.get("stok", data.get("result", {}).get("stok", ""))
if stok:
expires_at = time.time() + (25 * 60)
return StokSession(stok=stok, expires_at=expires_at)
except ValueError:
pass
raise WebClientError("Could not obtain stok token from login response")
except requests.RequestException as e:
log_error("Login failed", e)
raise WebClientError(f"Login failed: {e}") from e
def _ds_request(
self,
data: dict,
) -> dict:
"""
Make a request to the /ds endpoint.
Args:
data: Request data (method, params, etc.)
Returns:
JSON response as dict
"""
url = f"{self.base_url}/stok={self.stok}/ds"
log_request("POST", url, body=data)
try:
response = self._http_session.post(
url,
json=data,
timeout=60,
)
try:
response_data = response.json()
except ValueError:
response_data = {"raw": response.text[:500] if response.text else "empty"}
log_response(response.status_code, dict(response.headers), response_data)
response.raise_for_status()
# Check for error in response
error_code = response_data.get("error_code", 0)
if error_code != 0:
error_msg = response_data.get("error_msg", f"Error code: {error_code}")
raise WebClientError(f"API error: {error_msg}")
return response_data
except requests.RequestException as e:
log_error(f"Request to /ds failed", e)
raise WebClientError(f"Request failed: {e}") from e
def get_channels(self) -> list[Channel]:
"""Get list of camera channels."""
log_debug("Getting channels via /ds")
# Try different methods to get channel list
methods_to_try = [
{"method": "get", "channel": {"table": ["channel"]}},
{"method": "get", "device": {"name": ["device_info"]}},
{"method": "get", "channel_info": {}},
{"method": "do", "channel": {"action": "list"}},
]
for method_data in methods_to_try:
try:
data = self._ds_request(method_data)
# Try to find channels in response
channels = []
# Look for channel data in various places
channel_data = (
data.get("channel", {}).get("table", {}).get("channel", []) or
data.get("channel", []) or
data.get("channels", []) or
data.get("result", {}).get("channels", []) or
[]
)
if channel_data:
for i, ch in enumerate(channel_data):
if isinstance(ch, dict):
channels.append(Channel(
id=ch.get("id", ch.get("channel_id", i + 1)),
name=ch.get("name", ch.get("channel_name", f"Channel {i + 1}")),
enabled=ch.get("enabled", True),
))
if channels:
return channels
except WebClientError as e:
log_debug(f"Method failed: {e}")
continue
# Return default channels if API doesn't provide them
log_debug("Using default channels 1-32")
return [Channel(id=i, name=f"Channel {i}", enabled=True) for i in range(1, 33)]
def search_recordings(
self,
channel_id: int,
start_time: datetime,
end_time: datetime,
recording_type: str = "all",
) -> list[Recording]:
"""
Search for recordings in a time range.
Args:
channel_id: Camera channel ID
start_time: Start of time range
end_time: End of time range
recording_type: Type filter
Returns:
List of Recording objects
"""
start_str = start_time.strftime("%Y%m%d")
end_str = end_time.strftime("%Y%m%d")
start_ts = int(start_time.timestamp())
end_ts = int(end_time.timestamp())
log_debug(f"Searching recordings: ch={channel_id}, {start_time} to {end_time}")
# Try different search methods
methods_to_try = [
{
"method": "get",
"playback": {
"table": ["search"],
"search": {
"channel": channel_id,
"start_date": start_str,
"end_date": end_str,
}
}
},
{
"method": "do",
"playback": {
"action": "search",
"channel": channel_id,
"start_time": start_ts,
"end_time": end_ts,
}
},
{
"method": "get",
"record": {
"table": ["search"],
"search": {
"channel_id": channel_id,
"start": start_ts,
"end": end_ts,
}
}
},
{
"method": "do",
"record": {
"action": "search",
"channel": channel_id,
"start_date": start_str,
"end_date": end_str,
}
},
]
recordings = []
for method_data in methods_to_try:
try:
log_debug(f"Trying: {json.dumps(method_data)[:100]}")
data = self._ds_request(method_data)
# Log full response for debugging
if is_debug_enabled():
log_debug(f"Full response: {json.dumps(data, indent=2, default=str)[:1500]}")
# Look for recordings in various places
record_list = (
data.get("playback", {}).get("table", {}).get("search", []) or
data.get("playback", {}).get("search", []) or
data.get("record", {}).get("table", {}).get("search", []) or
data.get("record", {}).get("search", []) or
data.get("result", {}).get("records", []) or
data.get("records", []) or
data.get("list", []) or
[]
)
log_debug(f"Found {len(record_list)} records in response")
if record_list:
for rec in record_list:
try:
rec_start = self._parse_time(rec.get("start", rec.get("start_time", 0)))
rec_end = self._parse_time(rec.get("end", rec.get("end_time", 0)))
recordings.append(Recording(
id=str(rec.get("id", rec.get("record_id", ""))),
channel_id=channel_id,
start_time=rec_start,
end_time=rec_end,
size_bytes=rec.get("size", rec.get("file_size", 0)),
recording_type=str(rec.get("type", "unknown")),
file_path=rec.get("path", rec.get("file_path")),
))
except Exception as e:
log_debug(f"Error parsing record: {e}")
continue
if recordings:
return recordings
except WebClientError as e:
log_debug(f"Method failed: {e}")
continue
return recordings
def _parse_time(self, t) -> datetime:
"""Parse time from various formats."""
if isinstance(t, (int, float)) and t > 0:
return datetime.fromtimestamp(t)
if isinstance(t, str):
for fmt in ["%Y%m%d%H%M%S", "%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"]:
try:
return datetime.strptime(t, fmt)
except ValueError:
continue
try:
return datetime.fromtimestamp(int(t))
except:
pass
return datetime.now()
def download_recording(
self,
recording: Recording,
output_path: Path,
progress_callback: Optional[Callable[[int, int], None]] = None,
) -> Path:
"""Download a recording to disk."""
# Build download URL
if recording.file_path:
download_url = f"{self.base_url}/stok={self.stok}/ds?download={recording.file_path}"
else:
download_url = f"{self.base_url}/stok={self.stok}/ds?download&channel={recording.channel_id}&id={recording.id}"
log_debug(f"Downloading from: {download_url}")
# Determine output filename
if output_path.is_dir():
start_str = recording.start_time.strftime("%Y%m%d_%H%M%S")
end_str = recording.end_time.strftime("%H%M%S")
filename = f"ch{recording.channel_id}_{start_str}-{end_str}.mp4"
output_file = output_path / filename
else:
output_file = output_path
output_file.parent.mkdir(parents=True, exist_ok=True)
try:
response = self._http_session.get(download_url, stream=True, timeout=300)
response.raise_for_status()
total_size = int(response.headers.get("content-length", recording.size_bytes or 0))
downloaded = 0
with open(output_file, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if progress_callback:
progress_callback(downloaded, total_size)
log_debug(f"Downloaded {downloaded} bytes to {output_file}")
return output_file
except requests.RequestException as e:
log_error(f"Download failed", e)
raise WebClientError(f"Download failed: {e}") from e
def export_time_range(
self,
channel_id: int,
start_time: datetime,
end_time: datetime,
output_dir: Path,
recording_type: str = "all",
show_progress: bool = True,
) -> list[Path]:
"""Export all recordings in a time range."""
recordings = self.search_recordings(channel_id, start_time, end_time, recording_type)
if not recordings:
return []
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
downloaded_files = []
recordings_iter = tqdm(recordings, desc="Downloading", disable=not show_progress)
for recording in recordings_iter:
if show_progress:
recordings_iter.set_postfix_str(f"Ch{recording.channel_id} {recording.start_time:%H:%M}")
try:
output_file = self.download_recording(recording, output_dir)
downloaded_files.append(output_file)
except WebClientError as e:
if show_progress:
tqdm.write(f"Warning: Failed to download {recording}: {e}")
return downloaded_files
def close(self) -> None:
"""Close the HTTP session."""
self._http_session.close()
def __enter__(self) -> "WebClient":
return self
def __exit__(self, *args) -> None:
self.close()
class WebClientError(Exception):
"""Raised when web interface operations fail."""
pass