#!/usr/bin/env python3 import cgi import html.parser import http.cookiejar import json import os import re import shutil import subprocess import sys import time import urllib.error import urllib.parse import urllib.request MOZILLA_RELEASE_URL = "https://www.mozilla.org/en-US/firefox/releases/" USER_AGENT_TEMPLATE = "Mozilla/5.0 (X11; Linux x86_64; rv:83.0) Gecko/20100101 Firefox/%s" MIMETYPES = { "video/mp4": "mp4", "video/x-flv": "flv", "video/3gpp": "3gp", } QUALITIES = { "hd1080": 5, "hd720": 4, "large": 3, "medium": 2, "small": 1, } class VideoUnavailable(Exception): pass class NotYouTube(Exception): pass def print_form(url="", msg=""): script_url = "https://%s%s" % (os.environ["HTTP_HOST"], os.environ["REQUEST_URI"]) sys.stdout.write("Content-Type: text/html\r\n\r\n") sys.stdout.write(""" delx.net.au - YouTube Scraper

delx.net.au - YouTube Scraper

{0}

This page will let you easily download YouTube videos to watch offline. It will automatically grab the highest quality version.

Tip! Use this bookmarklet: YouTube Download to easily download videos. Right-click the link and add it to bookmarks, then when you're looking at a YouTube page select that bookmark from your browser's bookmarks menu to download the video straight away.

""".replace("{0}", msg).replace("{1}", url).replace("{2}", script_url)) cookiejar = http.cookiejar.CookieJar() urlopener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookiejar)) referrer = "" user_agent = None def urlopen(url, offset=None): global user_agent if not user_agent: page = MozillaReleasesPageParser() with urllib.request.urlopen(MOZILLA_RELEASE_URL) as f: page.feed(f.read().decode("utf-8")) page.close() user_agent = USER_AGENT_TEMPLATE % page.latest_release if url.startswith("//"): url = "https:" + url if not url.startswith("http://") and not url.startswith("https://"): url = "https://www.youtube.com" + url global referrer req = urllib.request.Request(url) if not referrer: referrer = url else: req.add_header("Referer", referrer) req.add_header("User-Agent", user_agent) if offset: req.add_header("Range", "bytes=%d-" % offset) res = urlopener.open(req) content_range = res.getheader("Content-Range") if content_range: tokens = content_range.split() assert tokens[0] == "bytes" start = int(tokens[1].split("-")[0]) assert start == offset return res def validate_url(url): parsed_url = urllib.parse.urlparse(url) scheme_ok = parsed_url.scheme == "https" host = parsed_url.netloc.lstrip("www.").lstrip("m.") host_ok = host in ["youtube.com", "youtu.be"] if scheme_ok and host_ok: return else: raise NotYouTube() def load_parse_url(url, parser): f = urlopen(url) parser.feed(f.read().decode("utf-8")) parser.close() f.close() def append_to_qs(url, params): r = list(urllib.parse.urlsplit(url)) qs = urllib.parse.parse_qs(r[3]) qs.update(params) r[3] = urllib.parse.urlencode(qs, True) url = urllib.parse.urlunsplit(r) return url def get_player_config(scripts): config_strings = [ ("ytcfg.set({\"", 2, "});", 1), ("ytInitialPlayerResponse = {\"", 2, "};", 1), ] player_config = {} for script in scripts: for line in script.split("\n"): for s1, off1, s2, off2 in config_strings: if s1 in line: p1 = line.find(s1) + len(s1) - off1 p2 = line.find(s2, p1) + off2 if p1 >= 0 and p2 > 0: player_config.update(json.loads(line[p1:p2])) return player_config def extract_js(script): PREFIX = "var _yt_player={};(function(g){var window=this;" SUFFIX = ";})(_yt_player);\n" assert script.startswith(PREFIX) assert script.endswith(SUFFIX) return script[len(PREFIX):-len(SUFFIX)] def find_cipher_func(script): FUNC_NAME = R"([a-zA-Z0-9$]+)" DECODE_URI_COMPONENT = R"(\(decodeURIComponent)?" FUNC_PARAMS = R"(\([a-zA-Z,\.]+\.s\))" TERMINATOR = R"[,;\)]" PATTERN = FUNC_NAME + DECODE_URI_COMPONENT + FUNC_PARAMS + TERMINATOR match = re.search(PATTERN, script) func_name = match.groups()[0] return func_name def construct_url_from_cipher_result(cipher_result): for k, v in cipher_result.items(): if isinstance(v, str) and v.startswith("https://"): temp_url = v break else: raise Exception("Could not find URL-like string in cipher result!") for k, v in cipher_result.items(): if isinstance(v, dict): params = {} for k2, v2 in v.items(): params[k2] = urllib.parse.unquote(v2) return append_to_qs(temp_url, params) else: raise Exception("Could not find params-like structure in cipher result!") def decode_cipher_url(js_url, cipher): cipher = urllib.parse.parse_qs(cipher) args = [ cipher["url"][0], cipher["sp"][0], cipher["s"][0], ] f = urlopen(js_url) script = f.read().decode("utf-8") f.close() cipher_func_name = find_cipher_func(script) params = { "cipher_func_name": cipher_func_name, "args": json.dumps(args), "code": json.dumps(extract_js(script)), } p = subprocess.Popen( "node", shell=True, close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE ) js_decode_script = (""" const vm = require('vm'); const fakeGlobal = {}; fakeGlobal.window = fakeGlobal; fakeGlobal.location = { hash: '', host: 'www.youtube.com', hostname: 'www.youtube.com', href: 'https://www.youtube.com', origin: 'https://www.youtube.com', pathname: '/', protocol: 'https:' }; fakeGlobal.history = { pushState: function(){} }; fakeGlobal.document = { location: fakeGlobal.location }; fakeGlobal.document = {}; fakeGlobal.navigator = { userAgent: '' }; fakeGlobal.XMLHttpRequest = class XMLHttpRequest {}; fakeGlobal.matchMedia = () => ({matches: () => {}, media: ''}); fakeGlobal.result = null; fakeGlobal.g = function(){}; // this is _yt_player fakeGlobal.TimeRanges = function(){}; const code_string = %(code)s + ';'; const exec_string = 'result = %(cipher_func_name)s(...%(args)s);'; vm.runInNewContext(code_string + exec_string, fakeGlobal); console.log(JSON.stringify(fakeGlobal.result)); """ % params) p.stdin.write(js_decode_script.encode("utf-8")) p.stdin.close() result = json.load(p.stdout) if p.wait() != 0: raise Exception("js failed to execute: %d" % p.returncode) result_url = construct_url_from_cipher_result(result) return result_url def get_best_video(player_config): formats = player_config["streamingData"]["formats"] best_url = None best_quality = None best_extension = None for format_data in formats: mimetype = format_data["mimeType"].split(";")[0] quality = format_data["quality"] if quality not in QUALITIES: continue if mimetype not in MIMETYPES: continue extension = MIMETYPES[mimetype] quality = QUALITIES.get(quality, -1) if best_quality is not None and quality < best_quality: continue if "signatureCipher" in format_data: js_url = player_config["PLAYER_JS_URL"] video_url = decode_cipher_url(js_url, format_data["signatureCipher"]) else: video_url = format_data["url"] best_url = video_url best_quality = quality best_extension = extension return best_url, best_extension def sanitize_filename(filename): return ( re.sub("\s+", " ", filename.strip()) .replace("\\", "-") .replace("/", "-") .replace("\0", " ") ) def get_video_url(page): player_config = get_player_config(page.scripts) if not player_config: raise VideoUnavailable(page.unavailable_message or "Could not find video URL") video_url, extension = get_best_video(player_config) if not video_url: return None, None title = player_config["videoDetails"].get("title", None) if not title: title = "Unknown title" filename = sanitize_filename(title) + "." + extension return video_url, filename class YouTubeVideoPageParser(html.parser.HTMLParser): def __init__(self): super().__init__() self.unavailable_message = None self.scripts = [] def handle_starttag(self, tag, attrs): attrs = dict(attrs) self._handle_unavailable_message(tag, attrs) self._handle_script(tag, attrs) def handle_endtag(self, tag): self.handle_data = self._ignore_data def _ignore_data(self, _): pass def _handle_unavailable_message(self, tag, attrs): if attrs.get("id", None) == "unavailable-message": self.handle_data = self._handle_unavailable_message_data def _handle_unavailable_message_data(self, data): self.unavailable_message = data.strip() def _handle_script(self, tag, attrs): if tag == "script": self.handle_data = self._handle_script_data def _handle_script_data(self, data): if data: self.scripts.append(data) class MozillaReleasesPageParser(html.parser.HTMLParser): def __init__(self): super().__init__() self.latest_release = "1.0" def handle_starttag(self, tag, attrs): attrs = dict(attrs) if attrs.get("data-latest-firefox", None): self.latest_release = attrs.get("data-latest-firefox", None) def write_video(filename, video_data): quoted_filename = urllib.parse.quote(filename.encode("utf-8")) sys.stdout.buffer.write( b"Content-Disposition: attachment; filename*=UTF-8''{0}\r\n" .replace(b"{0}", quoted_filename.encode("utf-8")) ) sys.stdout.buffer.write( b"Content-Length: {0}\r\n" .replace(b"{0}", video_data.getheader("Content-Length").encode("utf-8")) ) sys.stdout.buffer.write(b"\r\n") shutil.copyfileobj(video_data, sys.stdout.buffer) video_data.close() def cgimain(): args = cgi.parse() try: url = args["url"][0] except: print_form(url="https://www.youtube.com/watch?v=FOOBAR") return try: page = YouTubeVideoPageParser() validate_url(url) with urlopen(url) as f: page.feed(f.read().decode("utf-8")) page.close() video_url, filename = get_video_url(page) video_data = urlopen(video_url) except VideoUnavailable as e: print_form( url=url, msg="

Sorry, there was an error: %s

" % cgi.escape(e.args[0]) ) except NotYouTube: print_form( url=url, msg="

Sorry, that does not look like a YouTube page!

" ) except Exception as e: print_form( url=url, msg="

Sorry, there was an unknown error.

" ) return write_video(filename, video_data) def pp_size(size): suffixes = ["", "KiB", "MiB", "GiB"] for i, suffix in enumerate(suffixes): if size < 1024: break size /= 1024 return "%.2f %s" % (size, suffix) def copy_with_progress(content_length, infile, outfile): def print_status(): rate = 0 if now != last_ts: rate = last_bytes_read / (now - last_ts) sys.stdout.write("\33[2K\r") sys.stdout.write("%s / %s (%s/sec)" % ( pp_size(bytes_read), pp_size(content_length), pp_size(rate), )) sys.stdout.flush() last_ts = 0 last_bytes_read = 0 bytes_read = 0 while True: now = time.time() if now - last_ts > 0.5: print_status() last_ts = now last_bytes_read = 0 buf = infile.read(32768) if not buf: break outfile.write(buf) last_bytes_read += len(buf) bytes_read += len(buf) # Newline at the end print_status() print() def main(): try: url = sys.argv[1] except: print("Usage: %s https://youtube.com/watch?v=FOOBAR" % sys.argv[0], file=sys.stderr) sys.exit(1) page = YouTubeVideoPageParser() with urlopen(url) as f: page.feed(f.read().decode("utf-8")) page.close() video_url, filename = get_video_url(page) print("Downloading", filename) outfile = open(filename, "ab") offset = outfile.tell() if offset > 0: print("Resuming download from", pp_size(offset)) total_size = None while True: try: video_data = urlopen(video_url, offset) except urllib.error.HTTPError as e: if e.code == 416: print("File is complete!") break else: raise content_length = int(video_data.getheader("Content-Length")) if total_size is None: total_size = content_length try: copy_with_progress(content_length, video_data, outfile) except IOError as e: print() video_data.close() if outfile.tell() != total_size: old_offset = offset offset = outfile.tell() if old_offset == offset: time.sleep(1) print("Restarting download from", pp_size(offset)) else: break outfile.close() if __name__ == "__main__": if "SCRIPT_NAME" in os.environ: cgimain() else: try: main() except KeyboardInterrupt: print("\nExiting...") sys.exit(1)