import os import streamlit as st import whisperx import torch from utils import convert_segments_object_to_text, check_password, convert_segments_object_to_text_simple from gigiachat_requests import get_access_token, get_completion_from_gigachat, get_number_of_tokens from openai_requests import get_completion_from_openai if check_password(): st.title('Audio Transcription App') st.sidebar.title("Settings") device = os.getenv('DEVICE') batch_size = int(os.getenv('BATCH_SIZE')) compute_type = os.getenv('COMPUTE_TYPE') initial_base_prompt = os.getenv('BASE_PROMPT') initial_processing_prompt = os.getenv('PROCCESS_PROMPT') llm = st.sidebar.selectbox("LLM", ["GigaChat", "Chat GPT"], index=0) base_prompt = st.sidebar.text_area("Промпт для резюмирования", value=initial_base_prompt) max_tokens_summary = st.sidebar.number_input("Максимальное количество токенов при резюмировании", min_value=1, value=1024) enable_processing = st.sidebar.checkbox("Добавить обработку транскрибации", value=False) processing_prompt = st.sidebar.text_area("Промпт для обработки транскрибации", value=initial_processing_prompt) ACCESS_TOKEN = st.secrets["HF_TOKEN"] uploaded_file = st.file_uploader("Загрузите аудиофайл", type=["mp4", "wav", "m4a"]) if uploaded_file is not None: file_name = uploaded_file.name if 'file_name' not in st.session_state or st.session_state.file_name != file_name: st.session_state.transcript = '' st.session_state.file_name = file_name print(st.session_state.file_name) print(st.session_state.transcript) print(st.session_state.file_name) print(st.session_state.transcript) st.audio(uploaded_file) file_extension = uploaded_file.name.split(".")[-1] # Получаем расширение файла temp_file_path = f"temp_file.{file_extension}" # Создаем временное имя файла с правильным расширением with open(temp_file_path, "wb") as f: f.write(uploaded_file.getbuffer()) print(st.session_state.transcript) if 'transcript' not in st.session_state or st.session_state.transcript == '': with st.spinner('Транскрибируем...'): # Load model model = whisperx.load_model(os.getenv('WHISPER_MODEL_SIZE'), device, compute_type=compute_type) # Load and transcribe audio audio = whisperx.load_audio(temp_file_path) result = model.transcribe(audio, batch_size=batch_size, language="ru") print('Transcribed, now aligning') model_a, metadata = whisperx.load_align_model(language_code=result["language"], device=device) result = whisperx.align(result["segments"], model_a, metadata, audio, device, return_char_alignments=False) print('Aligned, now diarizing') diarize_model = whisperx.DiarizationPipeline(use_auth_token=st.secrets["HF_TOKEN"], device=device) diarize_segments = diarize_model(audio) result_diar = whisperx.assign_word_speakers(diarize_segments, result) transcript = convert_segments_object_to_text_simple(result_diar) st.session_state.transcript = transcript else: transcript = st.session_state.transcript st.write("Результат транскрибации:") st.text(transcript) if (llm == 'GigaChat'): access_token = get_access_token() if (enable_processing): with st.spinner('Обрабатываем транскрибацию...'): if (llm == 'GigaChat'): number_of_tokens = get_number_of_tokens(transcript, access_token) print('Количество токенов в транскрибации: ' + str(number_of_tokens)) transcript = get_completion_from_gigachat(processing_prompt + transcript, number_of_tokens + 1000, access_token) elif (llm == 'Chat GPT'): transcript = get_completion_from_openai(processing_prompt + transcript) st.write("Результат обработки:") st.text(transcript) with st.spinner('Резюмируем...'): if (llm == 'GigaChat'): summary_answer = get_completion_from_gigachat(base_prompt + transcript, max_tokens_summary, access_token) elif (llm == 'Chat GPT'): summary_answer = get_completion_from_openai(base_prompt + transcript, max_tokens_summary) st.write("Результат резюмирования:") st.text(summary_answer)