feat: Introduced YouTube bundle with essential components (#5415)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: cristhianzl <cristhian.lousa@gmail.com>
Co-authored-by: anovazzi1 <otavio2204@gmail.com>
This commit is contained in:
Raphael Valdetaro 2025-01-20 15:49:54 -03:00 committed by GitHub
commit 94d192ff5d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 1272 additions and 259 deletions

View file

@ -28,7 +28,6 @@ from .wikipedia_api import WikipediaAPIComponent
from .wolfram_alpha_api import WolframAlphaAPIComponent
from .yahoo import YfinanceComponent
from .yahoo_finance import YfinanceToolComponent
from .youtube_transcripts import YouTubeTranscriptsComponent
with warnings.catch_warnings():
warnings.simplefilter("ignore", LangChainDeprecationWarning)
@ -64,5 +63,4 @@ __all__ = [
"WolframAlphaAPIComponent",
"YfinanceComponent",
"YfinanceToolComponent",
"YouTubeTranscriptsComponent",
]

View file

@ -1,244 +0,0 @@
from langchain_community.document_loaders import YoutubeLoader
from langchain_community.document_loaders.youtube import TranscriptFormat
from langflow.custom import Component
from langflow.inputs import DropdownInput, IntInput, MultilineInput
from langflow.schema import Message
from langflow.template import Output
class YouTubeTranscriptsComponent(Component):
"""A component that extracts spoken content from YouTube videos as transcripts."""
display_name: str = "YouTube Transcripts"
description: str = "Extracts spoken content from YouTube videos as transcripts."
icon: str = "YouTube"
name = "YouTubeTranscripts"
inputs = [
MultilineInput(
name="url",
display_name="Video URL",
info="Enter the YouTube video URL to get transcripts from.",
tool_mode=True,
required=True,
),
DropdownInput(
name="transcript_format",
display_name="Transcript Format",
options=["text", "chunks"],
value="text",
info="The format of the transcripts. Either 'text' for a single output or 'chunks' for timestamped chunks.",
advanced=True,
),
IntInput(
name="chunk_size_seconds",
display_name="Chunk Size (seconds)",
value=60,
advanced=True,
info="The size of each transcript chunk in seconds. Only applicable when "
"'Transcript Format' is set to 'chunks'.",
),
DropdownInput(
name="language",
display_name="Language",
options=[
"af",
"ak",
"sq",
"am",
"ar",
"hy",
"as",
"ay",
"az",
"bn",
"eu",
"be",
"bho",
"bs",
"bg",
"my",
"ca",
"ceb",
"zh",
"zh-HK",
"zh-CN",
"zh-SG",
"zh-TW",
"zh-Hans",
"zh-Hant",
"hak-TW",
"nan-TW",
"co",
"hr",
"cs",
"da",
"dv",
"nl",
"en",
"en-US",
"eo",
"et",
"ee",
"fil",
"fi",
"fr",
"gl",
"lg",
"ka",
"de",
"el",
"gn",
"gu",
"ht",
"ha",
"haw",
"iw",
"hi",
"hmn",
"hu",
"is",
"ig",
"id",
"ga",
"it",
"ja",
"jv",
"kn",
"kk",
"km",
"rw",
"ko",
"kri",
"ku",
"ky",
"lo",
"la",
"lv",
"ln",
"lt",
"lb",
"mk",
"mg",
"ms",
"ml",
"mt",
"mi",
"mr",
"mn",
"ne",
"nso",
"no",
"ny",
"or",
"om",
"ps",
"fa",
"pl",
"pt",
"pa",
"qu",
"ro",
"ru",
"sm",
"sa",
"gd",
"sr",
"sn",
"sd",
"si",
"sk",
"sl",
"so",
"st",
"es",
"su",
"sw",
"sv",
"tg",
"ta",
"tt",
"te",
"th",
"ti",
"ts",
"tr",
"tk",
"uk",
"ur",
"ug",
"uz",
"vi",
"cy",
"fy",
"xh",
"yi",
"yo",
"zu",
],
value="en",
info=(
"Specify to make sure the transcripts are retrieved in your desired language. Defaults to English: 'en'"
),
),
DropdownInput(
name="translation",
display_name="Translation Language",
advanced=True,
options=["", "en", "es", "fr", "de", "it", "pt", "ru", "ja", "ko", "hi", "ar", "id"],
info="Translate the transcripts to the specified language. Leave empty for no translation.",
),
]
outputs = [
Output(name="transcripts", display_name="Transcription", method="build_youtube_transcripts"),
]
def build_youtube_transcripts(self) -> Message:
"""Method to extracts transcripts from a YouTube video URL.
Returns:
Message: The transcripts of the video as a text string. If 'transcript_format'
is 'text', the transcripts are returned as a single continuous string. If
'transcript_format' is 'chunks', the transcripts are returned as a string
with timestamped segments.
Raises:
Exception: Returns an error message if transcript retrieval fails.
"""
try:
# Attempt to load transcripts in the specified language, fallback to any available language
languages = [self.language] if self.language else None
loader = YoutubeLoader.from_youtube_url(
self.url,
transcript_format=TranscriptFormat.TEXT
if self.transcript_format == "text"
else TranscriptFormat.CHUNKS,
chunk_size_seconds=self.chunk_size_seconds,
language=languages,
translation=self.translation or None,
)
transcripts = loader.load()
if self.transcript_format == "text":
# Extract only the page_content from the Document
result = transcripts[0].page_content
return Message(text=result)
# For chunks, format the output with timestamps
formatted_chunks = []
for doc in transcripts:
start_seconds = int(doc.metadata["start_seconds"])
start_minutes = start_seconds // 60
start_seconds %= 60
timestamp = f"{start_minutes:02d}:{start_seconds:02d}"
formatted_chunks.append(f"{timestamp} {doc.page_content}")
result = "\n".join(formatted_chunks)
return Message(text=result)
except Exception as exc: # noqa: BLE001
# Using a specific error type for the return value
error_msg = f"Failed to get YouTube transcripts: {exc!s}"
return Message(text=error_msg)

View file

@ -0,0 +1,17 @@
from .channel import YouTubeChannelComponent
from .comments import YouTubeCommentsComponent
from .playlist import YouTubePlaylistComponent
from .search import YouTubeSearchComponent
from .trending import YouTubeTrendingComponent
from .video_details import YouTubeVideoDetailsComponent
from .youtube_transcripts import YouTubeTranscriptsComponent
__all__ = [
"YouTubeChannelComponent",
"YouTubeCommentsComponent",
"YouTubePlaylistComponent",
"YouTubeSearchComponent",
"YouTubeTranscriptsComponent",
"YouTubeTrendingComponent",
"YouTubeVideoDetailsComponent",
]

View file

@ -0,0 +1,227 @@
from typing import Any
from urllib.error import HTTPError
import pandas as pd
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from langflow.custom import Component
from langflow.inputs import BoolInput, MessageTextInput, SecretStrInput
from langflow.schema import DataFrame
from langflow.template import Output
class YouTubeChannelComponent(Component):
"""A component that retrieves detailed information about YouTube channels."""
display_name: str = "YouTube Channel"
description: str = "Retrieves detailed information and statistics about YouTube channels as a DataFrame."
icon: str = "YouTube"
# Constants
CHANNEL_ID_LENGTH = 24
QUOTA_EXCEEDED_STATUS = 403
NOT_FOUND_STATUS = 404
MAX_PLAYLIST_RESULTS = 10
inputs = [
MessageTextInput(
name="channel_url",
display_name="Channel URL or ID",
info="The URL or ID of the YouTube channel.",
tool_mode=True,
required=True,
),
SecretStrInput(
name="api_key",
display_name="YouTube API Key",
info="Your YouTube Data API key.",
required=True,
),
BoolInput(
name="include_statistics",
display_name="Include Statistics",
value=True,
info="Include channel statistics (views, subscribers, videos).",
),
BoolInput(
name="include_branding",
display_name="Include Branding",
value=True,
info="Include channel branding settings (banner, thumbnails).",
advanced=True,
),
BoolInput(
name="include_playlists",
display_name="Include Playlists",
value=False,
info="Include channel's public playlists.",
advanced=True,
),
]
outputs = [
Output(name="channel_df", display_name="Channel Info", method="get_channel_info"),
]
def _extract_channel_id(self, channel_url: str) -> str:
"""Extracts the channel ID from various YouTube channel URL formats."""
import re
if channel_url.startswith("UC") and len(channel_url) == self.CHANNEL_ID_LENGTH:
return channel_url
patterns = {
"custom_url": r"youtube\.com\/c\/([^\/\n?]+)",
"channel_id": r"youtube\.com\/channel\/([^\/\n?]+)",
"user": r"youtube\.com\/user\/([^\/\n?]+)",
"handle": r"youtube\.com\/@([^\/\n?]+)",
}
for pattern_type, pattern in patterns.items():
match = re.search(pattern, channel_url)
if match:
if pattern_type == "channel_id":
return match.group(1)
return self._get_channel_id_by_name(match.group(1), pattern_type)
return channel_url
def _get_channel_id_by_name(self, channel_name: str, identifier_type: str) -> str:
"""Gets the channel ID using the channel name or custom URL."""
youtube = None
try:
youtube = build("youtube", "v3", developerKey=self.api_key)
if identifier_type == "handle":
channel_name = channel_name.lstrip("@")
request = youtube.search().list(part="id", q=channel_name, type="channel", maxResults=1)
response = request.execute()
if response["items"]:
return response["items"][0]["id"]["channelId"]
error_msg = f"Could not find channel ID for: {channel_name}"
raise ValueError(error_msg)
except (HttpError, HTTPError) as e:
error_msg = f"YouTube API error while getting channel ID: {e!s}"
raise RuntimeError(error_msg) from e
except Exception as e:
error_msg = f"Unexpected error while getting channel ID: {e!s}"
raise ValueError(error_msg) from e
finally:
if youtube:
youtube.close()
def _get_channel_playlists(self, youtube: Any, channel_id: str) -> list[dict[str, Any]]:
"""Gets the public playlists for a channel."""
try:
playlists_request = youtube.playlists().list(
part="snippet,contentDetails",
channelId=channel_id,
maxResults=self.MAX_PLAYLIST_RESULTS,
)
playlists_response = playlists_request.execute()
playlists = []
for item in playlists_response.get("items", []):
playlist_data = {
"playlist_title": item["snippet"]["title"],
"playlist_description": item["snippet"]["description"],
"playlist_id": item["id"],
"playlist_video_count": item["contentDetails"]["itemCount"],
"playlist_published_at": item["snippet"]["publishedAt"],
"playlist_thumbnail_url": item["snippet"]["thumbnails"]["default"]["url"],
}
playlists.append(playlist_data)
return playlists
except (HttpError, HTTPError) as e:
return [{"error": str(e)}]
else:
return playlists
def get_channel_info(self) -> DataFrame:
"""Retrieves channel information and returns it as a DataFrame."""
youtube = None
try:
# Get channel ID and initialize YouTube API client
channel_id = self._extract_channel_id(self.channel_url)
youtube = build("youtube", "v3", developerKey=self.api_key)
# Prepare parts for the API request
parts = ["snippet", "contentDetails"]
if self.include_statistics:
parts.append("statistics")
if self.include_branding:
parts.append("brandingSettings")
# Get channel information
channel_response = youtube.channels().list(part=",".join(parts), id=channel_id).execute()
if not channel_response["items"]:
return DataFrame(pd.DataFrame({"error": ["Channel not found"]}))
channel_info = channel_response["items"][0]
# Build basic channel data
channel_data = {
"title": [channel_info["snippet"]["title"]],
"description": [channel_info["snippet"]["description"]],
"custom_url": [channel_info["snippet"].get("customUrl", "")],
"published_at": [channel_info["snippet"]["publishedAt"]],
"country": [channel_info["snippet"].get("country", "Not specified")],
"channel_id": [channel_id],
}
# Add thumbnails
for size, thumb in channel_info["snippet"]["thumbnails"].items():
channel_data[f"thumbnail_{size}"] = [thumb["url"]]
# Add statistics if requested
if self.include_statistics:
stats = channel_info["statistics"]
channel_data.update(
{
"view_count": [int(stats.get("viewCount", 0))],
"subscriber_count": [int(stats.get("subscriberCount", 0))],
"hidden_subscriber_count": [stats.get("hiddenSubscriberCount", False)],
"video_count": [int(stats.get("videoCount", 0))],
}
)
# Add branding if requested
if self.include_branding:
branding = channel_info.get("brandingSettings", {})
channel_data.update(
{
"brand_title": [branding.get("channel", {}).get("title", "")],
"brand_description": [branding.get("channel", {}).get("description", "")],
"brand_keywords": [branding.get("channel", {}).get("keywords", "")],
"brand_banner_url": [branding.get("image", {}).get("bannerExternalUrl", "")],
}
)
# Create the initial DataFrame
channel_df = pd.DataFrame(channel_data)
# Add playlists if requested
if self.include_playlists:
playlists = self._get_channel_playlists(youtube, channel_id)
if playlists and "error" not in playlists[0]:
# Create a DataFrame for playlists
playlists_df = pd.DataFrame(playlists)
# Join with main DataFrame
channel_df = pd.concat([channel_df] * len(playlists_df), ignore_index=True)
for column in playlists_df.columns:
channel_df[column] = playlists_df[column].to_numpy()
return DataFrame(channel_df)
except (HttpError, HTTPError, Exception) as e:
return DataFrame(pd.DataFrame({"error": [str(e)]}))
finally:
if youtube:
youtube.close()

View file

@ -0,0 +1,231 @@
from contextlib import contextmanager
import pandas as pd
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from langflow.custom import Component
from langflow.inputs import BoolInput, DropdownInput, IntInput, MessageTextInput, SecretStrInput
from langflow.schema import DataFrame
from langflow.template import Output
class YouTubeCommentsComponent(Component):
"""A component that retrieves comments from YouTube videos."""
display_name: str = "YouTube Comments"
description: str = "Retrieves and analyzes comments from YouTube videos."
icon: str = "YouTube"
# Constants
COMMENTS_DISABLED_STATUS = 403
NOT_FOUND_STATUS = 404
API_MAX_RESULTS = 100
inputs = [
MessageTextInput(
name="video_url",
display_name="Video URL",
info="The URL of the YouTube video to get comments from.",
tool_mode=True,
required=True,
),
SecretStrInput(
name="api_key",
display_name="YouTube API Key",
info="Your YouTube Data API key.",
required=True,
),
IntInput(
name="max_results",
display_name="Max Results",
value=20,
info="The maximum number of comments to return.",
),
DropdownInput(
name="sort_by",
display_name="Sort By",
options=["time", "relevance"],
value="relevance",
info="Sort comments by time or relevance.",
),
BoolInput(
name="include_replies",
display_name="Include Replies",
value=False,
info="Whether to include replies to comments.",
advanced=True,
),
BoolInput(
name="include_metrics",
display_name="Include Metrics",
value=True,
info="Include metrics like like count and reply count.",
advanced=True,
),
]
outputs = [
Output(name="comments", display_name="Comments", method="get_video_comments"),
]
def _extract_video_id(self, video_url: str) -> str:
"""Extracts the video ID from a YouTube URL."""
import re
patterns = [
r"(?:youtube\.com\/watch\?v=|youtu.be\/|youtube.com\/embed\/)([^&\n?#]+)",
r"youtube.com\/shorts\/([^&\n?#]+)",
]
for pattern in patterns:
match = re.search(pattern, video_url)
if match:
return match.group(1)
return video_url.strip()
def _process_reply(self, reply: dict, parent_id: str, *, include_metrics: bool = True) -> dict:
"""Process a single reply comment."""
reply_snippet = reply["snippet"]
reply_data = {
"comment_id": reply["id"],
"parent_comment_id": parent_id,
"author": reply_snippet["authorDisplayName"],
"text": reply_snippet["textDisplay"],
"published_at": reply_snippet["publishedAt"],
"is_reply": True,
}
if include_metrics:
reply_data["like_count"] = reply_snippet["likeCount"]
reply_data["reply_count"] = 0 # Replies can't have replies
return reply_data
def _process_comment(
self, item: dict, *, include_metrics: bool = True, include_replies: bool = False
) -> list[dict]:
"""Process a single comment thread."""
comment = item["snippet"]["topLevelComment"]["snippet"]
comment_id = item["snippet"]["topLevelComment"]["id"]
# Basic comment data
processed_comments = [
{
"comment_id": comment_id,
"parent_comment_id": "", # Empty for top-level comments
"author": comment["authorDisplayName"],
"author_channel_url": comment.get("authorChannelUrl", ""),
"text": comment["textDisplay"],
"published_at": comment["publishedAt"],
"updated_at": comment["updatedAt"],
"is_reply": False,
}
]
# Add metrics if requested
if include_metrics:
processed_comments[0].update(
{
"like_count": comment["likeCount"],
"reply_count": item["snippet"]["totalReplyCount"],
}
)
# Add replies if requested
if include_replies and item["snippet"]["totalReplyCount"] > 0 and "replies" in item:
for reply in item["replies"]["comments"]:
reply_data = self._process_reply(reply, parent_id=comment_id, include_metrics=include_metrics)
processed_comments.append(reply_data)
return processed_comments
@contextmanager
def youtube_client(self):
"""Context manager for YouTube API client."""
client = build("youtube", "v3", developerKey=self.api_key)
try:
yield client
finally:
client.close()
def get_video_comments(self) -> DataFrame:
"""Retrieves comments from a YouTube video and returns as DataFrame."""
try:
# Extract video ID from URL
video_id = self._extract_video_id(self.video_url)
# Use context manager for YouTube API client
with self.youtube_client() as youtube:
comments_data = []
results_count = 0
request = youtube.commentThreads().list(
part="snippet,replies",
videoId=video_id,
maxResults=min(self.API_MAX_RESULTS, self.max_results),
order=self.sort_by,
textFormat="plainText",
)
while request and results_count < self.max_results:
response = request.execute()
for item in response.get("items", []):
if results_count >= self.max_results:
break
comments = self._process_comment(
item, include_metrics=self.include_metrics, include_replies=self.include_replies
)
comments_data.extend(comments)
results_count += 1
# Get the next page if available and needed
if "nextPageToken" in response and results_count < self.max_results:
request = youtube.commentThreads().list(
part="snippet,replies",
videoId=video_id,
maxResults=min(self.API_MAX_RESULTS, self.max_results - results_count),
order=self.sort_by,
textFormat="plainText",
pageToken=response["nextPageToken"],
)
else:
request = None
# Convert to DataFrame
comments_df = pd.DataFrame(comments_data)
# Add video metadata
comments_df["video_id"] = video_id
comments_df["video_url"] = self.video_url
# Sort columns for better organization
column_order = [
"video_id",
"video_url",
"comment_id",
"parent_comment_id",
"is_reply",
"author",
"author_channel_url",
"text",
"published_at",
"updated_at",
]
if self.include_metrics:
column_order.extend(["like_count", "reply_count"])
comments_df = comments_df[column_order]
return DataFrame(comments_df)
except HttpError as e:
error_message = f"YouTube API error: {e!s}"
if e.resp.status == self.COMMENTS_DISABLED_STATUS:
error_message = "Comments are disabled for this video or API quota exceeded."
elif e.resp.status == self.NOT_FOUND_STATUS:
error_message = "Video not found."
return DataFrame(pd.DataFrame({"error": [error_message]}))

View file

@ -0,0 +1,32 @@
from pytube import Playlist # Ensure you have pytube installed
from langflow.custom import Component
from langflow.inputs import MessageTextInput
from langflow.schema import Data, DataFrame
from langflow.template import Output
class YouTubePlaylistComponent(Component):
display_name = "Youtube Playlist"
description = "Extracts all video URLs from a YouTube playlist."
icon = "YouTube" # Replace with a suitable icon
inputs = [
MessageTextInput(
name="playlist_url",
display_name="Playlist URL",
info="URL of the YouTube playlist.",
required=True,
),
]
outputs = [
Output(display_name="Video URLs", name="video_urls", method="extract_video_urls"),
]
def extract_video_urls(self) -> DataFrame:
playlist_url = self.playlist_url
playlist = Playlist(playlist_url)
video_urls = [video.watch_url for video in playlist.videos]
return DataFrame([Data(data={"video_url": url}) for url in video_urls])

View file

@ -0,0 +1,120 @@
from contextlib import contextmanager
import pandas as pd
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from langflow.custom import Component
from langflow.inputs import BoolInput, DropdownInput, IntInput, MessageTextInput, SecretStrInput
from langflow.schema import DataFrame
from langflow.template import Output
class YouTubeSearchComponent(Component):
"""A component that searches YouTube videos."""
display_name: str = "YouTube Search"
description: str = "Searches YouTube videos based on query."
icon: str = "YouTube"
inputs = [
MessageTextInput(
name="query",
display_name="Search Query",
info="The search query to look for on YouTube.",
tool_mode=True,
required=True,
),
SecretStrInput(
name="api_key",
display_name="YouTube API Key",
info="Your YouTube Data API key.",
required=True,
),
IntInput(
name="max_results",
display_name="Max Results",
value=10,
info="The maximum number of results to return.",
),
DropdownInput(
name="order",
display_name="Sort Order",
options=["relevance", "date", "rating", "title", "viewCount"],
value="relevance",
info="Sort order for the search results.",
),
BoolInput(
name="include_metadata",
display_name="Include Metadata",
value=True,
info="Include video metadata like description and statistics.",
advanced=True,
),
]
outputs = [
Output(name="results", display_name="Search Results", method="search_videos"),
]
@contextmanager
def youtube_client(self):
"""Context manager for YouTube API client."""
client = build("youtube", "v3", developerKey=self.api_key)
try:
yield client
finally:
client.close()
def search_videos(self) -> DataFrame:
"""Searches YouTube videos and returns results as DataFrame."""
try:
with self.youtube_client() as youtube:
search_response = (
youtube.search()
.list(
q=self.query,
part="id,snippet",
maxResults=self.max_results,
order=self.order,
type="video",
)
.execute()
)
results = []
for search_result in search_response.get("items", []):
video_id = search_result["id"]["videoId"]
snippet = search_result["snippet"]
result = {
"video_id": video_id,
"title": snippet["title"],
"description": snippet["description"],
"published_at": snippet["publishedAt"],
"channel_title": snippet["channelTitle"],
"thumbnail_url": snippet["thumbnails"]["default"]["url"],
}
if self.include_metadata:
# Get video details for additional metadata
video_response = youtube.videos().list(part="statistics,contentDetails", id=video_id).execute()
if video_response.get("items"):
video_details = video_response["items"][0]
result.update(
{
"view_count": int(video_details["statistics"]["viewCount"]),
"like_count": int(video_details["statistics"].get("likeCount", 0)),
"comment_count": int(video_details["statistics"].get("commentCount", 0)),
"duration": video_details["contentDetails"]["duration"],
}
)
results.append(result)
return DataFrame(pd.DataFrame(results))
except HttpError as e:
error_message = f"YouTube API error: {e!s}"
return DataFrame(pd.DataFrame({"error": [error_message]}))

View file

@ -0,0 +1,286 @@
from contextlib import contextmanager
import pandas as pd
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from langflow.custom import Component
from langflow.inputs import BoolInput, DropdownInput, IntInput, SecretStrInput
from langflow.schema import DataFrame
from langflow.template import Output
HTTP_FORBIDDEN = 403
HTTP_NOT_FOUND = 404
MAX_API_RESULTS = 50
class YouTubeTrendingComponent(Component):
"""A component that retrieves trending videos from YouTube."""
display_name: str = "YouTube Trending"
description: str = "Retrieves trending videos from YouTube with filtering options."
icon: str = "YouTube"
# Dictionary of country codes and names
COUNTRY_CODES = {
"Global": "US", # Default to US for global
"United States": "US",
"Brazil": "BR",
"United Kingdom": "GB",
"India": "IN",
"Japan": "JP",
"South Korea": "KR",
"Germany": "DE",
"France": "FR",
"Canada": "CA",
"Australia": "AU",
"Spain": "ES",
"Italy": "IT",
"Mexico": "MX",
"Russia": "RU",
"Netherlands": "NL",
"Poland": "PL",
"Argentina": "AR",
}
# Dictionary of video categories
VIDEO_CATEGORIES = {
"All": "0",
"Film & Animation": "1",
"Autos & Vehicles": "2",
"Music": "10",
"Pets & Animals": "15",
"Sports": "17",
"Travel & Events": "19",
"Gaming": "20",
"People & Blogs": "22",
"Comedy": "23",
"Entertainment": "24",
"News & Politics": "25",
"Education": "27",
"Science & Technology": "28",
"Nonprofits & Activism": "29",
}
inputs = [
SecretStrInput(
name="api_key",
display_name="YouTube API Key",
info="Your YouTube Data API key.",
required=True,
),
DropdownInput(
name="region",
display_name="Region",
options=list(COUNTRY_CODES.keys()),
value="Global",
info="The region to get trending videos from.",
),
DropdownInput(
name="category",
display_name="Category",
options=list(VIDEO_CATEGORIES.keys()),
value="All",
info="The category of videos to retrieve.",
),
IntInput(
name="max_results",
display_name="Max Results",
value=10,
info="Maximum number of trending videos to return (1-50).",
),
BoolInput(
name="include_statistics",
display_name="Include Statistics",
value=True,
info="Include video statistics (views, likes, comments).",
),
BoolInput(
name="include_content_details",
display_name="Include Content Details",
value=True,
info="Include video duration and quality info.",
advanced=True,
),
BoolInput(
name="include_thumbnails",
display_name="Include Thumbnails",
value=True,
info="Include video thumbnail URLs.",
advanced=True,
),
]
outputs = [
Output(name="trending_videos", display_name="Trending Videos", method="get_trending_videos"),
]
max_results: int
def _format_duration(self, duration: str) -> str:
"""Formats ISO 8601 duration to readable format."""
import re
# Remove 'PT' from the start of duration
duration = duration[2:]
hours = 0
minutes = 0
seconds = 0
# Extract hours, minutes and seconds
time_dict = {}
for time_unit in ["H", "M", "S"]:
match = re.search(r"(\d+)" + time_unit, duration)
if match:
time_dict[time_unit] = int(match.group(1))
if "H" in time_dict:
hours = time_dict["H"]
if "M" in time_dict:
minutes = time_dict["M"]
if "S" in time_dict:
seconds = time_dict["S"]
# Format the time string
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
return f"{minutes:02d}:{seconds:02d}"
@contextmanager
def youtube_client(self):
"""Context manager for YouTube API client."""
client = build("youtube", "v3", developerKey=self.api_key)
try:
yield client
finally:
client.close()
def get_trending_videos(self) -> DataFrame:
"""Retrieves trending videos from YouTube and returns as DataFrame."""
try:
# Validate max_results
if not 1 <= self.max_results <= MAX_API_RESULTS:
self.max_results = min(max(1, self.max_results), MAX_API_RESULTS)
# Use context manager for YouTube API client
with self.youtube_client() as youtube:
# Get country code
region_code = self.COUNTRY_CODES[self.region]
# Prepare API request parts
parts = ["snippet"]
if self.include_statistics:
parts.append("statistics")
if self.include_content_details:
parts.append("contentDetails")
# Prepare API request parameters
request_params = {
"part": ",".join(parts),
"chart": "mostPopular",
"regionCode": region_code,
"maxResults": self.max_results,
}
# Add category filter if not "All"
if self.category != "All":
request_params["videoCategoryId"] = self.VIDEO_CATEGORIES[self.category]
# Get trending videos
request = youtube.videos().list(**request_params)
response = request.execute()
videos_data = []
for item in response.get("items", []):
video_data = {
"video_id": item["id"],
"title": item["snippet"]["title"],
"description": item["snippet"]["description"],
"channel_id": item["snippet"]["channelId"],
"channel_title": item["snippet"]["channelTitle"],
"published_at": item["snippet"]["publishedAt"],
"url": f"https://www.youtube.com/watch?v={item['id']}",
"region": self.region,
"category": self.category,
}
# Add thumbnails if requested
if self.include_thumbnails:
for size, thumb in item["snippet"]["thumbnails"].items():
video_data[f"thumbnail_{size}_url"] = thumb["url"]
video_data[f"thumbnail_{size}_width"] = thumb.get("width", 0)
video_data[f"thumbnail_{size}_height"] = thumb.get("height", 0)
# Add statistics if requested
if self.include_statistics and "statistics" in item:
video_data.update(
{
"view_count": int(item["statistics"].get("viewCount", 0)),
"like_count": int(item["statistics"].get("likeCount", 0)),
"comment_count": int(item["statistics"].get("commentCount", 0)),
}
)
# Add content details if requested
if self.include_content_details and "contentDetails" in item:
content_details = item["contentDetails"]
video_data.update(
{
"duration": self._format_duration(content_details["duration"]),
"definition": content_details.get("definition", "hd").upper(),
"has_captions": content_details.get("caption", "false") == "true",
"licensed_content": content_details.get("licensedContent", False),
"projection": content_details.get("projection", "rectangular"),
}
)
videos_data.append(video_data)
# Convert to DataFrame
videos_df = pd.DataFrame(videos_data)
# Organize columns
column_order = [
"video_id",
"title",
"channel_id",
"channel_title",
"category",
"region",
"published_at",
"url",
"description",
]
if self.include_statistics:
column_order.extend(["view_count", "like_count", "comment_count"])
if self.include_content_details:
column_order.extend(["duration", "definition", "has_captions", "licensed_content", "projection"])
# Add thumbnail columns at the end if included
if self.include_thumbnails:
thumbnail_cols = [col for col in videos_df.columns if col.startswith("thumbnail_")]
column_order.extend(sorted(thumbnail_cols))
# Reorder columns, including any that might not be in column_order
remaining_cols = [col for col in videos_df.columns if col not in column_order]
videos_df = videos_df[column_order + remaining_cols]
return DataFrame(videos_df)
except HttpError as e:
error_message = f"YouTube API error: {e}"
if e.resp.status == HTTP_FORBIDDEN:
error_message = "API quota exceeded or access forbidden."
elif e.resp.status == HTTP_NOT_FOUND:
error_message = "Resource not found."
return DataFrame(pd.DataFrame({"error": [error_message]}))
except Exception as e:
import logging
logging.exception("An unexpected error occurred:")
return DataFrame(pd.DataFrame({"error": [str(e)]}))

View file

@ -0,0 +1,263 @@
from contextlib import contextmanager
import googleapiclient
import pandas as pd
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from langflow.custom import Component
from langflow.inputs import BoolInput, MessageTextInput, SecretStrInput
from langflow.schema import DataFrame
from langflow.template import Output
class YouTubeVideoDetailsComponent(Component):
"""A component that retrieves detailed information about YouTube videos."""
display_name: str = "YouTube Video Details"
description: str = "Retrieves detailed information and statistics about YouTube videos."
icon: str = "YouTube"
inputs = [
MessageTextInput(
name="video_url",
display_name="Video URL",
info="The URL of the YouTube video.",
tool_mode=True,
required=True,
),
SecretStrInput(
name="api_key",
display_name="YouTube API Key",
info="Your YouTube Data API key.",
required=True,
),
BoolInput(
name="include_statistics",
display_name="Include Statistics",
value=True,
info="Include video statistics (views, likes, comments).",
),
BoolInput(
name="include_content_details",
display_name="Include Content Details",
value=True,
info="Include video duration, quality, and age restriction info.",
advanced=True,
),
BoolInput(
name="include_tags",
display_name="Include Tags",
value=True,
info="Include video tags and keywords.",
advanced=True,
),
BoolInput(
name="include_thumbnails",
display_name="Include Thumbnails",
value=True,
info="Include video thumbnail URLs in different resolutions.",
advanced=True,
),
]
outputs = [
Output(name="video_data", display_name="Video Data", method="get_video_details"),
]
API_FORBIDDEN = 403
VIDEO_NOT_FOUND = 404
@contextmanager
def youtube_client(self):
"""Context manager for YouTube API client."""
client = build("youtube", "v3", developerKey=self.api_key)
try:
yield client
finally:
client.close()
def _extract_video_id(self, video_url: str) -> str:
"""Extracts the video ID from a YouTube URL."""
import re
patterns = [
r"(?:youtube\.com\/watch\?v=|youtu.be\/|youtube.com\/embed\/)([^&\n?#]+)",
r"youtube.com\/shorts\/([^&\n?#]+)",
]
for pattern in patterns:
match = re.search(pattern, video_url)
if match:
return match.group(1)
return video_url.strip()
def _format_duration(self, duration: str) -> str:
"""Formats the ISO 8601 duration to a readable format."""
import re
hours = 0
minutes = 0
seconds = 0
hours_match = re.search(r"(\d+)H", duration)
minutes_match = re.search(r"(\d+)M", duration)
seconds_match = re.search(r"(\d+)S", duration)
if hours_match:
hours = int(hours_match.group(1))
if minutes_match:
minutes = int(minutes_match.group(1))
if seconds_match:
seconds = int(seconds_match.group(1))
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
return f"{minutes:02d}:{seconds:02d}"
def get_video_details(self) -> DataFrame:
"""Retrieves detailed information about a YouTube video and returns as DataFrame."""
try:
with self.youtube_client() as youtube:
# Extract video ID
video_id = self._extract_video_id(self.video_url)
# Prepare parts for the API request
parts = ["snippet"]
if self.include_statistics:
parts.append("statistics")
if self.include_content_details:
parts.append("contentDetails")
# Get video information
video_response = youtube.videos().list(part=",".join(parts), id=video_id).execute()
if not video_response["items"]:
return DataFrame(pd.DataFrame({"error": ["Video not found"]}))
video_info = video_response["items"][0]
snippet = video_info["snippet"]
# Build video data dictionary
video_data = {
"video_id": [video_id],
"url": [f"https://www.youtube.com/watch?v={video_id}"],
"title": [snippet["title"]],
"description": [snippet["description"]],
"published_at": [snippet["publishedAt"]],
"channel_id": [snippet["channelId"]],
"channel_title": [snippet["channelTitle"]],
"category_id": [snippet.get("categoryId", "Unknown")],
"live_broadcast_content": [snippet.get("liveBroadcastContent", "none")],
}
# Add thumbnails if requested
if self.include_thumbnails:
for size, thumb in snippet["thumbnails"].items():
video_data[f"thumbnail_{size}_url"] = [thumb["url"]]
video_data[f"thumbnail_{size}_width"] = [thumb.get("width", 0)]
video_data[f"thumbnail_{size}_height"] = [thumb.get("height", 0)]
# Add tags if requested
if self.include_tags and "tags" in snippet:
video_data["tags"] = [", ".join(snippet["tags"])]
video_data["tags_count"] = [len(snippet["tags"])]
# Add statistics if requested
if self.include_statistics and "statistics" in video_info:
stats = video_info["statistics"]
video_data.update(
{
"view_count": [int(stats.get("viewCount", 0))],
"like_count": [int(stats.get("likeCount", 0))],
"favorite_count": [int(stats.get("favoriteCount", 0))],
"comment_count": [int(stats.get("commentCount", 0))],
}
)
# Add content details if requested
if self.include_content_details and "contentDetails" in video_info:
content_details = video_info["contentDetails"]
video_data.update(
{
"duration": [self._format_duration(content_details["duration"])],
"dimension": [content_details.get("dimension", "2d")],
"definition": [content_details.get("definition", "hd").upper()],
"has_captions": [content_details.get("caption", "false") == "true"],
"licensed_content": [content_details.get("licensedContent", False)],
"projection": [content_details.get("projection", "rectangular")],
"has_custom_thumbnails": [content_details.get("hasCustomThumbnail", False)],
}
)
# Add content rating if available
if "contentRating" in content_details:
rating_info = content_details["contentRating"]
video_data["content_rating"] = [str(rating_info)]
# Create DataFrame with organized columns
video_df = pd.DataFrame(video_data)
# Organize columns in logical groups
basic_cols = [
"video_id",
"title",
"url",
"channel_id",
"channel_title",
"published_at",
"category_id",
"live_broadcast_content",
"description",
]
stat_cols = ["view_count", "like_count", "favorite_count", "comment_count"]
content_cols = [
"duration",
"dimension",
"definition",
"has_captions",
"licensed_content",
"projection",
"has_custom_thumbnails",
"content_rating",
]
tag_cols = ["tags", "tags_count"]
thumb_cols = [col for col in video_df.columns if col.startswith("thumbnail_")]
# Reorder columns based on what's included
ordered_cols = basic_cols[:]
if self.include_statistics:
ordered_cols.extend([col for col in stat_cols if col in video_df.columns])
if self.include_content_details:
ordered_cols.extend([col for col in content_cols if col in video_df.columns])
if self.include_tags:
ordered_cols.extend([col for col in tag_cols if col in video_df.columns])
if self.include_thumbnails:
ordered_cols.extend(sorted(thumb_cols))
# Add any remaining columns
remaining_cols = [col for col in video_df.columns if col not in ordered_cols]
ordered_cols.extend(remaining_cols)
return DataFrame(video_df[ordered_cols])
except (HttpError, googleapiclient.errors.HttpError) as e:
error_message = f"YouTube API error: {e!s}"
if e.resp.status == self.API_FORBIDDEN:
error_message = "API quota exceeded or access forbidden."
elif e.resp.status == self.VIDEO_NOT_FOUND:
error_message = "Video not found."
return DataFrame(pd.DataFrame({"error": [error_message]}))
except KeyError as e:
return DataFrame(pd.DataFrame({"error": [str(e)]}))

View file

@ -0,0 +1,85 @@
import pandas as pd
import youtube_transcript_api
from langchain_community.document_loaders import YoutubeLoader
from langchain_community.document_loaders.youtube import TranscriptFormat
from langflow.custom import Component
from langflow.inputs import DropdownInput, IntInput, MultilineInput
from langflow.schema import DataFrame, Message
from langflow.template import Output
class YouTubeTranscriptsComponent(Component):
"""A component that extracts spoken content from YouTube videos as transcripts."""
display_name: str = "YouTube Transcripts"
description: str = "Extracts spoken content from YouTube videos with both DataFrame and text output options."
icon: str = "YouTube"
name = "YouTubeTranscripts"
inputs = [
MultilineInput(
name="url",
display_name="Video URL",
info="Enter the YouTube video URL to get transcripts from.",
tool_mode=True,
required=True,
),
IntInput(
name="chunk_size_seconds",
display_name="Chunk Size (seconds)",
value=60,
info="The size of each transcript chunk in seconds.",
),
DropdownInput(
name="translation",
display_name="Translation Language",
advanced=True,
options=["", "en", "es", "fr", "de", "it", "pt", "ru", "ja", "ko", "hi", "ar", "id"],
info="Translate the transcripts to the specified language. Leave empty for no translation.",
),
]
outputs = [
Output(name="dataframe", display_name="Chunks", method="get_dataframe_output"),
Output(name="message", display_name="Transcript", method="get_message_output"),
]
def _load_transcripts(self, *, as_chunks: bool = True):
"""Internal method to load transcripts from YouTube."""
loader = YoutubeLoader.from_youtube_url(
self.url,
transcript_format=TranscriptFormat.CHUNKS if as_chunks else TranscriptFormat.TEXT,
chunk_size_seconds=self.chunk_size_seconds,
translation=self.translation or None,
)
return loader.load()
def get_dataframe_output(self) -> DataFrame:
"""Provides transcript output as a DataFrame with timestamp and text columns."""
try:
transcripts = self._load_transcripts(as_chunks=True)
# Create DataFrame with timestamp and text columns
data = []
for doc in transcripts:
start_seconds = int(doc.metadata["start_seconds"])
start_minutes = start_seconds // 60
start_seconds %= 60
timestamp = f"{start_minutes:02d}:{start_seconds:02d}"
data.append({"timestamp": timestamp, "text": doc.page_content})
return DataFrame(pd.DataFrame(data))
except (youtube_transcript_api.TranscriptsDisabled, youtube_transcript_api.NoTranscriptFound) as exc:
return DataFrame(pd.DataFrame({"error": [f"Failed to get YouTube transcripts: {exc!s}"]}))
def get_message_output(self) -> Message:
"""Provides transcript output as continuous text."""
try:
transcripts = self._load_transcripts(as_chunks=False)
result = transcripts[0].page_content
return Message(text=result)
except (youtube_transcript_api.TranscriptsDisabled, youtube_transcript_api.NoTranscriptFound) as exc:
error_msg = f"Failed to get YouTube transcripts: {exc!s}"
return Message(text=error_msg)

View file

@ -742,6 +742,8 @@ export const BUNDLES_SIDEBAR_FOLDER_NAMES = [
"assemblyai",
"LangWatch",
"langwatch",
"Youtube",
"youtube",
];
export const AUTHORIZED_DUPLICATE_REQUESTS = [

View file

@ -535,6 +535,7 @@ export const SIDEBAR_BUNDLES = [
{ display_name: "Git", name: "git", icon: "GitLoader" },
{ display_name: "Confluence", name: "confluence", icon: "Confluence" },
{ display_name: "Mem0", name: "mem0", icon: "Mem0" },
{ display_name: "Youtube", name: "youtube", icon: "Youtube" },
];
export const categoryIcons = {

View file

@ -11,15 +11,11 @@ test(
await page.getByTestId("sidebar-search-input").click();
await page.getByTestId("sidebar-search-input").fill("youtube");
await page.waitForSelector('[id="toolsYouTube Transcripts"]', {
timeout: 3000,
});
await page.waitForTimeout(2000);
await page.getByTestId("youtubeYouTube Transcripts").hover();
await page.getByTestId("add-component-button-youtube-transcripts").click();
await page
.locator('//*[@id="toolsYouTube Transcripts"]')
.dragTo(page.locator('//*[@id="react-flow-id"]'));
await page.mouse.up();
await page.mouse.down();
await page.getByTestId("fit_view").click();
let outdatedComponents = await page
@ -35,16 +31,15 @@ test(
.getByTestId("textarea_str_url")
.fill("https://www.youtube.com/watch?v=VqhCQZaH4Vs");
await page.getByTestId("fit_view").click();
await page.getByTestId("button_run_youtube transcripts").click();
await page.waitForSelector("text=built successfully", { timeout: 30000 });
await page.getByTestId("output-inspection-transcription").first().click();
await page.waitForSelector("text=built successfully", { timeout: 300000 });
await page.getByTestId("output-inspection-transcript").first().click();
await page.waitForSelector("text=Component Output", { timeout: 30000 });
await page.getByRole("gridcell").first().click();
const value = await page.getByPlaceholder("Empty").inputValue();
expect(value.length).toBeGreaterThan(10);
},