import os import re import sqlite3 import requests import time import jwt import json import logging # New import for logging from fastapi import FastAPI, HTTPException, Request, Depends, Header, WebSocket, WebSocketDisconnect, Query from fastapi.responses import StreamingResponse, FileResponse from passlib.context import CryptContext import media_scanner # Import the Rust module from rapidfuzz import fuzz # Configure logging logging.basicConfig( filename="scan.log", # Log file name filemode="a", # Append mode format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO ) # Configuration MOVIES_DIR = r"Z:\plexmediaserver\movies" # Directory containing movie files TV_SHOWS_DIR = r"Z:\plexmediaserver\tv" # Directory containing TV shows and episodes DB_PATH = "movies.db" # SQLite database file OMDB_API_KEY = "8275d9b8" # Get from http://www.omdbapi.com/ # Authentication settings SECRET_KEY = "yoursecretkey" # Use a secure secret in production! ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_SECONDS = 600 # Set up a password context for hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # Initialize FastAPI app = FastAPI(title='Movie Library API') # Global dictionary to store sync sessions keyed by session_id sync_sessions = {} @app.websocket("/ws/sync/{session_id}") async def websocket_sync(session_id: str, websocket: WebSocket, media_id: str = Query(...), media_type: str = Query(...)): await websocket.accept() if session_id not in sync_sessions: sync_sessions[session_id] = [] sync_sessions[session_id].append((websocket, media_id, media_type)) try: while True: data = await websocket.receive_text() message_payload = {"media_id": media_id, "media_type": media_type, "data": data} broadcast = json.dumps(message_payload) for connection, _, _ in sync_sessions[session_id]: if connection != websocket: await connection.send_text(broadcast) except WebSocketDisconnect: sync_sessions[session_id] = [ (conn, m_id, m_type) for conn, m_id, m_type in sync_sessions[session_id] if conn != websocket ] if not sync_sessions[session_id]: del sync_sessions[session_id] def create_access_token(data: dict, expires_delta: int = ACCESS_TOKEN_EXPIRE_SECONDS): to_encode = data.copy() expire = int(time.time()) + expires_delta to_encode.update({"exp": expire}) token = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return token def verify_token(token: str): try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload # In a more complete system, you may return a user object. except jwt.PyJWTError: raise HTTPException(status_code=401, detail="Invalid or expired token") # Dependency to extract and verify token from the header def get_current_user(authorization: str = Header(...)): if not authorization.startswith("Bearer "): raise HTTPException(status_code=401, detail="Invalid authentication header") token = authorization[len("Bearer "):] return verify_token(token) # Database helper functions def init_db(): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(''' CREATE TABLE IF NOT EXISTS movies ( id INTEGER PRIMARY KEY, filepath TEXT UNIQUE, title TEXT, year TEXT, rated TEXT, released TEXT, runtime TEXT, genre TEXT, director TEXT, writer TEXT, actors TEXT, plot TEXT, language TEXT, country TEXT, awards TEXT, poster TEXT, imdb_rating TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ) ''') c.execute(''' CREATE TABLE IF NOT EXISTS episodes ( id INTEGER PRIMARY KEY, filepath TEXT UNIQUE, tv_show_id INTEGER, season INTEGER, episode INTEGER, title TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(tv_show_id) REFERENCES tv_shows(id) ) ''') c.execute(''' CREATE TABLE IF NOT EXISTS tv_shows ( id INTEGER PRIMARY KEY, name TEXT UNIQUE, rating TEXT, summary TEXT, genres TEXT, poster TEXT, created_at TEXT DEFAULT CURRENT_TIMESTAMP ) ''') c.execute(''' CREATE TABLE IF NOT EXISTS accounts ( id INTEGER PRIMARY KEY, username TEXT UNIQUE, hashed_password TEXT ) ''') # New table for tracking watch progress c.execute(''' CREATE TABLE IF NOT EXISTS watch_progress ( id INTEGER PRIMARY KEY, username TEXT, media_type TEXT, -- 'movie' or 'episode' media_id INTEGER, last_position INTEGER, tv_show_id INTEGER, -- added tv_show_id column updated_at TEXT DEFAULT CURRENT_TIMESTAMP, UNIQUE(username, media_type, media_id) ) ''') conn.commit() conn.close() def movie_exists(rel_path): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('SELECT 1 FROM movies WHERE filepath = ?', (rel_path,)) exists = c.fetchone() is not None conn.close() return exists def add_movie_to_db(movie): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(''' INSERT OR IGNORE INTO movies ( filepath, title, year, rated, released, runtime, genre, director, writer, actors, plot, language, country, awards, poster, imdb_rating, created_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', ( movie['filepath'], movie.get('Title'), movie.get('Year'), movie.get('Rated'), movie.get('Released'), movie.get('Runtime'), movie.get('Genre'), movie.get('Director'), movie.get('Writer'), movie.get('Actors'), movie.get('Plot'), movie.get('Language'), movie.get('Country'), movie.get('Awards'), movie.get('Poster'), movie.get('imdbRating') )) conn.commit() conn.close() def fetch_movie_details(title, year=None): params = {'t': title, 'apikey': OMDB_API_KEY} if year: params['y'] = year response = requests.get('http://www.omdbapi.com/', params=params) data = response.json() if data.get('Response') == 'True': logging.info(data) if data['Poster'] == 'N/A': data['Poster'] = 'https://placehold.co/500x750/jpg?text=No+Poster' return data else: raise ValueError(f"Movie '{title}' not found.") def tv_show_exists(show_name): normalized_name = show_name.strip().lower() alt_name = ("the " + normalized_name).strip() conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('SELECT 1 FROM tv_shows WHERE lower(name) = ? OR lower(name) = ?', (normalized_name, alt_name)) exists = c.fetchone() is not None conn.close() return exists def fetch_tv_show_details(show_name): response = requests.get("http://api.tvmaze.com/singlesearch/shows", params={'q': show_name}) if response.status_code != 200: raise ValueError(f"TV show '{show_name}' not found via TV API.") return response.json() def add_tv_show_to_db(details): conn = sqlite3.connect(DB_PATH) c = conn.cursor() genres = ", ".join(details.get('genres', [])) rating = details.get('rating', {}).get('average', 'N/A') summary = details.get('summary', '') image = details.get('image', {}).get('medium', '') if details.get('image') else '' c.execute(''' INSERT OR IGNORE INTO tv_shows ( name, rating, summary, genres, poster, created_at ) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', (details.get('name'), rating, summary, genres, image)) conn.commit() conn.close() def scan_and_populate(): init_db() processed_shows = set() # Keep track of processed TV shows log_messages = [] # Collect log messages try: msg = "Scanning movies..." print(msg) logging.info(msg) movie_files = media_scanner.scan_movies(MOVIES_DIR) msg = f"Found movie files: {movie_files}" print(msg) logging.info(msg) for full_path in movie_files: parent = os.path.basename(os.path.dirname(full_path)) match = re.match(r"(.+?)\s*\((\d{4})\)$", parent) if match: title = match.group(1).strip() year = match.group(2) else: title = os.path.splitext(os.path.basename(full_path))[0] year = None rel_path = os.path.relpath(full_path, MOVIES_DIR) if movie_exists(rel_path): msg = f"Movie already exists: {title} ({year or 'n/a'})" print(msg) logging.info(msg) continue try: details = fetch_movie_details(title, year) details['filepath'] = rel_path add_movie_to_db(details) msg = f"Added {title} ({year or 'n/a'}) to database." print(msg) logging.info(msg) except Exception as e: msg = f"Skipping {title}: {e}" print(msg) logging.error(msg) msg = "Scanning TV shows..." print(msg) logging.info(msg) tv_show_files = media_scanner.scan_tv_shows(TV_SHOWS_DIR) msg = f"Found TV show files: {tv_show_files}" print(msg) logging.info(msg) for full_path in tv_show_files: # Extract TV show name from the folder structure: "Show Name\Season X\filename" show_name = os.path.basename(os.path.dirname(os.path.dirname(full_path))) # Extract season number from the season folder ("Season X") season_dir = os.path.basename(os.path.dirname(full_path)) season_match = re.search(r"Season\s*(\d+)", season_dir, re.IGNORECASE) if not season_match: msg = f"Skipping {full_path}: Season number not found in directory '{season_dir}'" print(msg) logging.warning(msg) continue season = int(season_match.group(1)) # Extract episode number solely from the SxxEyy pattern in the filename basename = os.path.basename(full_path) ep_match = re.search(r"(?i)S(\d{2})E(\d{2})", basename) if not ep_match: msg = f"Skipping {full_path}: SxxEyy pattern not found in filename" print(msg) logging.warning(msg) continue episode = int(ep_match.group(2)) # Check and add TV show details if not processed yet if show_name not in processed_shows: if not tv_show_exists(show_name): try: tv_details = fetch_tv_show_details(show_name) add_tv_show_to_db(tv_details) msg = f"Added TV show details: {show_name}" print(msg) logging.info(msg) except Exception as e: msg = f"Skipping TV show details for {show_name}: {e}" print(msg) logging.error(msg) else: msg = f"TV show already exists: {show_name}" print(msg) logging.info(msg) processed_shows.add(show_name) rel_path = os.path.relpath(full_path, TV_SHOWS_DIR) conn = sqlite3.connect(DB_PATH) c = conn.cursor() # Try exact match first c.execute('SELECT id, name FROM tv_shows WHERE lower(name) = ?', (show_name.lower(),)) row = c.fetchone() if row: tv_show_id = row[0] else: # If not found, use fuzzy matching with a threshold (e.g. 80) c.execute('SELECT id, name FROM tv_shows') tv_show_row = None for db_row in c.fetchall(): db_id, db_name = db_row similarity = fuzz.ratio(db_name.lower(), show_name.lower()) if similarity > 80: tv_show_row = (db_id, db_name) break if tv_show_row: tv_show_id = tv_show_row[0] msg = f"Fuzzy matched '{show_name}' to '{tv_show_row[1]}' with score {similarity}" print(msg) logging.info(msg) else: msg = f"TV show id not found for {show_name}" print(msg) logging.warning(msg) conn.close() continue # Use the complete filename (without extension) as the episode title title = os.path.splitext(basename)[0] c.execute(''' INSERT OR IGNORE INTO episodes (filepath, tv_show_id, season, episode, title, created_at) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ''', (rel_path, tv_show_id, season, episode, title)) conn.commit() conn.close() msg = f"Added episode: {show_name} Season {season} Episode {episode}" print(msg) logging.info(msg) except Exception as e: msg = f"Error during scanning: {e}" print(msg) logging.error(msg) finally: msg = "Scan and populate completed." print(msg) logging.info(msg) def range_streamer(file_path: str, range_header: str = None, chunk_size: int = 1024*1024): file_size = os.path.getsize(file_path) if range_header is None: def iterfile(): with open(file_path, 'rb') as f: while (chunk := f.read(chunk_size)): yield chunk return StreamingResponse(iterfile(), media_type="video/mp4") m = re.search(r'bytes=(\d+)-(\d*)', range_header) if m: start = int(m.group(1)) end = m.group(2) if end: end = int(end) else: end = file_size - 1 else: start = 0 end = file_size - 1 content_length = (end - start) + 1 headers = { "Content-Range": f"bytes {start}-{end}/{file_size}", "Accept-Ranges": "bytes", "Content-Length": str(content_length) } def iter_range(): with open(file_path, 'rb') as f: f.seek(start) remaining = content_length while remaining > 0: chunk = f.read(min(chunk_size, remaining)) if not chunk: break remaining -= len(chunk) yield chunk return StreamingResponse(iter_range(), status_code=206, headers=headers, media_type="video/mp4") @app.get('/movies') def list_movies(current_user: dict = Depends(get_current_user)): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('SELECT * FROM movies') cols = [desc[0] for desc in c.description] movies = [dict(zip(cols, row)) for row in c.fetchall()] conn.close() return movies @app.get('/movies/{movie_id}') def get_movie(movie_id: int, current_user: dict = Depends(get_current_user)): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('SELECT * FROM movies WHERE id = ?', (movie_id,)) row = c.fetchone() conn.close() if row: cols = [desc[0] for desc in c.description] return dict(zip(cols, row)) raise HTTPException(status_code=404, detail='Movie not found') @app.get('/stream/{movie_id}') # def stream_movie(movie_id: int, request: Request, current_user: dict = Depends(get_current_user)): def stream_movie(movie_id: int, request: Request): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('SELECT filepath FROM movies WHERE id = ?', (movie_id,)) row = c.fetchone() conn.close() if not row: raise HTTPException(status_code=404, detail='Movie not found') file_path = os.path.join(MOVIES_DIR, row[0]) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail='File not found') range_header = request.headers.get('range') return range_streamer(file_path, range_header) @app.get('/episodes') def list_episodes(current_user: dict = Depends(get_current_user)): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('SELECT * FROM episodes') cols = [desc[0] for desc in c.description] episodes = [dict(zip(cols, row)) for row in c.fetchall()] conn.close() return episodes @app.get('/shows') def list_shows(current_user: dict = Depends(get_current_user)): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('SELECT * FROM tv_shows') cols = [desc[0] for desc in c.description] shows = [dict(zip(cols, row)) for row in c.fetchall()] conn.close() return shows @app.get('/shows/{show_id}') def get_tv_show(show_id: int, current_user: dict = Depends(get_current_user)): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('SELECT * FROM tv_shows WHERE id = ?', (show_id,)) row = c.fetchone() cols = [desc[0] for desc in c.description] if row else [] conn.close() if row: return dict(zip(cols, row)) raise HTTPException(status_code=404, detail="TV show not found") @app.get('/shows/{show_id}/seasons') def list_seasons(show_id: int, current_user: dict = Depends(get_current_user)): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('SELECT DISTINCT season FROM episodes WHERE tv_show_id = ?', (show_id,)) seasons = sorted([row[0] for row in c.fetchall()]) conn.close() if not seasons: raise HTTPException(status_code=404, detail='Seasons not found for this TV show') return seasons @app.get('/shows/{show_id}/seasons/{season}/episodes') def list_episodes_for_season(show_id: int, season: int, current_user: dict = Depends(get_current_user)): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(''' SELECT id, filepath, episode, title FROM episodes WHERE tv_show_id = ? AND season = ? ORDER BY episode ''', (show_id, season)) episodes = [dict(zip(['id', 'filepath', 'episode', 'title'], row)) for row in c.fetchall()] conn.close() if not episodes: raise HTTPException(status_code=404, detail='No episodes found for this season') return episodes @app.get('/stream_episode/{episode_id}') # def stream_episode(episode_id: int, request: Request, current_user: dict = Depends(get_current_user)): def stream_episode(episode_id: int, request: Request): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute('SELECT filepath FROM episodes WHERE id = ?', (episode_id,)) row = c.fetchone() conn.close() if not row: raise HTTPException(status_code=404, detail='Episode not found') file_path = os.path.join(TV_SHOWS_DIR, row[0]) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail='File not found') range_header = request.headers.get('range') return range_streamer(file_path, range_header) @app.post('/register') def register_account(user: dict): username = user.get("username") password = user.get("password") if not username or not password: raise HTTPException(status_code=400, detail="Username and password required") conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT id FROM accounts WHERE username = ?", (username,)) if c.fetchone(): conn.close() raise HTTPException(status_code=400, detail="Username already exists") hashed_password = pwd_context.hash(password) c.execute("INSERT INTO accounts (username, hashed_password) VALUES (?, ?)", (username, hashed_password)) conn.commit() conn.close() return {"message": "Account registered successfully"} @app.post('/login') def login_account(user: dict): username = user.get("username") password = user.get("password") if not username or not password: raise HTTPException(status_code=400, detail="Username and password required") conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT hashed_password FROM accounts WHERE username = ?", (username,)) row = c.fetchone() conn.close() if not row or not pwd_context.verify(password, row[0]): raise HTTPException(status_code=401, detail="Invalid credentials") token = create_access_token({"sub": username}) return {"access_token": token, "token_type": "bearer"} @app.post('/scan') def scan_new_files(current_user: dict = Depends(get_current_user)): try: scan_and_populate() return {"success": True} except Exception as e: return {"success": False, "error": str(e)} @app.get("/sessions/{session_id}") async def get_session_details(session_id: str): if session_id in sync_sessions: # Return the media_id and media_type of the first connection in the session _, media_id, media_type = sync_sessions[session_id][0] return {"session_id": session_id, "media_id": media_id, "media_type": media_type} else: raise HTTPException(status_code=404, detail="Session not found") # New endpoint to save watch progress (record timestamp when user stops watching) @app.post('/save_progress/{media_type}/{media_id}') def save_progress(media_type: str, media_id: int, progress: dict, current_user: dict = Depends(get_current_user)): last_position = progress.get("last_position") if last_position is None: raise HTTPException(status_code=400, detail="Missing last_position in payload") tv_show_id = None if media_type.lower() == "episode": # Lookup the associated tv_show_id for the episode. conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute("SELECT tv_show_id FROM episodes WHERE id = ?", (media_id,)) row = c.fetchone() if row: tv_show_id = row[0] print(f"Found tv_show_id {tv_show_id} for episode {media_id}") else: print(f"No tv show found for episode {media_id}") conn.close() conn = sqlite3.connect(DB_PATH) c = conn.cursor() if media_type.lower() == "episode" and tv_show_id is not None: # For episodes, we assume a unique SQL index exists on (username, tv_show_id) c.execute(''' INSERT INTO watch_progress (username, media_type, media_id, last_position, tv_show_id, updated_at) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ON CONFLICT(username, tv_show_id) DO UPDATE SET media_id = excluded.media_id, last_position = excluded.last_position, updated_at = CURRENT_TIMESTAMP ''', (current_user.get("sub"), media_type, media_id, last_position, tv_show_id)) else: # For movies (or episodes without a tv_show_id), fall back to unique(username, media_type, media_id) c.execute(''' INSERT INTO watch_progress (username, media_type, media_id, last_position, tv_show_id, updated_at) VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP) ON CONFLICT(username, media_type, media_id) DO UPDATE SET last_position = excluded.last_position, tv_show_id = excluded.tv_show_id, updated_at = CURRENT_TIMESTAMP ''', (current_user.get("sub"), media_type, media_id, last_position, tv_show_id)) conn.commit() conn.close() print(f"Progress saved for {media_type} ID {media_id} at position {last_position}") response = {"message": "Progress saved", "media_type": media_type, "media_id": media_id, "last_position": last_position} if tv_show_id: response["tv_show_id"] = tv_show_id return response # New endpoint to get saved watch progress so the user can resume the video @app.get('/get_progress/{media_type}/{media_id}') def get_progress(media_type: str, media_id: int, current_user: dict = Depends(get_current_user)): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(''' SELECT last_position FROM watch_progress WHERE username = ? AND media_type = ? AND media_id = ? ''', (current_user.get("sub"), media_type, media_id)) row = c.fetchone() conn.close() if row: return {"media_type": media_type, "media_id": media_id, "last_position": row[0]} return {"media_type": media_type, "media_id": media_id, "last_position": 0} @app.get('/in_progress') def list_in_progress(current_user: dict = Depends(get_current_user)): conn = sqlite3.connect(DB_PATH) c = conn.cursor() c.execute(''' SELECT media_type, media_id, last_position FROM watch_progress WHERE username = ? ''', (current_user.get("sub"),)) rows = c.fetchall() conn.close() movies = [] episodes = [] for media_type, media_id, last_position in rows: item = {"media_id": media_id, "last_position": last_position} if media_type.lower() == "movie": movies.append(item) elif media_type.lower() == "episode": episodes.append(item) print({"movies": movies, "episodes": episodes}) return {"movies": movies, "episodes": episodes} @app.get('/episodes/{episode_id}/show') def get_show_for_episode(episode_id: int, current_user: dict = Depends(get_current_user)): conn = sqlite3.connect(DB_PATH) c = conn.cursor() # Retrieve the tv_show_id for the given episode c.execute('SELECT tv_show_id FROM episodes WHERE id = ?', (episode_id,)) result = c.fetchone() if not result: conn.close() raise HTTPException(status_code=404, detail="Episode not found") tv_show_id = result[0] # Retrieve the TV show details using the tv_show_id c.execute('SELECT * FROM tv_shows WHERE id = ?', (tv_show_id,)) row = c.fetchone() if not row: conn.close() raise HTTPException(status_code=404, detail="TV show not found") cols = [desc[0] for desc in c.description] show = dict(zip(cols, row)) conn.close() return show if __name__ == '__main__': scan_and_populate() import uvicorn uvicorn.run(app, host='0.0.0.0', port=8000)