Cheskel Twersky 3 years ago
parent
commit
4fd850a7f7
4 changed files with 103 additions and 92 deletions
  1. 9 11
      docs/conf.py
  2. 66 54
      scrapetube/scrapetube.py
  3. 20 21
      setup.py
  4. 8 6
      tests/test.py

+ 9 - 11
docs/conf.py

@@ -1,23 +1,21 @@
-
 import os
 import sys
-sys.path.insert(0, os.path.abspath('../'))
-from scrapetube import __version__ as release
 
-
-project = 'Scrapetube'
-copyright = '2021, Cheskel Twersky'
-author = 'Cheskel Twersky'
+sys.path.insert(0, os.path.abspath("../"))
+from scrapetube import __version__ as release
 
 
+project = "Scrapetube"
+copyright = "2021, Cheskel Twersky"
+author = "Cheskel Twersky"
 
-extensions = ['sphinx.ext.autodoc', 'sphinx.ext.coverage', 'sphinx.ext.napoleon']
 
-templates_path = ['_templates']
+extensions = ["sphinx.ext.autodoc", "sphinx.ext.coverage", "sphinx.ext.napoleon"]
 
+templates_path = ["_templates"]
 
-exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
 
+exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
 
 
-html_theme = 'sphinx_rtd_theme'
+html_theme = "sphinx_rtd_theme"

+ 66 - 54
scrapetube/scrapetube.py

@@ -11,8 +11,8 @@ def get_channel(
     channel_url: str = None,
     limit: int = None,
     sleep: int = 1,
-    sort_by: Literal['newest', 'oldest', 'popular'] = 'newest'
-    ) -> Generator[dict, None, None]:
+    sort_by: Literal["newest", "oldest", "popular"] = "newest",
+) -> Generator[dict, None, None]:
 
     """Get videos for a channel.
 
@@ -36,22 +36,20 @@ def get_channel(
             Defaults to ``"newest"``.
     """
 
-    sort_by_map = {
-        'newest': 'dd',
-        'oldest': 'da',
-        'popular': 'p'
-    }
-    url = '{url}/videos?view=0&sort={sort_by}&flow=grid'.format(
-        url= channel_url or f'https://www.youtube.com/channel/{channel_id}',
-        sort_by= sort_by_map[sort_by]
-        )
-    api_endpoint = 'https://www.youtube.com/youtubei/v1/browse'
-    videos = get_videos(url, api_endpoint, 'gridVideoRenderer', limit, sleep)
+    sort_by_map = {"newest": "dd", "oldest": "da", "popular": "p"}
+    url = "{url}/videos?view=0&sort={sort_by}&flow=grid".format(
+        url=channel_url or f"https://www.youtube.com/channel/{channel_id}",
+        sort_by=sort_by_map[sort_by],
+    )
+    api_endpoint = "https://www.youtube.com/youtubei/v1/browse"
+    videos = get_videos(url, api_endpoint, "gridVideoRenderer", limit, sleep)
     for video in videos:
         yield video
 
 
-def get_playlist(playlist_id: str, limit: int = None, sleep: int = 1) -> Generator[dict, None, None]:
+def get_playlist(
+    playlist_id: str, limit: int = None, sleep: int = 1
+) -> Generator[dict, None, None]:
     """Get videos for a playlist.
 
     Parameters:
@@ -63,9 +61,9 @@ def get_playlist(playlist_id: str, limit: int = None, sleep: int = 1) -> Generat
             Seconds to sleep between API calls to youtube, in order to prevent getting blocked. Defaults to ``1``.
     """
 
-    url = f'https://www.youtube.com/playlist?list={playlist_id}'
-    api_endpoint = 'https://www.youtube.com/youtubei/v1/browse'
-    videos = get_videos(url, api_endpoint, 'playlistVideoRenderer', limit, sleep)
+    url = f"https://www.youtube.com/playlist?list={playlist_id}"
+    api_endpoint = "https://www.youtube.com/youtubei/v1/browse"
+    videos = get_videos(url, api_endpoint, "playlistVideoRenderer", limit, sleep)
     for video in videos:
         yield video
 
@@ -74,9 +72,9 @@ def get_search(
     query: str,
     limit: int = None,
     sleep: int = 1,
-    sort_by: Literal['relevance', 'upload_date', 'view_count', 'rating'] = 'relevance',
-    results_type: Literal['video', 'channel', 'playlist', 'movie'] = 'video'
-    ) -> Generator[dict, None, None]:
+    sort_by: Literal["relevance", "upload_date", "view_count", "rating"] = "relevance",
+    results_type: Literal["video", "channel", "playlist", "movie"] = "video",
+) -> Generator[dict, None, None]:
 
     """Search youtube and get videos.
 
@@ -101,43 +99,51 @@ def get_search(
     """
 
     sort_by_map = {
-        'relevance': 'A',
-        'upload_date': 'I',
-        'view_count': 'M',
-        'rating': 'E'
+        "relevance": "A",
+        "upload_date": "I",
+        "view_count": "M",
+        "rating": "E",
     }
 
     results_type_map = {
-        'video': ['B', 'videoRenderer'],
-        'channel': ['C', 'channelRenderer'],
-        'playlist': ['D', 'playlistRenderer'],
-        'movie': ['E', 'videoRenderer']
+        "video": ["B", "videoRenderer"],
+        "channel": ["C", "channelRenderer"],
+        "playlist": ["D", "playlistRenderer"],
+        "movie": ["E", "videoRenderer"],
     }
 
-    param_string = f'CA{sort_by_map[sort_by]}SAhA{results_type_map[results_type][0]}'
-    url = f'https://www.youtube.com/results?search_query={query}&sp={param_string}'
-    api_endpoint = 'https://www.youtube.com/youtubei/v1/search'
-    videos = get_videos(url, api_endpoint, results_type_map[results_type][1], limit, sleep)
+    param_string = f"CA{sort_by_map[sort_by]}SAhA{results_type_map[results_type][0]}"
+    url = f"https://www.youtube.com/results?search_query={query}&sp={param_string}"
+    api_endpoint = "https://www.youtube.com/youtubei/v1/search"
+    videos = get_videos(
+        url, api_endpoint, results_type_map[results_type][1], limit, sleep
+    )
     for video in videos:
         yield video
 
 
-def get_videos(url: str, api_endpoint: str, selector: str, limit: int, sleep: int) -> Generator[dict, None, None]:
+def get_videos(
+    url: str, api_endpoint: str, selector: str, limit: int, sleep: int
+) -> Generator[dict, None, None]:
     session = requests.Session()
-    session.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Safari/537.36'
+    session.headers[
+        "User-Agent"
+    ] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Safari/537.36"
     is_first = True
     quit = False
     count = 0
     while True:
         if is_first:
             html = get_initial_data(session, url)
-            client = json.loads(get_json_from_html(
-                html, 'INNERTUBE_CONTEXT', 2, '"}},') + '"}}')['client']
-            api_key = get_json_from_html(html, 'innertubeApiKey', 3)
-            session.headers['X-YouTube-Client-Name'] = '1'
-            session.headers['X-YouTube-Client-Version'] = client['clientVersion']
-            data = json.loads(get_json_from_html(
-                html, 'var ytInitialData = ', 0, '};') + '}')
+            client = json.loads(
+                get_json_from_html(html, "INNERTUBE_CONTEXT", 2, '"}},') + '"}}'
+            )["client"]
+            api_key = get_json_from_html(html, "innertubeApiKey", 3)
+            session.headers["X-YouTube-Client-Name"] = "1"
+            session.headers["X-YouTube-Client-Version"] = client["clientVersion"]
+            data = json.loads(
+                get_json_from_html(html, "var ytInitialData = ", 0, "};") + "}"
+            )
             next_data = get_next_data(data)
             is_first = False
         else:
@@ -161,40 +167,46 @@ def get_videos(url: str, api_endpoint: str, selector: str, limit: int, sleep: in
 
     session.close()
 
+
 def get_initial_data(session: requests.Session, url: str) -> str:
     response = session.get(url)
-    if 'uxe=' in response.request.url:
-        session.cookies.set('CONSENT', 'YES+cb', domain='.youtube.com')
+    if "uxe=" in response.request.url:
+        session.cookies.set("CONSENT", "YES+cb", domain=".youtube.com")
         response = session.get(url)
 
     html = response.text
     return html
 
 
-def get_ajax_data(session: requests.Session, api_endpoint: str, api_key: str, next_data: dict, client: dict) -> dict:
+def get_ajax_data(
+    session: requests.Session,
+    api_endpoint: str,
+    api_key: str,
+    next_data: dict,
+    client: dict,
+) -> dict:
     data = {
-        "context": {
-            'clickTracking': next_data['click_params'],
-            'client': client
-        },
-        'continuation': next_data['token']
+        "context": {"clickTracking": next_data["click_params"], "client": client},
+        "continuation": next_data["token"],
     }
-    response = session.post(api_endpoint, params={'key': api_key}, json=data)
+    response = session.post(api_endpoint, params={"key": api_key}, json=data)
     return response.json()
 
 
 def get_json_from_html(html: str, key: str, num_chars: int = 2, stop: str = '"') -> str:
     pos_begin = html.find(key) + len(key) + num_chars
     pos_end = html.find(stop, pos_begin)
-    return html[pos_begin: pos_end]
+    return html[pos_begin:pos_end]
 
 
 def get_next_data(data: dict) -> dict:
-    raw_next_data = next(search_dict(data, 'continuationEndpoint'), None)
+    raw_next_data = next(search_dict(data, "continuationEndpoint"), None)
     if not raw_next_data:
         return None
-    next_data = {'token': raw_next_data['continuationCommand']['token'], 'click_params': {
-        "clickTrackingParams": raw_next_data['clickTrackingParams']}}
+    next_data = {
+        "token": raw_next_data["continuationCommand"]["token"],
+        "click_params": {"clickTrackingParams": raw_next_data["clickTrackingParams"]},
+    }
 
     return next_data
 

+ 20 - 21
setup.py

@@ -1,36 +1,35 @@
-from setuptools import setup
 import re
 
+from setuptools import setup
+
 with open("scrapetube/__init__.py", encoding="utf-8") as f:
     version = re.findall(r"__version__ = \"(.+)\"", f.read())[0]
 
-with open('README.md', encoding='utf-8') as f:
+with open("README.md", encoding="utf-8") as f:
     readme = f.read()
 
-with open('requirements.txt', encoding='utf-8') as f:
+with open("requirements.txt", encoding="utf-8") as f:
     requirements = [r.strip() for r in f]
 
 setup(
-    name = 'scrapetube',
-    version = version,
-    packages = ['scrapetube'],
-    include_package_data = True,
-    url = 'https://github.com/dermasmid/scrapetube',
-    license = 'MIT',
-    long_description = readme,
-    long_description_content_type = 'text/markdown',
-    author = 'Cheskel Twersky',
-    author_email = 'twerskycheskel@gmail.com',
-    description = 'Scrape youtube without the official youtube api and without selenium.',
-    keywords = 'youtube python channel videos search playlist list get',
-    classifiers = [
+    name="scrapetube",
+    version=version,
+    packages=["scrapetube"],
+    include_package_data=True,
+    url="https://github.com/dermasmid/scrapetube",
+    license="MIT",
+    long_description=readme,
+    long_description_content_type="text/markdown",
+    author="Cheskel Twersky",
+    author_email="twerskycheskel@gmail.com",
+    description="Scrape youtube without the official youtube api and without selenium.",
+    keywords="youtube python channel videos search playlist list get",
+    classifiers=[
         "Programming Language :: Python :: 3",
         "License :: OSI Approved :: MIT License",
         "Operating System :: OS Independent",
     ],
-    project_urls={
-        'Documentation': 'https://scrapetube.readthedocs.io/en/latest/'
-    },
-    install_requires = requirements,
-    python_requires = '>=3.6',
+    project_urls={"Documentation": "https://scrapetube.readthedocs.io/en/latest/"},
+    install_requires=requirements,
+    python_requires=">=3.6",
 )

+ 8 - 6
tests/test.py

@@ -2,17 +2,19 @@ import sys
 import os
 import platform
 
-if platform.system() == 'Windows':
-    separator = '\\'
+if platform.system() == "Windows":
+    separator = "\\"
 else:
-    separator = '/'
+    separator = "/"
 
-sys.path.insert(0, '/'.join(os.path.dirname(os.path.realpath(__file__)).split(separator)[:-1]))
+sys.path.insert(
+    0, "/".join(os.path.dirname(os.path.realpath(__file__)).split(separator)[:-1])
+)
 
 import scrapetube
 
 
-videos = scrapetube.get_channel("UC9-y-6csu5WGm29I7JiwpnA", sort_by='popular')
+videos = scrapetube.get_channel("UC9-y-6csu5WGm29I7JiwpnA", sort_by="popular")
 
 for video in videos:
-    print(video['videoId'])
+    print(video["videoId"])