feat: voice mode tts endpoint (#7294)

* 📝 (chat-view-wrapper.tsx): Refactor ChatViewWrapper component to improve code readability and maintainability
📝 (chat-input.tsx): Add functionality to set voice assistant active state when showAudioInput is true
📝 (voice-assistant.tsx): Add functionality to set voice assistant active state and scroll to bottom when closing audio input
📝 (chat-view.tsx): Update ChatView component to consider sidebarOpen and isVoiceAssistantActive states
📝 (voiceStore.ts): Add isVoiceAssistantActive state and setIsVoiceAssistantActive function to voice store
📝 (index.ts, voice.types.ts): Update types to include sidebarOpen prop in chatViewProps and isVoiceAssistantActive state in VoiceStoreType

* url change

* 🔧 (chat-input.tsx): Add new session close voice assistant functionality to chat input component
🔧 (voice-button.tsx): Update voice button to set new session close voice assistant state
🔧 (sidebar-open-view.tsx): Update sidebar open view to set new session close voice assistant state
🔧 (voiceStore.ts, voice.types.ts): Add new session close voice assistant state and setter to voice store and types

* ♻️ (chat-input.tsx): remove unused setNewSessionCloseVoiceAssistant function to clean up code and improve readability

* url

* merge

* 📝 (chat-view-wrapper.tsx): Refactor ChatViewWrapper component to improve code readability and maintainability
📝 (chat-input.tsx): Add functionality to set voice assistant active state when showAudioInput is true
📝 (voice-assistant.tsx): Add functionality to set voice assistant active state and scroll to bottom when closing audio input
📝 (chat-view.tsx): Update ChatView component to consider sidebarOpen and isVoiceAssistantActive states
📝 (voiceStore.ts): Add isVoiceAssistantActive state and setIsVoiceAssistantActive function to voice store
📝 (index.ts, voice.types.ts): Update types to include sidebarOpen prop in chatViewProps and isVoiceAssistantActive state in VoiceStoreType

* new endpoint

* 🐛 (voice_mode.py): remove unnecessary comment and update input parameter in speech creation function
🐛 (use-start-conversation.ts): update WebSocket URL to use flow_tts endpoint and add support for audio language and input audio transcription model in WebSocket session update configuration

* 📝 (voice_mode.py): add "voice" attribute with value "coral" to TTSConfig class for voice mode customization
♻️ (use-start-conversation.ts): refactor code to use "transcription_session.update" type and update session attributes based on audioSettings and audioLanguage variables

*  (voice_mode.py): introduce a new 'voice' attribute with the value "coral" to specify the voice used for text-to-speech conversion in TTSConfig class

* ♻️ (voice_mode.py): remove unused 'voice' variable assignment to improve code cleanliness and readability

*  (voice_mode.py): update voice parameter value to "coral" for better voice quality and clarity in TTS websocket flow.

*  (voice_mode.py): add support for configuring OpenAI voice setting in TTSConfig class
🐛 (voice_mode.py): fix updating OpenAI voice setting in flow_tts_websocket function
📝 (use-start-conversation.ts): update voice settings format for flow_tts endpoint to include voice and provider information

* 🐛 (use-start-conversation.ts): fix issue with immediate startRecording function call by adding a 300ms delay to ensure proper initialization

* refactor

* refactor

* 🔧 (audio-settings-dialog.tsx): add isPlayingRef prop to SettingsVoiceModal component for managing audio playback state
🔧 (voice-assistant.tsx): pass isPlayingRef prop to VoiceAssistant component for controlling audio playback state

* [autofix.ci] apply automated fixes

* fixed tts and 11l

* lint fix

---------

Co-authored-by: cristhianzl <cristhian.lousa@gmail.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Sebastián Estévez 2025-03-30 10:50:51 -04:00 committed by GitHub
commit d73624e862
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 614 additions and 323 deletions

View file

@ -122,6 +122,7 @@ dependencies = [
"ibm-watsonx-ai>=1.3.1",
"langchain-ibm>=0.3.8",
"opik>=1.6.3",
"openai>=1.68.2",
]
[dependency-groups]

View file

@ -2,8 +2,6 @@ import asyncio
import base64
import json
import os
# For sync queue and thread
import queue
import threading
import traceback
@ -15,12 +13,13 @@ from uuid import UUID, uuid4
import numpy as np
import requests
import sqlalchemy.exc
import sqlalchemy
import webrtcvad
import websockets
from cryptography.fernet import InvalidToken
from elevenlabs.client import ElevenLabs
from elevenlabs import ElevenLabs
from fastapi import APIRouter, BackgroundTasks, Security
from openai import OpenAI
from sqlalchemy import select
from starlette.websockets import WebSocket, WebSocketDisconnect
@ -31,8 +30,8 @@ from langflow.logging import logger
from langflow.memory import aadd_messagetables
from langflow.schema.properties import Properties
from langflow.services.auth.utils import api_key_header, api_key_query, api_key_security, get_current_user_by_jwt
from langflow.services.database.models import MessageTable
from langflow.services.database.models.flow.model import Flow
from langflow.services.database.models.message.model import MessageTable
from langflow.services.deps import get_variable_service, session_scope
from langflow.utils.voice_utils import (
BYTES_PER_24K_FRAME,
@ -44,7 +43,7 @@ router = APIRouter(prefix="/voice", tags=["Voice"])
SILENCE_THRESHOLD = 0.1
PREFIX_PADDING_MS = 100
SILENCE_DURATION_MS = 100
SILENCE_DURATION_MS = 300
AUDIO_SAMPLE_THRESHOLD = 100
SESSION_INSTRUCTIONS = """
Your instructions will be divided into three mutually exclusive sections: "Permanent", "Default", and "Additional".
@ -64,7 +63,7 @@ Your instructions will be divided into three mutually exclusive sections: "Perma
* Converse with the user to assist with their question.
* Never provide URLs in repsonses, but you may use URLs in tool calls or when processing those
URLs' content.
* Always (and I mean *always*) let the user know before you call a function that you will be
* Always (and I mean *always*) let the user know before you will be
doing so.
* Always update the user with the required information, when the function returns.
* Unless otherwise requested, only summarize the return results. Do not repeat everything.
@ -76,99 +75,66 @@ Your instructions will be divided into three mutually exclusive sections: "Perma
[ADDITIONAL] The following instructions are to be considered only "Additional"
"""
class VoiceConfig:
def __init__(self, session_id: str):
self.session_id = session_id
self.use_elevenlabs = False
self.elevenlabs_voice = "JBFqnCBsd6RMkjVDRZzb"
self.elevenlabs_model = "eleven_multilingual_v2"
self.elevenlabs_client = None
self.elevenlabs_key = None
self.barge_in_enabled = False
self.default_openai_realtime_session = {
"modalities": ["text", "audio"],
"instructions": SESSION_INSTRUCTIONS,
"voice": "echo",
"temperature": 0.8,
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"turn_detection": {
"type": "server_vad",
"threshold": SILENCE_THRESHOLD,
"prefix_padding_ms": PREFIX_PADDING_MS,
"silence_duration_ms": SILENCE_DURATION_MS,
},
"input_audio_transcription": {"model": "whisper-1"},
"tools": [],
"tool_choice": "auto",
}
self.openai_realtime_session: dict[str, Any] = {}
def get_session_dict(self):
"""Return a copy of the default session dictionary with current settings."""
return dict(self.default_openai_realtime_session)
# --- Helper Functions ---
# Create a cache for voice configs
voice_config_cache: dict[str, VoiceConfig] = {}
async def authenticate_and_get_openai_key(client_websocket: WebSocket, session: DbSession):
"""Authenticate the user using a token or API key and retrieve the OpenAI API key.
def get_voice_config(session_id: str) -> VoiceConfig:
"""Get or create a VoiceConfig instance for the given session_id."""
if session_id is None:
msg = "session_id cannot be None"
raise ValueError(msg)
if session_id not in voice_config_cache:
voice_config_cache[session_id] = VoiceConfig(session_id)
return voice_config_cache[session_id]
# Create a global dictionary to store queues for each session
message_queues: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
# Track active message processing tasks
message_tasks: dict[str, asyncio.Task] = {}
async def get_flow_desc_from_db(flow_id: str) -> Flow:
"""Get flow from database."""
async with session_scope() as session:
stmt = select(Flow).where(Flow.id == UUID(flow_id))
result = await session.exec(stmt)
flow = result.scalar_one_or_none()
if not flow:
error_message = f"Flow with id {flow_id} not found"
raise ValueError(error_message)
return flow.description
def pcm16_to_float_array(pcm_data):
values = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32)
return values / 32768.0 # Normalize to -1.0 to 1.0
async def text_chunker_with_timeout(chunks, timeout=0.3):
"""Async generator that takes an async iterable (of text pieces),.
accumulates them and yields chunks without breaking sentences.
If no new text is received within 'timeout' seconds and there is
buffered text, it flushes that text.
Returns a tuple: (current_user, openai_key). If authentication fails, sends an error
message to the client and returns (None, None).
"""
token = client_websocket.cookies.get("access_token_lf")
current_user = None
if token:
current_user = await get_current_user_by_jwt(token, session)
if current_user is None:
current_user = await api_key_security(Security(api_key_query), Security(api_key_header))
if current_user is None:
await client_websocket.send_json(
{
"type": "error",
"code": "langflow_auth",
"message": "You must pass a valid Langflow token or cookie.",
}
)
return None, None
variable_service = get_variable_service()
try:
openai_key_value = await variable_service.get_variable(
user_id=current_user.id, name="OPENAI_API_KEY", field="openai_api_key", session=session
)
openai_key = openai_key_value if openai_key_value is not None else os.getenv("OPENAI_API_KEY", "")
if not openai_key or openai_key == "dummy":
await client_websocket.send_json(
{
"type": "error",
"code": "api_key_missing",
"key_name": "OPENAI_API_KEY",
"message": "OpenAI API key not found. Please set your API key as an env var or a global variable.",
}
)
return None, None
except Exception as e: # noqa: BLE001
logger.error(f"Error with API key: {e}")
logger.error(traceback.format_exc())
return None, None
return current_user, openai_key
# --- Synchronous Text Chunker ---
def sync_text_chunker(sync_queue_obj: queue.Queue, timeout: float = 0.3):
"""Synchronous generator that reads text pieces from a sync queue and yields complete chunks."""
splitters = (".", ",", "?", "!", ";", ":", "", "-", "(", ")", "[", "]", "}", " ")
buffer = ""
ait = chunks.__aiter__()
while True:
try:
text = await asyncio.wait_for(ait.__anext__(), timeout=timeout)
except asyncio.TimeoutError:
text = sync_queue_obj.get(timeout=timeout)
except queue.Empty:
if buffer:
yield buffer + " "
buffer = ""
continue
except StopAsyncIteration:
break
if text is None:
if buffer:
yield buffer + " "
@ -185,15 +151,6 @@ async def text_chunker_with_timeout(chunks, timeout=0.3):
yield buffer + " "
async def queue_generator(queue: asyncio.Queue):
"""Async generator that yields items from a queue."""
while True:
item = await queue.get()
if item is None:
break
yield item
async def handle_function_call(
websocket: WebSocket,
openai_ws: websockets.WebSocketClientProtocol,
@ -216,7 +173,6 @@ async def handle_function_call(
background_tasks=background_tasks,
current_user=current_user,
)
result = ""
async for line in response.body_iterator:
if not line:
@ -293,22 +249,254 @@ async def handle_function_call(
await openai_ws.send(json.dumps(function_output))
# --- Synchronous text chunker using a standard queue ---
def sync_text_chunker(sync_queue_obj: queue.Queue, timeout: float = 0.3):
"""Synchronous generator that reads text pieces from a sync queue.
# --- Config Classes and Caches ---
accumulates them and yields complete chunks.
class VoiceConfig:
def __init__(self, session_id: str):
self.session_id = session_id
self.use_elevenlabs = False
self.elevenlabs_voice = "JBFqnCBsd6RMkjVDRZzb"
self.elevenlabs_model = "eleven_multilingual_v2"
self.elevenlabs_client = None
self.elevenlabs_key = None
self.barge_in_enabled = False
self.default_openai_realtime_session = {
"modalities": ["text", "audio"],
"instructions": SESSION_INSTRUCTIONS,
"voice": "echo",
"temperature": 0.8,
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"turn_detection": {
"type": "server_vad",
"threshold": SILENCE_THRESHOLD,
"prefix_padding_ms": PREFIX_PADDING_MS,
"silence_duration_ms": SILENCE_DURATION_MS,
},
"input_audio_transcription": {"model": "whisper-1"},
"tools": [],
"tool_choice": "auto",
}
self.openai_realtime_session: dict[str, Any] = {}
def get_session_dict(self):
return dict(self.default_openai_realtime_session)
class ElevenLabsClientManager:
_instance = None
_api_key = None
@classmethod
async def get_client(cls, user_id=None, session=None):
"""Get or create an ElevenLabs client with the API key."""
if cls._instance is None:
if cls._api_key is None and user_id and session:
variable_service = get_variable_service()
try:
cls._api_key = await variable_service.get_variable(
user_id=user_id,
name="ELEVENLABS_API_KEY",
field="elevenlabs_api_key",
session=session,
)
except (InvalidToken, ValueError) as e:
logger.error(f"Error with ElevenLabs API key: {e}")
cls._api_key = os.getenv("ELEVENLABS_API_KEY", "")
if not cls._api_key:
logger.error("ElevenLabs API key not found")
return None
except (KeyError, AttributeError, sqlalchemy.exc.SQLAlchemyError) as e:
logger.error(f"Exception getting ElevenLabs API key: {e}")
return None
if cls._api_key:
cls._instance = ElevenLabs(api_key=cls._api_key)
return cls._instance
voice_config_cache: dict[str, VoiceConfig] = {}
def get_voice_config(session_id: str) -> VoiceConfig:
if session_id is None:
msg = "session_id cannot be None"
raise ValueError(msg)
if session_id not in voice_config_cache:
voice_config_cache[session_id] = VoiceConfig(session_id)
return voice_config_cache[session_id]
class TTSConfig:
def __init__(self, session_id: str, openai_key: str):
self.session_id = session_id
self.use_elevenlabs = False
self.elevenlabs_voice = "JBFqnCBsd6RMkjVDRZzb"
self.elevenlabs_model = "eleven_multilingual_v2"
self.elevenlabs_client = None
self.default_tts_session = {
"type": "transcription_session.update",
"session": {
"input_audio_format": "pcm16",
"input_audio_transcription": {
"model": "gpt-4o-transcribe",
"language": "en",
},
"turn_detection": {
"type": "server_vad",
"threshold": SILENCE_THRESHOLD,
"prefix_padding_ms": PREFIX_PADDING_MS,
"silence_duration_ms": SILENCE_DURATION_MS,
},
"input_audio_noise_reduction": {"type": "near_field"},
"include": [],
},
}
self.tts_session: dict[str, Any] = {}
self.oai_client = OpenAI(api_key=openai_key)
self.openai_voice = "echo"
def get_session_dict(self):
return dict(self.default_tts_session)
def get_openai_client(self):
return self.oai_client
def get_openai_voice(self):
return self.openai_voice
tts_config_cache: dict[str, TTSConfig] = {}
def get_tts_config(session_id: str, openai_key: str) -> TTSConfig:
if session_id is None:
msg = "session_id cannot be None"
raise ValueError(msg)
if session_id not in tts_config_cache:
tts_config_cache[session_id] = TTSConfig(session_id, openai_key)
return tts_config_cache[session_id]
async def add_message_to_db(message, session, flow_id, session_id, sender, sender_name):
"""Enforce alternating sequence by checking the last sender.
If two consecutive messages come from the same party (e.g. AI/AI), wait briefly.
"""
queue_key = f"{flow_id}:{session_id}"
# If the incoming sender is the same as the last recorded sender,
# wait for a change (with a timeout as a fallback).
if last_sender_by_session[queue_key] == sender:
await wait_for_sender_change(queue_key, sender, timeout=5)
last_sender_by_session[queue_key] = sender
# Now proceed to create the message
message_obj = MessageTable(
text=message,
sender=sender,
sender_name=sender_name,
session_id=session_id,
files=[],
flow_id=uuid.UUID(flow_id) if isinstance(flow_id, str) else flow_id,
properties=Properties().model_dump(),
content_blocks=[],
category="message",
)
await message_queues[queue_key].put(message_obj)
# Update last sender for this session
if queue_key not in message_tasks or message_tasks[queue_key].done():
message_tasks[queue_key] = asyncio.create_task(process_message_queue(queue_key, session))
async def wait_for_sender_change(queue_key, current_sender, timeout=5):
"""Wait until the last sender for this session is not the same as current_sender.
or until the timeout expires.
"""
waited = 0
interval = 0.05
while last_sender_by_session[queue_key] == current_sender and waited < timeout:
await asyncio.sleep(interval)
waited += interval
async def process_message_queue(queue_key, session):
"""Process messages from the queue one by one."""
try:
while True:
message = await message_queues[queue_key].get()
try:
await aadd_messagetables([message], session)
logger.debug(f"Added message to DB: {message.text[:30]}...")
except ValueError as e:
logger.error(f"Error saving message to database (ValueError): {e}")
logger.error(traceback.format_exc())
except sqlalchemy.exc.SQLAlchemyError as e:
logger.error(f"Error saving message to database (SQLAlchemyError): {e}")
logger.error(traceback.format_exc())
except (KeyError, AttributeError, TypeError) as e:
# More specific exceptions instead of blind Exception
logger.error(f"Error saving message to database: {e}")
logger.error(traceback.format_exc())
finally:
message_queues[queue_key].task_done()
if message_queues[queue_key].empty():
break
except Exception as e: # noqa: BLE001
logger.debug(f"Message queue processor for {queue_key} was cancelled: {e}")
logger.error(traceback.format_exc())
# --- Global Queues and Message Processing ---
message_queues: dict[str, asyncio.Queue] = defaultdict(asyncio.Queue)
message_tasks: dict[str, asyncio.Task] = {}
last_sender_by_session: defaultdict[str, str | None] = defaultdict(lambda: None)
async def get_flow_desc_from_db(flow_id: str) -> Flow:
async with session_scope() as session:
stmt = select(Flow).where(Flow.id == UUID(flow_id))
result = await session.exec(stmt)
flow = result.scalar_one_or_none()
if not flow:
msg = f"Flow with id {flow_id} not found"
raise ValueError(msg)
return flow.description
async def get_or_create_elevenlabs_client(user_id=None, session=None):
"""Get or create an ElevenLabs client with the API key."""
return await ElevenLabsClientManager.get_client(user_id, session)
def pcm16_to_float_array(pcm_data):
values = np.frombuffer(pcm_data, dtype=np.int16).astype(np.float32)
return values / 32768.0
async def text_chunker_with_timeout(chunks, timeout=0.3):
splitters = (".", ",", "?", "!", ";", ":", "", "-", "(", ")", "[", "]", "}", " ")
buffer = ""
ait = chunks.__aiter__()
while True:
try:
text = sync_queue_obj.get(timeout=timeout)
except queue.Empty:
text = await asyncio.wait_for(ait.__anext__(), timeout=timeout)
except asyncio.TimeoutError:
if buffer:
yield buffer + " "
buffer = ""
continue
except StopAsyncIteration:
break
if text is None:
if buffer:
yield buffer + " "
@ -325,6 +513,30 @@ def sync_text_chunker(sync_queue_obj: queue.Queue, timeout: float = 0.3):
yield buffer + " "
async def queue_generator(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
yield item
def create_event_logger(session_id: str):
state = {"last_event_type": None, "event_count": 0}
def log_event(event: dict, direction: str) -> None:
event_type = event["type"]
if event_type != state["last_event_type"]:
logger.debug(f"Event (session - {session_id}): {direction} {event_type}")
state["last_event_type"] = event_type
state["event_count"] = 0
current_count = 0 if state["event_count"] is None else state["event_count"]
state["event_count"] = current_count + 1
return log_event
# --- WebSocket Endpoints for Flow-as-Tool ---
@router.websocket("/ws/flow_as_tool/{flow_id}")
async def flow_as_tool_websocket_no_session(
client_websocket: WebSocket,
@ -354,45 +566,9 @@ async def flow_as_tool_websocket(
try:
await client_websocket.accept()
voice_config = get_voice_config(session_id)
token = client_websocket.cookies.get("access_token_lf")
current_user = None
if token:
current_user = await get_current_user_by_jwt(token, session)
if current_user is None:
current_user = await api_key_security(Security(api_key_query), Security(api_key_header))
if current_user is None:
await client_websocket.send_json(
{
"type": "error",
"code": "langflow_auth",
"message": "You must pass a valid Langflow token or cookie.",
}
)
return
variable_service = get_variable_service()
try:
openai_key_value = await variable_service.get_variable(
user_id=current_user.id, name="OPENAI_API_KEY", field="openai_api_key", session=session
)
openai_key = openai_key_value if openai_key_value is not None else os.getenv("OPENAI_API_KEY", "")
if not openai_key or openai_key == "dummy":
await client_websocket.send_json(
{
"type": "error",
"code": "api_key_missing",
"key_name": "OPENAI_API_KEY",
"message": "OpenAI API key not found. Please set your API key as an env var or a "
"global variable.",
}
)
return
except Exception as e: # noqa: BLE001
logger.error(f"Error with API key: {e}")
logger.error(traceback.format_exc())
current_user, openai_key = await authenticate_and_get_openai_key(client_websocket, session)
if current_user is None or openai_key is None:
return
try:
flow_description = await get_flow_desc_from_db(flow_id)
flow_tool = {
@ -782,6 +958,181 @@ async def flow_as_tool_websocket(
vad_task.cancel()
@router.websocket("/ws/flow_tts/{flow_id}")
async def flow_tts_websocket_no_session(
client_websocket: WebSocket,
flow_id: str,
background_tasks: BackgroundTasks,
session: DbSession,
):
session_id = str(uuid4())
await flow_tts_websocket(
client_websocket=client_websocket,
flow_id=flow_id,
background_tasks=background_tasks,
session=session,
session_id=session_id,
)
@router.websocket("/ws/flow_tts/{flow_id}/{session_id}")
async def flow_tts_websocket(
client_websocket: WebSocket,
flow_id: str,
background_tasks: BackgroundTasks,
session: DbSession,
session_id: str,
):
"""WebSocket endpoint for direct flow text-to-speech interaction."""
try:
await client_websocket.accept()
log_event = create_event_logger(session_id)
current_user, openai_key = await authenticate_and_get_openai_key(client_websocket, session)
if current_user is None or openai_key is None:
return
url = "wss://api.openai.com/v1/realtime?intent=transcription"
headers = {
"Authorization": f"Bearer {openai_key}",
"OpenAI-Beta": "realtime=v1",
}
tts_config = get_tts_config(session_id, openai_key)
async with websockets.connect(url, extra_headers=headers) as openai_ws:
tts_realtime_session = tts_config.get_session_dict()
await openai_ws.send(json.dumps(tts_realtime_session))
async def forward_to_openai() -> None:
try:
while True:
message_text = await client_websocket.receive_text()
event = json.loads(message_text)
log_event(event, "Client → OpenAI")
if event.get("type") == "input_audio_buffer.append":
base64_data = event.get("audio", "")
if not base64_data:
continue
out_event = {"type": "input_audio_buffer.append", "audio": base64_data}
await openai_ws.send(json.dumps(out_event))
elif event.get("type") == "input_audio_buffer.commit":
await openai_ws.send(message_text)
elif event.get("type") == "langflow.elevenlabs.config":
logger.info(f"langflow.elevenlabs.config {event}")
tts_config.use_elevenlabs = event["enabled"]
tts_config.elevenlabs_voice = event.get("voice_id", tts_config.elevenlabs_voice)
elif event.get("type") == "voice.settings":
# Store the voice setting
if event.get("voice"):
tts_config.openai_voice = event.get("voice")
logger.info(f"Updated OpenAI voice to: {tts_config.openai_voice}")
except Exception as e: # noqa: BLE001
logger.error(f"Error in WebSocket communication: {e}")
async def forward_to_client() -> None:
try:
while True:
data = await openai_ws.recv()
event = json.loads(data)
log_event(event, "OpenAI → Client")
await client_websocket.send_text(data)
if event.get("type") == "conversation.item.input_audio_transcription.completed":
transcript = event.get("transcript")
if transcript is not None and transcript != "":
input_request = InputValueRequest(
input_value=transcript, components=[], type="chat", session=session_id
)
response = await build_flow_and_stream(
flow_id=UUID(flow_id),
inputs=input_request,
background_tasks=background_tasks,
current_user=current_user,
)
result = ""
async for line in response.body_iterator:
if not line:
continue
event_data = json.loads(line)
await client_websocket.send_json(
{"type": "flow.build.progress", "data": event_data}
)
if event_data.get("event") == "end_vertex":
text = (
event_data.get("data", {})
.get("build_data", "")
.get("data", {})
.get("results", {})
.get("message", {})
.get("text", "")
)
if text:
result = text
if result != "":
if tts_config.use_elevenlabs:
elevenlabs_client = await get_or_create_elevenlabs_client(
current_user.id, session
)
if elevenlabs_client is None:
return
audio_stream = elevenlabs_client.generate(
voice=tts_config.elevenlabs_voice,
output_format="pcm_24000",
text=result,
model=tts_config.elevenlabs_model,
voice_settings=None,
stream=True,
)
for chunk in audio_stream:
base64_audio = base64.b64encode(chunk).decode("utf-8")
audio_event = {"type": "response.audio.delta", "delta": base64_audio}
await client_websocket.send_json(audio_event)
else:
oai_client = tts_config.get_openai_client()
voice = tts_config.get_openai_voice()
response = oai_client.audio.speech.create(
model="gpt-4o-mini-tts",
voice=voice,
input=result, # Use result instead of undefined input variable
instructions="be cheerful",
response_format="pcm",
)
base64_audio = base64.b64encode(response.content).decode("utf-8")
audio_event = {"type": "response.audio.delta", "delta": base64_audio}
await client_websocket.send_json(audio_event)
except Exception as e: # noqa: BLE001
logger.error(f"Error in WebSocket communication: {e}")
forward_to_openai_task = asyncio.create_task(forward_to_openai())
forward_to_client_task = asyncio.create_task(forward_to_client())
try:
await asyncio.gather(
forward_to_openai_task,
forward_to_client_task,
)
except Exception as e: # noqa: BLE001
logger.error(f"Error in WebSocket communication: {e}")
logger.error(traceback.format_exc())
finally:
forward_to_openai_task.cancel()
except Exception as e: # noqa: BLE001
logger.error(f"WebSocket error: {e}")
logger.error(traceback.format_exc())
await client_websocket.close()
def extract_transcript(json_data):
try:
content_list = json_data.get("item", {}).get("content", [])
for content_item in content_list:
if content_item.get("type") == "audio":
return content_item.get("transcript", "")
except (KeyError, TypeError, AttributeError) as e:
logger.debug(f"Error extracting transcript: {e}")
return ""
else:
return ""
@router.get("/elevenlabs/voice_ids")
async def get_elevenlabs_voice_ids(
current_user: CurrentActiveUser,
@ -816,137 +1167,3 @@ async def get_elevenlabs_voice_ids(
logger.error(f"Error fetching ElevenLabs voices: {e}")
logger.error(traceback.format_exc())
return {"error": str(e)}
# Replace ElevenLabsClient class with a better implementation
class ElevenLabsClientManager:
_instance = None
_api_key = None
@classmethod
async def get_client(cls, user_id=None, session=None):
"""Get or create an ElevenLabs client with the API key."""
if cls._instance is None:
if cls._api_key is None and user_id and session:
variable_service = get_variable_service()
try:
cls._api_key = await variable_service.get_variable(
user_id=user_id,
name="ELEVENLABS_API_KEY",
field="elevenlabs_api_key",
session=session,
)
except (InvalidToken, ValueError) as e:
logger.error(f"Error with ElevenLabs API key: {e}")
cls._api_key = os.getenv("ELEVENLABS_API_KEY", "")
if not cls._api_key:
logger.error("ElevenLabs API key not found")
return None
except (KeyError, AttributeError, sqlalchemy.exc.SQLAlchemyError) as e:
logger.error(f"Exception getting ElevenLabs API key: {e}")
return None
if cls._api_key:
cls._instance = ElevenLabs(api_key=cls._api_key)
return cls._instance
# Update the get_or_create_elevenlabs_client function to use the new manager
async def get_or_create_elevenlabs_client(user_id=None, session=None):
"""Get or create an ElevenLabs client with the API key."""
return await ElevenLabsClientManager.get_client(user_id, session)
# Global dictionary to track the last sender for each session (identified by queue_key)
last_sender_by_session: defaultdict[str, str | None] = defaultdict(lambda: None)
async def wait_for_sender_change(queue_key, current_sender, timeout=5):
"""Wait until the last sender for this session is not the same as current_sender.
or until the timeout expires.
"""
waited = 0
interval = 0.05
while last_sender_by_session[queue_key] == current_sender and waited < timeout:
await asyncio.sleep(interval)
waited += interval
async def add_message_to_db(message, session, flow_id, session_id, sender, sender_name):
"""Enforce alternating sequence by checking the last sender.
If two consecutive messages come from the same party (e.g. AI/AI), wait briefly.
"""
queue_key = f"{flow_id}:{session_id}"
# If the incoming sender is the same as the last recorded sender,
# wait for a change (with a timeout as a fallback).
if last_sender_by_session[queue_key] == sender:
await wait_for_sender_change(queue_key, sender, timeout=5)
last_sender_by_session[queue_key] = sender
# Now proceed to create the message
message_obj = MessageTable(
text=message,
sender=sender,
sender_name=sender_name,
session_id=session_id,
files=[],
flow_id=uuid.UUID(flow_id) if isinstance(flow_id, str) else flow_id,
properties=Properties().model_dump(),
content_blocks=[],
category="audio",
)
await message_queues[queue_key].put(message_obj)
# Update last sender for this session
if queue_key not in message_tasks or message_tasks[queue_key].done():
message_tasks[queue_key] = asyncio.create_task(process_message_queue(queue_key, session))
async def process_message_queue(queue_key, session):
"""Process messages from the queue one by one."""
try:
while True:
message = await message_queues[queue_key].get()
try:
await aadd_messagetables([message], session)
logger.debug(f"Added message to DB: {message.text[:30]}...")
except ValueError as e:
logger.error(f"Error saving message to database (ValueError): {e}")
logger.error(traceback.format_exc())
except sqlalchemy.exc.SQLAlchemyError as e:
logger.error(f"Error saving message to database (SQLAlchemyError): {e}")
logger.error(traceback.format_exc())
except (KeyError, AttributeError, TypeError) as e:
# More specific exceptions instead of blind Exception
logger.error(f"Error saving message to database: {e}")
logger.error(traceback.format_exc())
finally:
message_queues[queue_key].task_done()
if message_queues[queue_key].empty():
break
except Exception as e: # noqa: BLE001
logger.debug(f"Message queue processor for {queue_key} was cancelled: {e}")
logger.error(traceback.format_exc())
def extract_transcript(json_data):
try:
content_list = json_data.get("item", {}).get("content", [])
for content_item in content_list:
if content_item.get("type") == "audio":
return content_item.get("transcript", "")
# Move this to the else block
except (KeyError, TypeError, AttributeError) as e:
logger.debug(f"Error extracting transcript: {e}")
return ""
else:
# This is now properly in the else block
return ""

View file

@ -14,6 +14,7 @@ interface MessagesQueryParams {
params?: object;
onSuccess?: (data: MessagesResponse) => void;
stopPollingOn?: (data: MessagesResponse) => boolean;
session_id?: string;
}
interface MessagesResponse {
@ -109,6 +110,7 @@ export const useGetMessagesPollingMutation = (
payload: MessagesQueryParams,
): Promise<MessagesResponse> => {
const requestId = payload.id || "default";
const sessionId = payload.session_id;
if (requestInProgressRef.current[requestId]) {
return Promise.reject("Request already in progress");
@ -118,9 +120,11 @@ export const useGetMessagesPollingMutation = (
requestInProgressRef.current[requestId] = true;
const { id, mode, excludedFields, params } = payload;
const config = {};
if (id) {
config["params"] = { flow_id: id };
}
if (params) {
config["params"] = { ...config["params"], ...params };
}

View file

@ -11,6 +11,7 @@ import { useUpdateSessionName } from "@/controllers/API/queries/messages/use-ren
import useFlowsManagerStore from "@/stores/flowsManagerStore";
import useFlowStore from "@/stores/flowStore";
import { useUtilityStore } from "@/stores/utilityStore";
import { useVoiceStore } from "@/stores/voiceStore";
import { cn } from "@/utils/utils";
import React, { useEffect, useRef, useState } from "react";
import { v5 as uuidv5 } from "uuid";
@ -117,10 +118,15 @@ export default function SessionSelector({
}
};
const setNewSessionCloseVoiceAssistant = useVoiceStore(
(state) => state.setNewSessionCloseVoiceAssistant,
);
return (
<div
data-testid="session-selector"
onClick={(e) => {
setNewSessionCloseVoiceAssistant(true);
if (isEditing) e.stopPropagation();
else toggleVisibility();
}}

View file

@ -91,30 +91,23 @@ export const ChatViewWrapper = ({
{!playgroundPage && <Separator orientation="vertical" />}
</div>
</div>
<div
className={cn(
visibleSession ? "h-[95%]" : "h-full",
sidebarOpen
? "pointer-events-none blur-sm lg:pointer-events-auto lg:blur-0"
: "",
)}
>
{messagesFetched && (
<ChatView
focusChat={sessionId}
sendMessage={sendMessage}
visibleSession={visibleSession}
closeChat={
!canvasOpen
? undefined
: () => {
setOpen(false);
}
}
playgroundPage={playgroundPage}
/>
)}
</div>
{messagesFetched && (
<ChatView
focusChat={sessionId}
sendMessage={sendMessage}
visibleSession={visibleSession}
closeChat={
!canvasOpen
? undefined
: () => {
setOpen(false);
}
}
playgroundPage={playgroundPage}
sidebarOpen={sidebarOpen}
/>
)}
</div>
);
};

View file

@ -3,6 +3,7 @@ import useFileSizeValidator from "@/shared/hooks/use-file-size-validator";
import useAlertStore from "@/stores/alertStore";
import useFlowStore from "@/stores/flowStore";
import { useUtilityStore } from "@/stores/utilityStore";
import { useVoiceStore } from "@/stores/voiceStore";
import { AnimatePresence, motion } from "framer-motion";
import { useEffect, useRef, useState } from "react";
import ShortUniqueId from "short-unique-id";
@ -41,6 +42,20 @@ export default function ChatInput({
const [showAudioInput, setShowAudioInput] = useState(false);
const setIsVoiceAssistantActive = useVoiceStore(
(state) => state.setIsVoiceAssistantActive,
);
const newSessionCloseVoiceAssistant = useVoiceStore(
(state) => state.newSessionCloseVoiceAssistant,
);
useEffect(() => {
if (showAudioInput) {
setIsVoiceAssistantActive(true);
}
}, [showAudioInput]);
useFocusOnUnlock(isBuilding, inputRef);
useAutoResizeTextArea(chatValue, inputRef);
@ -184,7 +199,7 @@ export default function ChatInput({
return (
<AnimatePresence mode="wait">
{showAudioInput ? (
{showAudioInput && !newSessionCloseVoiceAssistant ? (
<motion.div
key="voice-assistant"
initial={{ opacity: 0 }}

View file

@ -37,6 +37,7 @@ interface SettingsVoiceModalProps {
handleClickSaveOpenAIApiKey: (openaiApiKey: string) => void;
isEditingOpenAIKey: boolean;
setIsEditingOpenAIKey: (isEditingOpenAIKey: boolean) => void;
isPlayingRef: React.MutableRefObject<boolean>;
}
const SettingsVoiceModal = ({
@ -50,6 +51,7 @@ const SettingsVoiceModal = ({
handleClickSaveOpenAIApiKey,
isEditingOpenAIKey,
setIsEditingOpenAIKey,
isPlayingRef,
}: SettingsVoiceModalProps) => {
const popupRef = useRef<HTMLDivElement>(null);
const [voice, setVoice] = useState<string>("alloy");
@ -147,6 +149,7 @@ const SettingsVoiceModal = ({
};
const onOpenChangeDropdownMenu = (open: boolean) => {
isPlayingRef.current = false;
setOpen(open);
setShowSettingsModal(open, openaiApiKey, elevenLabsApiKey);
};

View file

@ -1,17 +1,25 @@
import ForwardedIconComponent from "@/components/common/genericIconComponent";
import { Button } from "@/components/ui/button";
import { ICON_STROKE_WIDTH } from "@/constants/constants";
import { useVoiceStore } from "@/stores/voiceStore";
interface VoiceButtonProps {
toggleRecording: () => void;
}
const VoiceButton = ({ toggleRecording }: VoiceButtonProps) => {
const setNewSessionCloseVoiceAssistant = useVoiceStore(
(state) => state.setNewSessionCloseVoiceAssistant,
);
return (
<>
<div>
<Button
onClick={toggleRecording}
onClick={() => {
toggleRecording();
setNewSessionCloseVoiceAssistant(false);
}}
className="btn-playground-actions group"
unstyled
data-testid="voice-button"

View file

@ -12,7 +12,8 @@ export const useStartConversation = (
const currentHost = window.location.hostname;
const currentPort = window.location.port;
const protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
const url = `${protocol}//${currentHost}:${currentPort}/api/v1/voice/ws/flow_as_tool/${flowId}/${currentSessionId}`;
const url = `${protocol}//${currentHost}:${currentPort}/api/v1/voice/ws/flow_tts/${flowId}/${currentSessionId?.toString()}`;
//const url = `${protocol}//${currentHost}:${currentPort}/api/v1/voice/ws/flow_as_tool/${flowId}/${currentSessionId?.toString()}`;
try {
if (wsRef.current?.readyState === WebSocket.CONNECTING) {
@ -26,6 +27,8 @@ export const useStartConversation = (
const audioSettings = JSON.parse(
getLocalStorage("lf_audio_settings_playground") || "{}",
);
const audioLanguage =
getLocalStorage("lf_audio_language_playground") || "en-US";
wsRef.current = new WebSocket(url);
@ -42,17 +45,20 @@ export const useStartConversation = (
: "",
}),
);
// For flow_tts endpoint, we need to use the proper session update format
if (audioSettings.provider !== "elevenlabs") {
wsRef.current.send(
JSON.stringify({
type: "session.update",
session: {
voice: audioSettings.voice,
},
type: "voice.settings",
voice: audioSettings.voice || "echo",
provider: audioSettings.provider || "openai",
}),
);
}
startRecording();
setTimeout(() => {
startRecording();
}, 300);
}
};

View file

@ -14,7 +14,6 @@ import { useMessagesStore } from "@/stores/messagesStore";
import { useUtilityStore } from "@/stores/utilityStore";
import { useVoiceStore } from "@/stores/voiceStore";
import { cn } from "@/utils/utils";
import { AxiosError } from "axios";
import { useEffect, useMemo, useRef, useState } from "react";
import IconComponent from "../../../../../../../components/common/genericIconComponent";
import SettingsVoiceModal from "./components/audio-settings/audio-settings-dialog";
@ -61,6 +60,9 @@ export function VoiceAssistant({
const analyserRef = useRef<AnalyserNode | null>(null);
const soundDetected = useVoiceStore((state) => state.soundDetected);
const setIsVoiceAssistantActive = useVoiceStore(
(state) => state.setIsVoiceAssistantActive,
);
const setSoundDetected = useVoiceStore((state) => state.setSoundDetected);
const messagesStore = useMessagesStore();
const setIsBuilding = useFlowStore((state) => state.setIsBuilding);
@ -86,6 +88,8 @@ export function VoiceAssistant({
const currentSessionId = useUtilityStore((state) => state.currentSessionId);
const setErrorData = useAlertStore((state) => state.setErrorData);
const { data: globalVariables } = useGetGlobalVariables();
const currentFlow = useFlowStore((state) => state.currentFlow);
const currentFlowId = currentFlow?.id;
const hasOpenAIAPIKey = useMemo(() => {
return (
@ -93,13 +97,6 @@ export function VoiceAssistant({
);
}, [variables, open, addKey]);
const hasElevenLabsApiKey = useMemo(() => {
return (
variables?.find((variable) => variable === "ELEVENLABS_API_KEY")
?.length! > 0
);
}, [variables, addKey, open]);
const openaiApiKey = useMemo(() => {
return variables?.find((variable) => variable === "OPENAI_API_KEY");
}, [variables, addKey]);
@ -216,7 +213,7 @@ export function VoiceAssistant({
const handleGetMessagesMutation = () => {
getMessagesMutation.mutate({
mode: "union",
id: currentSessionId,
id: currentFlowId,
});
};
@ -291,10 +288,20 @@ export function VoiceAssistant({
};
}, []);
const scrollToBottom = () => {
setTimeout(() => {
const chatContainer = document.querySelector(".chat-message-div");
if (chatContainer) {
chatContainer.scrollTop = chatContainer.scrollHeight;
}
}, 300);
};
const handleCloseAudioInput = () => {
setIsRecording;
setIsRecording(false);
stopRecording();
setShowAudioInput(false);
scrollToBottom();
};
const handleSetShowSettingsModal = async (
@ -430,6 +437,7 @@ export function VoiceAssistant({
handleClickSaveOpenAIApiKey={handleClickSaveOpenAIApiKey}
isEditingOpenAIKey={isEditingOpenAIKey}
setIsEditingOpenAIKey={setIsEditingOpenAIKey}
isPlayingRef={isPlayingRef}
>
{hasOpenAIAPIKey ? (
<>

View file

@ -39,6 +39,7 @@ export default function ChatView({
focusChat,
closeChat,
playgroundPage,
sidebarOpen,
}: chatViewProps): JSX.Element {
const flowPool = useFlowStore((state) => state.flowPool);
const inputs = useFlowStore((state) => state.inputs);
@ -163,11 +164,19 @@ export default function ChatView({
};
const flowRunningSkeletonMemo = useMemo(() => <FlowRunningSqueleton />, []);
const soundDetected = useVoiceStore((state) => state.soundDetected);
const isVoiceAssistantActive = useVoiceStore(
(state) => state.isVoiceAssistantActive,
);
return (
<div
className="flex h-full w-full flex-col rounded-md"
className={cn(
"flex h-full w-full flex-col rounded-md",
visibleSession ? "h-[95%]" : "h-full",
sidebarOpen &&
!isVoiceAssistantActive &&
"pointer-events-none blur-sm lg:pointer-events-auto lg:blur-0",
)}
onDragOver={dragOver}
onDragEnter={dragEnter}
onDragLeave={dragLeave}

View file

@ -1,5 +1,6 @@
import ShadTooltip from "@/components/common/shadTooltipComponent";
import { Button } from "@/components/ui/button";
import { useVoiceStore } from "@/stores/voiceStore";
import IconComponent from "../../../components/common/genericIconComponent";
import { SidebarOpenViewProps } from "../types/sidebar-open-view";
import SessionSelector from "./IOFieldView/components/session-selector";
@ -13,6 +14,10 @@ export const SidebarOpenView = ({
selectedViewField,
playgroundPage,
}: SidebarOpenViewProps) => {
const setNewSessionCloseVoiceAssistant = useVoiceStore(
(state) => state.setNewSessionCloseVoiceAssistant,
);
return (
<>
<div className="flex flex-col pl-3">
@ -34,6 +39,7 @@ export const SidebarOpenView = ({
onClick={(_) => {
setvisibleSession(undefined);
setSelectedViewField(undefined);
setNewSessionCloseVoiceAssistant(true);
}}
>
<IconComponent

View file

@ -37,4 +37,10 @@ export const useVoiceStore = create<VoiceStoreType>((set, get) => ({
) => set({ openaiVoices }),
soundDetected: false,
setSoundDetected: (soundDetected: boolean) => set({ soundDetected }),
isVoiceAssistantActive: false,
setIsVoiceAssistantActive: (isVoiceAssistantActive: boolean) =>
set({ isVoiceAssistantActive }),
newSessionCloseVoiceAssistant: false,
setNewSessionCloseVoiceAssistant: (newSessionCloseVoiceAssistant: boolean) =>
set({ newSessionCloseVoiceAssistant }),
}));

View file

@ -792,6 +792,7 @@ export type IOFieldViewProps = {
export type UndrawCardComponentProps = { flow: FlowType };
export type chatViewProps = {
sidebarOpen: boolean;
sendMessage: ({
repeat,
files,

View file

@ -31,4 +31,10 @@ export type VoiceStoreType = {
) => void;
soundDetected: boolean;
setSoundDetected: (soundDetected: boolean) => void;
isVoiceAssistantActive: boolean;
setIsVoiceAssistantActive: (isVoiceAssistantActive: boolean) => void;
newSessionCloseVoiceAssistant: boolean;
setNewSessionCloseVoiceAssistant: (
newSessionCloseVoiceAssistant: boolean,
) => void;
};

8
uv.lock generated
View file

@ -4577,6 +4577,7 @@ dependencies = [
{ name = "networkx" },
{ name = "nltk" },
{ name = "numexpr" },
{ name = "openai" },
{ name = "openinference-instrumentation-langchain" },
{ name = "opensearch-py" },
{ name = "opik" },
@ -4770,6 +4771,7 @@ requires-dist = [
{ name = "nltk", specifier = "==3.9.1" },
{ name = "numexpr", specifier = "==2.10.2" },
{ name = "nv-ingest-client", marker = "extra == 'nv-ingest'", specifier = "==2025.2.7.dev0" },
{ name = "openai", specifier = ">=1.68.2" },
{ name = "openinference-instrumentation-langchain", specifier = ">=0.1.29" },
{ name = "opensearch-py", specifier = "==2.8.0" },
{ name = "opik", specifier = ">=1.6.3" },
@ -6484,7 +6486,7 @@ wheels = [
[[package]]
name = "openai"
version = "1.66.3"
version = "1.69.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
@ -6496,9 +6498,9 @@ dependencies = [
{ name = "tqdm" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a3/77/5172104ca1df35ed2ed8fb26dbc787f721c39498fc51d666c4db07756a0c/openai-1.66.3.tar.gz", hash = "sha256:8dde3aebe2d081258d4159c4cb27bdc13b5bb3f7ea2201d9bd940b9a89faf0c9", size = 397244 }
sdist = { url = "https://files.pythonhosted.org/packages/ab/99/d164612528dfb7a9b19330623daded608e75d25823b01f81e0376eb388a4/openai-1.69.0.tar.gz", hash = "sha256:7b8a10a8ff77e1ae827e5e4c8480410af2070fb68bc973d6c994cf8218f1f98d", size = 409579 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/78/5a/e20182f7b6171642d759c548daa0ba20a1d3ac10d2bd0a13fd75704a9ac3/openai-1.66.3-py3-none-any.whl", hash = "sha256:a427c920f727711877ab17c11b95f1230b27767ba7a01e5b66102945141ceca9", size = 567400 },
{ url = "https://files.pythonhosted.org/packages/b8/a4/28113be8b7bc937656aaf7b06feff7e9a5eb742ee4e405c6c48c30d879c4/openai-1.69.0-py3-none-any.whl", hash = "sha256:73c4b2ddfd050060f8d93c70367189bd891e70a5adb6d69c04c3571f4fea5627", size = 599068 },
]
[[package]]