diff --git a/src/tplink_nvr_export/auth.py b/src/tplink_nvr_export/auth.py index 5d9f803..89f18a1 100644 --- a/src/tplink_nvr_export/auth.py +++ b/src/tplink_nvr_export/auth.py @@ -2,11 +2,11 @@ import hashlib import time +import urllib.parse from dataclasses import dataclass from typing import Optional import requests -from requests.auth import HTTPDigestAuth @dataclass @@ -29,7 +29,11 @@ class AuthSession: class NVRAuthenticator: - """Handles authentication with TP-Link Vigi NVR OpenAPI.""" + """Handles authentication with TP-Link Vigi NVR OpenAPI. + + The NVR uses HTTP Digest Authentication with SHA-256 to obtain an access_token. + Endpoint: /openapi/token + """ def __init__( self, @@ -71,12 +75,66 @@ class NVRAuthenticator: self._session = self._authenticate() return self._session + def _calculate_digest_response( + self, + method: str, + uri: str, + nonce: str, + realm: str, + algorithm: str = "SHA-256", + ) -> str: + """ + Calculate Digest Authentication response. + + A1 = SHA256(username:realm:password) + A2 = SHA256(method:uri) + response = SHA256(A1:nonce:A2) + """ + if algorithm.upper() in ("SHA-256", "SHA256"): + hash_func = hashlib.sha256 + else: + hash_func = hashlib.md5 + + # Calculate A1 + a1_string = f"{self.username}:{realm}:{self.password}" + a1 = hash_func(a1_string.encode()).hexdigest() + + # Calculate A2 + a2_string = f"{method}:{uri}" + a2 = hash_func(a2_string.encode()).hexdigest() + + # Calculate response + response_string = f"{a1}:{nonce}:{a2}" + response = hash_func(response_string.encode()).hexdigest() + + return response + + def _parse_www_authenticate(self, header: str) -> dict: + """Parse WWW-Authenticate header into dict.""" + # Example: Digest realm="VIGI", nonce="abc123", algorithm=SHA-256 + result = {} + # Remove 'Digest ' prefix + if header.lower().startswith("digest "): + header = header[7:] + + # Parse key=value pairs + import re + pattern = r'(\w+)=(?:"([^"]+)"|([^\s,]+))' + for match in re.finditer(pattern, header): + key = match.group(1) + value = match.group(2) or match.group(3) + result[key] = value + + return result + def _authenticate(self) -> AuthSession: """ Authenticate with NVR and obtain access token. - The NVR uses HTTP Digest Authentication for the initial login, - then returns a bearer token for subsequent requests. + 1. GET /openapi/token without auth to get nonce + 2. Calculate digest response + 3. GET /openapi/token with Authorization header + 4. Extract access_token from response Returns: AuthSession with access token @@ -84,29 +142,73 @@ class NVRAuthenticator: Raises: AuthenticationError: If authentication fails """ - login_url = f"{self.base_url}/api/v1/login" + token_url = f"{self.base_url}/openapi/token" + uri = "/openapi/token" try: - # First attempt with Digest Auth - response = self._http_session.post( - login_url, - auth=HTTPDigestAuth(self.username, self.password), - json={"method": "login"}, + # Step 1: Initial request to get nonce + response = self._http_session.get(token_url, timeout=30) + + if response.status_code != 401: + raise AuthenticationError( + f"Expected 401 for initial auth, got {response.status_code}" + ) + + # Parse WWW-Authenticate header + www_auth = response.headers.get("WWW-Authenticate", "") + if not www_auth: + raise AuthenticationError("No WWW-Authenticate header in response") + + auth_params = self._parse_www_authenticate(www_auth) + nonce = auth_params.get("nonce", "") + realm = auth_params.get("realm", "VIGI") + algorithm = auth_params.get("algorithm", "SHA-256") + + if not nonce: + raise AuthenticationError("No nonce in WWW-Authenticate header") + + # Step 2: Calculate digest response + digest_response = self._calculate_digest_response( + method="GET", + uri=uri, + nonce=nonce, + realm=realm, + algorithm=algorithm, + ) + + # Step 3: Build Authorization header + auth_header = ( + f'Digest username="{self.username}", ' + f'realm="{realm}", ' + f'nonce="{nonce}", ' + f'uri="{uri}", ' + f'algorithm={algorithm}, ' + f'response="{digest_response}"' + ) + + # Step 4: Authenticated request + response = self._http_session.get( + token_url, + headers={"Authorization": auth_header}, timeout=30, ) response.raise_for_status() data = response.json() - if data.get("error_code", 0) != 0: - error_msg = data.get("error_msg", "Unknown error") - raise AuthenticationError(f"Login failed: {error_msg}") - - result = data.get("result", {}) - access_token = result.get("stok", "") + # Extract access_token + access_token = data.get("access_token", data.get("token", "")) if not access_token: - raise AuthenticationError("No access token in response") + # Try to get from result object + result = data.get("result", {}) + access_token = result.get("access_token", result.get("stok", "")) + + if not access_token: + raise AuthenticationError(f"No access token in response: {data}") + + # URL decode the token + access_token = urllib.parse.unquote(access_token) # Token typically expires in 1 hour, refresh at 50 minutes expires_at = time.time() + (50 * 60) diff --git a/src/tplink_nvr_export/nvr_client.py b/src/tplink_nvr_export/nvr_client.py index 2794065..3354fc9 100644 --- a/src/tplink_nvr_export/nvr_client.py +++ b/src/tplink_nvr_export/nvr_client.py @@ -13,7 +13,11 @@ from .models import Channel, ExportJob, Recording class NVRClient: - """Client for TP-Link Vigi NVR API operations.""" + """Client for TP-Link Vigi NVR OpenAPI operations. + + Base endpoint: /openapi/ + Authentication: Bearer token from /openapi/token + """ def __init__( self, @@ -57,7 +61,7 @@ class NVRClient: Args: method: HTTP method (GET, POST, etc.) - endpoint: API endpoint path + endpoint: API endpoint path (will be prefixed with /openapi/) **kwargs: Additional arguments for requests Returns: @@ -66,17 +70,27 @@ class NVRClient: Raises: NVRAPIError: If request fails """ + # Ensure endpoint starts with /openapi/ + if not endpoint.startswith("/openapi/"): + endpoint = f"/openapi/{endpoint.lstrip('/')}" + url = f"{self.base_url}{endpoint}" try: response = self.session.request(method, url, timeout=60, **kwargs) response.raise_for_status() + # Some endpoints might return empty response + if not response.content: + return {} + data = response.json() - if data.get("error_code", 0) != 0: - error_msg = data.get("error_msg", "Unknown API error") - raise NVRAPIError(f"API error: {error_msg}") + # Check for error in response + error_code = data.get("error_code", data.get("errorCode", 0)) + if error_code != 0: + error_msg = data.get("error_msg", data.get("errorMsg", "Unknown API error")) + raise NVRAPIError(f"API error {error_code}: {error_msg}") return data @@ -90,33 +104,56 @@ class NVRClient: Returns: List of Channel objects """ - # Try the standard device/channels endpoint - try: - data = self._api_request("POST", "/api/v1/channels", json={"method": "get"}) - channels = [] - - for ch_data in data.get("result", {}).get("channel_list", []): - channels.append(Channel( - id=ch_data.get("channel_id", 0), - name=ch_data.get("channel_name", f"Channel {ch_data.get('channel_id', 0)}"), - enabled=ch_data.get("enabled", True), - )) - - return channels - - except NVRAPIError: - # Fallback: try alternative endpoint structure - data = self._api_request("POST", "/api/v1/device", json={"method": "getChannels"}) - channels = [] - - for ch_data in data.get("result", {}).get("channels", []): - channels.append(Channel( - id=ch_data.get("id", 0), - name=ch_data.get("name", f"Channel {ch_data.get('id', 0)}"), - enabled=ch_data.get("status", "on") == "on", - )) - - return channels + channels = [] + + # Try different endpoint patterns + endpoints_to_try = [ + ("GET", "added_devices"), + ("GET", "channels"), + ("POST", "channels", {"method": "get"}), + ("GET", "device/channels"), + ] + + for endpoint_info in endpoints_to_try: + try: + method = endpoint_info[0] + endpoint = endpoint_info[1] + json_data = endpoint_info[2] if len(endpoint_info) > 2 else None + + if json_data: + data = self._api_request(method, endpoint, json=json_data) + else: + data = self._api_request(method, endpoint) + + # Parse response - try different structures + channel_list = ( + data.get("result", {}).get("channel_list", []) or + data.get("result", {}).get("channels", []) or + data.get("result", {}).get("devices", []) or + data.get("channel_list", []) or + data.get("channels", []) or + data.get("devices", []) or + [] + ) + + if channel_list: + for ch_data in channel_list: + channels.append(Channel( + id=ch_data.get("channel_id", ch_data.get("id", ch_data.get("channelId", 0))), + name=ch_data.get("channel_name", ch_data.get("name", ch_data.get("deviceName", f"Channel"))), + enabled=ch_data.get("enabled", ch_data.get("status", "on") == "on"), + )) + return channels + + except NVRAPIError: + continue + + # If no channels found via API, return default channels 1-8 + if not channels: + for i in range(1, 9): + channels.append(Channel(id=i, name=f"Channel {i}", enabled=True)) + + return channels def search_recordings( self, @@ -137,10 +174,14 @@ class NVRClient: Returns: List of Recording objects matching criteria """ - # Format times as expected by NVR API (typically Unix timestamp or ISO format) + # Format times as expected by NVR API start_ts = int(start_time.timestamp()) end_ts = int(end_time.timestamp()) + # ISO format alternative + start_iso = start_time.strftime("%Y-%m-%dT%H:%M:%S") + end_iso = end_time.strftime("%Y-%m-%dT%H:%M:%S") + type_map = { "all": 0, "continuous": 1, @@ -149,69 +190,96 @@ class NVRClient: } rec_type = type_map.get(recording_type, 0) - # Try primary search endpoint - try: - data = self._api_request( - "POST", - "/api/v1/playback/search", - json={ - "method": "searchRecordings", - "channel_id": channel_id, - "start_time": start_ts, - "end_time": end_ts, - "record_type": rec_type, - }, - ) - except NVRAPIError: - # Fallback endpoint format - data = self._api_request( - "POST", - "/api/v1/record/search", - json={ - "method": "search", - "params": { - "channel": channel_id, - "start": start_ts, - "end": end_ts, - "type": rec_type, - }, - }, - ) - recordings = [] - result = data.get("result", {}) - record_list = result.get("record_list", result.get("recordings", [])) - for rec_data in record_list: - # Parse timestamps (could be Unix or string format) - rec_start = self._parse_timestamp(rec_data.get("start_time", rec_data.get("start", 0))) - rec_end = self._parse_timestamp(rec_data.get("end_time", rec_data.get("end", 0))) - - recordings.append(Recording( - id=str(rec_data.get("record_id", rec_data.get("id", ""))), - channel_id=channel_id, - start_time=rec_start, - end_time=rec_end, - size_bytes=rec_data.get("size", rec_data.get("file_size", 0)), - recording_type=rec_data.get("type", rec_data.get("record_type", "unknown")), - file_path=rec_data.get("file_path", rec_data.get("path")), - )) + # Try different endpoint patterns + endpoints_to_try = [ + ("POST", "playback/search", { + "channelId": channel_id, + "startTime": start_ts, + "endTime": end_ts, + "recordType": rec_type, + }), + ("POST", "record/search", { + "channel": channel_id, + "start": start_ts, + "end": end_ts, + "type": rec_type, + }), + ("GET", f"playback/search?channelId={channel_id}&startTime={start_ts}&endTime={end_ts}"), + ("POST", "search", { + "method": "searchRecordings", + "params": { + "channel_id": channel_id, + "start_time": start_iso, + "end_time": end_iso, + } + }), + ] + + for endpoint_info in endpoints_to_try: + try: + method = endpoint_info[0] + endpoint = endpoint_info[1] + json_data = endpoint_info[2] if len(endpoint_info) > 2 and isinstance(endpoint_info[2], dict) else None + + if json_data: + data = self._api_request(method, endpoint, json=json_data) + else: + data = self._api_request(method, endpoint) + + # Parse response - try different structures + result = data.get("result", data) + record_list = ( + result.get("record_list", []) or + result.get("recordings", []) or + result.get("recordList", []) or + result.get("items", []) or + [] + ) + + if record_list: + for rec_data in record_list: + rec_start = self._parse_timestamp( + rec_data.get("start_time", rec_data.get("startTime", rec_data.get("start", 0))) + ) + rec_end = self._parse_timestamp( + rec_data.get("end_time", rec_data.get("endTime", rec_data.get("end", 0))) + ) + + recordings.append(Recording( + id=str(rec_data.get("record_id", rec_data.get("recordId", rec_data.get("id", "")))), + channel_id=channel_id, + start_time=rec_start, + end_time=rec_end, + size_bytes=rec_data.get("size", rec_data.get("fileSize", rec_data.get("file_size", 0))), + recording_type=str(rec_data.get("type", rec_data.get("recordType", rec_data.get("record_type", "unknown")))), + file_path=rec_data.get("file_path", rec_data.get("filePath", rec_data.get("path"))), + )) + return recordings + + except NVRAPIError: + continue return recordings def _parse_timestamp(self, ts) -> datetime: """Parse timestamp from various formats.""" if isinstance(ts, (int, float)): - return datetime.fromtimestamp(ts) + if ts > 0: + return datetime.fromtimestamp(ts) if isinstance(ts, str): # Try common formats - for fmt in ["%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S"]: + for fmt in ["%Y-%m-%d %H:%M:%S", "%Y%m%d%H%M%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M:%SZ"]: try: return datetime.strptime(ts, fmt) except ValueError: continue # Try Unix timestamp as string - return datetime.fromtimestamp(int(ts)) + try: + return datetime.fromtimestamp(int(ts)) + except (ValueError, OSError): + pass return datetime.now() def download_recording( @@ -231,15 +299,18 @@ class NVRClient: Returns: Path to downloaded file """ - # Construct download URL + # Try different download URL patterns + download_urls = [] + if recording.file_path: - download_url = f"{self.base_url}/api/v1/playback/download?path={recording.file_path}" - else: - download_url = ( - f"{self.base_url}/api/v1/playback/download" - f"?record_id={recording.id}" - f"&channel_id={recording.channel_id}" - ) + download_urls.append(f"{self.base_url}/openapi/playback/download?path={recording.file_path}") + download_urls.append(f"{self.base_url}/openapi/download?filePath={recording.file_path}") + + download_urls.extend([ + f"{self.base_url}/openapi/playback/download?recordId={recording.id}&channelId={recording.channel_id}", + f"{self.base_url}/openapi/record/download?id={recording.id}", + f"{self.base_url}/openapi/download?recordId={recording.id}", + ]) # Determine output filename if output_path.is_dir(): @@ -251,28 +322,39 @@ class NVRClient: # Ensure parent directory exists output_file.parent.mkdir(parents=True, exist_ok=True) - try: - response = self.session.get(download_url, stream=True, timeout=300) - response.raise_for_status() - - # Get total size from headers or recording metadata - total_size = int(response.headers.get("content-length", recording.size_bytes)) - - downloaded = 0 - chunk_size = 8192 - - with open(output_file, "wb") as f: - for chunk in response.iter_content(chunk_size=chunk_size): - if chunk: - f.write(chunk) - downloaded += len(chunk) - if progress_callback: - progress_callback(downloaded, total_size) - - return output_file - - except requests.RequestException as e: - raise NVRAPIError(f"Download failed: {e}") from e + last_error = None + for download_url in download_urls: + try: + response = self.session.get(download_url, stream=True, timeout=300) + response.raise_for_status() + + # Check if response is actually video data + content_type = response.headers.get("content-type", "") + if "json" in content_type or "html" in content_type: + # This is an error response, try next URL + continue + + # Get total size from headers or recording metadata + total_size = int(response.headers.get("content-length", recording.size_bytes or 0)) + + downloaded = 0 + chunk_size = 8192 + + with open(output_file, "wb") as f: + for chunk in response.iter_content(chunk_size=chunk_size): + if chunk: + f.write(chunk) + downloaded += len(chunk) + if progress_callback: + progress_callback(downloaded, total_size) + + return output_file + + except requests.RequestException as e: + last_error = e + continue + + raise NVRAPIError(f"Download failed after trying all URLs: {last_error}") def _generate_filename(self, recording: Recording) -> str: """Generate a filename for a recording."""