|
import uuid |
|
import chromadb |
|
import pandas as pd |
|
import os |
|
from dotenv import load_dotenv |
|
import json |
|
from transformers import AutoModelForCausalLM |
|
|
|
from pinecone.grpc import PineconeGRPC as Pinecone |
|
from pinecone import ServerlessSpec |
|
|
|
|
|
load_dotenv() |
|
|
|
ollama_ef = AutoModelForCausalLM.from_pretrained("nomic-embed-text-v1.5.Q5_K_S.gguf", |
|
model_type='llama', |
|
max_new_tokens = 10960, |
|
threads = 3, |
|
) |
|
|
|
csv_files = [] |
|
root_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
cve_csv_path = os.path.join(root_dir, 'data\\cve') |
|
|
|
csv_files.extend([os.path.join(cve_csv_path, f) for f in os.listdir(cve_csv_path) if f.endswith('.csv')]) |
|
|
|
dtype_dict = { |
|
'Name': str, |
|
'Status': str, |
|
'Description': str, |
|
'References': str, |
|
'Phase': str, |
|
'Votes': str, |
|
'Comments': str |
|
} |
|
|
|
|
|
pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY")) |
|
|
|
chroma_data_path = str(os.getenv('CHROMA_DATA_PATH')) |
|
|
|
chroma_db_directory = str("chroma_db/") |
|
|
|
client = chromadb.PersistentClient(path=os.path.join(chroma_data_path, chroma_db_directory)) |
|
|
|
collection = client.get_or_create_collection(name="CVE", embedding_function=ollama_ef) |
|
|
|
index_name = "code-vulnerability-ai" |
|
|
|
documents_to_add = [] |
|
ids_to_add = [] |
|
metadata_to_add = [] |
|
documents_to_add_string = [] |
|
|
|
batch_size = 10 |
|
current_batch = 0 |
|
|
|
if csv_files: |
|
for csv_file in csv_files: |
|
print(f"Processing {csv_file}...") |
|
df = pd.read_csv(csv_file, on_bad_lines='skip', dtype=dtype_dict) |
|
|
|
documents = df['Description'].fillna('').astype(str).tolist() |
|
|
|
if not df.empty and 'Description' in df.columns: |
|
for index, row in df.iterrows(): |
|
metadata_parts = row['Name'].split(';') |
|
metadata = { |
|
"Name": str(metadata_parts[0].strip()), |
|
"Status": str(metadata_parts[1].strip()) if len(metadata_parts) > 1 else "", |
|
"Description": str(metadata_parts[2].strip()) if len(metadata_parts) > 2 else "", |
|
"References": str(metadata_parts[3].strip()) if len(metadata_parts) > 3 else "", |
|
"Phase": str(metadata_parts[4].strip()) if len(metadata_parts) > 4 else "", |
|
"Votes": str(metadata_parts[5].strip()) if len(metadata_parts) > 5 else "", |
|
} |
|
document_id = str(uuid.uuid4()) |
|
|
|
document_content = metadata["Description"] |
|
|
|
document = {'id': document_id, 'content': document_content} |
|
|
|
documents_to_add.append(document) |
|
documents_to_add_string.append(json.dumps(documents_to_add)) |
|
ids_to_add.append(document_id) |
|
metadata_to_add.append(metadata) |
|
|
|
current_batch += 1 |
|
if current_batch % batch_size == 0: |
|
print(f"Batch {current_batch // batch_size} added to the collection.") |
|
collection.add(documents=documents_to_add_string, ids=ids_to_add, metadatas=metadata_to_add) |
|
documents_to_add = [] |
|
ids_to_add = [] |
|
metadata_to_add = [] |
|
documents_to_add_string = [] |
|
print(f"Batch {current_batch // batch_size} completed.") |
|
|
|
else: |
|
print(f"Skipping file {csv_file} due to empty DataFrame or missing 'Description' column") |
|
else: |
|
print("No CSV files found in the directory. Skipping processing.") |
|
|
|
|
|
if documents_to_add: |
|
print(f"Adding remaining {len(documents_to_add)} documents to the collection.") |
|
collection.add(documents=documents_to_add_string, ids=ids_to_add, metadatas=metadata_to_add) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|