Move CLI into main package
This commit is contained in:
parent
561ce5df4a
commit
45c0edc2a2
12 changed files with 785 additions and 49 deletions
34
mimic3-http/mimic3_http/_resources.py
Normal file
34
mimic3-http/mimic3_http/_resources.py
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
# Copyright 2022 Mycroft AI Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
"""Shared access to package resources"""
|
||||
import os
|
||||
import typing
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import importlib.resources
|
||||
|
||||
files = importlib.resources.files
|
||||
except (ImportError, AttributeError):
|
||||
# Backport for Python < 3.9
|
||||
import importlib_resources # type: ignore
|
||||
|
||||
files = importlib_resources.files
|
||||
|
||||
_PACKAGE = "mimic3_http"
|
||||
_DIR = Path(typing.cast(os.PathLike, files(_PACKAGE)))
|
||||
|
||||
__version__ = (_DIR / "VERSION").read_text(encoding="utf-8").strip()
|
||||
2
mimic3-tts/.gitignore
vendored
2
mimic3-tts/.gitignore
vendored
|
|
@ -12,3 +12,5 @@ htmlcov
|
|||
__pycache__/
|
||||
.mypy_cache/
|
||||
*.egg-info/
|
||||
|
||||
flycheck_*.py
|
||||
|
|
|
|||
|
|
@ -1,6 +1,15 @@
|
|||
from pathlib import Path
|
||||
|
||||
from opentts_abc import AudioResult, MarkResult
|
||||
from opentts_abc import (
|
||||
AudioResult,
|
||||
BaseResult,
|
||||
BaseToken,
|
||||
MarkResult,
|
||||
Phonemes,
|
||||
SayAs,
|
||||
Voice,
|
||||
Word,
|
||||
)
|
||||
from opentts_abc.ssml import SSMLSpeaker
|
||||
|
||||
from ._resources import __version__
|
||||
|
|
|
|||
|
|
@ -1,50 +1,536 @@
|
|||
#!/usr/bin/env python3
|
||||
# Copyright 2022 Mycroft AI Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
import argparse
|
||||
import csv
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import string
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import typing
|
||||
import wave
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from queue import Queue
|
||||
|
||||
from opentts_abc.ssml import SSMLSpeaker
|
||||
from ._resources import _PACKAGE
|
||||
|
||||
from .tts import AudioResult, MarkResult, Mimic3Settings, Mimic3TextToSpeechSystem
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
settings = Mimic3Settings()
|
||||
tts = Mimic3TextToSpeechSystem(settings)
|
||||
|
||||
speaker = SSMLSpeaker(tts)
|
||||
# ssml = '<speak><voice name="el_GR/rapunzelina_low"><s><w>Το</w><w>αερόστρωμνό</w><w>μου</w><w>είναι</w><w>γεμάτο</w><w>χέλια.</w></s></voice></speak>'
|
||||
# ssml = '<speak><voice name="uk_UK/m-ailabs_low"><s><w>бажав</w></s></voice></speak>'
|
||||
# ssml = '<speak><s><w>Hello</w><w>World</w></s></speak>'
|
||||
# ssml = '<speak><s>Hello world</s></speak>'
|
||||
# ssml = '<speak><s><voice name="el_GR/rapunzelina_low"><say-as interpret-as="characters">12</say-as></voice></s></speak>'
|
||||
ssml = """
|
||||
<speak>
|
||||
<voice name="en_US/amy_low">
|
||||
Today is a test.
|
||||
This is another test.
|
||||
</voice>
|
||||
if typing.TYPE_CHECKING:
|
||||
from . import BaseResult, Mimic3TextToSpeechSystem # noqa: F401
|
||||
|
||||
|
||||
<voice name="es_ES/carlfm_low">
|
||||
<lang xml:lang="es_ES">
|
||||
Soy el <say-as interpret-as="number" format="ordinal">1</say-as>.
|
||||
</lang>
|
||||
</voice>
|
||||
</speak>
|
||||
"""
|
||||
_LOGGER = logging.getLogger(_PACKAGE)
|
||||
|
||||
|
||||
wav_file: wave.Wave_write = wave.open("out.wav", "wb")
|
||||
params_set = False
|
||||
with wav_file:
|
||||
for result in speaker.speak(ssml):
|
||||
if isinstance(result, AudioResult):
|
||||
if not params_set:
|
||||
wav_file.setframerate(result.sample_rate_hz)
|
||||
wav_file.setsampwidth(result.sample_width_bytes)
|
||||
wav_file.setnchannels(result.num_channels)
|
||||
params_set = True
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
wav_file.writeframes(result.audio_bytes)
|
||||
elif isinstance(result, MarkResult):
|
||||
print("mark", result.name)
|
||||
|
||||
@dataclass
|
||||
class ResultToProcess:
|
||||
result: "BaseResult"
|
||||
line: str
|
||||
line_id: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class CommandLineInterfaceState:
|
||||
args: argparse.Namespace
|
||||
texts: typing.Optional[typing.Iterable[str]] = None
|
||||
mark_writer: typing.Optional[typing.TextIO] = None
|
||||
tts: typing.Optional["Mimic3TextToSpeechSystem"] = None
|
||||
text_from_stdin: bool = False
|
||||
|
||||
all_audio: bytes = field(default_factory=bytes)
|
||||
sample_rate_hz: int = 22050
|
||||
sample_width_bytes: int = 2
|
||||
num_channels: int = 1
|
||||
|
||||
result_queue: typing.Optional["Queue[typing.Optional[ResultToProcess]]"] = None
|
||||
result_thread: typing.Optional[threading.Thread] = None
|
||||
|
||||
|
||||
class OutputNaming(str, Enum):
|
||||
"""Format used for output file names"""
|
||||
|
||||
TEXT = "text"
|
||||
TIME = "time"
|
||||
ID = "id"
|
||||
|
||||
|
||||
class StdinFormat(str, Enum):
|
||||
"""Format of standard input"""
|
||||
|
||||
AUTO = "auto"
|
||||
"""Choose based on SSML state"""
|
||||
|
||||
LINES = "lines"
|
||||
"""Each line is a separate sentence/document"""
|
||||
|
||||
DOCUMENT = "document"
|
||||
"""Entire input is one document"""
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point"""
|
||||
args = get_args()
|
||||
|
||||
if args.debug:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
if args.version:
|
||||
# Print version and exit
|
||||
from . import __version__
|
||||
|
||||
print(__version__)
|
||||
sys.exit(0)
|
||||
|
||||
state = CommandLineInterfaceState(args=args)
|
||||
initialize_args(state)
|
||||
initialize_tts(state)
|
||||
|
||||
try:
|
||||
if args.voices:
|
||||
# Print voices and exit
|
||||
print_voices(state)
|
||||
else:
|
||||
# Process user input
|
||||
if os.isatty(sys.stdin.fileno()):
|
||||
print("Reading text from stdin...", file=sys.stderr)
|
||||
|
||||
process_lines(state)
|
||||
finally:
|
||||
shutdown_tts(state)
|
||||
|
||||
|
||||
def initialize_args(state: CommandLineInterfaceState):
|
||||
import numpy as np
|
||||
|
||||
args = state.args
|
||||
|
||||
# Create output directory
|
||||
if args.output_dir:
|
||||
args.output_dir = Path(args.output_dir)
|
||||
args.output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Open file for writing the names from <mark> tags in SSML.
|
||||
# Each name is printed on a single line.
|
||||
if args.mark_file:
|
||||
args.mark_file = Path(args.mark_file)
|
||||
args.mark_file.parent.mkdir(parents=True, exist_ok=True)
|
||||
state.mark_writer = open( # pylint: disable=consider-using-with
|
||||
args.mark_file, "w", encoding="utf-8"
|
||||
)
|
||||
elif args.stdout:
|
||||
state.mark_writer = sys.stderr
|
||||
else:
|
||||
state.mark_writer = sys.stdout
|
||||
|
||||
if args.seed is not None:
|
||||
_LOGGER.debug("Setting random seed to %s", args.seed)
|
||||
np.random.seed(args.seed)
|
||||
|
||||
if args.csv:
|
||||
args.output_naming = "id"
|
||||
|
||||
# Read text from stdin or arguments
|
||||
if args.text:
|
||||
# Use arguments
|
||||
state.texts = args.text
|
||||
else:
|
||||
# Use stdin
|
||||
state.text_from_stdin = True
|
||||
stdin_format = StdinFormat.LINES
|
||||
|
||||
if (args.stdin_format == StdinFormat.AUTO) and args.ssml:
|
||||
# Assume SSML input is entire document
|
||||
stdin_format = StdinFormat.DOCUMENT
|
||||
|
||||
if stdin_format == StdinFormat.DOCUMENT:
|
||||
# One big line
|
||||
state.texts = [sys.stdin.read()]
|
||||
else:
|
||||
# Multiple lines
|
||||
state.texts = sys.stdin
|
||||
|
||||
assert state.texts is not None
|
||||
|
||||
if args.process_on_blank_line:
|
||||
|
||||
# Combine text until a blank line is encountered.
|
||||
# Good for line-wrapped books where
|
||||
# sentences are broken
|
||||
# up across multiple
|
||||
# lines.
|
||||
def process_on_blank_line(lines: typing.Iterable[str]):
|
||||
text = ""
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
if text:
|
||||
yield text
|
||||
|
||||
text = ""
|
||||
continue
|
||||
|
||||
text += " " + line
|
||||
|
||||
state.texts = process_on_blank_line(state.texts)
|
||||
|
||||
|
||||
def initialize_tts(state: CommandLineInterfaceState):
|
||||
from mimic3_tts import Mimic3Settings, Mimic3TextToSpeechSystem # noqa: F811
|
||||
|
||||
args = state.args
|
||||
|
||||
state.tts = Mimic3TextToSpeechSystem(Mimic3Settings())
|
||||
|
||||
if args.voices:
|
||||
# Don't bother with the rest of the initialization
|
||||
return
|
||||
|
||||
if state.args.voice:
|
||||
# Set default voice
|
||||
state.tts.voice = state.args.voice
|
||||
|
||||
if state.args.preload_voice:
|
||||
for voice_key in state.args.preload_voice:
|
||||
_LOGGER.debug("Preloading voice: %s", voice_key)
|
||||
state.tts.preload_voice(voice_key)
|
||||
|
||||
state.result_queue = Queue(maxsize=args.result_queue_size)
|
||||
|
||||
state.result_thread = threading.Thread(
|
||||
target=process_result, daemon=True, args=(state,)
|
||||
)
|
||||
state.result_thread.start()
|
||||
|
||||
|
||||
def process_result(state: CommandLineInterfaceState):
|
||||
try:
|
||||
from mimic3_tts import AudioResult, MarkResult
|
||||
|
||||
assert state.result_queue is not None
|
||||
args = state.args
|
||||
|
||||
while True:
|
||||
result_todo = state.result_queue.get()
|
||||
if result_todo is None:
|
||||
break
|
||||
|
||||
try:
|
||||
result = result_todo.result
|
||||
line = result_todo.line
|
||||
line_id = result_todo.line_id
|
||||
|
||||
if isinstance(result, AudioResult):
|
||||
if args.interactive or args.output_dir:
|
||||
# Convert to WAV audio
|
||||
wav_bytes: typing.Optional[bytes] = None
|
||||
if args.interactive:
|
||||
if args.stdout:
|
||||
# Write audio to stdout
|
||||
sys.stdout.buffer.write(result.audio_bytes)
|
||||
sys.stdout.buffer.flush()
|
||||
else:
|
||||
# Play sound
|
||||
if not wav_bytes:
|
||||
wav_bytes = result.to_wav_bytes()
|
||||
|
||||
if wav_bytes:
|
||||
play_wav_bytes(wav_bytes)
|
||||
|
||||
if args.output_dir:
|
||||
if not wav_bytes:
|
||||
wav_bytes = result.to_wav_bytes()
|
||||
|
||||
# Determine file name
|
||||
if args.output_naming == OutputNaming.TEXT:
|
||||
# Use text itself
|
||||
file_name = line.strip().replace(" ", "_")
|
||||
file_name = file_name.translate(
|
||||
str.maketrans(
|
||||
"", "", string.punctuation.replace("_", "")
|
||||
)
|
||||
)
|
||||
elif args.output_naming == OutputNaming.TIME:
|
||||
# Use timestamp
|
||||
file_name = str(time.time())
|
||||
elif args.output_naming == OutputNaming.ID:
|
||||
file_name = line_id
|
||||
|
||||
assert file_name, f"No file name for text: {line}"
|
||||
wav_path = args.output_dir / (file_name + ".wav")
|
||||
wav_path.write_bytes(wav_bytes)
|
||||
|
||||
_LOGGER.debug("Wrote %s", wav_path)
|
||||
else:
|
||||
# Combine all audio and output to stdout at the end
|
||||
state.all_audio += result.audio_bytes
|
||||
state.sample_rate_hz = result.sample_rate_hz
|
||||
state.sample_width_bytes = result.sample_width_bytes
|
||||
state.num_channels = result.num_channels
|
||||
elif isinstance(result, MarkResult):
|
||||
if state.mark_writer:
|
||||
print(result.name, file=state.mark_writer)
|
||||
except Exception:
|
||||
_LOGGER.exception("Error processing result")
|
||||
except Exception:
|
||||
_LOGGER.exception("process_result")
|
||||
|
||||
|
||||
def process_line(
|
||||
line: str,
|
||||
state: CommandLineInterfaceState,
|
||||
line_id: str = "",
|
||||
):
|
||||
from mimic3_tts import SSMLSpeaker
|
||||
|
||||
assert state.tts is not None
|
||||
assert state.result_queue is not None
|
||||
|
||||
args = state.args
|
||||
|
||||
if args.ssml:
|
||||
results = SSMLSpeaker(state.tts).speak(line)
|
||||
else:
|
||||
state.tts.begin_utterance()
|
||||
|
||||
# TODO: text language
|
||||
state.tts.speak_text(line)
|
||||
|
||||
results = state.tts.end_utterance()
|
||||
|
||||
for result in results:
|
||||
state.result_queue.put(
|
||||
ResultToProcess(
|
||||
result=result,
|
||||
line=line,
|
||||
line_id=line_id,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def process_lines(state: CommandLineInterfaceState):
|
||||
assert state.texts is not None
|
||||
|
||||
args = state.args
|
||||
|
||||
try:
|
||||
result_idx = 0
|
||||
|
||||
for line in state.texts:
|
||||
line_id = ""
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if args.output_naming == OutputNaming.ID:
|
||||
# Line has the format id|text instead of just text
|
||||
line_id, line = line.split(args.id_delimiter, maxsplit=1)
|
||||
|
||||
process_line(line, state, line_id=line_id)
|
||||
result_idx += 1
|
||||
|
||||
except KeyboardInterrupt:
|
||||
if state.result_queue is not None:
|
||||
# Draw audio playback queue
|
||||
while not state.result_queue.empty():
|
||||
state.result_queue.get()
|
||||
finally:
|
||||
# Wait for raw stream to finish
|
||||
if state.result_queue is not None:
|
||||
state.result_queue.put(None)
|
||||
|
||||
if state.result_thread is not None:
|
||||
print("Waiting for audio to finish...", file=sys.stderr)
|
||||
state.result_thread.join()
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
# Write combined audio to stdout
|
||||
if state.all_audio:
|
||||
_LOGGER.debug("Writing WAV audio to stdout")
|
||||
|
||||
if sys.stdout.isatty() and (not state.args.stdout):
|
||||
with io.BytesIO() as wav_io:
|
||||
wav_file_play: wave.Wave_write = wave.open(wav_io, "wb")
|
||||
with wav_file_play:
|
||||
wav_file_play.setframerate(state.sample_rate_hz)
|
||||
wav_file_play.setsampwidth(state.sample_width_bytes)
|
||||
wav_file_play.setnchannels(state.num_channels)
|
||||
wav_file_play.writeframes(state.all_audio)
|
||||
|
||||
play_wav_bytes(wav_io.getvalue())
|
||||
else:
|
||||
# Write output directly to stdout
|
||||
wav_file_write: wave.Wave_write = wave.open(sys.stdout.buffer, "wb")
|
||||
with wav_file_write:
|
||||
wav_file_write.setframerate(state.sample_rate_hz)
|
||||
wav_file_write.setsampwidth(state.sample_width_bytes)
|
||||
wav_file_write.setnchannels(state.num_channels)
|
||||
wav_file_write.writeframes(state.all_audio)
|
||||
|
||||
sys.stdout.buffer.flush()
|
||||
|
||||
|
||||
def shutdown_tts(state: CommandLineInterfaceState):
|
||||
if state.tts is not None:
|
||||
state.tts.shutdown()
|
||||
state.tts = None
|
||||
|
||||
|
||||
def play_wav_bytes(wav_bytes: bytes):
|
||||
from playsound import playsound
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="wb+", suffix=".wav") as wav_file:
|
||||
wav_file.write(wav_bytes)
|
||||
wav_file.seek(0)
|
||||
|
||||
_LOGGER.debug("Playing WAV file: %s", wav_file.name)
|
||||
playsound(wav_file.name)
|
||||
|
||||
|
||||
def print_voices(state: CommandLineInterfaceState):
|
||||
assert state.tts is not None
|
||||
|
||||
voices = list(state.tts.get_voices())
|
||||
voices = sorted(voices, key=lambda v: v.key)
|
||||
|
||||
writer = csv.writer(sys.stdout, delimiter="\t")
|
||||
writer.writerow(("KEY", "LANGUAGE", "NAME", "DESCRIPTION", "LOCATION"))
|
||||
for voice in voices:
|
||||
writer.writerow(
|
||||
(voice.key, voice.language, voice.name, voice.description, voice.location)
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
def get_args():
|
||||
"""Parse command-line arguments"""
|
||||
parser = argparse.ArgumentParser(prog=_PACKAGE)
|
||||
# parser.add_argument(
|
||||
# "--language", help="Gruut language for text input (en-us, etc.)"
|
||||
# )
|
||||
parser.add_argument(
|
||||
"text", nargs="*", help="Text to convert to speech (default: stdin)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--stdin-format",
|
||||
choices=[str(v.value) for v in StdinFormat],
|
||||
default=StdinFormat.AUTO,
|
||||
help="Format of stdin text (default: auto)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--voice",
|
||||
"-v",
|
||||
help="Name of voice (expected in <voices-dir>/<language>)",
|
||||
)
|
||||
# parser.add_argument(
|
||||
# "--voices-dir",
|
||||
# help="Directory with voices (format is <language>/<name_model-type>)",
|
||||
# )
|
||||
parser.add_argument("--voices", action="store_true", help="List available voices")
|
||||
parser.add_argument("--output-dir", help="Directory to write WAV file(s)")
|
||||
parser.add_argument(
|
||||
"--output-naming",
|
||||
choices=[v.value for v in OutputNaming],
|
||||
default="text",
|
||||
help="Naming scheme for output WAV files (requires --output-dir)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--id-delimiter",
|
||||
default="|",
|
||||
help="Delimiter between id and text in lines (default: |). Requires --output-naming id",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--interactive",
|
||||
action="store_true",
|
||||
help="Play audio after each input line (see --play-command)",
|
||||
)
|
||||
parser.add_argument("--csv", action="store_true", help="Input format is id|text")
|
||||
parser.add_argument(
|
||||
"--mark-file",
|
||||
help="File to write mark names to as they're encountered (--ssml only)",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--noise-scale",
|
||||
type=float,
|
||||
help="Noise scale [0-1], default is 0.667",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--length-scale",
|
||||
type=float,
|
||||
help="Length scale (1.0 is default speed, 0.5 is 2x faster)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--noise-w",
|
||||
type=float,
|
||||
help="Variation in cadence [0-1], default is 0.8",
|
||||
)
|
||||
|
||||
# Miscellaneous
|
||||
parser.add_argument(
|
||||
"--result-queue-size",
|
||||
default=5,
|
||||
help="Maximum number of sentences to maintain in output queue (default: 5)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--process-on-blank-line",
|
||||
action="store_true",
|
||||
help="Process text only after encountering a blank line",
|
||||
)
|
||||
parser.add_argument("--ssml", action="store_true", help="Input text is SSML")
|
||||
# parser.add_argument(
|
||||
# "--optimizations",
|
||||
# choices=["auto", "on", "off"],
|
||||
# default="auto",
|
||||
# help="Enable/disable Onnx optimizations (auto=disable on armv7l)",
|
||||
# )
|
||||
|
||||
parser.add_argument(
|
||||
"--stdout",
|
||||
action="store_true",
|
||||
help="Force audio output to stdout even if a tty is detected",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--preload-voice", action="append", help="Preload voice when starting up"
|
||||
)
|
||||
parser.add_argument("--seed", type=int, help="Set random seed (default: not set)")
|
||||
parser.add_argument("--version", action="store_true", help="Print version and exit")
|
||||
parser.add_argument(
|
||||
"--debug", action="store_true", help="Print DEBUG messages to the console"
|
||||
)
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
|
|||
170
mimic3-tts/mimic3_tts/download.py
Normal file
170
mimic3-tts/mimic3_tts/download.py
Normal file
|
|
@ -0,0 +1,170 @@
|
|||
# Copyright 2022 Mycroft AI Inc.
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import typing
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
from urllib.error import HTTPError
|
||||
|
||||
from xdgenvpy import XDG
|
||||
|
||||
from ._resources import _DIR, _PACKAGE
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
class VoiceDownloadError(Exception):
|
||||
"""Occurs when a voice fails to download"""
|
||||
|
||||
|
||||
def download_voice(voices_dir: typing.Union[str, Path], link: str) -> Path:
|
||||
"""Download and extract a voice (or vocoder)"""
|
||||
from tqdm.auto import tqdm
|
||||
|
||||
voice_name = link.split("/")[-1]
|
||||
voices_dir = Path(voices_dir)
|
||||
voices_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
_LOGGER.debug("Downloading voice to %s from %s", voices_dir, link)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(link) as response:
|
||||
with tempfile.NamedTemporaryFile(mode="wb+", suffix=".tar.gz") as temp_file:
|
||||
with tqdm(
|
||||
unit="B",
|
||||
unit_scale=True,
|
||||
unit_divisor=1024,
|
||||
miniters=1,
|
||||
desc=voice_name,
|
||||
total=int(response.headers.get("content-length", 0)),
|
||||
) as pbar:
|
||||
chunk = response.read(4096)
|
||||
while chunk:
|
||||
temp_file.write(chunk)
|
||||
pbar.update(len(chunk))
|
||||
chunk = response.read(4096)
|
||||
|
||||
temp_file.seek(0)
|
||||
|
||||
# Extract
|
||||
with tempfile.TemporaryDirectory() as temp_dir_str:
|
||||
temp_dir = Path(temp_dir_str)
|
||||
_LOGGER.debug("Extracting %s to %s", temp_file.name, temp_dir_str)
|
||||
shutil.unpack_archive(temp_file.name, temp_dir_str)
|
||||
|
||||
# Expecting <language>/<voice_name>
|
||||
lang_dir = next(temp_dir.iterdir())
|
||||
assert lang_dir.is_dir()
|
||||
|
||||
voice_dir = next(lang_dir.iterdir())
|
||||
assert voice_dir.is_dir()
|
||||
|
||||
# Copy to destination
|
||||
dest_lang_dir = voices_dir / lang_dir.name
|
||||
dest_lang_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
dest_voice_dir = voices_dir / lang_dir.name / voice_dir.name
|
||||
if dest_voice_dir.is_dir():
|
||||
# Delete existing files
|
||||
shutil.rmtree(str(dest_voice_dir))
|
||||
|
||||
# Move files
|
||||
_LOGGER.debug("Moving %s to %s", voice_dir, dest_voice_dir)
|
||||
shutil.move(str(voice_dir), str(dest_voice_dir))
|
||||
|
||||
_LOGGER.info("Installed %s to %s", link, dest_voice_dir)
|
||||
|
||||
return dest_voice_dir
|
||||
except HTTPError as e:
|
||||
_LOGGER.exception("download_voice")
|
||||
raise VoiceDownloadError(
|
||||
f"Failed to download voice {voice_name} from {link}: {e}"
|
||||
) from e
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point"""
|
||||
default_voices_dir = Path(XDG().XDG_DATA_HOME) / "mimic3"
|
||||
|
||||
parser = argparse.ArgumentParser(prog=f"{_PACKAGE}.download")
|
||||
parser.add_argument("--url", action="append", help="URL of voice to download")
|
||||
parser.add_argument(
|
||||
"--name",
|
||||
action="append",
|
||||
help="Name of voice to download (e.g., en_US/vctk_low)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-dir",
|
||||
default=default_voices_dir,
|
||||
help=f"Path to output directory (default: {default_voices_dir})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--debug", action="store_true", help="Print DEBUG messages to console"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.debug:
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
_LOGGER.debug(args)
|
||||
|
||||
args.output_dir = Path(args.output_dir)
|
||||
args.url = args.url or []
|
||||
args.name = args.name or []
|
||||
|
||||
with open(_DIR / "voices.json", "r", encoding="utf-8") as voices_file:
|
||||
voices_by_name = json.load(voices_file)
|
||||
|
||||
if (not args.url) and (not args.name):
|
||||
# Print available voices and exit
|
||||
json.dump(voices_by_name, sys.stdout, indent=4, ensure_ascii=False)
|
||||
sys.exit(0)
|
||||
|
||||
urls_to_download = args.url
|
||||
|
||||
if args.name:
|
||||
# Gather URLs for voices by name
|
||||
|
||||
for voice_name in args.name:
|
||||
voice_info = voices_by_name.get(voice_name)
|
||||
if not voice_info:
|
||||
_LOGGER.fatal("Voice not found: %s", voice_name)
|
||||
sys.exit(1)
|
||||
|
||||
urls_to_download.append(voice_info["url"])
|
||||
|
||||
args.output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for url in urls_to_download:
|
||||
download_voice(args.output_dir, url)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
|
@ -187,11 +187,12 @@ class Mimic3TextToSpeechSystem(TextToSpeechSystem):
|
|||
speakers.append(line)
|
||||
|
||||
yield Voice(
|
||||
key=str(voice_dir.absolute()),
|
||||
key=f"{voice_lang}/{voice_name}",
|
||||
name=voice_name,
|
||||
language=voice_lang,
|
||||
description="",
|
||||
speakers=speakers,
|
||||
location=str(voice_dir.absolute()),
|
||||
properties=properties,
|
||||
)
|
||||
|
||||
|
|
@ -380,14 +381,15 @@ class Mimic3TextToSpeechSystem(TextToSpeechSystem):
|
|||
model_dir: typing.Optional[Path] = None
|
||||
for maybe_voice in self.get_voices():
|
||||
if maybe_voice.key.endswith(voice_key):
|
||||
model_dir = Path(maybe_voice.key)
|
||||
model_dir = Path(maybe_voice.location)
|
||||
break
|
||||
|
||||
if model_dir is None:
|
||||
raise VoiceNotFoundError(voice_key)
|
||||
|
||||
# Full path to voice model directory
|
||||
canonical_key = str(model_dir.absolute())
|
||||
voice_lang = model_dir.parent.name
|
||||
voice_name = model_dir.name
|
||||
canonical_key = f"{voice_lang}/{voice_name}"
|
||||
|
||||
existing_voice = self._loaded_voices.get(canonical_key)
|
||||
if existing_voice is not None:
|
||||
|
|
|
|||
|
|
@ -6,5 +6,11 @@ ignore_missing_imports = True
|
|||
[mypy-onnxruntime.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-playsound.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-tqdm.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-xdgenvpy.*]
|
||||
ignore_missing_imports = True
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ disable=
|
|||
missing-class-docstring,
|
||||
missing-function-docstring,
|
||||
import-error,
|
||||
relative-import-beyond-top-level
|
||||
relative-beyond-top-level
|
||||
|
||||
[FORMAT]
|
||||
expected-line-ending-format=LF
|
||||
|
|
|
|||
|
|
@ -1,8 +1,10 @@
|
|||
dataclasses-json<1.0
|
||||
espeak-phonemizer>=1.0,<2.0
|
||||
gruut[en,de,es,nl,it,fr,sw]>=2.2.2,<3.0
|
||||
gruut>=2.2.2,<3.0
|
||||
numpy<2.0
|
||||
onnxruntime>=1.6,<2.0
|
||||
phonemes2ids<2.0
|
||||
opentts_abc<1.0
|
||||
phonemes2ids<2.0
|
||||
playsound~=1.3.0
|
||||
tqdm>=4,<5
|
||||
xdgenvpy>2.0,<3
|
||||
|
|
|
|||
|
|
@ -42,6 +42,25 @@ with open(version_path, "r", encoding="utf-8") as version_file:
|
|||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
# dependency => [tags]
|
||||
extras = {}
|
||||
|
||||
# Create language-specific extras
|
||||
for lang in [
|
||||
"de",
|
||||
"es",
|
||||
"fr",
|
||||
"it",
|
||||
"nl",
|
||||
"pt",
|
||||
"ru",
|
||||
"sv",
|
||||
"sw",
|
||||
]:
|
||||
extras[f"gruut[{lang}]"] = [lang]
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
setup(
|
||||
name="mimic3_tts",
|
||||
version=version,
|
||||
|
|
@ -53,7 +72,8 @@ setup(
|
|||
packages=setuptools.find_packages(),
|
||||
package_data={"mimic3_tts": ["VERSION", "py.typed"]},
|
||||
install_requires=requirements,
|
||||
extras_require={':python_version<"3.9"': ["importlib_resources"]},
|
||||
extras_require={':python_version<"3.9"': ["importlib_resources"], **extras},
|
||||
entry_points={"console_scripts": ["mimic3 = mimic3_cli.__main__:main"]},
|
||||
classifiers=[
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Intended Audience :: Developers",
|
||||
|
|
|
|||
2
opentts-abc/.gitignore
vendored
2
opentts-abc/.gitignore
vendored
|
|
@ -12,3 +12,5 @@ htmlcov
|
|||
__pycache__/
|
||||
.mypy_cache/
|
||||
*.egg-info/
|
||||
|
||||
flycheck_*.py
|
||||
|
|
|
|||
|
|
@ -156,6 +156,9 @@ class Voice:
|
|||
description: str
|
||||
"""Human-readable description of the voice"""
|
||||
|
||||
location: str
|
||||
"""File path or URI where the voice exists"""
|
||||
|
||||
speakers: typing.Optional[typing.Sequence[str]] = None
|
||||
"""List of speakers within the voice model if multi-speaker"""
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue