- import datetime
- import subprocess
- import json
- import sqlite3
- import typesense
- from sentence_transformers import SentenceTransformer
- from dotenv import dotenv_values
- config = dotenv_values(".env")
- ts_client = typesense.Client({
- 'api_key': config["TYPESENSE_API_KEY"],
- 'nodes': [{
- 'host': 'typesense',
- 'port': '8108',
- 'protocol': 'http'
- }],
- 'connection_timeout_seconds': 2
- })
- embedding_model = SentenceTransformer("sentence-transformers/all-MiniLM-L12-v2", cache_folder="/data")
- def fetch_or_create_search_key():
- conn = sqlite3.connect("/data/database.sqlite3")
- cursor = conn.cursor()
- cursor.execute("SELECT key FROM typesense_keys LIMIT 1")
- rows = cursor.fetchall()
- if len(rows) == 0:
- key = ts_client.keys.create({"description": "Search-only key.", "actions": ["documents:search"], "collections": ["*"]})
- key = key["value"]
- cursor.execute("INSERT INTO typesense_keys (key) VALUES (?)", (key,))
- conn.commit()
- else:
- key = rows[0][0]
- conn.close()
- return key
- def get_video_json(video_id):
- conn = sqlite3.connect("/data/database.sqlite3")
- cursor = conn.cursor()
- cursor.execute("""SELECT info, status FROM discovered_videos
- WHERE id = ? LIMIT 1""", (video_id,))
- result = cursor.fetchall()[0]
- data = json.loads(result[0])
- data["status"] = result[1]
- conn.close()
- return data
- def hide_videos(video_ids):
- conn = sqlite3.connect("/data/database.sqlite3")
- cursor = conn.cursor()
- cursor.execute(f"""UPDATE OR IGNORE discovered_videos
- SET status = 'HIDDEN'
- AND id IN ({','.join(['?'] * len(video_ids))})
- RETURNING status""", video_ids)
- new_status = cursor.fetchall()[0][0]
- conn.commit()
- conn.close()
- for video_id in video_ids:
- doc = {
- "id": video_id,
- "status": "HIDDEN"
- }
- ts_client.collections["discovered_videos"].documents[video_id].update(doc)
- return new_status
- def request_videos(video_ids):
- conn = sqlite3.connect("/data/database.sqlite3")
- cursor = conn.cursor()
- cursor.execute(f"""UPDATE OR ABORT discovered_videos
- SET status = 'REQUESTED'
- AND id IN ({','.join(['?'] * len(video_ids))})
- RETURNING status""", video_ids)
- new_status = cursor.fetchall()[0][0]
- conn.commit()
- conn.close()
- for video_id in video_ids:
- doc = {
- "id": video_id,
- "status": "REQUESTED"
- }
- ts_client.collections["discovered_videos"].documents[video_id].update(doc)
- return new_status
- def download_video(video_id):
- conn = sqlite3.connect("/data/database.sqlite3")
- cursor = conn.cursor()
- cursor.execute("""UPDATE OR FAIL discovered_videos
- set status = 'DOWNLOADING'
- where status = 'REQUESTED'
- and id = ?
- returning json_extract(info, '$.original_url')""", (video_id,))
- original_url = cursor.fetchall()[0][0]
- conn.commit()
- doc = {
- "id": video_id,
- "status": "DOWNLOADING"
- }
- ts_client.collections["discovered_videos"].documents[video_id].update(doc)
- command = [
- "/app/bin/yt-dlp",
- "-S",
- "res:1080",
- "-P",
- "/data",
- "-o",
- "%(extractor)s-store/%(channel)s - %(channel_id)s/%(id)s/%(title)s.%(ext)s",
- "--write-subs",
- "--write-auto-subs",
- "--sub-langs",
- "\"en-US,en,en-us,en-gb,en-GB\"",
- "--write-thumbnail",
- "--continue",
- "--embed-chapters",
- "--embed-subs",
- original_url
- ]
- returncode = subprocess.run(command).returncode
- if returncode != 0:
- cursor.execute("""UPDATE OR FAIL discovered_videos
- SET status = 'REQUESTED'
- AND id = ?""", (video_id,))
- conn.commit()
- conn.close()
- doc = {
- "id": video_id,
- "status": "REQUESTED"
- }
- ts_client.collections["discovered_videos"].documents[video_id].update(doc)
- raise Exception(f"Download failed for URL: {original_url}")
- return
- cursor.execute("""UPDATE OR FAIL discovered_videos
- SET status = 'DOWNLOADED'
- AND id = ?""", (video_id,))
- conn.commit()
- conn.close()
- doc = {
- "id": video_id,
- "status": "DOWNLOADED"
- }
- ts_client.collections["discovered_videos"].documents[video_id].update(doc)
- def scrape_url(url):
- command = [
- "/app/bin/yt-dlp",
- "--dump-json",
- "--write-subs",
- "--sponsorblock-mark",
- "all",
- "--sub-langs",
- "\"en-US,en,en-us,en-gb,en-GB\"",
- url
- ]
- output = subprocess.check_output(command).decode("utf-8")
- for line in output.splitlines():
- video = json.loads(line)
- del video["formats"]
- del video["requested_formats"]
- del video["thumbnails"]
- del video["automatic_captions"]
- data = json.dumps(video)
- video_id = video["id"]
- conn = sqlite3.connect("/data/database.sqlite3")
- cursor = conn.cursor()
- cursor.execute("INSERT INTO discovered_videos (id, info, status) VALUES (?, ?, 'DISCOVERED') ON CONFLICT(id) DO UPDATE SET info = excluded.info", (video_id, data))
- channel_id = video["channel_id"]
- playlist_id = video.get("playlist_id", None)
- if playlist_id and channel_id != playlist_id:
- cursor.execute("INSERT INTO videos_in_playlists (video_id, playlist_id, name) VALUES (?, ?, ?) ON CONFLICT(video_id, playlist_id) DO NOTHING", (video_id, playlist_id, video["playlist_name"]))
- conn.commit()
- cursor.execute("SELECT DISTINCT playlist_id FROM videos_in_playlists WHERE video_id = ?", (video_id,))
- rows = cursor.fetchall()
- playlist_ids = [row[0] for row in rows]
- conn.close()
- embeddings = embedding_model.encode(video["fulltitle"])
- document = {
- "id": video["id"],
- "fulltitle": video["fulltitle"],
- "title_vec": embeddings.tolist(),
- "description": video["description"],
- "channel": video["channel"],
- "channel_follower_count": video["channel_follower_count"],
- "channel_id": video["channel_id"],
- "duration": video["duration"],
- "view_count": video["view_count"],
- "upload_date": int(video["upload_date"]),
- "filesize_approx": video["filesize_approx"],
- "extractor": video["extractor"],
- "thumbnail": video["thumbnail"],
- "status": "DISCOVERED",
- "requested_by": "",
- "playlist_ids": playlist_ids,
- }
- ts_client.collections["discovered_videos"].documents.upsert(document)