|
|
|
import csv |
|
import os |
|
import os.path as osp |
|
import re |
|
from collections import defaultdict |
|
from datetime import datetime |
|
|
|
import numpy as np |
|
from mmengine import ConfigDict |
|
|
|
try: |
|
from prettytable import from_csv |
|
except ImportError: |
|
from_csv = None |
|
|
|
from opencompass.utils import model_abbr_from_cfg |
|
|
|
from .subjective_post_process import post_process_autoj, post_process_judgelm |
|
from .utils import get_judgeanswer_and_reference, get_outdir |
|
|
|
CATEGORIES = { |
|
'中文推理': ['数学计算', '逻辑推理'], |
|
'中文语言': ['基本任务', '中文理解', '综合问答', '文本写作', '角色扮演', '专业能力'], |
|
} |
|
|
|
All_Dimensions = [ |
|
'事实正确性', '满足用户需求', '安全无害', '清晰度', '逻辑性', '完备性', '创造性', '可负责程度', '逻辑连贯性', |
|
'公平与可负责程度', '丰富度', '综合得分' |
|
] |
|
|
|
MAPPING = { |
|
'事实与解释型回答': ['事实正确性', '满足用户需求', '清晰度', '完备性'], |
|
'逻辑推理型回答': ['事实正确性', '满足用户需求', '逻辑连贯性', '完备性'], |
|
'生成型回答': ['事实正确性', '满足用户需求', '逻辑连贯性', '创造性', '丰富度'], |
|
'建议型回答': ['事实正确性', '满足用户需求', '公平与可负责程度', '创造性'] |
|
} |
|
|
|
|
|
def detect_mapping(text): |
|
if '清晰度' in text and '完备性' in text: |
|
return '事实与解释型回答' |
|
elif '完备性' in text and '逻辑连贯性' in text: |
|
return '逻辑推理型回答' |
|
elif '创造性' in text and '丰富度' in text: |
|
return '生成型回答' |
|
elif '创造性' in text and '公平与可负责程度' in text: |
|
return '建议型回答' |
|
else: |
|
return None |
|
|
|
|
|
def extract_missing_rating(text, search_type): |
|
searching_keys = MAPPING[search_type] |
|
result_dict = {} |
|
for k in searching_keys: |
|
matches = re.findall(rf'{k}.*?\n', text) |
|
result_dict[k] = None |
|
for match in reversed(matches): |
|
if re.findall(r'\d{1,2}', match): |
|
result_dict[k] = int(re.findall(r'\d{1,2}', match)[-1]) |
|
break |
|
overall_number = re.findall('\d{1,2}', text) |
|
try: |
|
result_dict['综合得分'] = int(overall_number[-1]) |
|
except: |
|
return {} |
|
return result_dict |
|
|
|
|
|
def extract_rating_plus(text): |
|
pattern = r'{(.*?)}(?![^{]*{)' |
|
match = re.search(pattern, text) |
|
|
|
if match: |
|
dictionary_str = match.group(1) |
|
kv_pattern = r"'(.*?)': (\d+)" |
|
matches = re.findall(kv_pattern, dictionary_str) |
|
result_dict = {key: int(value) for key, value in matches} |
|
return result_dict |
|
else: |
|
match_type = detect_mapping(text=text) |
|
if match_type is not None: |
|
return extract_missing_rating(text=text, search_type=match_type) |
|
else: |
|
return None |
|
|
|
|
|
def extract_rating(text): |
|
pattern = r'{(.*?)}(?![^{]*{)' |
|
match = re.search(pattern, text) |
|
|
|
if match: |
|
dictionary_str = match.group(1) |
|
kv_pattern = r"'(.*?)': (\d+)" |
|
matches = re.findall(kv_pattern, dictionary_str) |
|
result_dict = {key: int(value) for key, value in matches} |
|
return result_dict |
|
else: |
|
return None |
|
|
|
|
|
def check_rating(rating, all_dimensions): |
|
for k, v in rating.items(): |
|
if isinstance(v, (int, float)) and k in all_dimensions: |
|
if v >= 0 and v <= 10: |
|
pass |
|
else: |
|
return None |
|
else: |
|
return None |
|
return rating |
|
|
|
|
|
def post_process_alignbench_plus(judgement: str, |
|
all_dimensions=All_Dimensions, |
|
possible_keys=['综合得分']): |
|
"""Input a string like below: |
|
|
|
xxx{'事实正确性': 1, '满足用户需求': 1, '清晰度': 2, '完备性': 1, '综合得分': 1}xxx, |
|
and extract each score |
|
""" |
|
|
|
def extract_score(text): |
|
keys_pattern = '|'.join(map(re.escape, possible_keys)) |
|
pattern = rf"({'|'.join(possible_keys)}): (\d+(\.\d{{1,2}})?)" |
|
match = re.search(pattern, text) |
|
if match: |
|
try: |
|
return float(match.group(1)) |
|
except ValueError: |
|
return -1 |
|
return -1 |
|
|
|
|
|
rating = extract_rating_plus(judgement) |
|
|
|
if rating is not None: |
|
score = -1 |
|
for key in possible_keys: |
|
score = rating.get(key, -1) |
|
if score != -1: |
|
break |
|
if score == -1: |
|
score = extract_score(judgement) |
|
if score >= 0 and score <= 10: |
|
pass |
|
else: |
|
score = -1 |
|
rating = check_rating(rating, all_dimensions) |
|
else: |
|
score = -1 |
|
if rating == None or score == -1: |
|
return None |
|
else: |
|
return {'rating': rating, 'score': score} |
|
|
|
|
|
def post_process_alignbench(judgement: str, |
|
all_dimensions=All_Dimensions, |
|
possible_keys=['综合得分']): |
|
"""Input a string like below: |
|
|
|
xxx{'事实正确性': 1, '满足用户需求': 1, '清晰度': 2, '完备性': 1, '综合得分': 1}xxx, |
|
and extract each score |
|
""" |
|
|
|
def extract_score(text): |
|
keys_pattern = '|'.join(map(re.escape, possible_keys)) |
|
pattern = rf"({'|'.join(possible_keys)}): (\d+(\.\d{{1,2}})?)" |
|
match = re.search(pattern, text) |
|
if match: |
|
try: |
|
return float(match.group(1)) |
|
except ValueError: |
|
return -1 |
|
return -1 |
|
|
|
judgement = judgement.replace('\n', '') |
|
rating = extract_rating(judgement) |
|
|
|
if rating is not None: |
|
score = -1 |
|
for key in possible_keys: |
|
score = rating.get(key, -1) |
|
if score != -1: |
|
break |
|
if score == -1: |
|
score = extract_score(judgement) |
|
if score >= 0 and score <= 10: |
|
pass |
|
else: |
|
score = -1 |
|
rating = check_rating(rating, all_dimensions) |
|
else: |
|
score = -1 |
|
if rating == None or score == -1: |
|
return None |
|
else: |
|
return {'rating': rating, 'score': score} |
|
|
|
|
|
def get_dimension_results(judged_answers, references, fout, fout_flag, model): |
|
dimension_ratings = defaultdict(int) |
|
dimension_counts = defaultdict(int) |
|
for ans, ref in zip(judged_answers, references): |
|
for k, v in ans['rating'].items(): |
|
if k != '综合得分' or k != 'Overall Score': |
|
dimension_ratings[k] += v |
|
dimension_counts[k] += 1 |
|
else: |
|
if k == '综合得分': |
|
dimension_ratings['综合得分'] += ans['score'] |
|
dimension_counts['综合得分'] += 1 |
|
else: |
|
dimension_ratings['Overall Score'] += ans['score'] |
|
dimension_counts['Overall Score'] += 1 |
|
|
|
dimension_avg_ratings = defaultdict(float) |
|
for dimension, total_score in dimension_ratings.items(): |
|
dimension_avg_ratings[ |
|
dimension] = total_score / dimension_counts[dimension] |
|
|
|
scores = {model: dimension_avg_ratings} |
|
rows = list(scores.keys()) |
|
columns = list(scores[rows[0]].keys()) |
|
with open(fout, 'a+', newline='') as csvfile: |
|
writer = csv.writer(csvfile) |
|
if fout_flag == 0: |
|
writer.writerow(['模型'] + columns) |
|
|
|
for row in rows: |
|
writer.writerow([row] + |
|
[scores[row][column] for column in columns]) |
|
|
|
|
|
def get_capability_results(judged_answers, |
|
references, |
|
fout, |
|
fout_flag, |
|
model, |
|
categories=CATEGORIES): |
|
capability_ratings = defaultdict(int) |
|
capability_counts = defaultdict(int) |
|
for ans, ref in zip(judged_answers, references): |
|
capability_ratings[ref['capability']] += ans['score'] |
|
capability_counts[ref['capability']] += 1 |
|
|
|
capability_avg_ratings = defaultdict(float) |
|
|
|
for capability, total_score in capability_ratings.items(): |
|
capability_avg_ratings[ |
|
capability] = total_score / capability_counts[capability] |
|
|
|
temp_list = [] |
|
total_column_num = 2 |
|
for category, sub_categories in categories.items(): |
|
total_column_num += 1 + len(sub_categories) |
|
capability_avg_ratings[category + '总分'] = np.mean([ |
|
np.mean(capability_avg_ratings[cat]) |
|
for cat in categories[category] |
|
]) |
|
temp_list.append(category + '总分') |
|
capability_avg_ratings['总分'] = 0 |
|
for temp in temp_list: |
|
capability_avg_ratings['总分'] += capability_avg_ratings[temp] |
|
capability_avg_ratings['总分'] /= len(temp_list) |
|
scores = {model: capability_avg_ratings} |
|
|
|
with open(fout, 'a+', newline='') as csvfile: |
|
writer = csv.writer(csvfile) |
|
if fout_flag == 0: |
|
num_header = [str(i) for i in range(total_column_num)] |
|
writer.writerow(num_header) |
|
|
|
header = ['模型', '总分'] |
|
for category, sub_categories in categories.items(): |
|
header.append(category) |
|
header.extend([None for _ in range(len(sub_categories))]) |
|
writer.writerow(header) |
|
|
|
sub_header = ['模型', '总分'] |
|
for category, sub_categories in categories.items(): |
|
sub_header.extend([category + '总分']) |
|
sub_header.extend(sub_categories) |
|
writer.writerow(sub_header) |
|
|
|
row = [model] |
|
row.append(scores[model]['总分']) |
|
for category, sub_categories in categories.items(): |
|
row.append(scores[model][category + '总分']) |
|
for sub_category in sub_categories: |
|
row.append(scores[model][sub_category]) |
|
writer.writerow(row) |
|
|
|
|
|
class AlignmentBenchSummarizer: |
|
"""Do the subjectivity analyze based on evaluation results. |
|
|
|
Args: |
|
config (ConfigDict): The configuration object of the evaluation task. |
|
It's expected to be filled out at runtime. |
|
""" |
|
|
|
def __init__(self, config: ConfigDict, judge_type='general') -> None: |
|
self.tasks = [] |
|
self.cfg = config |
|
self.eval_model_cfgs = self.cfg['eval']['partitioner']['models'] |
|
self.eval_model_abbrs = [ |
|
model_abbr_from_cfg(model) for model in self.eval_model_cfgs |
|
] |
|
self.judge_abbr = model_abbr_from_cfg(self.cfg['judge_model']) |
|
self.judge_type = judge_type |
|
assert self.judge_type in [ |
|
'general', 'autoj', 'judgelm', 'general_plus' |
|
] |
|
self.judge_map = { |
|
'general': post_process_alignbench, |
|
'general_plus': post_process_alignbench_plus, |
|
'autoj': post_process_autoj, |
|
'judgelm': post_process_judgelm |
|
} |
|
self.judge_function = self.judge_map[self.judge_type] |
|
self.category = CATEGORIES |
|
|
|
def summarize(self, |
|
time_str: str = datetime.now().strftime('%Y%m%d_%H%M%S')): |
|
"""Summarize the subjectivity analysis based on evaluation results. |
|
|
|
Args: |
|
time_str (str): Timestamp for file naming. |
|
|
|
Returns: |
|
pd.DataFrame: The summary results. |
|
""" |
|
dataset_cfgs = self.cfg['datasets'] |
|
output_dir, results_folder = get_outdir(self.cfg, time_str) |
|
fout_flag, fout_flag2 = 0, 0 |
|
for eval_model_abbr in self.eval_model_abbrs: |
|
subdir = eval_model_abbr + '_judged-by--' + self.judge_abbr |
|
subdir_path = os.path.join(results_folder, subdir) |
|
if os.path.isdir(subdir_path): |
|
model, judge_model = eval_model_abbr, self.judge_abbr |
|
if self.judge_type == 'general': |
|
fout = osp.join( |
|
output_dir, |
|
'judged-by--' + judge_model + '-dimension.csv') |
|
fout2 = osp.join( |
|
output_dir, |
|
'judged-by--' + judge_model + '-capability.csv') |
|
for dataset in dataset_cfgs: |
|
judged_answers, references = get_judgeanswer_and_reference( |
|
dataset, subdir_path, self.judge_function) |
|
if self.judge_type == 'general': |
|
get_dimension_results(judged_answers, references, fout, |
|
fout_flag, model) |
|
fout_flag += 1 |
|
get_capability_results(judged_answers, references, fout2, |
|
fout_flag2, model, self.category) |
|
fout_flag2 += 1 |
|
else: |
|
print(subdir_path + ' is not exist! please check!') |
|
if self.judge_type == 'general': |
|
with open(fout, 'r') as f: |
|
x = from_csv(f) |
|
print(x) |
|
with open(fout2, 'r') as f: |
|
x = from_csv(f) |
|
print(x) |
|
|