import asyncio import queue from typing import Optional, Any import logging import threading import time import random from vocode.streaming.agent.bot_sentiment_analyser import ( BotSentimentAnalyser, ) from vocode.streaming.factory import ( create_agent, create_synthesizer, create_transcriber, ) from vocode.streaming.models.message import BaseMessage from vocode.streaming.output_device.base_output_device import BaseOutputDevice from vocode.streaming.utils.goodbye_model import GoodbyeModel from vocode.streaming.utils.transcript import Transcript from vocode.streaming.models.agent import ( FillerAudioConfig, FILLER_AUDIO_DEFAULT_SILENCE_THRESHOLD_SECONDS, ) from vocode.streaming.models.synthesizer import ( TrackBotSentimentConfig, ) from vocode.streaming.constants import ( TEXT_TO_SPEECH_CHUNK_SIZE_SECONDS, PER_CHUNK_ALLOWANCE_SECONDS, ALLOWED_IDLE_TIME, ) from vocode.streaming.agent.base_agent import BaseAgent from vocode.streaming.synthesizer.base_synthesizer import ( BaseSynthesizer, SynthesisResult, FillerAudio, ) from vocode.streaming.utils import ( create_conversation_id, create_loop_in_thread, get_chunk_size_per_second, ) from vocode.streaming.transcriber.base_transcriber import ( Transcription, BaseTranscriber, ) class StreamingConversation: def __init__( self, output_device: BaseOutputDevice, transcriber: BaseTranscriber, agent: BaseAgent, synthesizer: BaseSynthesizer, conversation_id: str = None, per_chunk_allowance_seconds: int = PER_CHUNK_ALLOWANCE_SECONDS, logger: Optional[logging.Logger] = None, ): self.id = conversation_id or create_conversation_id() self.logger = logger or logging.getLogger(__name__) self.output_device = output_device self.transcriber = transcriber self.transcriber.set_on_response(self.on_transcription_response) self.transcriber_task = None self.agent = agent self.synthesizer = synthesizer self.synthesizer_event_loop = asyncio.new_event_loop() self.synthesizer_thread = threading.Thread( name="synthesizer", target=create_loop_in_thread, args=(self.synthesizer_event_loop,), ) self.per_chunk_allowance_seconds = per_chunk_allowance_seconds self.transcript = Transcript() self.bot_sentiment = None if self.synthesizer.get_synthesizer_config().track_bot_sentiment_in_voice: if isinstance( self.synthesizer.get_synthesizer_config().track_bot_sentiment_in_voice, bool, ): self.track_bot_sentiment_config = TrackBotSentimentConfig() else: self.track_bot_sentiment_config = ( self.synthesizer.get_synthesizer_config().track_bot_sentiment_in_voice ) self.bot_sentiment_analyser = BotSentimentAnalyser( emotions=self.track_bot_sentiment_config.emotions ) if self.agent.get_agent_config().end_conversation_on_goodbye: self.goodbye_model = GoodbyeModel() self.is_human_speaking = False self.active = False self.current_synthesis_task = None self.is_current_synthesis_interruptable = False self.stop_events: queue.Queue[threading.Event] = queue.Queue() self.last_action_timestamp = time.time() self.check_for_idle_task = None self.track_bot_sentiment_task = None self.should_wait_for_filler_audio_done_event = False self.current_filler_audio_done_event: Optional[threading.Event] = None self.current_filler_seconds_per_chunk: int = 0 self.current_transcription_is_interrupt: bool = False async def start(self): self.transcriber_task = asyncio.create_task(self.transcriber.run()) is_ready = await self.transcriber.ready() if not is_ready: raise Exception("Transcriber startup failed") self.synthesizer_thread.start() if self.agent.get_agent_config().send_filler_audio: filler_audio_config = ( self.agent.get_agent_config().send_filler_audio if isinstance( self.agent.get_agent_config().send_filler_audio, FillerAudioConfig ) else FillerAudioConfig() ) self.synthesizer.set_filler_audios(filler_audio_config) self.agent.start() if self.agent.get_agent_config().initial_message: self.transcript.add_bot_message( self.agent.get_agent_config().initial_message.text ) if self.synthesizer.get_synthesizer_config().track_bot_sentiment_in_voice: self.update_bot_sentiment() self.send_message_to_stream_nonblocking( self.agent.get_agent_config().initial_message, False ) self.active = True if self.synthesizer.get_synthesizer_config().track_bot_sentiment_in_voice: self.track_bot_sentiment_task = asyncio.create_task( self.track_bot_sentiment() ) self.check_for_idle_task = asyncio.create_task(self.check_for_idle()) async def check_for_idle(self): while self.is_active(): if time.time() - self.last_action_timestamp > ( self.agent.get_agent_config().allowed_idle_time_seconds or ALLOWED_IDLE_TIME ): self.logger.debug("Conversation idle for too long, terminating") self.mark_terminated() return await asyncio.sleep(15) async def track_bot_sentiment(self): prev_transcript = None while self.is_active(): await asyncio.sleep(1) if self.transcript.to_string() != prev_transcript: self.update_bot_sentiment() prev_transcript = self.transcript.to_string() def update_bot_sentiment(self): new_bot_sentiment = self.bot_sentiment_analyser.analyse( self.transcript.to_string() ) if new_bot_sentiment.emotion: self.logger.debug("Bot sentiment: %s", new_bot_sentiment) self.bot_sentiment = new_bot_sentiment def receive_audio(self, chunk: bytes): self.transcriber.send_audio(chunk) async def send_messages_to_stream_async( self, messages, should_allow_human_to_cut_off_bot: bool, wait_for_filler_audio: bool = False, ) -> tuple[str, bool]: messages_queue = queue.Queue() messages_done = threading.Event() speech_cut_off = threading.Event() seconds_per_chunk = TEXT_TO_SPEECH_CHUNK_SIZE_SECONDS chunk_size = ( get_chunk_size_per_second( self.synthesizer.get_synthesizer_config().audio_encoding, self.synthesizer.get_synthesizer_config().sampling_rate, ) * seconds_per_chunk ) async def send_to_call(): response_buffer = "" cut_off = False self.is_current_synthesis_interruptable = should_allow_human_to_cut_off_bot while True: try: message: BaseMessage = messages_queue.get_nowait() except queue.Empty: if messages_done.is_set(): break else: await asyncio.sleep(0) continue stop_event = self.enqueue_stop_event() synthesis_result = self.synthesizer.create_speech( message, chunk_size, bot_sentiment=self.bot_sentiment ) message_sent, cut_off = await self.send_speech_to_output( message.text, synthesis_result, stop_event, seconds_per_chunk, ) self.logger.debug("Message sent: {}".format(message_sent)) response_buffer = f"{response_buffer} {message_sent}" if cut_off: speech_cut_off.set() break await asyncio.sleep(0) if cut_off: self.agent.update_last_bot_message_on_cut_off(response_buffer) self.transcript.add_bot_message(response_buffer) return response_buffer, cut_off asyncio.run_coroutine_threadsafe(send_to_call(), self.synthesizer_event_loop) messages_generated = 0 for i, message in enumerate(messages): messages_generated += 1 if i == 0: if wait_for_filler_audio: self.interrupt_all_synthesis() self.wait_for_filler_audio_to_finish() if speech_cut_off.is_set(): break messages_queue.put_nowait(BaseMessage(text=message)) await asyncio.sleep(0) if messages_generated == 0: self.logger.debug("Agent generated no messages") if wait_for_filler_audio: self.interrupt_all_synthesis() messages_done.set() def send_message_to_stream_nonblocking( self, message: BaseMessage, should_allow_human_to_cut_off_bot: bool, ): asyncio.run_coroutine_threadsafe( self.send_message_to_stream_async( message, self.agent.get_agent_config().allow_agent_to_be_cut_off, ), self.synthesizer_event_loop, ) async def send_message_to_stream_async( self, message: BaseMessage, should_allow_human_to_cut_off_bot: bool, ) -> tuple[str, bool]: self.is_current_synthesis_interruptable = should_allow_human_to_cut_off_bot stop_event = self.enqueue_stop_event() self.logger.debug("Synthesizing speech for message") seconds_per_chunk = TEXT_TO_SPEECH_CHUNK_SIZE_SECONDS chunk_size = ( get_chunk_size_per_second( self.synthesizer.get_synthesizer_config().audio_encoding, self.synthesizer.get_synthesizer_config().sampling_rate, ) * seconds_per_chunk ) synthesis_result = self.synthesizer.create_speech( message, chunk_size, bot_sentiment=self.bot_sentiment ) message_sent, cut_off = await self.send_speech_to_output( message.text, synthesis_result, stop_event, seconds_per_chunk, ) self.logger.debug("Message sent: {}".format(message_sent)) if cut_off: self.agent.update_last_bot_message_on_cut_off(message_sent) self.transcript.add_bot_message(message_sent) return message_sent, cut_off def warmup_synthesizer(self): self.synthesizer.ready_synthesizer() # returns an estimate of what was sent up to, and a flag if the message was cut off async def send_speech_to_output( self, message, synthesis_result: SynthesisResult, stop_event: threading.Event, seconds_per_chunk: int, is_filler_audio: bool = False, ): message_sent = message cut_off = False chunk_size = seconds_per_chunk * get_chunk_size_per_second( self.synthesizer.get_synthesizer_config().audio_encoding, self.synthesizer.get_synthesizer_config().sampling_rate, ) for i, chunk_result in enumerate(synthesis_result.chunk_generator): start_time = time.time() speech_length_seconds = seconds_per_chunk * ( len(chunk_result.chunk) / chunk_size ) if stop_event.is_set(): seconds = i * seconds_per_chunk self.logger.debug( "Interrupted, stopping text to speech after {} chunks".format(i) ) message_sent = f"{synthesis_result.get_message_up_to(seconds)}-" cut_off = True break if i == 0: if is_filler_audio: self.should_wait_for_filler_audio_done_event = True await self.output_device.send_async(chunk_result.chunk) end_time = time.time() await asyncio.sleep( max( speech_length_seconds - (end_time - start_time) - self.per_chunk_allowance_seconds, 0, ) ) self.logger.debug( "Sent chunk {} with size {}".format(i, len(chunk_result.chunk)) ) self.last_action_timestamp = time.time() # clears it off the stop events queue if not stop_event.is_set(): stop_event.set() return message_sent, cut_off async def on_transcription_response(self, transcription: Transcription): self.last_action_timestamp = time.time() if transcription.is_final: self.logger.debug( "Got transcription: {}, confidence: {}".format( transcription.message, transcription.confidence ) ) if not self.is_human_speaking: # send interrupt self.current_transcription_is_interrupt = False if self.is_current_synthesis_interruptable: self.logger.debug("sending interrupt") self.current_transcription_is_interrupt = self.interrupt_all_synthesis() self.logger.debug("Human started speaking") transcription.is_interrupt = self.current_transcription_is_interrupt self.is_human_speaking = not transcription.is_final return await self.handle_transcription(transcription) def enqueue_stop_event(self): stop_event = threading.Event() self.stop_events.put_nowait(stop_event) return stop_event def interrupt_all_synthesis(self): """Returns true if any synthesis was interrupted""" num_interrupts = 0 while True: try: stop_event = self.stop_events.get_nowait() if not stop_event.is_set(): self.logger.debug("Interrupting synthesis") stop_event.set() num_interrupts += 1 except queue.Empty: break return num_interrupts > 0 async def send_filler_audio_to_output( self, filler_audio: FillerAudio, stop_event: threading.Event, done_event: threading.Event, ): filler_synthesis_result = filler_audio.create_synthesis_result() self.is_current_synthesis_interruptable = filler_audio.is_interruptable if isinstance( self.agent.get_agent_config().send_filler_audio, FillerAudioConfig ): silence_threshold = ( self.agent.get_agent_config().send_filler_audio.silence_threshold_seconds ) else: silence_threshold = FILLER_AUDIO_DEFAULT_SILENCE_THRESHOLD_SECONDS await asyncio.sleep(silence_threshold) self.logger.debug("Sending filler audio to output") await self.send_speech_to_output( filler_audio.message.text, filler_synthesis_result, stop_event, filler_audio.seconds_per_chunk, is_filler_audio=True, ) done_event.set() def wait_for_filler_audio_to_finish(self): if not self.should_wait_for_filler_audio_done_event: self.logger.debug( "Not waiting for filler audio to finish since we didn't send any chunks" ) return self.should_wait_for_filler_audio_done_event = False if ( self.current_filler_audio_done_event and not self.current_filler_audio_done_event.is_set() ): self.logger.debug("Waiting for filler audio to finish") # this should guarantee that filler audio finishes, since it has to be on its last chunk if not self.current_filler_audio_done_event.wait( self.current_filler_seconds_per_chunk ): self.logger.debug("Filler audio did not finish") async def handle_transcription(self, transcription: Transcription): if transcription.is_final: self.transcript.add_human_message(transcription.message) goodbye_detected_task = None if self.agent.get_agent_config().end_conversation_on_goodbye: goodbye_detected_task = asyncio.create_task( self.goodbye_model.is_goodbye(transcription.message) ) if self.agent.get_agent_config().send_filler_audio: self.logger.debug("Sending filler audio") if self.synthesizer.filler_audios: filler_audio = random.choice(self.synthesizer.filler_audios) self.logger.debug(f"Chose {filler_audio.message.text}") self.current_filler_audio_done_event = threading.Event() self.current_filler_seconds_per_chunk = ( filler_audio.seconds_per_chunk ) stop_event = self.enqueue_stop_event() asyncio.run_coroutine_threadsafe( self.send_filler_audio_to_output( filler_audio, stop_event, done_event=self.current_filler_audio_done_event, ), self.synthesizer_event_loop, ) else: self.logger.debug("No filler audio available for synthesizer") self.logger.debug("Generating response for transcription") if self.agent.get_agent_config().generate_responses: responses = self.agent.generate_response( transcription.message, is_interrupt=transcription.is_interrupt ) await self.send_messages_to_stream_async( responses, self.agent.get_agent_config().allow_agent_to_be_cut_off, wait_for_filler_audio=self.agent.get_agent_config().send_filler_audio, ) else: response, should_stop = self.agent.respond( transcription.message, is_interrupt=transcription.is_interrupt ) if self.agent.get_agent_config().send_filler_audio: self.interrupt_all_synthesis() self.wait_for_filler_audio_to_finish() if should_stop: self.logger.debug("Agent requested to stop") self.mark_terminated() return if response: self.send_message_to_stream_nonblocking( BaseMessage(text=response), self.agent.get_agent_config().allow_agent_to_be_cut_off, ) else: self.logger.debug("No response generated") if goodbye_detected_task: try: goodbye_detected = await asyncio.wait_for( goodbye_detected_task, 0.1 ) if goodbye_detected: self.logger.debug("Goodbye detected, ending conversation") self.mark_terminated() return except asyncio.TimeoutError: self.logger.debug("Goodbye detection timed out") def mark_terminated(self): self.active = False # must be called from the main thread def terminate(self): self.mark_terminated() if self.check_for_idle_task: self.logger.debug("Terminating check_for_idle Task") self.check_for_idle_task.cancel() if self.track_bot_sentiment_task: self.logger.debug("Terminating track_bot_sentiment Task") self.track_bot_sentiment_task.cancel() self.logger.debug("Terminating agent") self.agent.terminate() self.logger.debug("Terminating speech transcriber") self.transcriber.terminate() self.logger.debug("Terminating synthesizer event loop") self.synthesizer_event_loop.call_soon_threadsafe( self.synthesizer_event_loop.stop ) self.logger.debug("Terminating synthesizer thread") if self.synthesizer_thread.is_alive(): self.synthesizer_thread.join() self.logger.debug("Terminating transcriber task") self.transcriber_task.cancel() self.logger.debug("Successfully terminated") def is_active(self): return self.active