"""Auto Media Remover (listing mode) Script to query a Tautulli instance and list Movies and TV Shows whose last watched ( last_played ) date is older than a specified number of days (default 365 days). It paginates through libraries, collecting movie and show items. Usage examples (PowerShell): python amr.py --url http://localhost:8181 --api-key YOUR_KEY python amr.py --url http://tautulli.local:8181 --api-key YOUR_KEY --days 400 --output old_media.csv python amr.py --url http://localhost:8181 --api-key YOUR_KEY --include-never-watched python amr.py --url http://localhost:8181 --api-key YOUR_KEY --sort size Environment variables (fallback if CLI args not provided): TAUTULLI_URL, TAUTULLI_API_KEY Note: This only lists candidates. It does NOT delete anything. References: https://github.com/Tautulli/Tautulli/wiki/Tautulli-API-Reference """ from __future__ import annotations import argparse import csv import datetime as dt import os import sys from dataclasses import dataclass from typing import Any, Dict, Iterable, List, Optional try: import requests # type: ignore except ImportError: # pragma: no cover print("The 'requests' library is required. Install with: pip install requests", file=sys.stderr) sys.exit(1) # ----------------------------- Data Models ---------------------------------- @dataclass class MediaItem: rating_key: str title: str media_type: str # movie or show library_name: str last_played: Optional[dt.datetime] # UTC naive added_at: Optional[dt.datetime] year: Optional[int] size_bytes: Optional[int] = None # added: file size in bytes @property def days_since_last_played(self) -> Optional[int]: if not self.last_played: return None return (dt.datetime.utcnow() - self.last_played).days @property def size_gb(self) -> Optional[float]: if self.size_bytes is None: return None return self.size_bytes / (1024 ** 3) def to_row(self) -> List[str]: return [ self.media_type, self.library_name, self.title, str(self.year or ''), self.last_played.isoformat() if self.last_played else '', str(self.days_since_last_played) if self.days_since_last_played is not None else '', self.added_at.isoformat() if self.added_at else '', self.rating_key, str(self.size_bytes or ''), ] # ----------------------------- API Client ----------------------------------- class TautulliClient: def __init__(self, base_url: str, api_key: str, timeout: int = 30): self.base_url = base_url.rstrip('/') self.api_key = api_key self.timeout = timeout def _get(self, **params) -> Dict[str, Any]: params = {"apikey": self.api_key, **params} try: r = requests.get(f"{self.base_url}/api/v2", params=params, timeout=self.timeout) r.raise_for_status() data = r.json() except requests.RequestException as e: raise RuntimeError(f"HTTP error calling Tautulli: {e}") from e except ValueError as e: raise RuntimeError("Invalid JSON response from Tautulli") from e if data.get('response', {}).get('result') != 'success': raise RuntimeError(f"Tautulli API error: {data.get('response', {}).get('message')}") return data['response']['data'] def get_libraries(self) -> List[Dict[str, Any]]: data = self._get(cmd='get_libraries') return data # list of library dicts def iter_library_items(self, section_id: int, library_name: str, section_type: str) -> Iterable[MediaItem]: # Uses get_library_media_info with pagination length = 100 start = 0 total = None while True: data = self._get(cmd='get_library_media_info', section_id=section_id, start=start, length=length) # DataTables style items = data.get('data') or data # sometimes direct list if total is None: total = data.get('recordsTotal') or data.get('total_count') or len(items) for it in items: # Fields vary; attempt to normalize last_played_ts = it.get('last_played') or it.get('last_watched') added_at_ts = it.get('added_at') size_val = it.get('file_size') or it.get('size') or it.get('media_size') try: size_bytes = int(size_val) if size_val is not None else None except Exception: size_bytes = None def conv(ts): if not ts: return None try: return dt.datetime.utcfromtimestamp(int(ts)) except Exception: return None last_played = conv(last_played_ts) added_at = conv(added_at_ts) yield MediaItem( rating_key=str(it.get('rating_key')), title=it.get('title') or it.get('name') or 'Unknown', media_type=section_type, library_name=library_name, last_played=last_played, added_at=added_at, year=it.get('year'), size_bytes=size_bytes, ) start += length if total is not None and start >= total: break # ----------------------------- Core Logic ------------------------------------ def find_old_media(client: TautulliClient, days: int, include_never_watched: bool) -> List[MediaItem]: cutoff = dt.datetime.utcnow() - dt.timedelta(days=days) results: List[MediaItem] = [] libraries = client.get_libraries() for lib in libraries: section_type = lib.get('section_type') # movie, show, artist, photo, etc. if section_type not in ('movie', 'show'): continue name = lib.get('section_name') or lib.get('name') or f"Library {lib.get('section_id')}" section_id = lib.get('section_id') or lib.get('id') if section_id is None: continue try: for item in client.iter_library_items(section_id=int(section_id), library_name=name, section_type=section_type): if item.last_played is None: if include_never_watched: results.append(item) continue if item.last_played < cutoff: results.append(item) except Exception as e: print(f"Warning: failed to process library '{name}': {e}", file=sys.stderr) return results # ----------------------------- CLI ------------------------------------------ def parse_args(argv: Optional[List[str]] = None) -> argparse.Namespace: p = argparse.ArgumentParser(description="List Tautulli media last watched over N days ago.") p.add_argument('--url', dest='url', default=os.environ.get('TAUTULLI_URL'), help='Base URL to Tautulli (e.g. http://localhost:8181)') p.add_argument('--api-key', dest='api_key', default=os.environ.get('TAUTULLI_API_KEY'), help='Tautulli API key') p.add_argument('--days', type=int, default=365, help='Age in days since last watched (default: 365)') p.add_argument('--include-never-watched', action='store_true', help='Include items never watched (no last_played)') p.add_argument('--output', help='Optional CSV output file path') p.add_argument('--sort', choices=['title', 'days', 'last_played', 'size'], default='days', help='Sort output list') return p.parse_args(argv) def main(argv: Optional[List[str]] = None) -> int: args = parse_args(argv) if not args.url or not args.api_key: print('Error: --url and --api-key (or environment variables) are required.', file=sys.stderr) return 2 client = TautulliClient(args.url, args.api_key) try: old_media = find_old_media(client, days=args.days, include_never_watched=args.include_never_watched) except Exception as e: print(f"Failed to query Tautulli: {e}", file=sys.stderr) return 1 if args.sort == 'title': old_media.sort(key=lambda m: (m.title.lower(), m.media_type)) elif args.sort == 'last_played': old_media.sort(key=lambda m: (m.last_played or dt.datetime.min)) elif args.sort == 'size': old_media.sort(key=lambda m: (m.size_bytes or -1), reverse=True) else: # days old_media.sort(key=lambda m: (m.days_since_last_played if m.days_since_last_played is not None else -1), reverse=True) # Console output print(f"Found {len(old_media)} media items last watched over {args.days} days ago" + (" (including never watched)" if args.include_never_watched else '')) print('-' * 135) print(f"{'Type':5} {'Library':20} {'Title':40} {'Year':4} {'Last Played (UTC)':20} {'Days':5} {'Size(GB)':8}") print('-' * 135) for m in old_media: lp = m.last_played.isoformat(sep=' ')[:19] if m.last_played else 'Never' days = m.days_since_last_played if m.days_since_last_played is not None else '' size_gb = f"{m.size_gb:.2f}" if m.size_gb is not None else '' print(f"{m.media_type[:5]:5} {m.library_name[:20]:20} {m.title[:40]:40} {str(m.year or ''):4} {lp:20} {str(days):5} {size_gb:8}") # CSV output if args.output: try: with open(args.output, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['media_type', 'library', 'title', 'year', 'last_played_utc', 'days_since_last_played', 'added_at_utc', 'rating_key', 'size_bytes']) for m in old_media: writer.writerow(m.to_row()) print(f"\nCSV written to {args.output}") except OSError as e: print(f"Failed to write CSV: {e}", file=sys.stderr) return 1 return 0 if __name__ == '__main__': # pragma: no cover raise SystemExit(main())