File size: 6,947 Bytes
ee531be
 
8fb0be5
ee531be
 
59f6126
 
ee531be
350d733
ee531be
350d733
94f759f
 
350d733
 
 
94f759f
 
 
 
 
350d733
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d35d295
350d733
 
 
 
 
94f759f
350d733
 
 
 
94f759f
 
 
 
 
 
 
 
 
 
350d733
 
 
 
94f759f
 
 
 
 
 
 
 
 
 
350d733
 
 
 
94f759f
 
ee531be
 
 
59f6126
 
 
 
 
ee531be
 
 
 
 
 
59f6126
ee531be
 
 
 
 
 
 
59f6126
3819293
59f6126
 
eaf3576
4380489
 
 
 
 
 
59f6126
 
361b6e2
ee531be
 
e30f74a
ee531be
 
 
 
 
 
 
 
 
 
8fb0be5
ee531be
 
 
8fb0be5
ee531be
 
 
 
 
8fb0be5
ee531be
 
 
 
 
 
8fb0be5
ee531be
 
8fb0be5
ee531be
 
 
8fb0be5
ee531be
 
 
17e6aa5
ee531be
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import requests
import base64
import os
import json
import streamlit as st
import whisperx
import torch

def convert_segments_object_to_text(data):
    result = []

    for segment in data['segments']:
        words = segment['words']
        segment_speaker = segment.get('speaker', None)
        segment_start = segment.get('start', None)
        segment_end = segment.get('end', None)
        current_speaker = None
        current_start = None
        current_end = None
        current_text = []

        # Forward fill speaker, start and end if missing
        for i, word_info in enumerate(words):
            if 'speaker' not in word_info:
                if i > 0 and 'speaker' in words[i - 1]:
                    word_info['speaker'] = words[i - 1]['speaker']
                elif i < len(words) - 1 and 'speaker' in words[i + 1]:
                    word_info['speaker'] = words[i + 1]['speaker']
                else:
                    word_info['speaker'] = segment_speaker
                    
            if 'start' not in word_info:
                if i > 0 and 'end' in words[i - 1]:
                    word_info['start'] = words[i - 1]['end']
                else:
                    word_info['start'] = segment_start
                    
            if 'end' not in word_info:
                if i < len(words) - 1 and 'start' in words[i + 1]:
                    word_info['end'] = words[i + 1]['start']
                elif i == len(words) - 1:
                    word_info['end'] = segment_end
                else:
                    word_info['end'] = word_info['start']

        for word_info in words:
            word = word_info.get('word', '')
            start = word_info.get('start', None)
            end = word_info.get('end', None)
            speaker = word_info.get('speaker', None)

            if current_speaker is None:
                current_speaker = speaker
                current_start = start

            if speaker == current_speaker:
                current_text.append(word)
                current_end = end
            else:
                # Finish current segment
                if current_start is not None and current_end is not None:
                    formatted_text = f'{current_speaker} ({current_start} : {current_end}) : {" ".join(current_text)}'
                else:
                    formatted_text = f'{current_speaker} : {" ".join(current_text)}'
                result.append(formatted_text)

                # Start new segment
                current_speaker = speaker
                current_start = start
                current_end = end
                current_text = [word]

        # Append the last segment
        if current_text:
            if current_start is not None and current_end is not None:
                formatted_text = f'{current_speaker} ({current_start} : {current_end}) : {" ".join(current_text)}'
            else:
                formatted_text = f'{current_speaker} : {" ".join(current_text)}'
            result.append(formatted_text)

    return '\n'.join(result)
    
st.title('Audio Transcription App')
st.sidebar.title("Settings")
# Sidebar inputs
device = st.sidebar.selectbox("Device", ["cpu", "cuda"], index=1)
batch_size = st.sidebar.number_input("Batch Size", min_value=1, value=16)
compute_type = st.sidebar.selectbox("Compute Type", ["float16", "int8"], index=0)

ACCESS_TOKEN = st.secrets["HF_TOKEN"]

uploaded_file = st.file_uploader("Загрузите аудиофайл", type=["mp4", "wav", "m4a"])

if uploaded_file is not None:
    st.audio(uploaded_file)
    file_extension = uploaded_file.name.split(".")[-1]  # Получаем расширение файла
    temp_file_path = f"temp_file.{file_extension}"  # Создаем временное имя файла с правильным расширением
    
    with open(temp_file_path, "wb") as f:
        f.write(uploaded_file.getbuffer())
    
    with st.spinner('Транскрибируем...'):
        # Load model
        model = whisperx.load_model(os.getenv('WHISPER_MODEL_SIZE'), device, compute_type=compute_type)
        # Load and transcribe audio
        audio = whisperx.load_audio(temp_file_path)
        result = model.transcribe(audio, batch_size=batch_size, language="ru")
        print('Transcribed, now aligning')
        
        model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device)
        result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False)
        print('Aligned, now diarizing')
        
        diarize_model = whisperx.DiarizationPipeline(use_auth_token=st.secrets["HF_TOKEN"], device=device)
        diarize_segments = diarize_model(audio)
        result_diar = whisperx.assign_word_speakers(diarize_segments, result)
        
    st.write("Результат транскрибации:")
    transcript = convert_segments_object_to_text(result_diar)
    st.text(transcript)
    
    with st.spinner('Резюмируем...'):
        username = st.secrets["GIGA_USERNAME"]
        password = st.secrets["GIGA_SECRET"]
        
        # Получаем строку с базовой авторизацией в формате Base64
        auth_str = f'{username}:{password}'
        auth_bytes = auth_str.encode('utf-8')
        auth_base64 = base64.b64encode(auth_bytes).decode('utf-8')
        url = os.getenv('GIGA_AUTH_URL')
        
        headers = {
            'Authorization': f'Basic {auth_base64}',  # вставляем базовую авторизацию
            'RqUID': os.getenv('GIGA_rquid'),
            'Content-Type': 'application/x-www-form-urlencoded',
            'Accept': 'application/json'
        }
        
        data = {
            'scope': os.getenv('GIGA_SCOPE')
        }
        
        response = requests.post(url, headers=headers, data=data, verify=False)
        access_token = response.json()['access_token']
        print('Got access token')
        
        url_completion = os.getenv('GIGA_COMPLETION_URL')
        
        data_copm = json.dumps({
          "model": os.getenv('GIGA_MODEL'),
          "messages": [
            {
              "role": "user",
              "content": os.getenv('GIGA_BASE_PROMPT') + transcript
            }
          ],
          "stream": False,
          "max_tokens": int(os.getenv('GIGA_MAX_TOKENS')),
        })
        
        headers_comp = {
          'Content-Type': 'application/json',
          'Accept': 'application/json',
          'Authorization': 'Bearer ' + access_token
        }
        
        response = requests.post(url_completion, headers=headers_comp, data=data_copm, verify=False)
        response_data = response.json()
        answer_from_llm = response_data['choices'][0]['message']['content']
        
    st.write("Результат резюмирования:")
    st.text(answer_from_llm)