vocode-python/vocode/streaming/synthesizer/google_synthesizer.py
2023-03-28 10:29:00 -07:00

110 lines
4 KiB
Python

import io
import wave
from typing import Any, Optional
from dotenv import load_dotenv
from google.cloud import texttospeech_v1beta1 as tts
from vocode.streaming.agent.bot_sentiment_analyser import BotSentiment
from vocode.streaming.models.message import BaseMessage
from vocode.streaming.synthesizer.base_synthesizer import (
BaseSynthesizer,
SynthesisResult,
encode_as_wav,
)
from vocode.streaming.models.synthesizer import GoogleSynthesizerConfig
from vocode.streaming.models.audio_encoding import AudioEncoding
from vocode.streaming.utils import convert_wav
load_dotenv()
class GoogleSynthesizer(BaseSynthesizer):
OFFSET_SECONDS = 0.5
def __init__(self, synthesizer_config: GoogleSynthesizerConfig):
super().__init__(synthesizer_config)
# Instantiates a client
self.client = tts.TextToSpeechClient()
# Build the voice request, select the language code ("en-US") and the ssml
# voice gender ("neutral")
self.voice = tts.VoiceSelectionParams(
language_code="en-US", name="en-US-Neural2-I"
)
# Select the type of audio file you want returned
self.audio_config = tts.AudioConfig(
audio_encoding=tts.AudioEncoding.LINEAR16,
sample_rate_hertz=24000,
speaking_rate=1.2,
pitch=0,
effects_profile_id=["telephony-class-application"],
)
def synthesize(self, message: str) -> tts.SynthesizeSpeechResponse:
synthesis_input = tts.SynthesisInput(text=message)
# Perform the text-to-speech request on the text input with the selected
# voice parameters and audio file type
return self.client.synthesize_speech(
request=tts.SynthesizeSpeechRequest(
input=synthesis_input,
voice=self.voice,
audio_config=self.audio_config,
enable_time_pointing=[
tts.SynthesizeSpeechRequest.TimepointType.SSML_MARK
],
)
)
def create_speech(
self,
message: BaseMessage,
chunk_size: int,
bot_sentiment: Optional[BotSentiment] = None,
) -> SynthesisResult:
response = self.synthesize(message.text)
output_sample_rate = response.audio_config.sample_rate_hertz
real_offset = int(GoogleSynthesizer.OFFSET_SECONDS * output_sample_rate)
output_bytes_io = io.BytesIO()
in_memory_wav = wave.open(output_bytes_io, "wb")
in_memory_wav.setnchannels(1)
in_memory_wav.setsampwidth(2)
in_memory_wav.setframerate(output_sample_rate)
in_memory_wav.writeframes(response.audio_content[real_offset:-real_offset])
output_bytes_io.seek(0)
if self.synthesizer_config.audio_encoding == AudioEncoding.LINEAR16:
output_bytes = convert_wav(
output_bytes_io,
output_sample_rate=self.synthesizer_config.sampling_rate,
output_encoding=AudioEncoding.LINEAR16,
)
elif self.synthesizer_config.audio_encoding == AudioEncoding.MULAW:
output_bytes = convert_wav(
output_bytes_io,
output_sample_rate=self.synthesizer_config.sampling_rate,
output_encoding=AudioEncoding.MULAW,
)
if self.synthesizer_config.should_encode_as_wav:
output_bytes = encode_as_wav(output_bytes)
def chunk_generator(output_bytes):
for i in range(0, len(output_bytes), chunk_size):
if i + chunk_size > len(output_bytes):
yield SynthesisResult.ChunkResult(output_bytes[i:], True)
else:
yield SynthesisResult.ChunkResult(
output_bytes[i : i + chunk_size], False
)
return SynthesisResult(
chunk_generator(output_bytes),
lambda seconds: self.get_message_cutoff_from_total_response_length(
message, seconds, len(output_bytes)
),
)