scrapetube.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327
  1. import json
  2. import time
  3. from typing import Generator
  4. import requests
  5. from typing_extensions import Literal
  6. type_property_map = {
  7. "videos": "videoRenderer",
  8. "streams": "videoRenderer",
  9. "shorts": "reelWatchEndpoint"
  10. }
  11. def get_channel(
  12. channel_id: str = None,
  13. channel_url: str = None,
  14. channel_username: str = None,
  15. limit: int = None,
  16. sleep: float = 1,
  17. proxies: dict = None,
  18. sort_by: Literal["newest", "oldest", "popular"] = "newest",
  19. content_type: Literal["videos", "shorts", "streams"] = "videos",
  20. ) -> Generator[dict, None, None]:
  21. """Get videos for a channel.
  22. Parameters:
  23. channel_id (``str``, *optional*):
  24. The channel id from the channel you want to get the videos for.
  25. If you prefer to use the channel url instead, see ``channel_url`` below.
  26. channel_url (``str``, *optional*):
  27. The url to the channel you want to get the videos for.
  28. Since there is a few type's of channel url's, you can use the one you want
  29. by passing it here instead of using ``channel_id``.
  30. channel_username (``str``, *optional*):
  31. The username from the channel you want to get the videos for.
  32. Ex. ``LinusTechTips`` (without the @).
  33. If you prefer to use the channel url instead, see ``channel_url`` above.
  34. limit (``int``, *optional*):
  35. Limit the number of videos you want to get.
  36. sleep (``int``, *optional*):
  37. Seconds to sleep between API calls to youtube, in order to prevent getting blocked.
  38. Defaults to 1.
  39. proxies (``dict``, *optional*):
  40. A dictionary with the proxies you want to use. Ex:
  41. ``{'https': 'http://username:password@101.102.103.104:3128'}``
  42. sort_by (``str``, *optional*):
  43. In what order to retrieve to videos. Pass one of the following values.
  44. ``"newest"``: Get the new videos first.
  45. ``"oldest"``: Get the old videos first.
  46. ``"popular"``: Get the popular videos first. Defaults to "newest".
  47. content_type (``str``, *optional*):
  48. In order to get content type. Pass one of the following values.
  49. ``"videos"``: Videos
  50. ``"shorts"``: Shorts
  51. ``"streams"``: Streams
  52. """
  53. base_url = ""
  54. if channel_url:
  55. base_url = channel_url
  56. elif channel_id:
  57. base_url = f"https://www.youtube.com/channel/{channel_id}"
  58. elif channel_username:
  59. base_url = f"https://www.youtube.com/@{channel_username}"
  60. url = "{base_url}/{content_type}?view=0&flow=grid".format(
  61. base_url=base_url,
  62. content_type=content_type,
  63. )
  64. api_endpoint = "https://www.youtube.com/youtubei/v1/browse"
  65. videos = get_videos(url, api_endpoint, "contents", type_property_map[content_type], limit, sleep, proxies, sort_by)
  66. for video in videos:
  67. yield video
  68. def get_playlist(
  69. playlist_id: str, limit: int = None, sleep: int = 1, proxies: dict = None
  70. ) -> Generator[dict, None, None]:
  71. """Get videos for a playlist.
  72. Parameters:
  73. playlist_id (``str``):
  74. The playlist id from the playlist you want to get the videos for.
  75. limit (``int``, *optional*):
  76. Limit the number of videos you want to get.
  77. sleep (``int``, *optional*):
  78. Seconds to sleep between API calls to youtube, in order to prevent getting blocked.
  79. Defaults to 1.
  80. proxies (``dict``, *optional*):
  81. A dictionary with the proxies you want to use. Ex:
  82. ``{'https': 'http://username:password@101.102.103.104:3128'}``
  83. """
  84. url = f"https://www.youtube.com/playlist?list={playlist_id}"
  85. api_endpoint = "https://www.youtube.com/youtubei/v1/browse"
  86. videos = get_videos(url, api_endpoint, "playlistVideoListRenderer", "playlistVideoRenderer", limit, sleep, proxies)
  87. for video in videos:
  88. yield video
  89. def get_search(
  90. query: str,
  91. limit: int = None,
  92. sleep: int = 1,
  93. sort_by: Literal["relevance", "upload_date", "view_count", "rating"] = "relevance",
  94. results_type: Literal["video", "channel", "playlist", "movie"] = "video",
  95. proxies: dict = None,
  96. ) -> Generator[dict, None, None]:
  97. """Search youtube and get videos.
  98. Parameters:
  99. query (``str``):
  100. The term you want to search for.
  101. limit (``int``, *optional*):
  102. Limit the number of videos you want to get.
  103. sleep (``int``, *optional*):
  104. Seconds to sleep between API calls to youtube, in order to prevent getting blocked.
  105. Defaults to 1.
  106. sort_by (``str``, *optional*):
  107. In what order to retrieve to videos. Pass one of the following values.
  108. ``"relevance"``: Get the new videos in order of relevance.
  109. ``"upload_date"``: Get the new videos first.
  110. ``"view_count"``: Get the popular videos first.
  111. ``"rating"``: Get videos with more likes first.
  112. Defaults to "relevance".
  113. results_type (``str``, *optional*):
  114. What type you want to search for. Pass one of the following values:
  115. ``"video"|"channel"|"playlist"|"movie"``. Defaults to "video".
  116. proxies (``dict``, *optional*):
  117. A dictionary with the proxies you want to use. Ex:
  118. ``{'https': 'http://username:password@101.102.103.104:3128'}``
  119. """
  120. sort_by_map = {
  121. "relevance": "A",
  122. "upload_date": "I",
  123. "view_count": "M",
  124. "rating": "E",
  125. }
  126. results_type_map = {
  127. "video": ["B", "videoRenderer"],
  128. "channel": ["C", "channelRenderer"],
  129. "playlist": ["D", "playlistRenderer"],
  130. "movie": ["E", "videoRenderer"],
  131. }
  132. param_string = f"CA{sort_by_map[sort_by]}SAhA{results_type_map[results_type][0]}"
  133. url = f"https://www.youtube.com/results?search_query={query}&sp={param_string}"
  134. api_endpoint = "https://www.youtube.com/youtubei/v1/search"
  135. videos = get_videos(
  136. url, api_endpoint, "contents", results_type_map[results_type][1], limit, sleep, proxies
  137. )
  138. for video in videos:
  139. yield video
  140. def get_video(
  141. id: str,
  142. ) -> dict:
  143. """Get a single video.
  144. Parameters:
  145. id (``str``):
  146. The video id from the video you want to get.
  147. """
  148. session = get_session()
  149. url = f"https://www.youtube.com/watch?v={id}"
  150. html = get_initial_data(session, url)
  151. client = json.loads(
  152. get_json_from_html(html, "INNERTUBE_CONTEXT", 2, '"}},') + '"}}'
  153. )["client"]
  154. session.headers["X-YouTube-Client-Name"] = "1"
  155. session.headers["X-YouTube-Client-Version"] = client["clientVersion"]
  156. data = json.loads(
  157. get_json_from_html(html, "var ytInitialData = ", 0, "};") + "}"
  158. )
  159. return next(search_dict(data, "videoPrimaryInfoRenderer"))
  160. def get_videos(
  161. url: str, api_endpoint: str, selector_list: str, selector_item: str, limit: int, sleep: float, proxies: dict = None, sort_by: str = None
  162. ) -> Generator[dict, None, None]:
  163. session = get_session(proxies)
  164. is_first = True
  165. quit_it = False
  166. count = 0
  167. while True:
  168. if is_first:
  169. html = get_initial_data(session, url)
  170. client = json.loads(
  171. get_json_from_html(html, "INNERTUBE_CONTEXT", 2, '"}},') + '"}}'
  172. )["client"]
  173. api_key = get_json_from_html(html, "innertubeApiKey", 3)
  174. session.headers["X-YouTube-Client-Name"] = "1"
  175. session.headers["X-YouTube-Client-Version"] = client["clientVersion"]
  176. data = json.loads(
  177. get_json_from_html(html, "var ytInitialData = ", 0, "};") + "}"
  178. )
  179. data = next(search_dict(data, selector_list), None)
  180. next_data = get_next_data(data, sort_by)
  181. is_first = False
  182. if sort_by and sort_by != "newest":
  183. continue
  184. else:
  185. data = get_ajax_data(session, api_endpoint, api_key, next_data, client)
  186. next_data = get_next_data(data)
  187. for result in get_videos_items(data, selector_item):
  188. try:
  189. count += 1
  190. yield result
  191. if count == limit:
  192. quit_it = True
  193. break
  194. except GeneratorExit:
  195. quit_it = True
  196. break
  197. if not next_data or quit_it:
  198. break
  199. time.sleep(sleep)
  200. session.close()
  201. def get_session(proxies: dict = None) -> requests.Session:
  202. session = requests.Session()
  203. if proxies:
  204. session.proxies.update(proxies)
  205. session.headers[
  206. "User-Agent"
  207. ] = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
  208. session.headers["Accept-Language"] = "en"
  209. return session
  210. def get_initial_data(session: requests.Session, url: str) -> str:
  211. session.cookies.set("CONSENT", "YES+cb", domain=".youtube.com")
  212. response = session.get(url, params={"ucbcb":1})
  213. html = response.text
  214. return html
  215. def get_ajax_data(
  216. session: requests.Session,
  217. api_endpoint: str,
  218. api_key: str,
  219. next_data: dict,
  220. client: dict,
  221. ) -> dict:
  222. data = {
  223. "context": {"clickTracking": next_data["click_params"], "client": client},
  224. "continuation": next_data["token"],
  225. }
  226. response = session.post(api_endpoint, params={"key": api_key}, json=data)
  227. return response.json()
  228. def get_json_from_html(html: str, key: str, num_chars: int = 2, stop: str = '"') -> str:
  229. pos_begin = html.find(key) + len(key) + num_chars
  230. pos_end = html.find(stop, pos_begin)
  231. return html[pos_begin:pos_end]
  232. def get_next_data(data: dict, sort_by: str = None) -> dict:
  233. # Youtube, please don't change the order of these
  234. sort_by_map = {
  235. "newest": 0,
  236. "popular": 1,
  237. "oldest": 2,
  238. }
  239. if sort_by and sort_by != "newest":
  240. endpoint = next(
  241. search_dict(data, "feedFilterChipBarRenderer"), None)["contents"][sort_by_map[sort_by]]["chipCloudChipRenderer"]["navigationEndpoint"]
  242. else:
  243. endpoint = next(search_dict(data, "continuationEndpoint"), None)
  244. if not endpoint:
  245. return None
  246. next_data = {
  247. "token": endpoint["continuationCommand"]["token"],
  248. "click_params": {"clickTrackingParams": endpoint["clickTrackingParams"]},
  249. }
  250. return next_data
  251. def search_dict(partial: dict, search_key: str) -> Generator[dict, None, None]:
  252. stack = [partial]
  253. while stack:
  254. current_item = stack.pop(0)
  255. if isinstance(current_item, dict):
  256. for key, value in current_item.items():
  257. if key == search_key:
  258. yield value
  259. else:
  260. stack.append(value)
  261. elif isinstance(current_item, list):
  262. for value in current_item:
  263. stack.append(value)
  264. def get_videos_items(data: dict, selector: str) -> Generator[dict, None, None]:
  265. return search_dict(data, selector)