From d73624e862775dde3b5a97b1bc0160447b93244a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebasti=C3=A1n=20Est=C3=A9vez?= Date: Sun, 30 Mar 2025 10:50:51 -0400 Subject: [PATCH] feat: voice mode tts endpoint (#7294) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 📝 (chat-view-wrapper.tsx): Refactor ChatViewWrapper component to improve code readability and maintainability 📝 (chat-input.tsx): Add functionality to set voice assistant active state when showAudioInput is true 📝 (voice-assistant.tsx): Add functionality to set voice assistant active state and scroll to bottom when closing audio input 📝 (chat-view.tsx): Update ChatView component to consider sidebarOpen and isVoiceAssistantActive states 📝 (voiceStore.ts): Add isVoiceAssistantActive state and setIsVoiceAssistantActive function to voice store 📝 (index.ts, voice.types.ts): Update types to include sidebarOpen prop in chatViewProps and isVoiceAssistantActive state in VoiceStoreType * url change * 🔧 (chat-input.tsx): Add new session close voice assistant functionality to chat input component 🔧 (voice-button.tsx): Update voice button to set new session close voice assistant state 🔧 (sidebar-open-view.tsx): Update sidebar open view to set new session close voice assistant state 🔧 (voiceStore.ts, voice.types.ts): Add new session close voice assistant state and setter to voice store and types * ♻️ (chat-input.tsx): remove unused setNewSessionCloseVoiceAssistant function to clean up code and improve readability * url * merge * 📝 (chat-view-wrapper.tsx): Refactor ChatViewWrapper component to improve code readability and maintainability 📝 (chat-input.tsx): Add functionality to set voice assistant active state when showAudioInput is true 📝 (voice-assistant.tsx): Add functionality to set voice assistant active state and scroll to bottom when closing audio input 📝 (chat-view.tsx): Update ChatView component to consider sidebarOpen and isVoiceAssistantActive states 📝 (voiceStore.ts): Add isVoiceAssistantActive state and setIsVoiceAssistantActive function to voice store 📝 (index.ts, voice.types.ts): Update types to include sidebarOpen prop in chatViewProps and isVoiceAssistantActive state in VoiceStoreType * new endpoint * 🐛 (voice_mode.py): remove unnecessary comment and update input parameter in speech creation function 🐛 (use-start-conversation.ts): update WebSocket URL to use flow_tts endpoint and add support for audio language and input audio transcription model in WebSocket session update configuration * 📝 (voice_mode.py): add "voice" attribute with value "coral" to TTSConfig class for voice mode customization ♻️ (use-start-conversation.ts): refactor code to use "transcription_session.update" type and update session attributes based on audioSettings and audioLanguage variables * ✨ (voice_mode.py): introduce a new 'voice' attribute with the value "coral" to specify the voice used for text-to-speech conversion in TTSConfig class * ♻️ (voice_mode.py): remove unused 'voice' variable assignment to improve code cleanliness and readability * ✨ (voice_mode.py): update voice parameter value to "coral" for better voice quality and clarity in TTS websocket flow. * ✨ (voice_mode.py): add support for configuring OpenAI voice setting in TTSConfig class 🐛 (voice_mode.py): fix updating OpenAI voice setting in flow_tts_websocket function 📝 (use-start-conversation.ts): update voice settings format for flow_tts endpoint to include voice and provider information * 🐛 (use-start-conversation.ts): fix issue with immediate startRecording function call by adding a 300ms delay to ensure proper initialization * refactor * refactor * 🔧 (audio-settings-dialog.tsx): add isPlayingRef prop to SettingsVoiceModal component for managing audio playback state 🔧 (voice-assistant.tsx): pass isPlayingRef prop to VoiceAssistant component for controlling audio playback state * [autofix.ci] apply automated fixes * fixed tts and 11l * lint fix --------- Co-authored-by: cristhianzl Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- pyproject.toml | 1 + .../base/langflow/api/v1/voice_mode.py | 769 +++++++++++------- .../messages/use-get-messages-polling.ts | 4 + .../components/session-selector.tsx | 6 + .../IOModal/components/chat-view-wrapper.tsx | 41 +- .../chatView/chatInput/chat-input.tsx | 17 +- .../audio-settings/audio-settings-dialog.tsx | 3 + .../components/voice-button.tsx | 10 +- .../hooks/use-start-conversation.ts | 18 +- .../voice-assistant/voice-assistant.tsx | 28 +- .../chatView/components/chat-view.tsx | 13 +- .../IOModal/components/sidebar-open-view.tsx | 6 + src/frontend/src/stores/voiceStore.ts | 6 + src/frontend/src/types/components/index.ts | 1 + .../src/types/zustand/voice/voice.types.ts | 6 + uv.lock | 8 +- 16 files changed, 614 insertions(+), 323 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 5fc94c13c..59aa4f450 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -122,6 +122,7 @@ dependencies = [ "ibm-watsonx-ai>=1.3.1", "langchain-ibm>=0.3.8", "opik>=1.6.3", + "openai>=1.68.2", ] [dependency-groups] diff --git a/src/backend/base/langflow/api/v1/voice_mode.py b/src/backend/base/langflow/api/v1/voice_mode.py index 0ce0ba6a9..f5d4ea38b 100644 --- a/src/backend/base/langflow/api/v1/voice_mode.py +++ b/src/backend/base/langflow/api/v1/voice_mode.py @@ -2,8 +2,6 @@ import asyncio import base64 import json import os - -# For sync queue and thread import queue import threading import traceback @@ -15,12 +13,13 @@ from uuid import UUID, uuid4 import numpy as np import requests -import sqlalchemy.exc +import sqlalchemy import webrtcvad import websockets from cryptography.fernet import InvalidToken -from elevenlabs.client import ElevenLabs +from elevenlabs import ElevenLabs from fastapi import APIRouter, BackgroundTasks, Security +from openai import OpenAI from sqlalchemy import select from starlette.websockets import WebSocket, WebSocketDisconnect @@ -31,8 +30,8 @@ from langflow.logging import logger from langflow.memory import aadd_messagetables from langflow.schema.properties import Properties from langflow.services.auth.utils import api_key_header, api_key_query, api_key_security, get_current_user_by_jwt +from langflow.services.database.models import MessageTable from langflow.services.database.models.flow.model import Flow -from langflow.services.database.models.message.model import MessageTable from langflow.services.deps import get_variable_service, session_scope from langflow.utils.voice_utils import ( BYTES_PER_24K_FRAME, @@ -44,7 +43,7 @@ router = APIRouter(prefix="/voice", tags=["Voice"]) SILENCE_THRESHOLD = 0.1 PREFIX_PADDING_MS = 100 -SILENCE_DURATION_MS = 100 +SILENCE_DURATION_MS = 300 AUDIO_SAMPLE_THRESHOLD = 100 SESSION_INSTRUCTIONS = """ Your instructions will be divided into three mutually exclusive sections: "Permanent", "Default", and "Additional". @@ -64,7 +63,7 @@ Your instructions will be divided into three mutually exclusive sections: "Perma * Converse with the user to assist with their question. * Never provide URLs in repsonses, but you may use URLs in tool calls or when processing those URLs' content. -* Always (and I mean *always*) let the user know before you call a function that you will be +* Always (and I mean *always*) let the user know before you will be doing so. * Always update the user with the required information, when the function returns. * Unless otherwise requested, only summarize the return results. Do not repeat everything. @@ -76,99 +75,66 @@ Your instructions will be divided into three mutually exclusive sections: "Perma [ADDITIONAL] The following instructions are to be considered only "Additional" """ - -class VoiceConfig: - def __init__(self, session_id: str): - self.session_id = session_id - self.use_elevenlabs = False - self.elevenlabs_voice = "JBFqnCBsd6RMkjVDRZzb" - self.elevenlabs_model = "eleven_multilingual_v2" - self.elevenlabs_client = None - self.elevenlabs_key = None - self.barge_in_enabled = False - - self.default_openai_realtime_session = { - "modalities": ["text", "audio"], - "instructions": SESSION_INSTRUCTIONS, - "voice": "echo", - "temperature": 0.8, - "input_audio_format": "pcm16", - "output_audio_format": "pcm16", - "turn_detection": { - "type": "server_vad", - "threshold": SILENCE_THRESHOLD, - "prefix_padding_ms": PREFIX_PADDING_MS, - "silence_duration_ms": SILENCE_DURATION_MS, - }, - "input_audio_transcription": {"model": "whisper-1"}, - "tools": [], - "tool_choice": "auto", - } - self.openai_realtime_session: dict[str, Any] = {} - - def get_session_dict(self): - """Return a copy of the default session dictionary with current settings.""" - return dict(self.default_openai_realtime_session) +# --- Helper Functions --- -# Create a cache for voice configs -voice_config_cache: dict[str, VoiceConfig] = {} +async def authenticate_and_get_openai_key(client_websocket: WebSocket, session: DbSession): + """Authenticate the user using a token or API key and retrieve the OpenAI API key. - -def get_voice_config(session_id: str) -> VoiceConfig: - """Get or create a VoiceConfig instance for the given session_id.""" - if session_id is None: - msg = "session_id cannot be None" - raise ValueError(msg) - - if session_id not in voice_config_cache: - voice_config_cache[session_id] = VoiceConfig(session_id) - return voice_config_cache[session_id] - - -# Create a global dictionary to store queues for each session -message_queues: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue) -# Track active message processing tasks -message_tasks: dict[str, asyncio.Task] = {} - - -async def get_flow_desc_from_db(flow_id: str) -> Flow: - """Get flow from database.""" - async with session_scope() as session: - stmt = select(Flow).where(Flow.id == UUID(flow_id)) - result = await session.exec(stmt) - flow = result.scalar_one_or_none() - if not flow: - error_message = f"Flow with id {flow_id} not found" - raise ValueError(error_message) - return flow.description - - -def pcm16_to_float_array(pcm_data): - values = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) - return values / 32768.0 # Normalize to -1.0 to 1.0 - - -async def text_chunker_with_timeout(chunks, timeout=0.3): - """Async generator that takes an async iterable (of text pieces),. - - accumulates them and yields chunks without breaking sentences. - If no new text is received within 'timeout' seconds and there is - buffered text, it flushes that text. + Returns a tuple: (current_user, openai_key). If authentication fails, sends an error + message to the client and returns (None, None). """ + token = client_websocket.cookies.get("access_token_lf") + current_user = None + if token: + current_user = await get_current_user_by_jwt(token, session) + if current_user is None: + current_user = await api_key_security(Security(api_key_query), Security(api_key_header)) + if current_user is None: + await client_websocket.send_json( + { + "type": "error", + "code": "langflow_auth", + "message": "You must pass a valid Langflow token or cookie.", + } + ) + return None, None + variable_service = get_variable_service() + try: + openai_key_value = await variable_service.get_variable( + user_id=current_user.id, name="OPENAI_API_KEY", field="openai_api_key", session=session + ) + openai_key = openai_key_value if openai_key_value is not None else os.getenv("OPENAI_API_KEY", "") + if not openai_key or openai_key == "dummy": + await client_websocket.send_json( + { + "type": "error", + "code": "api_key_missing", + "key_name": "OPENAI_API_KEY", + "message": "OpenAI API key not found. Please set your API key as an env var or a global variable.", + } + ) + return None, None + except Exception as e: # noqa: BLE001 + logger.error(f"Error with API key: {e}") + logger.error(traceback.format_exc()) + return None, None + return current_user, openai_key + + +# --- Synchronous Text Chunker --- +def sync_text_chunker(sync_queue_obj: queue.Queue, timeout: float = 0.3): + """Synchronous generator that reads text pieces from a sync queue and yields complete chunks.""" splitters = (".", ",", "?", "!", ";", ":", "—", "-", "(", ")", "[", "]", "}", " ") buffer = "" - ait = chunks.__aiter__() while True: try: - text = await asyncio.wait_for(ait.__anext__(), timeout=timeout) - except asyncio.TimeoutError: + text = sync_queue_obj.get(timeout=timeout) + except queue.Empty: if buffer: yield buffer + " " buffer = "" continue - except StopAsyncIteration: - break if text is None: if buffer: yield buffer + " " @@ -185,15 +151,6 @@ async def text_chunker_with_timeout(chunks, timeout=0.3): yield buffer + " " -async def queue_generator(queue: asyncio.Queue): - """Async generator that yields items from a queue.""" - while True: - item = await queue.get() - if item is None: - break - yield item - - async def handle_function_call( websocket: WebSocket, openai_ws: websockets.WebSocketClientProtocol, @@ -216,7 +173,6 @@ async def handle_function_call( background_tasks=background_tasks, current_user=current_user, ) - result = "" async for line in response.body_iterator: if not line: @@ -293,22 +249,254 @@ async def handle_function_call( await openai_ws.send(json.dumps(function_output)) -# --- Synchronous text chunker using a standard queue --- -def sync_text_chunker(sync_queue_obj: queue.Queue, timeout: float = 0.3): - """Synchronous generator that reads text pieces from a sync queue. +# --- Config Classes and Caches --- - accumulates them and yields complete chunks. + +class VoiceConfig: + def __init__(self, session_id: str): + self.session_id = session_id + self.use_elevenlabs = False + self.elevenlabs_voice = "JBFqnCBsd6RMkjVDRZzb" + self.elevenlabs_model = "eleven_multilingual_v2" + self.elevenlabs_client = None + self.elevenlabs_key = None + self.barge_in_enabled = False + + self.default_openai_realtime_session = { + "modalities": ["text", "audio"], + "instructions": SESSION_INSTRUCTIONS, + "voice": "echo", + "temperature": 0.8, + "input_audio_format": "pcm16", + "output_audio_format": "pcm16", + "turn_detection": { + "type": "server_vad", + "threshold": SILENCE_THRESHOLD, + "prefix_padding_ms": PREFIX_PADDING_MS, + "silence_duration_ms": SILENCE_DURATION_MS, + }, + "input_audio_transcription": {"model": "whisper-1"}, + "tools": [], + "tool_choice": "auto", + } + self.openai_realtime_session: dict[str, Any] = {} + + def get_session_dict(self): + return dict(self.default_openai_realtime_session) + + +class ElevenLabsClientManager: + _instance = None + _api_key = None + + @classmethod + async def get_client(cls, user_id=None, session=None): + """Get or create an ElevenLabs client with the API key.""" + if cls._instance is None: + if cls._api_key is None and user_id and session: + variable_service = get_variable_service() + try: + cls._api_key = await variable_service.get_variable( + user_id=user_id, + name="ELEVENLABS_API_KEY", + field="elevenlabs_api_key", + session=session, + ) + except (InvalidToken, ValueError) as e: + logger.error(f"Error with ElevenLabs API key: {e}") + cls._api_key = os.getenv("ELEVENLABS_API_KEY", "") + if not cls._api_key: + logger.error("ElevenLabs API key not found") + return None + except (KeyError, AttributeError, sqlalchemy.exc.SQLAlchemyError) as e: + logger.error(f"Exception getting ElevenLabs API key: {e}") + return None + + if cls._api_key: + cls._instance = ElevenLabs(api_key=cls._api_key) + + return cls._instance + + +voice_config_cache: dict[str, VoiceConfig] = {} + + +def get_voice_config(session_id: str) -> VoiceConfig: + if session_id is None: + msg = "session_id cannot be None" + raise ValueError(msg) + if session_id not in voice_config_cache: + voice_config_cache[session_id] = VoiceConfig(session_id) + return voice_config_cache[session_id] + + +class TTSConfig: + def __init__(self, session_id: str, openai_key: str): + self.session_id = session_id + self.use_elevenlabs = False + self.elevenlabs_voice = "JBFqnCBsd6RMkjVDRZzb" + self.elevenlabs_model = "eleven_multilingual_v2" + self.elevenlabs_client = None + self.default_tts_session = { + "type": "transcription_session.update", + "session": { + "input_audio_format": "pcm16", + "input_audio_transcription": { + "model": "gpt-4o-transcribe", + "language": "en", + }, + "turn_detection": { + "type": "server_vad", + "threshold": SILENCE_THRESHOLD, + "prefix_padding_ms": PREFIX_PADDING_MS, + "silence_duration_ms": SILENCE_DURATION_MS, + }, + "input_audio_noise_reduction": {"type": "near_field"}, + "include": [], + }, + } + self.tts_session: dict[str, Any] = {} + self.oai_client = OpenAI(api_key=openai_key) + self.openai_voice = "echo" + + def get_session_dict(self): + return dict(self.default_tts_session) + + def get_openai_client(self): + return self.oai_client + + def get_openai_voice(self): + return self.openai_voice + + +tts_config_cache: dict[str, TTSConfig] = {} + + +def get_tts_config(session_id: str, openai_key: str) -> TTSConfig: + if session_id is None: + msg = "session_id cannot be None" + raise ValueError(msg) + if session_id not in tts_config_cache: + tts_config_cache[session_id] = TTSConfig(session_id, openai_key) + return tts_config_cache[session_id] + + +async def add_message_to_db(message, session, flow_id, session_id, sender, sender_name): + """Enforce alternating sequence by checking the last sender. + + If two consecutive messages come from the same party (e.g. AI/AI), wait briefly. """ + queue_key = f"{flow_id}:{session_id}" + + # If the incoming sender is the same as the last recorded sender, + # wait for a change (with a timeout as a fallback). + if last_sender_by_session[queue_key] == sender: + await wait_for_sender_change(queue_key, sender, timeout=5) + last_sender_by_session[queue_key] = sender + + # Now proceed to create the message + message_obj = MessageTable( + text=message, + sender=sender, + sender_name=sender_name, + session_id=session_id, + files=[], + flow_id=uuid.UUID(flow_id) if isinstance(flow_id, str) else flow_id, + properties=Properties().model_dump(), + content_blocks=[], + category="message", + ) + + await message_queues[queue_key].put(message_obj) + # Update last sender for this session + + if queue_key not in message_tasks or message_tasks[queue_key].done(): + message_tasks[queue_key] = asyncio.create_task(process_message_queue(queue_key, session)) + + +async def wait_for_sender_change(queue_key, current_sender, timeout=5): + """Wait until the last sender for this session is not the same as current_sender. + + or until the timeout expires. + """ + waited = 0 + interval = 0.05 + while last_sender_by_session[queue_key] == current_sender and waited < timeout: + await asyncio.sleep(interval) + waited += interval + + +async def process_message_queue(queue_key, session): + """Process messages from the queue one by one.""" + try: + while True: + message = await message_queues[queue_key].get() + + try: + await aadd_messagetables([message], session) + logger.debug(f"Added message to DB: {message.text[:30]}...") + except ValueError as e: + logger.error(f"Error saving message to database (ValueError): {e}") + logger.error(traceback.format_exc()) + except sqlalchemy.exc.SQLAlchemyError as e: + logger.error(f"Error saving message to database (SQLAlchemyError): {e}") + logger.error(traceback.format_exc()) + except (KeyError, AttributeError, TypeError) as e: + # More specific exceptions instead of blind Exception + logger.error(f"Error saving message to database: {e}") + logger.error(traceback.format_exc()) + finally: + message_queues[queue_key].task_done() + + if message_queues[queue_key].empty(): + break + except Exception as e: # noqa: BLE001 + logger.debug(f"Message queue processor for {queue_key} was cancelled: {e}") + logger.error(traceback.format_exc()) + + +# --- Global Queues and Message Processing --- + +message_queues: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue) +message_tasks: dict[str, asyncio.Task] = {} +last_sender_by_session: defaultdict[str, str | None] = defaultdict(lambda: None) + + +async def get_flow_desc_from_db(flow_id: str) -> Flow: + async with session_scope() as session: + stmt = select(Flow).where(Flow.id == UUID(flow_id)) + result = await session.exec(stmt) + flow = result.scalar_one_or_none() + if not flow: + msg = f"Flow with id {flow_id} not found" + raise ValueError(msg) + return flow.description + + +async def get_or_create_elevenlabs_client(user_id=None, session=None): + """Get or create an ElevenLabs client with the API key.""" + return await ElevenLabsClientManager.get_client(user_id, session) + + +def pcm16_to_float_array(pcm_data): + values = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32) + return values / 32768.0 + + +async def text_chunker_with_timeout(chunks, timeout=0.3): splitters = (".", ",", "?", "!", ";", ":", "—", "-", "(", ")", "[", "]", "}", " ") buffer = "" + ait = chunks.__aiter__() while True: try: - text = sync_queue_obj.get(timeout=timeout) - except queue.Empty: + text = await asyncio.wait_for(ait.__anext__(), timeout=timeout) + except asyncio.TimeoutError: if buffer: yield buffer + " " buffer = "" continue + except StopAsyncIteration: + break if text is None: if buffer: yield buffer + " " @@ -325,6 +513,30 @@ def sync_text_chunker(sync_queue_obj: queue.Queue, timeout: float = 0.3): yield buffer + " " +async def queue_generator(queue: asyncio.Queue): + while True: + item = await queue.get() + if item is None: + break + yield item + + +def create_event_logger(session_id: str): + state = {"last_event_type": None, "event_count": 0} + + def log_event(event: dict, direction: str) -> None: + event_type = event["type"] + if event_type != state["last_event_type"]: + logger.debug(f"Event (session - {session_id}): {direction} {event_type}") + state["last_event_type"] = event_type + state["event_count"] = 0 + current_count = 0 if state["event_count"] is None else state["event_count"] + state["event_count"] = current_count + 1 + + return log_event + + +# --- WebSocket Endpoints for Flow-as-Tool --- @router.websocket("/ws/flow_as_tool/{flow_id}") async def flow_as_tool_websocket_no_session( client_websocket: WebSocket, @@ -354,45 +566,9 @@ async def flow_as_tool_websocket( try: await client_websocket.accept() voice_config = get_voice_config(session_id) - token = client_websocket.cookies.get("access_token_lf") - current_user = None - if token: - current_user = await get_current_user_by_jwt(token, session) - - if current_user is None: - current_user = await api_key_security(Security(api_key_query), Security(api_key_header)) - if current_user is None: - await client_websocket.send_json( - { - "type": "error", - "code": "langflow_auth", - "message": "You must pass a valid Langflow token or cookie.", - } - ) - return - - variable_service = get_variable_service() - try: - openai_key_value = await variable_service.get_variable( - user_id=current_user.id, name="OPENAI_API_KEY", field="openai_api_key", session=session - ) - openai_key = openai_key_value if openai_key_value is not None else os.getenv("OPENAI_API_KEY", "") - if not openai_key or openai_key == "dummy": - await client_websocket.send_json( - { - "type": "error", - "code": "api_key_missing", - "key_name": "OPENAI_API_KEY", - "message": "OpenAI API key not found. Please set your API key as an env var or a " - "global variable.", - } - ) - return - except Exception as e: # noqa: BLE001 - logger.error(f"Error with API key: {e}") - logger.error(traceback.format_exc()) + current_user, openai_key = await authenticate_and_get_openai_key(client_websocket, session) + if current_user is None or openai_key is None: return - try: flow_description = await get_flow_desc_from_db(flow_id) flow_tool = { @@ -782,6 +958,181 @@ async def flow_as_tool_websocket( vad_task.cancel() +@router.websocket("/ws/flow_tts/{flow_id}") +async def flow_tts_websocket_no_session( + client_websocket: WebSocket, + flow_id: str, + background_tasks: BackgroundTasks, + session: DbSession, +): + session_id = str(uuid4()) + await flow_tts_websocket( + client_websocket=client_websocket, + flow_id=flow_id, + background_tasks=background_tasks, + session=session, + session_id=session_id, + ) + + +@router.websocket("/ws/flow_tts/{flow_id}/{session_id}") +async def flow_tts_websocket( + client_websocket: WebSocket, + flow_id: str, + background_tasks: BackgroundTasks, + session: DbSession, + session_id: str, +): + """WebSocket endpoint for direct flow text-to-speech interaction.""" + try: + await client_websocket.accept() + log_event = create_event_logger(session_id) + current_user, openai_key = await authenticate_and_get_openai_key(client_websocket, session) + if current_user is None or openai_key is None: + return + + url = "wss://api.openai.com/v1/realtime?intent=transcription" + headers = { + "Authorization": f"Bearer {openai_key}", + "OpenAI-Beta": "realtime=v1", + } + + tts_config = get_tts_config(session_id, openai_key) + async with websockets.connect(url, extra_headers=headers) as openai_ws: + tts_realtime_session = tts_config.get_session_dict() + await openai_ws.send(json.dumps(tts_realtime_session)) + + async def forward_to_openai() -> None: + try: + while True: + message_text = await client_websocket.receive_text() + event = json.loads(message_text) + log_event(event, "Client → OpenAI") + if event.get("type") == "input_audio_buffer.append": + base64_data = event.get("audio", "") + if not base64_data: + continue + out_event = {"type": "input_audio_buffer.append", "audio": base64_data} + await openai_ws.send(json.dumps(out_event)) + elif event.get("type") == "input_audio_buffer.commit": + await openai_ws.send(message_text) + elif event.get("type") == "langflow.elevenlabs.config": + logger.info(f"langflow.elevenlabs.config {event}") + tts_config.use_elevenlabs = event["enabled"] + tts_config.elevenlabs_voice = event.get("voice_id", tts_config.elevenlabs_voice) + elif event.get("type") == "voice.settings": + # Store the voice setting + if event.get("voice"): + tts_config.openai_voice = event.get("voice") + logger.info(f"Updated OpenAI voice to: {tts_config.openai_voice}") + except Exception as e: # noqa: BLE001 + logger.error(f"Error in WebSocket communication: {e}") + + async def forward_to_client() -> None: + try: + while True: + data = await openai_ws.recv() + event = json.loads(data) + log_event(event, "OpenAI → Client") + await client_websocket.send_text(data) + if event.get("type") == "conversation.item.input_audio_transcription.completed": + transcript = event.get("transcript") + if transcript is not None and transcript != "": + input_request = InputValueRequest( + input_value=transcript, components=[], type="chat", session=session_id + ) + response = await build_flow_and_stream( + flow_id=UUID(flow_id), + inputs=input_request, + background_tasks=background_tasks, + current_user=current_user, + ) + result = "" + async for line in response.body_iterator: + if not line: + continue + event_data = json.loads(line) + await client_websocket.send_json( + {"type": "flow.build.progress", "data": event_data} + ) + if event_data.get("event") == "end_vertex": + text = ( + event_data.get("data", {}) + .get("build_data", "") + .get("data", {}) + .get("results", {}) + .get("message", {}) + .get("text", "") + ) + if text: + result = text + if result != "": + if tts_config.use_elevenlabs: + elevenlabs_client = await get_or_create_elevenlabs_client( + current_user.id, session + ) + if elevenlabs_client is None: + return + audio_stream = elevenlabs_client.generate( + voice=tts_config.elevenlabs_voice, + output_format="pcm_24000", + text=result, + model=tts_config.elevenlabs_model, + voice_settings=None, + stream=True, + ) + for chunk in audio_stream: + base64_audio = base64.b64encode(chunk).decode("utf-8") + audio_event = {"type": "response.audio.delta", "delta": base64_audio} + await client_websocket.send_json(audio_event) + else: + oai_client = tts_config.get_openai_client() + voice = tts_config.get_openai_voice() + response = oai_client.audio.speech.create( + model="gpt-4o-mini-tts", + voice=voice, + input=result, # Use result instead of undefined input variable + instructions="be cheerful", + response_format="pcm", + ) + + base64_audio = base64.b64encode(response.content).decode("utf-8") + audio_event = {"type": "response.audio.delta", "delta": base64_audio} + await client_websocket.send_json(audio_event) + except Exception as e: # noqa: BLE001 + logger.error(f"Error in WebSocket communication: {e}") + + forward_to_openai_task = asyncio.create_task(forward_to_openai()) + forward_to_client_task = asyncio.create_task(forward_to_client()) + try: + await asyncio.gather( + forward_to_openai_task, + forward_to_client_task, + ) + except Exception as e: # noqa: BLE001 + logger.error(f"Error in WebSocket communication: {e}") + logger.error(traceback.format_exc()) + finally: + forward_to_openai_task.cancel() + except Exception as e: # noqa: BLE001 + logger.error(f"WebSocket error: {e}") + logger.error(traceback.format_exc()) + await client_websocket.close() + + +def extract_transcript(json_data): + try: + content_list = json_data.get("item", {}).get("content", []) + for content_item in content_list: + if content_item.get("type") == "audio": + return content_item.get("transcript", "") + except (KeyError, TypeError, AttributeError) as e: + logger.debug(f"Error extracting transcript: {e}") + return "" + else: + return "" + + @router.get("/elevenlabs/voice_ids") async def get_elevenlabs_voice_ids( current_user: CurrentActiveUser, @@ -816,137 +1167,3 @@ async def get_elevenlabs_voice_ids( logger.error(f"Error fetching ElevenLabs voices: {e}") logger.error(traceback.format_exc()) return {"error": str(e)} - - -# Replace ElevenLabsClient class with a better implementation -class ElevenLabsClientManager: - _instance = None - _api_key = None - - @classmethod - async def get_client(cls, user_id=None, session=None): - """Get or create an ElevenLabs client with the API key.""" - if cls._instance is None: - if cls._api_key is None and user_id and session: - variable_service = get_variable_service() - try: - cls._api_key = await variable_service.get_variable( - user_id=user_id, - name="ELEVENLABS_API_KEY", - field="elevenlabs_api_key", - session=session, - ) - except (InvalidToken, ValueError) as e: - logger.error(f"Error with ElevenLabs API key: {e}") - cls._api_key = os.getenv("ELEVENLABS_API_KEY", "") - if not cls._api_key: - logger.error("ElevenLabs API key not found") - return None - except (KeyError, AttributeError, sqlalchemy.exc.SQLAlchemyError) as e: - logger.error(f"Exception getting ElevenLabs API key: {e}") - return None - - if cls._api_key: - cls._instance = ElevenLabs(api_key=cls._api_key) - - return cls._instance - - -# Update the get_or_create_elevenlabs_client function to use the new manager -async def get_or_create_elevenlabs_client(user_id=None, session=None): - """Get or create an ElevenLabs client with the API key.""" - return await ElevenLabsClientManager.get_client(user_id, session) - - -# Global dictionary to track the last sender for each session (identified by queue_key) -last_sender_by_session: defaultdict[str, str | None] = defaultdict(lambda: None) - - -async def wait_for_sender_change(queue_key, current_sender, timeout=5): - """Wait until the last sender for this session is not the same as current_sender. - - or until the timeout expires. - """ - waited = 0 - interval = 0.05 - while last_sender_by_session[queue_key] == current_sender and waited < timeout: - await asyncio.sleep(interval) - waited += interval - - -async def add_message_to_db(message, session, flow_id, session_id, sender, sender_name): - """Enforce alternating sequence by checking the last sender. - - If two consecutive messages come from the same party (e.g. AI/AI), wait briefly. - """ - queue_key = f"{flow_id}:{session_id}" - - # If the incoming sender is the same as the last recorded sender, - # wait for a change (with a timeout as a fallback). - if last_sender_by_session[queue_key] == sender: - await wait_for_sender_change(queue_key, sender, timeout=5) - last_sender_by_session[queue_key] = sender - - # Now proceed to create the message - message_obj = MessageTable( - text=message, - sender=sender, - sender_name=sender_name, - session_id=session_id, - files=[], - flow_id=uuid.UUID(flow_id) if isinstance(flow_id, str) else flow_id, - properties=Properties().model_dump(), - content_blocks=[], - category="audio", - ) - - await message_queues[queue_key].put(message_obj) - # Update last sender for this session - - if queue_key not in message_tasks or message_tasks[queue_key].done(): - message_tasks[queue_key] = asyncio.create_task(process_message_queue(queue_key, session)) - - -async def process_message_queue(queue_key, session): - """Process messages from the queue one by one.""" - try: - while True: - message = await message_queues[queue_key].get() - - try: - await aadd_messagetables([message], session) - logger.debug(f"Added message to DB: {message.text[:30]}...") - except ValueError as e: - logger.error(f"Error saving message to database (ValueError): {e}") - logger.error(traceback.format_exc()) - except sqlalchemy.exc.SQLAlchemyError as e: - logger.error(f"Error saving message to database (SQLAlchemyError): {e}") - logger.error(traceback.format_exc()) - except (KeyError, AttributeError, TypeError) as e: - # More specific exceptions instead of blind Exception - logger.error(f"Error saving message to database: {e}") - logger.error(traceback.format_exc()) - finally: - message_queues[queue_key].task_done() - - if message_queues[queue_key].empty(): - break - except Exception as e: # noqa: BLE001 - logger.debug(f"Message queue processor for {queue_key} was cancelled: {e}") - logger.error(traceback.format_exc()) - - -def extract_transcript(json_data): - try: - content_list = json_data.get("item", {}).get("content", []) - - for content_item in content_list: - if content_item.get("type") == "audio": - return content_item.get("transcript", "") - # Move this to the else block - except (KeyError, TypeError, AttributeError) as e: - logger.debug(f"Error extracting transcript: {e}") - return "" - else: - # This is now properly in the else block - return "" diff --git a/src/frontend/src/controllers/API/queries/messages/use-get-messages-polling.ts b/src/frontend/src/controllers/API/queries/messages/use-get-messages-polling.ts index 8a37770df..49984771b 100644 --- a/src/frontend/src/controllers/API/queries/messages/use-get-messages-polling.ts +++ b/src/frontend/src/controllers/API/queries/messages/use-get-messages-polling.ts @@ -14,6 +14,7 @@ interface MessagesQueryParams { params?: object; onSuccess?: (data: MessagesResponse) => void; stopPollingOn?: (data: MessagesResponse) => boolean; + session_id?: string; } interface MessagesResponse { @@ -109,6 +110,7 @@ export const useGetMessagesPollingMutation = ( payload: MessagesQueryParams, ): Promise => { const requestId = payload.id || "default"; + const sessionId = payload.session_id; if (requestInProgressRef.current[requestId]) { return Promise.reject("Request already in progress"); @@ -118,9 +120,11 @@ export const useGetMessagesPollingMutation = ( requestInProgressRef.current[requestId] = true; const { id, mode, excludedFields, params } = payload; const config = {}; + if (id) { config["params"] = { flow_id: id }; } + if (params) { config["params"] = { ...config["params"], ...params }; } diff --git a/src/frontend/src/modals/IOModal/components/IOFieldView/components/session-selector.tsx b/src/frontend/src/modals/IOModal/components/IOFieldView/components/session-selector.tsx index db8450225..cb1fb7c7b 100644 --- a/src/frontend/src/modals/IOModal/components/IOFieldView/components/session-selector.tsx +++ b/src/frontend/src/modals/IOModal/components/IOFieldView/components/session-selector.tsx @@ -11,6 +11,7 @@ import { useUpdateSessionName } from "@/controllers/API/queries/messages/use-ren import useFlowsManagerStore from "@/stores/flowsManagerStore"; import useFlowStore from "@/stores/flowStore"; import { useUtilityStore } from "@/stores/utilityStore"; +import { useVoiceStore } from "@/stores/voiceStore"; import { cn } from "@/utils/utils"; import React, { useEffect, useRef, useState } from "react"; import { v5 as uuidv5 } from "uuid"; @@ -117,10 +118,15 @@ export default function SessionSelector({ } }; + const setNewSessionCloseVoiceAssistant = useVoiceStore( + (state) => state.setNewSessionCloseVoiceAssistant, + ); + return (
{ + setNewSessionCloseVoiceAssistant(true); if (isEditing) e.stopPropagation(); else toggleVisibility(); }} diff --git a/src/frontend/src/modals/IOModal/components/chat-view-wrapper.tsx b/src/frontend/src/modals/IOModal/components/chat-view-wrapper.tsx index ef3335ef8..8b68e6c26 100644 --- a/src/frontend/src/modals/IOModal/components/chat-view-wrapper.tsx +++ b/src/frontend/src/modals/IOModal/components/chat-view-wrapper.tsx @@ -91,30 +91,23 @@ export const ChatViewWrapper = ({ {!playgroundPage && }
-
- {messagesFetched && ( - { - setOpen(false); - } - } - playgroundPage={playgroundPage} - /> - )} -
+ + {messagesFetched && ( + { + setOpen(false); + } + } + playgroundPage={playgroundPage} + sidebarOpen={sidebarOpen} + /> + )} ); }; diff --git a/src/frontend/src/modals/IOModal/components/chatView/chatInput/chat-input.tsx b/src/frontend/src/modals/IOModal/components/chatView/chatInput/chat-input.tsx index 7393b9554..9c4f25b8b 100644 --- a/src/frontend/src/modals/IOModal/components/chatView/chatInput/chat-input.tsx +++ b/src/frontend/src/modals/IOModal/components/chatView/chatInput/chat-input.tsx @@ -3,6 +3,7 @@ import useFileSizeValidator from "@/shared/hooks/use-file-size-validator"; import useAlertStore from "@/stores/alertStore"; import useFlowStore from "@/stores/flowStore"; import { useUtilityStore } from "@/stores/utilityStore"; +import { useVoiceStore } from "@/stores/voiceStore"; import { AnimatePresence, motion } from "framer-motion"; import { useEffect, useRef, useState } from "react"; import ShortUniqueId from "short-unique-id"; @@ -41,6 +42,20 @@ export default function ChatInput({ const [showAudioInput, setShowAudioInput] = useState(false); + const setIsVoiceAssistantActive = useVoiceStore( + (state) => state.setIsVoiceAssistantActive, + ); + + const newSessionCloseVoiceAssistant = useVoiceStore( + (state) => state.newSessionCloseVoiceAssistant, + ); + + useEffect(() => { + if (showAudioInput) { + setIsVoiceAssistantActive(true); + } + }, [showAudioInput]); + useFocusOnUnlock(isBuilding, inputRef); useAutoResizeTextArea(chatValue, inputRef); @@ -184,7 +199,7 @@ export default function ChatInput({ return ( - {showAudioInput ? ( + {showAudioInput && !newSessionCloseVoiceAssistant ? ( void; isEditingOpenAIKey: boolean; setIsEditingOpenAIKey: (isEditingOpenAIKey: boolean) => void; + isPlayingRef: React.MutableRefObject; } const SettingsVoiceModal = ({ @@ -50,6 +51,7 @@ const SettingsVoiceModal = ({ handleClickSaveOpenAIApiKey, isEditingOpenAIKey, setIsEditingOpenAIKey, + isPlayingRef, }: SettingsVoiceModalProps) => { const popupRef = useRef(null); const [voice, setVoice] = useState("alloy"); @@ -147,6 +149,7 @@ const SettingsVoiceModal = ({ }; const onOpenChangeDropdownMenu = (open: boolean) => { + isPlayingRef.current = false; setOpen(open); setShowSettingsModal(open, openaiApiKey, elevenLabsApiKey); }; diff --git a/src/frontend/src/modals/IOModal/components/chatView/chatInput/components/voice-assistant/components/voice-button.tsx b/src/frontend/src/modals/IOModal/components/chatView/chatInput/components/voice-assistant/components/voice-button.tsx index 34f9e45e7..be7383b4c 100644 --- a/src/frontend/src/modals/IOModal/components/chatView/chatInput/components/voice-assistant/components/voice-button.tsx +++ b/src/frontend/src/modals/IOModal/components/chatView/chatInput/components/voice-assistant/components/voice-button.tsx @@ -1,17 +1,25 @@ import ForwardedIconComponent from "@/components/common/genericIconComponent"; import { Button } from "@/components/ui/button"; import { ICON_STROKE_WIDTH } from "@/constants/constants"; +import { useVoiceStore } from "@/stores/voiceStore"; interface VoiceButtonProps { toggleRecording: () => void; } const VoiceButton = ({ toggleRecording }: VoiceButtonProps) => { + const setNewSessionCloseVoiceAssistant = useVoiceStore( + (state) => state.setNewSessionCloseVoiceAssistant, + ); + return ( <>