import datetime import subprocess import json import sqlite3 import typesense from sentence_transformers import SentenceTransformer from dotenv import dotenv_values config = dotenv_values(".env") ts_client = typesense.Client({ 'api_key': config["TYPESENSE_API_KEY"], 'nodes': [{ 'host': 'typesense', 'port': '8108', 'protocol': 'http' }], 'connection_timeout_seconds': 2 }) embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L12-v2", cache_folder="/data") def fetch_or_create_search_key(): conn = sqlite3.connect("/data/database.sqlite3") cursor = conn.cursor() cursor.execute("SELECT key FROM typesense_keys LIMIT 1") rows = cursor.fetchall() if len(rows) == 0: key = ts_client.keys.create({"description": "Search-only key.", "actions": ["documents:search"], "collections": ["*"]}) key = key["value"] cursor.execute("INSERT INTO typesense_keys (key) VALUES (?)", (key,)) conn.commit() else: key = rows[0][0] conn.close() return key def get_video_json(video_id): conn = sqlite3.connect("/data/database.sqlite3") cursor = conn.cursor() cursor.execute("""SELECT info, status FROM discovered_videos WHERE id = ? LIMIT 1""", (video_id,)) result = cursor.fetchall()[0] data = json.loads(result[0]) data["status"] = result[1] conn.close() return data def hide_videos(video_ids): conn = sqlite3.connect("/data/database.sqlite3") cursor = conn.cursor() cursor.execute(f"""UPDATE OR IGNORE discovered_videos SET status = 'HIDDEN' WHERE status = 'DISCOVERED' AND id IN ({','.join(['?'] * len(video_ids))}) RETURNING status""", video_ids) new_status = cursor.fetchall()[0][0] conn.commit() conn.close() for video_id in video_ids: doc = { "id": video_id, "status": "HIDDEN" } ts_client.collections["discovered_videos"].documents[video_id].update(doc) return new_status def request_videos(video_ids): conn = sqlite3.connect("/data/database.sqlite3") cursor = conn.cursor() cursor.execute(f"""UPDATE OR ABORT discovered_videos SET status = 'REQUESTED' WHERE status = 'DISCOVERED' AND id IN ({','.join(['?'] * len(video_ids))}) RETURNING status""", video_ids) new_status = cursor.fetchall()[0][0] conn.commit() conn.close() for video_id in video_ids: doc = { "id": video_id, "status": "REQUESTED" } ts_client.collections["discovered_videos"].documents[video_id].update(doc) return new_status def download_video(video_id): conn = sqlite3.connect("/data/database.sqlite3") cursor = conn.cursor() cursor.execute("""UPDATE OR FAIL discovered_videos set status = 'DOWNLOADING' where status = 'REQUESTED' and id = ? returning json_extract(info, '$.original_url')""", (video_id,)) original_url = cursor.fetchall()[0][0] conn.commit() doc = { "id": video_id, "status": "DOWNLOADING" } ts_client.collections["discovered_videos"].documents[video_id].update(doc) command = [ "/app/bin/yt-dlp", "-S", "res:1080", "-P", "/data", "-o", "%(extractor)s-store/%(channel)s - %(channel_id)s/%(id)s/%(title)s.%(ext)s", "--write-subs", "--write-auto-subs", "--sub-langs", "\"en-US,en,en-us,en-gb,en-GB\"", "--write-thumbnail", "--continue", "--embed-chapters", "--embed-subs", original_url ] returncode = subprocess.run(command).returncode if returncode != 0: cursor.execute("""UPDATE OR FAIL discovered_videos SET status = 'REQUESTED' WHERE status = 'DOWNLOADING' AND id = ?""", (video_id,)) conn.commit() conn.close() doc = { "id": video_id, "status": "REQUESTED" } ts_client.collections["discovered_videos"].documents[video_id].update(doc) raise Exception(f"Download failed for URL: {original_url}") return cursor.execute("""UPDATE OR FAIL discovered_videos SET status = 'DOWNLOADED' WHERE status = 'DOWNLOADING' AND id = ?""", (video_id,)) conn.commit() conn.close() doc = { "id": video_id, "status": "DOWNLOADED" } ts_client.collections["discovered_videos"].documents[video_id].update(doc) def scrape_url(url): command = [ "/app/bin/yt-dlp", "--dump-json", "--write-subs", "--sponsorblock-mark", "all", "--sub-langs", "\"en-US,en,en-us,en-gb,en-GB\"", url ] output = subprocess.check_output(command).decode("utf-8") for line in output.splitlines(): video = json.loads(line) del video["formats"] del video["requested_formats"] del video["thumbnails"] del video["automatic_captions"] data = json.dumps(video) video_id = video["id"] conn = sqlite3.connect("/data/database.sqlite3") cursor = conn.cursor() cursor.execute("INSERT INTO discovered_videos (id, info, status) VALUES (?, ?, 'DISCOVERED') ON CONFLICT(id) DO UPDATE SET info = excluded.info", (video_id, data)) channel_id = video["channel_id"] playlist_id = video.get("playlist_id", None) if playlist_id and channel_id != playlist_id: cursor.execute("INSERT INTO videos_in_playlists (video_id, playlist_id, name) VALUES (?, ?, ?) ON CONFLICT(video_id, playlist_id) DO NOTHING", (video_id, playlist_id, video["playlist_name"])) conn.commit() cursor.execute("SELECT DISTINCT playlist_id FROM videos_in_playlists WHERE video_id = ?", (video_id,)) rows = cursor.fetchall() playlist_ids = [row[0] for row in rows] conn.close() embeddings = embedding_model.encode(video["fulltitle"]) document = { "id": video["id"], "fulltitle": video["fulltitle"], "title_vec": embeddings.tolist(), "description": video["description"], "channel": video["channel"], "channel_follower_count": video["channel_follower_count"], "channel_id": video["channel_id"], "duration": video["duration"], "view_count": video["view_count"], "upload_date": int(video["upload_date"]), "filesize_approx": video["filesize_approx"], "extractor": video["extractor"], "thumbnail": video["thumbnail"], "status": "DISCOVERED", "requested_by": "", "playlist_ids": playlist_ids, } ts_client.collections["discovered_videos"].documents.upsert(document)