File size: 11,091 Bytes
003d053
 
 
 
 
 
0196a95
 
 
 
 
 
003d053
 
 
0196a95
003d053
 
 
 
 
 
 
 
 
 
0196a95
 
003d053
 
0196a95
 
 
 
 
003d053
 
 
 
0196a95
003d053
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0196a95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
003d053
0196a95
 
 
003d053
 
 
 
 
0196a95
003d053
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0196a95
 
 
 
 
 
 
 
 
 
 
 
 
 
003d053
 
 
 
 
 
 
0196a95
 
 
 
 
 
 
003d053
0196a95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
003d053
 
 
0196a95
 
 
 
 
 
 
 
 
003d053
0196a95
 
 
 
 
 
 
003d053
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0196a95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
003d053
0196a95
 
 
 
 
 
 
 
 
 
 
 
003d053
 
 
 
0196a95
003d053
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
try:
    import cn2an
except ImportError:
    print("The 'cn2an' module is not installed. Please install it using 'pip install cn2an'.")
    exit(1)

try:
    import jieba
except ImportError:
    print("The 'jieba' module is not installed. Please install it using 'pip install jieba'.")
    exit(1)

import re
import numpy as np
import wave
import jieba.posseg as pseg


def save_audio(file_name, audio, rate=24000):
    """
    保存音频文件
    :param file_name:
    :param audio:
    :param rate:
    :return:
    """
    import os
    from config import DEFAULT_DIR
    audio = (audio * 32767).astype(np.int16)

    # 检查默认目录
    if not os.path.exists(DEFAULT_DIR):
        os.makedirs(DEFAULT_DIR)
    full_path = os.path.join(DEFAULT_DIR, file_name)
    with wave.open(full_path, "w") as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)
        wf.setframerate(rate)
        wf.writeframes(audio.tobytes())
    return full_path


def combine_audio(wavs):
    """
    合并多段音频
    :param wavs:
    :return:
    """
    wavs = [normalize_audio(w) for w in wavs]  # 先对每段音频归一化
    combined_audio = np.concatenate(wavs, axis=1)  # 沿着时间轴合并
    return normalize_audio(combined_audio)  # 合并后再次归一化


def normalize_audio(audio):
    """
    Normalize audio array to be between -1 and 1
    :param audio: Input audio array
    :return: Normalized audio array
    """
    audio = np.clip(audio, -1, 1)
    max_val = np.max(np.abs(audio))
    if max_val > 0:
        audio = audio / max_val
    return audio


def combine_audio_with_crossfade(audio_arrays, crossfade_duration=0.1, rate=24000):
    """
    Combine audio arrays with crossfade to avoid clipping noise at the junctions.
    :param audio_arrays: List of audio arrays to combine
    :param crossfade_duration: Duration of the crossfade in seconds
    :param rate: Sample rate of the audio
    :return: Combined audio array
    """
    crossfade_samples = int(crossfade_duration * rate)
    combined_audio = np.array([], dtype=np.float32)

    for i in range(len(audio_arrays)):
        audio_arrays[i] = np.squeeze(audio_arrays[i])  # Ensure all arrays are 1D
        if i == 0:
            combined_audio = audio_arrays[i]  # Start with the first audio array
        else:
            # Apply crossfade between the end of the current combined audio and the start of the next array
            overlap = np.minimum(len(combined_audio), crossfade_samples)
            crossfade_end = combined_audio[-overlap:]
            crossfade_start = audio_arrays[i][:overlap]
            # Crossfade by linearly blending the audio samples
            t = np.linspace(0, 1, overlap)
            crossfaded = crossfade_end * (1 - t) + crossfade_start * t
            # Combine audio by replacing the end of the current combined audio with the crossfaded audio
            combined_audio[-overlap:] = crossfaded
            # Append the rest of the new array
            combined_audio = np.concatenate((combined_audio, audio_arrays[i][overlap:]))

    return combined_audio


def remove_chinese_punctuation(text):
    """
    移除文本中的中文标点符号 [:;!(),【】『』「」《》-‘“’”:,;!\(\)\[\]><\-] 替换为 ,
    :param text:
    :return:
    """
    chinese_punctuation_pattern = r"[:;!(),【】『』「」《》-‘“’”:,;!\(\)\[\]><\-·]"
    text = re.sub(chinese_punctuation_pattern, ',', text)
    # 使用正则表达式将多个连续的句号替换为一个句号
    text = re.sub(r'[。,]{2,}', '。', text)
    # 删除开头和结尾的 , 号
    text = re.sub(r'^,|,$', '', text)
    return text

def remove_english_punctuation(text):
    """
    移除文本中的中文标点符号 [:;!(),【】『』「」《》-‘“’”:,;!\(\)\[\]><\-] 替换为 ,
    :param text:
    :return:
    """
    chinese_punctuation_pattern = r"[:;!(),【】『』「」《》-‘“’”:,;!\(\)\[\]><\-·]"
    text = re.sub(chinese_punctuation_pattern, ',', text)
    # 使用正则表达式将多个连续的句号替换为一个句号
    text = re.sub(r'[,\.]{2,}', '.', text)
    # 删除开头和结尾的 , 号
    text = re.sub(r'^,|,$', '', text)
    return text


def text_normalize(text):
    """
    对文本进行归一化处理 (PaddlePaddle版本)
    :param text:
    :return:
    """
    from zh_normalization import TextNormalizer
    # ref: https://github.com/PaddlePaddle/PaddleSpeech/tree/develop/paddlespeech/t2s/frontend/zh_normalization
    tx = TextNormalizer()
    sentences = tx.normalize(text)
    _txt = ''.join(sentences)
    return _txt


def convert_numbers_to_chinese(text):
    """
    将文本中的数字转换为中文数字 例如 123 -> 一百二十三
    :param text:
    :return:
    """
    return cn2an.transform(text, "an2cn")


def detect_language(sentence):
    # ref: https://github.com/2noise/ChatTTS/blob/main/ChatTTS/utils/infer_utils.py#L55
    chinese_char_pattern = re.compile(r'[\u4e00-\u9fff]')
    english_word_pattern = re.compile(r'\b[A-Za-z]+\b')

    chinese_chars = chinese_char_pattern.findall(sentence)
    english_words = english_word_pattern.findall(sentence)

    if len(chinese_chars) > len(english_words):
        return "zh"
    else:
        return "en"


def split_text(text, min_length=60):
    """
    将文本分割为长度不小于min_length的句子
    :param text:
    :param min_length:
    :return:
    """
    # 短句分割符号
    sentence_delimiters = re.compile(r'([。?!\.]+)')
    # 匹配多个连续的回车符 作为段落点 强制分段
    paragraph_delimiters = re.compile(r'(\s*\n\s*)+')

    paragraphs = re.split(paragraph_delimiters, text)

    result = []

    for paragraph in paragraphs:
        if not paragraph.strip():
            continue  # 跳过空段落
        # 小于阈值的段落直接分开
        if len(paragraph.strip()) < min_length:
            result.append(paragraph.strip())
            continue
        # 大于的再计算拆分
        sentences = re.split(sentence_delimiters, paragraph)
        current_sentence = ''
        for sentence in sentences:
            if re.match(sentence_delimiters, sentence):
                current_sentence += sentence.strip() + ''
                if len(current_sentence) >= min_length:
                    result.append(current_sentence.strip())
                    current_sentence = ''
            else:
                current_sentence += sentence.strip()

        if current_sentence:
            if len(current_sentence) < min_length and len(result) > 0:
                result[-1] += current_sentence
            else:
                result.append(current_sentence)
    if detect_language(text[:1024]) == "zh":
        result = [normalize_zh(_.strip()) for _ in result if _.strip()]
    else:
        result = [normalize_en(_.strip()) for _ in result if _.strip()]
    return result


def normalize_en(text):
    # 不再在 ChatTTS 外正则化文本
    # from tn.english.normalizer import Normalizer
    # normalizer = Normalizer()
    # text = normalizer.normalize(text)
    # text = remove_english_punctuation(text)
    return text


def normalize_zh(text):
    # 不再在 ChatTTS 外正则化文本
    # from tn.chinese.normalizer import Normalizer
    # normalizer = Normalizer()
    # text = normalizer.normalize(text)
    # text = remove_chinese_punctuation(text)
    text = process_ddd(text)
    return text


def batch_split(items, batch_size=5):
    """
    将items划分为大小为batch_size的批次
    :param items:
    :param batch_size:
    :return:
    """
    return [items[i:i + batch_size] for i in range(0, len(items), batch_size)]


# 读取 txt 文件,支持自动判断文件编码
def read_long_text(file_path):
    """
    读取长文本文件,自动判断文件编码
    :param file_path: 文件路径
    :return: 文本内容
    """
    encodings = ['utf-8', 'gbk', 'iso-8859-1', 'utf-16']

    for encoding in encodings:
        try:
            with open(file_path, 'r', encoding=encoding) as file:
                return file.read()
        except (UnicodeDecodeError, LookupError):
            continue

    raise ValueError("无法识别文件编码")


def replace_tokens(text):
    remove_tokens = ['UNK']
    for token in remove_tokens:
        text = re.sub(r'\[' + re.escape(token) + r'\]', '', text)

    tokens = ['uv_break', 'laugh','lbreak']
    for token in tokens:
        text = re.sub(r'\[' + re.escape(token) + r'\]', f'uu{token}uu', text)
        text = text.replace('_', '')
    return text


def restore_tokens(text):
    tokens = ['uvbreak', 'laugh', 'UNK', 'lbreak']
    for token in tokens:
        text = re.sub(r'uu' + re.escape(token) + r'uu', f'[{token}]', text)
    text = text.replace('[uvbreak]', '[uv_break]')
    return text


def process_ddd(text):
    """
    处理“地”、“得” 字的使用,都替换为“的”
    依据:地、得的使用,主要是在动词和形容词前后,本方法没有严格按照语法替换,因为时常遇到用错的情况。
    另外受 jieba 分词准确率的影响,部分情况下可能会出漏掉。例如:小红帽疑惑地问
    :param text: 输入的文本
    :return: 处理后的文本
    """
    word_list = [(word, flag) for word, flag in pseg.cut(text, use_paddle=False)]
    # print(word_list)
    processed_words = []
    for i, (word, flag) in enumerate(word_list):
        if word in ["地", "得"]:
            # Check previous and next word's flag
            # prev_flag = word_list[i - 1][1] if i > 0 else None
            # next_flag = word_list[i + 1][1] if i + 1 < len(word_list) else None

            # if prev_flag in ['v', 'a'] or next_flag in ['v', 'a']:
            if flag in ['uv', 'ud']:
                processed_words.append("的")
            else:
                processed_words.append(word)
        else:
            processed_words.append(word)

    return ''.join(processed_words)


def replace_space_between_chinese(text):
    return re.sub(r'(?<=[\u4e00-\u9fff])\s+(?=[\u4e00-\u9fff])', '', text)


if __name__ == '__main__':
    # txts = [
    #     "快速地跑过红色的大门",
    #     "笑得很开心,学得很好",
    #     "小红帽疑惑地问?",
    #     "大灰狼慌张地回答",
    #     "哦,这是为了更好地听你说话。",
    #     "大灰狼不耐烦地说:“为了更好地抱你。”",
    #     "他跑得很快,工作做得非常认真,这是他努力地结果。得到",
    # ]
    # for txt in txts:
    #     print(txt, '-->', process_ddd(txt))

    txts = [
        "电影中梁朝伟扮演的陈永仁的编号27149",
        "这块黄金重达324.75克 我们班的最高总分为583分",
        "12\~23 -1.5\~2",
        "居维埃·拉色别德①、杜梅里②、卡特法日③,"

    ]
    for txt in txts:
        print(txt, '-->', text_normalize(txt))
        # print(txt, '-->', convert_numbers_to_chinese(txt))