본문 바로가기
  • 머킹이의 머신로그
AI/코드 실습하기

파인튜닝된 모델을 활용한 RAG (Retrieval-Augmented Generation) 예제

by 머킹 2024. 8. 26.
728x90

 

안녕하세요 머킹입니다.

KULLM3 + RAG 코드가 잘돼서 예제 코드를 만들었어요.

 

1. 예제 코드
import os
import torch
import pandas as pd
from transformers import AutoTokenizer, AutoModelForCausalLM
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from rank_bm25 import BM25Okapi
from nltk.tokenize import word_tokenize
import nltk

nltk.download('punkt')

# 설정
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

# 파인튜닝된 모델 경로
model_path = "path/to/your/fine-tuned-model"

# GPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 모델과 토크나이저 로드
if os.path.isdir(model_path):
    tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True, use_fast=False)
    model = AutoModelForCausalLM.from_pretrained(
        model_path, 
        device_map="auto",
        trust_remote_code=True,
        low_cpu_mem_usage=True
    )
else:
    print(f"Error: The directory {model_path} does not exist.")
    exit(1)

# SentenceTransformer 로드 (예제용 다국어 모델 사용)
embedding_model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2').to(device)

# CSV 파일 로드 및 문서 전처리
csv_filepath = "path/to/your/data.csv"
df = pd.read_csv(csv_filepath)

def preprocess_document(doc):
    chunks = [doc[i:i+100] for i in range(0, len(doc), 100)]
    return [chunk.strip() for chunk in chunks if chunk.strip()]

documents = []
for doc in df['your_text_column'].dropna():
    documents.extend(preprocess_document(doc))

# 문서 임베딩 계산
document_embeddings = embedding_model.encode(documents, convert_to_tensor=True)

# FAISS 인덱스 생성 및 임베딩 추가
index = faiss.IndexFlatL2(embedding_model.get_sentence_embedding_dimension())
index.add(document_embeddings.cpu().detach().numpy())

# BM25 인덱스 생성
tokenized_corpus = [word_tokenize(doc.lower()) for doc in documents]
bm25 = BM25Okapi(tokenized_corpus)

def hybrid_search(query, top_k=5):
    query_embedding = embedding_model.encode(query, convert_to_tensor=True).cpu().detach().numpy()
    distances, indices = index.search(np.array([query_embedding]), top_k)

    tokenized_query = word_tokenize(query.lower())
    bm25_scores = bm25.get_scores(tokenized_query)
    
    combined_scores = {}
    for i, score in enumerate(bm25_scores):
        if i in indices[0]:
            combined_scores[i] = 0.3 * score + 0.7 * (1 / (distances[0][list(indices[0]).index(i)] + 1))
        else:
            combined_scores[i] = 0.3 * score
    
    top_indices = sorted(combined_scores, key=combined_scores.get, reverse=True)[:top_k]
    return [documents[i] for i in top_indices]

# 대화 컨텍스트 유지용 변수
conversation_history = ""

# 질문에 대한 RAG 수행
def rag_query(query, top_k=5):
    global conversation_history
    
    retrieved_documents = hybrid_search(query, top_k)
    
    context = "\n\n".join([f"문서 {i+1}: {doc}" for i, doc in enumerate(retrieved_documents)])
    
    input_text = f"""대화 이력:
    {conversation_history}
    
    컨텍스트:
    {context}
    
    질문: {query}
    지침:
    1. 대화 이력과 컨텍스트에서 질문과 정확히 일치하는 정보를 찾아 답변하세요.
    2. 질문에서 요구하는 정확한 정보만 제공하세요.
    3. 정보가 부분적으로라도 있다면 제공하세요.
    4. 정보가 전혀 없는 경우에만 '제공된 정보에 답변이 없습니다'라고 말하세요.
    5. 추가 질문이나 지침을 생성하지 마세요.
    답변:"""
    
    inputs = tokenizer(input_text, return_tensors="pt").to(model.device)
    outputs = model.generate(
        **inputs,
        max_new_tokens=500,
        temperature=0.5,
        do_sample=True,
        num_beams=5,
        num_return_sequences=1,
        eos_token_id=tokenizer.eos_token_id
    )
    
    response = tokenizer.decode(outputs[0], skip_special_tokens=True)
    
    conversation_history += f"질문: {query}\n답변: {response}\n"
    
    return response.strip(), retrieved_documents

if __name__ == "__main__":
    print("RAG 파이프라인 설정 완료. 질문을 입력해주세요.")
    while True:
        query = input("\n질문을 입력하세요 (종료하려면 'q' 입력): ")
        if query.lower() == 'q':
            break
        response, sources = rag_query(query)
        print(f"\n답변: {response}")
        print(f"\n참고 문서 수: {len(sources)}")

print("프로그램을 종료합니다.")

 

 

2. 코드 설명

이 코드는 파인튜닝된 언어 모델과 검색 시스템을 결합하여 질문에 대한 답변을 생성하는 RAG 파이프라인을 구현합니다. 기본적인 흐름은 다음과 같습니다:

  1. 모델 로드: 파인튜닝된 언어 모델과 토크나이저를 로드합니다. 모델 경로가 존재하지 않으면 오류를 출력합니다.
  2. SentenceTransformer 로드: 문서 임베딩을 계산하기 위해 SentenceTransformer 모델을 로드합니다. 예제에서는 다국어 모델을 사용합니다.
  3. 문서 임베딩 및 인덱스 생성: 주어진 CSV 파일로부터 텍스트 데이터를 읽어들인 후, 문서의 임베딩을 계산하고 FAISS 인덱스를 생성합니다. 이 인덱스를 통해 나중에 유사한 문서를 빠르게 검색할 수 있습니다.
  4. BM25 인덱스 생성: BM25 알고리즘을 사용하여 각 문서의 토큰화된 버전을 인덱싱합니다. 이 인덱스는 검색 효율을 높이는 데 사용됩니다.
  5. Hybrid Search: 사용자의 질의에 대해 임베딩 기반 검색과 BM25 기반 검색을 결합하여 관련 문서를 찾습니다. 이 과정에서 두 가지 점수를 결합하여 최종 순위를 매깁니다.
  6. RAG 쿼리 처리: 사용자의 질의를 기반으로 문서를 검색한 후, 대화 이력과 검색된 문서를 기반으로 언어 모델이 답변을 생성합니다.

3. 코드 내 함수 및 프로세스, 모듈 설명

  • AutoTokenizer, AutoModelForCausalLM: Hugging Face의 트랜스포머 모델을 사용하여 토큰화 및 언어 모델을 로드합니다. from_pretrained 메서드를 사용하여 파인튜닝된 모델을 로드하고, GPU에 최적화된 설정을 사용합니다.
  • SentenceTransformer: 텍스트 임베딩을 생성하기 위한 모델입니다. 문서 단위의 임베딩을 계산하여, 검색에 사용할 수 있는 벡터화된 표현을 생성합니다.
  • FAISS: Facebook AI Research에서 개발한 라이브러리로, 빠른 유사도 검색을 위해 사용됩니다. 생성된 임베딩을 인덱싱하여, 사용자의 질의와 가장 유사한 문서를 효율적으로 검색할 수 있게 해줍니다.
  • BM25Okapi: 전통적인 정보 검색 알고리즘인 BM25를 구현한 Python 라이브러리입니다. 문서와 질의 간의 일치 정도를 점수화하여 검색 결과를 제공합니다.
  • hybrid_search: 임베딩 기반 검색과 BM25 검색을 결합한 함수로, 두 가지 검색 결과를 조합하여 최종 순위를 매깁니다. 검색의 정확성을 높이는 데 도움을 줍니다.
  • rag_query: 이 함수는 RAG 프로세스의 핵심으로, 질의에 대해 검색된 문서를 기반으로 언어 모델이 답변을 생성합니다. 대화 이력을 포함하여 질의에 더욱 맥락에 맞는 답변을 제공합니다.
  • conversation_history: 대화의 연속성을 유지하기 위해 사용되며, 이전 대화 내용이 포함된 상태에서 새로운 질의에 답변을 생성하도록 합니다.

 

 

처음에는 답변이 빠르지 않지만 로드하느라..

그래도 꽤 잘 찾아줘서 만족했습니다.

 

이제 vLLM을 붙여보고 싶은데... 정말 쉽지 않네요 ㅎㅎ;