Madten
===
```python=
# cdc_rag_app_FAR.py
import os
import streamlit as st
# from dotenv import load_dotenv
from PyPDF2 import PdfReader
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain_community.llms import HuggingFacePipeline
import transformers
from torch import cuda, bfloat16
from langchain_community.embeddings import HuggingFaceEmbeddings
from transformers import StoppingCriteria, StoppingCriteriaList
import torch
from streamlit_chat import message
from langchain_core.prompts import PromptTemplate
from sentence_transformers import SentenceTransformer
from langchain_experimental.text_splitter import SemanticChunker
# from langchain.chains import RetrievalQA
from langchain_community.llms import GPT4All
import shutil
from datetime import datetime
import time
from langchain_community.chat_models import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from PIL import Image
import naive_definition_chunking
import warnings
warnings.filterwarnings("ignore")
# os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
torch.cuda.empty_cache()
st.set_page_config(page_title="CDC Copilot Beta", page_icon= ":hospital:", layout="wide")
config_control_ls = []
##### change the avatar to chnage the avatar pic of user and bot
user_input_avatar = "shapes"
bot_avatar = "fun-emoji"
# bot_avatar = Image.open("bot-icon.png")
# user_input_avatar = "fun-emoji"
# bot_avatar = "shapes"
user_input_flag = False
bot_response_flag= True
hf_auth = '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'
# os.environ["HUGGINGFACEHUB_API_TOKEN"] = hf_auth
os.environ["HF_HOME"] = hf_auth
claud3_auth = "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"
os.environ["ANTHROPIC_API_KEY"] = claud3_auth
device = f'cuda:{cuda.current_device()}' if cuda.is_available() else 'cpu'
print("Available device:",device)
#### max sequence_length 256
embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"
#### max sequence_length 512
# embedding_model_name = "thenlper/gte-small"
chunk_size = SentenceTransformer(embedding_model_name).max_seq_length
chunk_overlap = int(chunk_size/10)
## code to find the max sequence length
print(f"Model's maximum sequence length: {chunk_size}")
# embedding_model_name = "thenlper/gte-large"
embeddings = HuggingFaceEmbeddings(
model_name=embedding_model_name,
model_kwargs={'device': device}
)
def create_folder(folder_path):
if not os.path.exists(folder_path):
# Create the folder
os.makedirs(folder_path)
root_db_path = "Faiss_DB"
create_folder(root_db_path)
def get_model(model_id, temperature):
try:
# set quantization configuration to load large model with less GPU memory
# this requires the `bitsandbytes` library
bnb_config = transformers.BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type='nf4',
bnb_4bit_use_double_quant=True,
bnb_4bit_compute_dtype=bfloat16
)
# begin initializing HF items, you need an access token
model_config = transformers.AutoConfig.from_pretrained(
model_id,
use_auth_token = hf_auth,
device_map = device
)
model = transformers.AutoModelForCausalLM.from_pretrained(
model_id,
trust_remote_code=True,
config=model_config,
quantization_config=bnb_config,
device_map=device,
use_auth_token = hf_auth
)
# enable evaluation mode to allow model inference
model.eval()
torch.cuda.empty_cache()
tokenizer = transformers.AutoTokenizer.from_pretrained(
model_id,
use_auth_token = hf_auth
)
# define custom stopping criteria object
stop_list = ['\nHuman:', '\n```\n']
stop_token_ids = [tokenizer(x)['input_ids'] for x in stop_list]
stop_token_ids = [torch.LongTensor(x).to(device) for x in stop_token_ids]
class StopOnTokens(StoppingCriteria):
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
for stop_ids in stop_token_ids:
if torch.eq(input_ids[0][-len(stop_ids):], stop_ids).all():
return True
return False
stopping_criteria = StoppingCriteriaList([StopOnTokens()])
terminators = [
tokenizer.eos_token_id,
tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
generate_text = transformers.pipeline(
model=model,
tokenizer=tokenizer,
return_full_text=False, # langchain expects the full text
task='text-generation',
# we pass model parameters here too
stopping_criteria=stopping_criteria, # without this model rambles during chat
temperature=temperature, # 'randomness' of outputs, 0.0 is the min and 1.0 the max
max_new_tokens=512, # max number of tokens to generate in the output
repetition_penalty=1.1, # without this output begins repeating
top_k = 20,
top_p =0.92,
batch_size=1,
pad_token_id=tokenizer.eos_token_id,
eos_token_id=terminators,
torch_dtype=torch.float16,
max_length=1024,
# do_sample=True,
)
llm = HuggingFacePipeline(pipeline=generate_text)
return(llm)
except Exception as e:
# message(str(e), is_user=True, avatar_style=bot_avatar)
e = "LLM Loading Error: " + str(e)
st.session_state["messages"].append([str(e), bot_response_flag, bot_avatar])
def get_pdf_text(pdf_docs):
text = ""
for pdf in pdf_docs:
pdf_reader = PdfReader(pdf)
for page in pdf_reader.pages:
text += page.extract_text()
return text
def get_vectorstore(pdf_docs, docs):
today_date = datetime.today().strftime('%Y-%m-%d')
today_folder = os.path.join(root_db_path, today_date)
create_folder(today_folder)
doc_names = '_'.join([str(i.name) for i in pdf_docs])
if "FAR" in doc_names:
vectorstore_name = os.path.join(root_db_path, doc_names)
else:
vectorstore_name = os.path.join(today_folder, doc_names)
for folder_name in os.listdir(root_db_path):
if folder_name != today_date and "FAR" not in folder_name:
shutil.rmtree(os.path.join(root_db_path,folder_name))
if os.path.exists(vectorstore_name):
vectorstore = FAISS.load_local(vectorstore_name, embeddings, allow_dangerous_deserialization=True)
print(f"Loading {doc_names} database from local")
else:
vectorstore = FAISS.from_documents(documents=docs, embedding=embeddings)
vectorstore.save_local(vectorstore_name)
print(f"Generating local database: {doc_names}")
return vectorstore
def get_conversation_chain(vectorstore, model_id, temperature):
template = """
Use the following pieces of information and provide a brief and to-the-point answer the user's question.
If you don't know the answer, just say that you don't know, don't try to make up an answer.
Context: {context}
Question: {question}
Only return the helpful answer below and nothing else.
Helpful answer:
"""
prompt = PromptTemplate(template=template,
input_variables=['context', 'question'])
model_path_dict = {"GPT4ALL: Mistral": "C:/Users/amrpatra/Downloads/gpt4all/mistral-7b-openorca.gguf2.Q4_0.gguf",
"GPT4ALL: Falcon": "C:/Users/amrpatra/Downloads/gpt4all/gpt4all-falcon-newbpe-q4_0.gguf",
"GPT4ALL: Gemma": "C:/Users/amrpatra/Downloads/gpt4all/gemma-7b-it-q4_0.gguf",
"GPT4ALL: LLAMA2": "C:/Users/amrpatra/Downloads/gpt4all/llama-2-7b-chat.Q4_0.gguf"}
# if model_id == "google/flan-t5-xxl":
if model_id in ["google/flan-t5-base", 'google/flan-t5-large']:
generate_text = transformers.pipeline(model=model_id, max_length = 500)
llm = HuggingFacePipeline(pipeline=generate_text)
elif model_id.lower().startswith("gpt4all"):
model_path = model_path_dict[model_id]
llm = GPT4All( model=model_path, verbose=True, max_tokens=1000)
elif model_id == 'GPT-3.5':
llm = ChatOpenAI(
model_name = "gpt-3.5-turbo",
streaming = True,
verbose = True,
temperature = 0
)
elif model_id == 'GPT-4':
llm = ChatOpenAI(
model_name = "gpt-4",
streaming = True,
verbose = True,
temperature = 0
)
elif model_id == "Claude-3":
llm = ChatAnthropic(temperature=0, model_name="claude-3-opus-20240229")
else:
llm = get_model(model_id, temperature)
if not llm:
# message("Failed to Load the Model.", is_user=True, avatar_style=bot_avatar)
st.session_state["messages"].append(["Failed to Load the Model.", bot_response_flag, bot_avatar])
# llm = HuggingFaceHub(repo_id=model_id, model_kwargs = {"temperature" : temperature})
else:
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={'k': 5})
conversation_chain = ConversationalRetrievalChain.from_llm(
llm = llm,
retriever = retriever,
memory=memory,
combine_docs_chain_kwargs={"prompt": prompt},
max_tokens_limit=4000,
verbose = True
)
return conversation_chain
def handle_userinput(user_question):
global user_input_avatar, bot_avatar
try:
start_time = time.time()
with torch.no_grad():
user_question = user_question.strip()
response = st.session_state.conversation({'question': user_question})
# print(response)
st.session_state.chat_history = response['chat_history']
torch.cuda.empty_cache()
end_start = time.time()
print(f"Response generated in {(end_start- start_time)/60} minutes")
st.session_state["messages"].append([user_question, user_input_flag, user_input_avatar])
st.session_state["messages"].append([response["answer"], bot_response_flag, bot_avatar])
except Exception as e:
error_msg = str(e)
# error_msg = f"Error: Document Not Uploaded/Invalid Document Format\n\nOpps! It appears that you haven't uploaded any document or the uploaded document is not in a valid format."
# message(error_msg, is_user=True, avatar_style=bot_avatar)
st.session_state["messages"].append([error_msg, bot_response_flag, bot_avatar])
def display_message(pdf_docs, model_name, database_config):
global user_input_avatar, bot_avatar, config_control_ls
# if st.session_state["messages"]:
# with st.container(border=True, height= 300):
try:
if database_config == "Complete CDC Database":
if os.path.exists(os.path.join(root_db_path, "FAR.pdf")):
config_info_msg = f"Hi,\nCDC Database Available.\nModel selected: {model_name}"
else:
config_info_msg = f"Hi,\nCDC database not found.\nPlease upload the complete CDC document.\nModel selected: {model_name}"
if database_config == "Upload CDC Document":
if pdf_docs:
doc_names = '\n'.join([str(i.name) for i in pdf_docs])
config_info_msg = f"Hi,\nFile successfully uploaded:\n{doc_names}\n\nModel selected: {model_name}"
else:
config_info_msg = None
if config_info_msg:
if len(st.session_state["config_msg_ls"]) == 1:
st.session_state["messages"].append([config_info_msg, bot_response_flag, bot_avatar])
else:
if st.session_state["config_msg_ls"]:
# print(st.session_state["config_msg_ls"][-1])
# print(config_info_msg)
if str(st.session_state["config_msg_ls"][-1]) != str(config_info_msg):
st.session_state["messages"].append([config_info_msg, bot_response_flag, bot_avatar])
st.session_state["config_msg_ls"].append(config_info_msg)
except Exception as e:
config_info_msg = "Congig Msg Error: " + str(e)
st.session_state["messages"].append([config_info_msg, bot_response_flag, bot_avatar])
# message(config_info_msg, is_user=True, avatar_style=bot_avatar)
for i, msg_list in enumerate(st.session_state["messages"]):
msg = msg_list[0]
flag = msg_list[1]
avatar_style = msg_list[2]
message(msg, is_user=flag, key=str(i), avatar_style=avatar_style)
# if i % 2 == 0:
# message(msg, is_user=flag, key = str(i), avatar_style= user_input_avatar)
# else:
# message(msg, is_user=flag, key=str(i), avatar_style=bot_avatar)
def main():
global user_input_avatar, bot_avatar
if len(st.session_state) == 0:
st.session_state["messages"] = []
st.session_state["config_msg_ls"] = []
st.session_state["user_input"] = None
if "conversation" not in st.session_state:
st.session_state.conversation = None
if "chat_history" not in st.session_state:
st.session_state.chat_history = None
st.header("CDC Copilot Beta")
if st.session_state["user_input"]:
handle_userinput(st.session_state["user_input"])
model_ls = [
"google/flan-t5-base",
# 'google/flan-t5-large',
# "meta-llama/Llama-2-7b-chat-hf",
# "meta-llama/Meta-Llama-3-8B",
# "meta-llama/Meta-Llama-3-8B-Instruct",
# 'mistralai/Mistral-7B-Instruct-v0.2',
# 'databricks/dolly-v2-3b',
# "GPT4ALL: Mistral",
# "GPT4ALL: Falcon",
# "GPT4ALL: Gemma",
# "GPT4ALL: LLAMA2",
"GPT-3.5",
"GPT-4",
"Claude-3",
]
with st.sidebar:
st.subheader("Your documents")
database_config_option = ["Complete CDC Database", "Upload CDC Document"]
database_config = st.selectbox("Select Database Configuration:", database_config_option)
if database_config == "Upload CDC Document":
pdf_docs = st.file_uploader("Upload your PDFs here and select", accept_multiple_files=True)
else:
pdf_docs = None
process_button = st.button("Process Document :hourglass:", type= "primary", key = "process_buttton")
model_name = st.selectbox("Select HuggingFace Model ID:", model_ls)
model_name = model_name.strip()
if model_name in ["GPT-3.5", "GPT-4"]:
openai_api_key = st.text_input(
label = 'Please enter your OpenAI API key',
value = "",
placeholder = "sk-",
type = "password"
)
os.environ["OPENAI_API_KEY"] = openai_api_key
temperature = st.number_input("Select Randomness Temperature:", min_value=0.1, max_value=1.0, value="min", step=0.1, )
if process_button:
torch.cuda.empty_cache()
# if not os.path.exists(os.path.join(root_db_path,"FAR.pdf")):
# st.session_state["messages"].append(["Hi,\nCDC database not found.\nPlease upload the complete CDC document.", bot_response_flag, bot_avatar])
# elif not pdf_docs:
# # message("Hi,\nPlease upload the document you would like to analyze using the chat interface." , is_user=True, avatar_style=bot_avatar)
# st.session_state["messages"].append(["Hi,\nPlease upload the document you would like to analyze using the chat interface.", bot_response_flag, bot_avatar])
# if os.path.exists(os.path.join(root_db_path, "FAR.pdf")) or pdf_docs:
with st.spinner("Processing...."):
if database_config == "Complete CDC Database":
if os.path.exists(os.path.join(root_db_path,"FAR.pdf")):
vectorstore = FAISS.load_local(os.path.join(root_db_path,"FAR.pdf"), embeddings, allow_dangerous_deserialization=True)
st.session_state.conversation = get_conversation_chain(vectorstore, model_name, temperature)
print("Complete FAR retreiver slected!!")
else:
st.session_state["messages"].append(["Hi,\nCDC database not found.\nPlease upload the complete CDC document.", bot_response_flag, bot_avatar])
if database_config == "Upload CDC Document":
if pdf_docs:
doc_names = ''.join([str(i.name) for i in pdf_docs])
if "FAR.pdf" in doc_names and os.path.exists(os.path.join(root_db_path,"FAR.pdf")):
vectorstore = FAISS.load_local(os.path.join(root_db_path,"FAR.pdf"), embeddings, allow_dangerous_deserialization=True)
st.session_state.conversation = get_conversation_chain(vectorstore, model_name, temperature)
print("Complete CDC Database Already Present!!")
else:
# get pdf text
raw_text = get_pdf_text(pdf_docs)
docs = [
doc
for doc_path in pdf_docs
for doc in naive_definition_chunking.parse_and_chunk(doc_path, method="pdfbox")
]
vectorstore = get_vectorstore(pdf_docs, docs)
st.session_state.conversation = get_conversation_chain(vectorstore, model_name, temperature)
print("Retreiver generated from uploaded pdfs!!")
else:
st.session_state["messages"].append(["Hi,\nPlease upload the document you would like to analyze using the chat interface.", bot_response_flag, bot_avatar])
# create conversation chain
# else:
# st.session_state["messages"].append(["Hi,\nPlease upload the document you would like to analyze using the chat interface.", bot_response_flag, bot_avatar])
display_message(pdf_docs, model_name, database_config)
st.session_state["user_input"] = None
user_input = st.text_input(label = "Ask a question", placeholder = "Ask a question:", value = None, key="user_input",
label_visibility="collapsed",)
torch.cuda.empty_cache()
if __name__ == '__main__':
main()
```
```python
# naive_definition_chunking.py
from functools import singledispatch
import io
import pathlib
import subprocess
import re
import tempfile
from typing import Iterable, Optional, Tuple, Union, Any
from langchain_core.documents import Document
def extract_pypdf(doc_path: str) -> Iterable[Tuple[str, int]]:
pdf_reader = PdfReader(doc_path)
return [
(line, pdf_reader.get_page_number(page))
for page in pdf_reader.pages
for line in page.extract_text().splitlines()
]
def extract_pdfbox(doc_path: str):
html_tmp = tempfile.mktemp(suffix=".html")
md_tmp = tempfile.mktemp(suffix=".md")
subprocess.run(
[
"java",
"-jar",
"pdfbox-app-3.0.2.jar",
"export:text",
"-html",
"--output",
html_tmp,
"--input",
doc_path,
]
)
subprocess.run(["pandoc", html_tmp, "-t", "gfm", "-o", md_tmp])
with open(md_tmp) as fr:
return [(line, None) for line in fr]
def clean_line(line: str) -> str:
return line.replace('"*', '"').replace('*"', '"').replace("*", " ")
@singledispatch
def get_lines(doc) -> Iterable[Tuple[str, Optional[int]]]:
raise NotImplemented()
@get_lines.register
def _(doc: list) -> Iterable[Tuple[str, Optional[int]]]:
for line, page in doc:
for line_ in line.splitlines():
yield line_, page
def find_definitions(
lines: Iterable[Tuple[str, Optional[int]]],
) -> Iterable[Tuple[str, Optional[int]]]:
"""
A naive implementation of a definition glossary document.
Notes:
- This will yield false negatives in some cases.
- Page number can be off, if node is large
:param lines: An iterator containing text and starting page tuples
:return: and iterator of tuples containing combined chunks and starting page
"""
p = re.compile(r"[*][\w\s]+[*]")
buffer, start_page = None, None
for line, page in lines:
if p.match(line) and ("means" in line or line.rstrip().endswith("—")):
if buffer:
yield "\n".join(buffer), start_page
buffer = [clean_line(line)]
start_page = page
elif buffer:
buffer.append(clean_line(line))
if buffer:
yield "\n".join(buffer), start_page
def parse_and_chunk(doc_path: Union[str, Any], method: str = "pdfbox") -> Iterable[Document]:
match method:
case "pdfbox":
extractor = extract_pdfbox
case _:
raise NotImplemented(f"Unknown extraction method {method}")
if not isinstance(doc_path, str):
# Assume UploadedFile
doc_path_ = tempfile.mktemp()
with open(doc_path_, "wb") as fw:
fw.write(doc_path.getvalue())
doc_path = doc_path_
chunks = find_definitions(get_lines(extractor(doc_path)))
for chunk, start_page in chunks:
yield Document(
page_content=chunk, metadata={"page": start_page, "source": doc_path}
)
```
https://pdfbox.apache.org/