import asyncio import audioop import secrets from typing import Any import wave from ..models.audio_encoding import AudioEncoding def create_loop_in_thread(loop: asyncio.AbstractEventLoop, long_running_task=None): asyncio.set_event_loop(loop) if long_running_task: loop.run_until_complete(long_running_task) else: loop.run_forever() def convert_linear_audio( raw_wav: bytes, input_sample_rate=24000, output_sample_rate=8000, output_encoding=AudioEncoding.LINEAR16, output_sample_width=2, ): # downsample if input_sample_rate != output_sample_rate: raw_wav, _ = audioop.ratecv( raw_wav, 2, 1, input_sample_rate, output_sample_rate, None ) if output_encoding == AudioEncoding.LINEAR16: return raw_wav elif output_encoding == AudioEncoding.MULAW: return audioop.lin2ulaw(raw_wav, output_sample_width) def convert_wav( file: Any, output_sample_rate=8000, output_encoding=AudioEncoding.LINEAR16, ): with wave.open(file, "rb") as wav: raw_wav = wav.readframes(wav.getnframes()) return convert_linear_audio( raw_wav, input_sample_rate=wav.getframerate(), output_sample_rate=output_sample_rate, output_encoding=output_encoding, output_sample_width=wav.getsampwidth(), ) def get_chunk_size_per_second(audio_encoding: AudioEncoding, sampling_rate: int) -> int: if audio_encoding == AudioEncoding.LINEAR16: return sampling_rate * 2 elif audio_encoding == AudioEncoding.MULAW: return sampling_rate else: raise Exception("Unsupported audio encoding") def create_conversation_id() -> str: return secrets.token_urlsafe(16)