TwT-6's picture
Upload 2667 files
256a159 verified
raw
history blame contribute delete
No virus
13.2 kB
# flake8: noqa: E501
import csv
import os
import os.path as osp
import re
from collections import defaultdict
from datetime import datetime
import numpy as np
from mmengine import ConfigDict
try:
from prettytable import from_csv
except ImportError:
from_csv = None
from opencompass.utils import model_abbr_from_cfg
from .subjective_post_process import post_process_autoj, post_process_judgelm
from .utils import get_judgeanswer_and_reference, get_outdir
CATEGORIES = {
'中文推理': ['数学计算', '逻辑推理'],
'中文语言': ['基本任务', '中文理解', '综合问答', '文本写作', '角色扮演', '专业能力'],
}
All_Dimensions = [
'事实正确性', '满足用户需求', '安全无害', '清晰度', '逻辑性', '完备性', '创造性', '可负责程度', '逻辑连贯性',
'公平与可负责程度', '丰富度', '综合得分'
]
MAPPING = {
'事实与解释型回答': ['事实正确性', '满足用户需求', '清晰度', '完备性'],
'逻辑推理型回答': ['事实正确性', '满足用户需求', '逻辑连贯性', '完备性'],
'生成型回答': ['事实正确性', '满足用户需求', '逻辑连贯性', '创造性', '丰富度'],
'建议型回答': ['事实正确性', '满足用户需求', '公平与可负责程度', '创造性']
}
def detect_mapping(text):
if '清晰度' in text and '完备性' in text:
return '事实与解释型回答'
elif '完备性' in text and '逻辑连贯性' in text:
return '逻辑推理型回答'
elif '创造性' in text and '丰富度' in text:
return '生成型回答'
elif '创造性' in text and '公平与可负责程度' in text:
return '建议型回答'
else:
return None
def extract_missing_rating(text, search_type):
searching_keys = MAPPING[search_type]
result_dict = {}
for k in searching_keys:
matches = re.findall(rf'{k}.*?\n', text)
result_dict[k] = None
for match in reversed(matches):
if re.findall(r'\d{1,2}', match):
result_dict[k] = int(re.findall(r'\d{1,2}', match)[-1])
break
overall_number = re.findall('\d{1,2}', text)
try:
result_dict['综合得分'] = int(overall_number[-1])
except:
return {}
return result_dict
def extract_rating_plus(text):
pattern = r'{(.*?)}(?![^{]*{)' # match last brackets
match = re.search(pattern, text)
if match:
dictionary_str = match.group(1)
kv_pattern = r"'(.*?)': (\d+)"
matches = re.findall(kv_pattern, dictionary_str)
result_dict = {key: int(value) for key, value in matches}
return result_dict
else:
match_type = detect_mapping(text=text)
if match_type is not None:
return extract_missing_rating(text=text, search_type=match_type)
else:
return None
def extract_rating(text):
pattern = r'{(.*?)}(?![^{]*{)' # match last brackets
match = re.search(pattern, text)
if match:
dictionary_str = match.group(1)
kv_pattern = r"'(.*?)': (\d+)"
matches = re.findall(kv_pattern, dictionary_str)
result_dict = {key: int(value) for key, value in matches}
return result_dict
else:
return None
def check_rating(rating, all_dimensions):
for k, v in rating.items():
if isinstance(v, (int, float)) and k in all_dimensions: # 确保值是数字
if v >= 0 and v <= 10:
pass
else:
return None
else:
return None
return rating
def post_process_alignbench_plus(judgement: str,
all_dimensions=All_Dimensions,
possible_keys=['综合得分']):
"""Input a string like below:
xxx{'事实正确性': 1, '满足用户需求': 1, '清晰度': 2, '完备性': 1, '综合得分': 1}xxx,
and extract each score
"""
def extract_score(text):
keys_pattern = '|'.join(map(re.escape, possible_keys))
pattern = rf"({'|'.join(possible_keys)}): (\d+(\.\d{{1,2}})?)"
match = re.search(pattern, text)
if match:
try:
return float(match.group(1))
except ValueError:
return -1
return -1
# judgement = judgement.replace('\n', '')
rating = extract_rating_plus(judgement)
if rating is not None:
score = -1
for key in possible_keys:
score = rating.get(key, -1)
if score != -1:
break
if score == -1:
score = extract_score(judgement)
if score >= 0 and score <= 10:
pass
else:
score = -1
rating = check_rating(rating, all_dimensions)
else:
score = -1
if rating == None or score == -1:
return None
else:
return {'rating': rating, 'score': score}
def post_process_alignbench(judgement: str,
all_dimensions=All_Dimensions,
possible_keys=['综合得分']):
"""Input a string like below:
xxx{'事实正确性': 1, '满足用户需求': 1, '清晰度': 2, '完备性': 1, '综合得分': 1}xxx,
and extract each score
"""
def extract_score(text):
keys_pattern = '|'.join(map(re.escape, possible_keys))
pattern = rf"({'|'.join(possible_keys)}): (\d+(\.\d{{1,2}})?)"
match = re.search(pattern, text)
if match:
try:
return float(match.group(1))
except ValueError:
return -1
return -1
judgement = judgement.replace('\n', '')
rating = extract_rating(judgement)
if rating is not None:
score = -1
for key in possible_keys:
score = rating.get(key, -1)
if score != -1:
break
if score == -1:
score = extract_score(judgement)
if score >= 0 and score <= 10:
pass
else:
score = -1
rating = check_rating(rating, all_dimensions)
else:
score = -1
if rating == None or score == -1:
return None
else:
return {'rating': rating, 'score': score}
def get_dimension_results(judged_answers, references, fout, fout_flag, model):
dimension_ratings = defaultdict(int)
dimension_counts = defaultdict(int)
for ans, ref in zip(judged_answers, references):
for k, v in ans['rating'].items():
if k != '综合得分' or k != 'Overall Score':
dimension_ratings[k] += v
dimension_counts[k] += 1
else:
if k == '综合得分':
dimension_ratings['综合得分'] += ans['score']
dimension_counts['综合得分'] += 1
else:
dimension_ratings['Overall Score'] += ans['score']
dimension_counts['Overall Score'] += 1
dimension_avg_ratings = defaultdict(float)
for dimension, total_score in dimension_ratings.items():
dimension_avg_ratings[
dimension] = total_score / dimension_counts[dimension]
scores = {model: dimension_avg_ratings}
rows = list(scores.keys())
columns = list(scores[rows[0]].keys())
with open(fout, 'a+', newline='') as csvfile:
writer = csv.writer(csvfile)
if fout_flag == 0:
writer.writerow(['模型'] + columns)
for row in rows:
writer.writerow([row] +
[scores[row][column] for column in columns])
def get_capability_results(judged_answers,
references,
fout,
fout_flag,
model,
categories=CATEGORIES):
capability_ratings = defaultdict(int)
capability_counts = defaultdict(int)
for ans, ref in zip(judged_answers, references):
capability_ratings[ref['capability']] += ans['score']
capability_counts[ref['capability']] += 1
capability_avg_ratings = defaultdict(float)
for capability, total_score in capability_ratings.items():
capability_avg_ratings[
capability] = total_score / capability_counts[capability]
temp_list = []
total_column_num = 2
for category, sub_categories in categories.items():
total_column_num += 1 + len(sub_categories)
capability_avg_ratings[category + '总分'] = np.mean([
np.mean(capability_avg_ratings[cat])
for cat in categories[category]
])
temp_list.append(category + '总分')
capability_avg_ratings['总分'] = 0
for temp in temp_list:
capability_avg_ratings['总分'] += capability_avg_ratings[temp]
capability_avg_ratings['总分'] /= len(temp_list)
scores = {model: capability_avg_ratings}
with open(fout, 'a+', newline='') as csvfile:
writer = csv.writer(csvfile)
if fout_flag == 0:
num_header = [str(i) for i in range(total_column_num)]
writer.writerow(num_header)
header = ['模型', '总分']
for category, sub_categories in categories.items():
header.append(category)
header.extend([None for _ in range(len(sub_categories))])
writer.writerow(header)
sub_header = ['模型', '总分']
for category, sub_categories in categories.items():
sub_header.extend([category + '总分'])
sub_header.extend(sub_categories)
writer.writerow(sub_header)
row = [model]
row.append(scores[model]['总分'])
for category, sub_categories in categories.items():
row.append(scores[model][category + '总分'])
for sub_category in sub_categories:
row.append(scores[model][sub_category])
writer.writerow(row)
class AlignmentBenchSummarizer:
"""Do the subjectivity analyze based on evaluation results.
Args:
config (ConfigDict): The configuration object of the evaluation task.
It's expected to be filled out at runtime.
"""
def __init__(self, config: ConfigDict, judge_type='general') -> None:
self.tasks = []
self.cfg = config
self.eval_model_cfgs = self.cfg['eval']['partitioner']['models']
self.eval_model_abbrs = [
model_abbr_from_cfg(model) for model in self.eval_model_cfgs
]
self.judge_abbr = model_abbr_from_cfg(self.cfg['judge_model'])
self.judge_type = judge_type
assert self.judge_type in [
'general', 'autoj', 'judgelm', 'general_plus'
]
self.judge_map = {
'general': post_process_alignbench,
'general_plus': post_process_alignbench_plus,
'autoj': post_process_autoj,
'judgelm': post_process_judgelm
}
self.judge_function = self.judge_map[self.judge_type]
self.category = CATEGORIES
def summarize(self,
time_str: str = datetime.now().strftime('%Y%m%d_%H%M%S')):
"""Summarize the subjectivity analysis based on evaluation results.
Args:
time_str (str): Timestamp for file naming.
Returns:
pd.DataFrame: The summary results.
"""
dataset_cfgs = self.cfg['datasets']
output_dir, results_folder = get_outdir(self.cfg, time_str)
fout_flag, fout_flag2 = 0, 0
for eval_model_abbr in self.eval_model_abbrs:
subdir = eval_model_abbr + '_judged-by--' + self.judge_abbr
subdir_path = os.path.join(results_folder, subdir)
if os.path.isdir(subdir_path):
model, judge_model = eval_model_abbr, self.judge_abbr
if self.judge_type == 'general':
fout = osp.join(
output_dir,
'judged-by--' + judge_model + '-dimension.csv')
fout2 = osp.join(
output_dir,
'judged-by--' + judge_model + '-capability.csv')
for dataset in dataset_cfgs:
judged_answers, references = get_judgeanswer_and_reference(
dataset, subdir_path, self.judge_function)
if self.judge_type == 'general':
get_dimension_results(judged_answers, references, fout,
fout_flag, model)
fout_flag += 1
get_capability_results(judged_answers, references, fout2,
fout_flag2, model, self.category)
fout_flag2 += 1
else:
print(subdir_path + ' is not exist! please check!')
if self.judge_type == 'general':
with open(fout, 'r') as f:
x = from_csv(f)
print(x)
with open(fout2, 'r') as f:
x = from_csv(f)
print(x)