|
import uuid
|
|
import chromadb
|
|
import pandas as pd
|
|
import os
|
|
from dotenv import load_dotenv
|
|
import json
|
|
from transformers import AutoModelForCausalLM
|
|
|
|
load_dotenv()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
csv_files = []
|
|
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
cve_csv_path = os.path.join(root_dir, 'codevulnerabilityai\\data\\cve')
|
|
|
|
csv_files.extend([os.path.join(cve_csv_path, f) for f in os.listdir(cve_csv_path) if f.endswith('.csv')])
|
|
|
|
dtype_dict = {
|
|
'Name': str,
|
|
'Status': str,
|
|
'Description': str,
|
|
'References': str,
|
|
'Phase': str,
|
|
'Votes': str,
|
|
'Comments': str
|
|
}
|
|
|
|
chroma_data_path = str(os.getenv('CHROMA_DATA_PATH'))
|
|
|
|
chroma_db_directory = str("chroma_db/")
|
|
|
|
client = chromadb.PersistentClient(path=os.path.join(chroma_data_path, chroma_db_directory))
|
|
|
|
collection = client.get_or_create_collection(name="CVE")
|
|
|
|
documents_to_add = []
|
|
ids_to_add = []
|
|
metadata_to_add = []
|
|
documents_to_add_string = []
|
|
|
|
batch_size = 10
|
|
current_batch = 0
|
|
|
|
if csv_files:
|
|
for csv_file in csv_files:
|
|
print(f"Processing {csv_file}...")
|
|
df = pd.read_csv(csv_file, on_bad_lines='skip', dtype=dtype_dict)
|
|
|
|
documents = df['Description'].fillna('').astype(str).tolist()
|
|
|
|
if not df.empty and 'Description' in df.columns:
|
|
for index, row in df.iterrows():
|
|
metadata_parts = row['Name'].split(';')
|
|
metadata = {
|
|
"Name": str(metadata_parts[0].strip()),
|
|
"Status": str(metadata_parts[1].strip()) if len(metadata_parts) > 1 else "",
|
|
"Description": str(metadata_parts[2].strip()) if len(metadata_parts) > 2 else "",
|
|
"References": str(metadata_parts[3].strip()) if len(metadata_parts) > 3 else "",
|
|
"Phase": str(metadata_parts[4].strip()) if len(metadata_parts) > 4 else "",
|
|
"Votes": str(metadata_parts[5].strip()) if len(metadata_parts) > 5 else "",
|
|
}
|
|
document_id = str(uuid.uuid4())
|
|
|
|
document_content = metadata["Description"]
|
|
|
|
document = {'id': document_id, 'content': document_content}
|
|
|
|
documents_to_add.append(document)
|
|
documents_to_add_string.append(json.dumps(documents_to_add))
|
|
ids_to_add.append(document_id)
|
|
metadata_to_add.append(metadata)
|
|
|
|
current_batch += 1
|
|
if current_batch % batch_size == 0:
|
|
print(f"Batch {current_batch // batch_size} added to the collection.")
|
|
collection.add(documents=documents_to_add_string, ids=ids_to_add, metadatas=metadata_to_add)
|
|
documents_to_add = []
|
|
ids_to_add = []
|
|
metadata_to_add = []
|
|
documents_to_add_string = []
|
|
print(f"Batch {current_batch // batch_size} completed.")
|
|
|
|
else:
|
|
print(f"Skipping file {csv_file} due to empty DataFrame or missing 'Description' column")
|
|
else:
|
|
print("No CSV files found in the directory. Skipping processing.")
|
|
|
|
|
|
if documents_to_add:
|
|
print(f"Adding remaining {len(documents_to_add)} documents to the collection.")
|
|
collection.add(documents=documents_to_add_string, ids=ids_to_add, metadatas=metadata_to_add)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|