Madten === ```python= # cdc_rag_app_FAR.py import os import streamlit as st # from dotenv import load_dotenv from PyPDF2 import PdfReader from langchain.text_splitter import CharacterTextSplitter from langchain_community.vectorstores import FAISS from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationalRetrievalChain from langchain_community.llms import HuggingFacePipeline import transformers from torch import cuda, bfloat16 from langchain_community.embeddings import HuggingFaceEmbeddings from transformers import StoppingCriteria, StoppingCriteriaList import torch from streamlit_chat import message from langchain_core.prompts import PromptTemplate from sentence_transformers import SentenceTransformer from langchain_experimental.text_splitter import SemanticChunker # from langchain.chains import RetrievalQA from langchain_community.llms import GPT4All import shutil from datetime import datetime import time from langchain_community.chat_models import ChatOpenAI from langchain_anthropic import ChatAnthropic from PIL import Image import naive_definition_chunking import warnings warnings.filterwarnings("ignore") # os.environ['CUDA_VISIBLE_DEVICES'] = '-1' torch.cuda.empty_cache() st.set_page_config(page_title="CDC Copilot Beta", page_icon= ":hospital:", layout="wide") config_control_ls = [] ##### change the avatar to chnage the avatar pic of user and bot user_input_avatar = "shapes" bot_avatar = "fun-emoji" # bot_avatar = Image.open("bot-icon.png") # user_input_avatar = "fun-emoji" # bot_avatar = "shapes" user_input_flag = False bot_response_flag= True hf_auth = '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' # os.environ["HUGGINGFACEHUB_API_TOKEN"] = hf_auth os.environ["HF_HOME"] = hf_auth claud3_auth = "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" os.environ["ANTHROPIC_API_KEY"] = claud3_auth device = f'cuda:{cuda.current_device()}' if cuda.is_available() else 'cpu' print("Available device:",device) #### max sequence_length 256 embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2" #### max sequence_length 512 # embedding_model_name = "thenlper/gte-small" chunk_size = SentenceTransformer(embedding_model_name).max_seq_length chunk_overlap = int(chunk_size/10) ## code to find the max sequence length print(f"Model's maximum sequence length: {chunk_size}") # embedding_model_name = "thenlper/gte-large" embeddings = HuggingFaceEmbeddings( model_name=embedding_model_name, model_kwargs={'device': device} ) def create_folder(folder_path): if not os.path.exists(folder_path): # Create the folder os.makedirs(folder_path) root_db_path = "Faiss_DB" create_folder(root_db_path) def get_model(model_id, temperature): try: # set quantization configuration to load large model with less GPU memory # this requires the `bitsandbytes` library bnb_config = transformers.BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type='nf4', bnb_4bit_use_double_quant=True, bnb_4bit_compute_dtype=bfloat16 ) # begin initializing HF items, you need an access token model_config = transformers.AutoConfig.from_pretrained( model_id, use_auth_token = hf_auth, device_map = device ) model = transformers.AutoModelForCausalLM.from_pretrained( model_id, trust_remote_code=True, config=model_config, quantization_config=bnb_config, device_map=device, use_auth_token = hf_auth ) # enable evaluation mode to allow model inference model.eval() torch.cuda.empty_cache() tokenizer = transformers.AutoTokenizer.from_pretrained( model_id, use_auth_token = hf_auth ) # define custom stopping criteria object stop_list = ['\nHuman:', '\n```\n'] stop_token_ids = [tokenizer(x)['input_ids'] for x in stop_list] stop_token_ids = [torch.LongTensor(x).to(device) for x in stop_token_ids] class StopOnTokens(StoppingCriteria): def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool: for stop_ids in stop_token_ids: if torch.eq(input_ids[0][-len(stop_ids):], stop_ids).all(): return True return False stopping_criteria = StoppingCriteriaList([StopOnTokens()]) terminators = [ tokenizer.eos_token_id, tokenizer.convert_tokens_to_ids("<|eot_id|>") ] generate_text = transformers.pipeline( model=model, tokenizer=tokenizer, return_full_text=False, # langchain expects the full text task='text-generation', # we pass model parameters here too stopping_criteria=stopping_criteria, # without this model rambles during chat temperature=temperature, # 'randomness' of outputs, 0.0 is the min and 1.0 the max max_new_tokens=512, # max number of tokens to generate in the output repetition_penalty=1.1, # without this output begins repeating top_k = 20, top_p =0.92, batch_size=1, pad_token_id=tokenizer.eos_token_id, eos_token_id=terminators, torch_dtype=torch.float16, max_length=1024, # do_sample=True, ) llm = HuggingFacePipeline(pipeline=generate_text) return(llm) except Exception as e: # message(str(e), is_user=True, avatar_style=bot_avatar) e = "LLM Loading Error: " + str(e) st.session_state["messages"].append([str(e), bot_response_flag, bot_avatar]) def get_pdf_text(pdf_docs): text = "" for pdf in pdf_docs: pdf_reader = PdfReader(pdf) for page in pdf_reader.pages: text += page.extract_text() return text def get_vectorstore(pdf_docs, docs): today_date = datetime.today().strftime('%Y-%m-%d') today_folder = os.path.join(root_db_path, today_date) create_folder(today_folder) doc_names = '_'.join([str(i.name) for i in pdf_docs]) if "FAR" in doc_names: vectorstore_name = os.path.join(root_db_path, doc_names) else: vectorstore_name = os.path.join(today_folder, doc_names) for folder_name in os.listdir(root_db_path): if folder_name != today_date and "FAR" not in folder_name: shutil.rmtree(os.path.join(root_db_path,folder_name)) if os.path.exists(vectorstore_name): vectorstore = FAISS.load_local(vectorstore_name, embeddings, allow_dangerous_deserialization=True) print(f"Loading {doc_names} database from local") else: vectorstore = FAISS.from_documents(documents=docs, embedding=embeddings) vectorstore.save_local(vectorstore_name) print(f"Generating local database: {doc_names}") return vectorstore def get_conversation_chain(vectorstore, model_id, temperature): template = """ Use the following pieces of information and provide a brief and to-the-point answer the user's question. If you don't know the answer, just say that you don't know, don't try to make up an answer. Context: {context} Question: {question} Only return the helpful answer below and nothing else. Helpful answer: """ prompt = PromptTemplate(template=template, input_variables=['context', 'question']) model_path_dict = {"GPT4ALL: Mistral": "C:/Users/amrpatra/Downloads/gpt4all/mistral-7b-openorca.gguf2.Q4_0.gguf", "GPT4ALL: Falcon": "C:/Users/amrpatra/Downloads/gpt4all/gpt4all-falcon-newbpe-q4_0.gguf", "GPT4ALL: Gemma": "C:/Users/amrpatra/Downloads/gpt4all/gemma-7b-it-q4_0.gguf", "GPT4ALL: LLAMA2": "C:/Users/amrpatra/Downloads/gpt4all/llama-2-7b-chat.Q4_0.gguf"} # if model_id == "google/flan-t5-xxl": if model_id in ["google/flan-t5-base", 'google/flan-t5-large']: generate_text = transformers.pipeline(model=model_id, max_length = 500) llm = HuggingFacePipeline(pipeline=generate_text) elif model_id.lower().startswith("gpt4all"): model_path = model_path_dict[model_id] llm = GPT4All( model=model_path, verbose=True, max_tokens=1000) elif model_id == 'GPT-3.5': llm = ChatOpenAI( model_name = "gpt-3.5-turbo", streaming = True, verbose = True, temperature = 0 ) elif model_id == 'GPT-4': llm = ChatOpenAI( model_name = "gpt-4", streaming = True, verbose = True, temperature = 0 ) elif model_id == "Claude-3": llm = ChatAnthropic(temperature=0, model_name="claude-3-opus-20240229") else: llm = get_model(model_id, temperature) if not llm: # message("Failed to Load the Model.", is_user=True, avatar_style=bot_avatar) st.session_state["messages"].append(["Failed to Load the Model.", bot_response_flag, bot_avatar]) # llm = HuggingFaceHub(repo_id=model_id, model_kwargs = {"temperature" : temperature}) else: memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={'k': 5}) conversation_chain = ConversationalRetrievalChain.from_llm( llm = llm, retriever = retriever, memory=memory, combine_docs_chain_kwargs={"prompt": prompt}, max_tokens_limit=4000, verbose = True ) return conversation_chain def handle_userinput(user_question): global user_input_avatar, bot_avatar try: start_time = time.time() with torch.no_grad(): user_question = user_question.strip() response = st.session_state.conversation({'question': user_question}) # print(response) st.session_state.chat_history = response['chat_history'] torch.cuda.empty_cache() end_start = time.time() print(f"Response generated in {(end_start- start_time)/60} minutes") st.session_state["messages"].append([user_question, user_input_flag, user_input_avatar]) st.session_state["messages"].append([response["answer"], bot_response_flag, bot_avatar]) except Exception as e: error_msg = str(e) # error_msg = f"Error: Document Not Uploaded/Invalid Document Format\n\nOpps! It appears that you haven't uploaded any document or the uploaded document is not in a valid format." # message(error_msg, is_user=True, avatar_style=bot_avatar) st.session_state["messages"].append([error_msg, bot_response_flag, bot_avatar]) def display_message(pdf_docs, model_name, database_config): global user_input_avatar, bot_avatar, config_control_ls # if st.session_state["messages"]: # with st.container(border=True, height= 300): try: if database_config == "Complete CDC Database": if os.path.exists(os.path.join(root_db_path, "FAR.pdf")): config_info_msg = f"Hi,\nCDC Database Available.\nModel selected: {model_name}" else: config_info_msg = f"Hi,\nCDC database not found.\nPlease upload the complete CDC document.\nModel selected: {model_name}" if database_config == "Upload CDC Document": if pdf_docs: doc_names = '\n'.join([str(i.name) for i in pdf_docs]) config_info_msg = f"Hi,\nFile successfully uploaded:\n{doc_names}\n\nModel selected: {model_name}" else: config_info_msg = None if config_info_msg: if len(st.session_state["config_msg_ls"]) == 1: st.session_state["messages"].append([config_info_msg, bot_response_flag, bot_avatar]) else: if st.session_state["config_msg_ls"]: # print(st.session_state["config_msg_ls"][-1]) # print(config_info_msg) if str(st.session_state["config_msg_ls"][-1]) != str(config_info_msg): st.session_state["messages"].append([config_info_msg, bot_response_flag, bot_avatar]) st.session_state["config_msg_ls"].append(config_info_msg) except Exception as e: config_info_msg = "Congig Msg Error: " + str(e) st.session_state["messages"].append([config_info_msg, bot_response_flag, bot_avatar]) # message(config_info_msg, is_user=True, avatar_style=bot_avatar) for i, msg_list in enumerate(st.session_state["messages"]): msg = msg_list[0] flag = msg_list[1] avatar_style = msg_list[2] message(msg, is_user=flag, key=str(i), avatar_style=avatar_style) # if i % 2 == 0: # message(msg, is_user=flag, key = str(i), avatar_style= user_input_avatar) # else: # message(msg, is_user=flag, key=str(i), avatar_style=bot_avatar) def main(): global user_input_avatar, bot_avatar if len(st.session_state) == 0: st.session_state["messages"] = [] st.session_state["config_msg_ls"] = [] st.session_state["user_input"] = None if "conversation" not in st.session_state: st.session_state.conversation = None if "chat_history" not in st.session_state: st.session_state.chat_history = None st.header("CDC Copilot Beta") if st.session_state["user_input"]: handle_userinput(st.session_state["user_input"]) model_ls = [ "google/flan-t5-base", # 'google/flan-t5-large', # "meta-llama/Llama-2-7b-chat-hf", # "meta-llama/Meta-Llama-3-8B", # "meta-llama/Meta-Llama-3-8B-Instruct", # 'mistralai/Mistral-7B-Instruct-v0.2', # 'databricks/dolly-v2-3b', # "GPT4ALL: Mistral", # "GPT4ALL: Falcon", # "GPT4ALL: Gemma", # "GPT4ALL: LLAMA2", "GPT-3.5", "GPT-4", "Claude-3", ] with st.sidebar: st.subheader("Your documents") database_config_option = ["Complete CDC Database", "Upload CDC Document"] database_config = st.selectbox("Select Database Configuration:", database_config_option) if database_config == "Upload CDC Document": pdf_docs = st.file_uploader("Upload your PDFs here and select", accept_multiple_files=True) else: pdf_docs = None process_button = st.button("Process Document :hourglass:", type= "primary", key = "process_buttton") model_name = st.selectbox("Select HuggingFace Model ID:", model_ls) model_name = model_name.strip() if model_name in ["GPT-3.5", "GPT-4"]: openai_api_key = st.text_input( label = 'Please enter your OpenAI API key', value = "", placeholder = "sk-", type = "password" ) os.environ["OPENAI_API_KEY"] = openai_api_key temperature = st.number_input("Select Randomness Temperature:", min_value=0.1, max_value=1.0, value="min", step=0.1, ) if process_button: torch.cuda.empty_cache() # if not os.path.exists(os.path.join(root_db_path,"FAR.pdf")): # st.session_state["messages"].append(["Hi,\nCDC database not found.\nPlease upload the complete CDC document.", bot_response_flag, bot_avatar]) # elif not pdf_docs: # # message("Hi,\nPlease upload the document you would like to analyze using the chat interface." , is_user=True, avatar_style=bot_avatar) # st.session_state["messages"].append(["Hi,\nPlease upload the document you would like to analyze using the chat interface.", bot_response_flag, bot_avatar]) # if os.path.exists(os.path.join(root_db_path, "FAR.pdf")) or pdf_docs: with st.spinner("Processing...."): if database_config == "Complete CDC Database": if os.path.exists(os.path.join(root_db_path,"FAR.pdf")): vectorstore = FAISS.load_local(os.path.join(root_db_path,"FAR.pdf"), embeddings, allow_dangerous_deserialization=True) st.session_state.conversation = get_conversation_chain(vectorstore, model_name, temperature) print("Complete FAR retreiver slected!!") else: st.session_state["messages"].append(["Hi,\nCDC database not found.\nPlease upload the complete CDC document.", bot_response_flag, bot_avatar]) if database_config == "Upload CDC Document": if pdf_docs: doc_names = ''.join([str(i.name) for i in pdf_docs]) if "FAR.pdf" in doc_names and os.path.exists(os.path.join(root_db_path,"FAR.pdf")): vectorstore = FAISS.load_local(os.path.join(root_db_path,"FAR.pdf"), embeddings, allow_dangerous_deserialization=True) st.session_state.conversation = get_conversation_chain(vectorstore, model_name, temperature) print("Complete CDC Database Already Present!!") else: # get pdf text raw_text = get_pdf_text(pdf_docs) docs = [ doc for doc_path in pdf_docs for doc in naive_definition_chunking.parse_and_chunk(doc_path, method="pdfbox") ] vectorstore = get_vectorstore(pdf_docs, docs) st.session_state.conversation = get_conversation_chain(vectorstore, model_name, temperature) print("Retreiver generated from uploaded pdfs!!") else: st.session_state["messages"].append(["Hi,\nPlease upload the document you would like to analyze using the chat interface.", bot_response_flag, bot_avatar]) # create conversation chain # else: # st.session_state["messages"].append(["Hi,\nPlease upload the document you would like to analyze using the chat interface.", bot_response_flag, bot_avatar]) display_message(pdf_docs, model_name, database_config) st.session_state["user_input"] = None user_input = st.text_input(label = "Ask a question", placeholder = "Ask a question:", value = None, key="user_input", label_visibility="collapsed",) torch.cuda.empty_cache() if __name__ == '__main__': main() ``` ```python # naive_definition_chunking.py from functools import singledispatch import io import pathlib import subprocess import re import tempfile from typing import Iterable, Optional, Tuple, Union, Any from langchain_core.documents import Document def extract_pypdf(doc_path: str) -> Iterable[Tuple[str, int]]: pdf_reader = PdfReader(doc_path) return [ (line, pdf_reader.get_page_number(page)) for page in pdf_reader.pages for line in page.extract_text().splitlines() ] def extract_pdfbox(doc_path: str): html_tmp = tempfile.mktemp(suffix=".html") md_tmp = tempfile.mktemp(suffix=".md") subprocess.run( [ "java", "-jar", "pdfbox-app-3.0.2.jar", "export:text", "-html", "--output", html_tmp, "--input", doc_path, ] ) subprocess.run(["pandoc", html_tmp, "-t", "gfm", "-o", md_tmp]) with open(md_tmp) as fr: return [(line, None) for line in fr] def clean_line(line: str) -> str: return line.replace('"*', '"').replace('*"', '"').replace("*", " ") @singledispatch def get_lines(doc) -> Iterable[Tuple[str, Optional[int]]]: raise NotImplemented() @get_lines.register def _(doc: list) -> Iterable[Tuple[str, Optional[int]]]: for line, page in doc: for line_ in line.splitlines(): yield line_, page def find_definitions( lines: Iterable[Tuple[str, Optional[int]]], ) -> Iterable[Tuple[str, Optional[int]]]: """ A naive implementation of a definition glossary document. Notes: - This will yield false negatives in some cases. - Page number can be off, if node is large :param lines: An iterator containing text and starting page tuples :return: and iterator of tuples containing combined chunks and starting page """ p = re.compile(r"[*][\w\s]+[*]") buffer, start_page = None, None for line, page in lines: if p.match(line) and ("means" in line or line.rstrip().endswith("—")): if buffer: yield "\n".join(buffer), start_page buffer = [clean_line(line)] start_page = page elif buffer: buffer.append(clean_line(line)) if buffer: yield "\n".join(buffer), start_page def parse_and_chunk(doc_path: Union[str, Any], method: str = "pdfbox") -> Iterable[Document]: match method: case "pdfbox": extractor = extract_pdfbox case _: raise NotImplemented(f"Unknown extraction method {method}") if not isinstance(doc_path, str): # Assume UploadedFile doc_path_ = tempfile.mktemp() with open(doc_path_, "wb") as fw: fw.write(doc_path.getvalue()) doc_path = doc_path_ chunks = find_definitions(get_lines(extractor(doc_path))) for chunk, start_page in chunks: yield Document( page_content=chunk, metadata={"page": start_page, "source": doc_path} ) ``` https://pdfbox.apache.org/