File size: 3,821 Bytes
390fec3
 
 
 
9f68a6e
390fec3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a93e42f
124547b
 
 
a93e42f
124547b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a93e42f
390fec3
 
 
a93e42f
390fec3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
89ce027
 
a93e42f
89ce027
 
 
a93e42f
53dde00
89ce027
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
import base64
import json
import requests
import streamlit as st

def get_access_token():
    username = st.secrets["GIGA_USERNAME"]
    password = st.secrets["GIGA_SECRET"]
        
    # Получаем строку с базовой авторизацией в формате Base64
    auth_str = f'{username}:{password}'
    auth_bytes = auth_str.encode('utf-8')
    auth_base64 = base64.b64encode(auth_bytes).decode('utf-8')
    url = os.getenv('GIGA_AUTH_URL')
        
    headers = {
        'Authorization': f'Basic {auth_base64}',  # вставляем базовую авторизацию
        'RqUID': os.getenv('GIGA_rquid'),
        'Content-Type': 'application/x-www-form-urlencoded',
        'Accept': 'application/json'
    }
        
    data = {
        'scope': os.getenv('GIGA_SCOPE')
    }

    response = requests.post(url, headers=headers, data=data, verify=False)
    access_token = response.json()['access_token']
    print('Got access token')
    return access_token

def get_number_of_tokens(prompt, access_token, model):
    url_completion = os.getenv('GIGA_TOKENS_URL')
        
    data_copm = json.dumps({
        "model": model,
        "input": [
            prompt
        ],
    })
        
    headers_comp = {
        'Content-Type': 'application/json',
        'Accept': 'application/json',
        'Authorization': 'Bearer ' + access_token
    }
        
    response = requests.post(url_completion, headers=headers_comp, data=data_copm, verify=False)
    response_data = response.json()

    return response_data[0]['tokens']

def get_completion_from_gigachat(prompt, max_tokens, access_token, model):
    url_completion = os.getenv('GIGA_COMPLETION_URL')
        
    data_copm = json.dumps({
        "model": model,
        "messages": [
        {
            "role": "user",
            "content": prompt
        }
        ],
        "stream": False,
        "max_tokens": max_tokens,
    })
        
    headers_comp = {
        'Content-Type': 'application/json',
        'Accept': 'application/json',
        'Authorization': 'Bearer ' + access_token
    }
        
    response = requests.post(url_completion, headers=headers_comp, data=data_copm, verify=False)
    response_data = response.json()
    answer_from_llm = response_data['choices'][0]['message']['content']

    return answer_from_llm

def process_transcribation_with_gigachat(prompt, transcript, max_tokens, access_token, model):
    url_completion = os.getenv('GIGA_COMPLETION_URL')

    data_copm = json.dumps({
        "model": model,
        "max_tokens": max_tokens,
        "messages": [
        {
            "role": "user",
            "content": prompt + transcript
        }
        ],
        "stream": True,
    })
        
    headers_comp = {
        'Content-Type': 'application/json',
        'Accept': 'application/json',
        'Authorization': 'Bearer ' + access_token
    }

    output_text = ''
    st.write("Результат обработки:")
    text_container = st.empty()
    response = requests.post(url_completion, headers=headers_comp, data=data_copm, verify=False, stream=True)
    for line in response.iter_lines():
        if line:
            decoded_line = line.decode('utf-8')
            if decoded_line.startswith('data:'):
                decoded_line = decoded_line[len('data:'):].strip()
            try:
                data = json.loads(decoded_line)
                if "choices" in data:
                    for choice in data["choices"]:
                        if "delta" in choice and "content" in choice["delta"]:
                            output_text += choice["delta"]["content"]
                            text_container.text(output_text)
            except json.JSONDecodeError:
                continue


    return output_text