# Einstieg KI :::spoiler Inhaltsverzeichnis [toc] ::: ## KI-Begriffe LLM ~ ... Prompt ~ ... LoRa ~ Low Ranking Adaptation Controllnet (Stable Diffusion - nur BildKI) - ggf. Generierung etwas mehr zu steuern? (TODO) - Maske: generiertes Bild Maske > Pixel in Maske haben höhere - Canny: ... Subsurface Scattering (Haut) ... ## KI-Modelle ### Flux - Standard NF 4 - Q5 - Q8 ### Stable Diffusion ## KI-Links - comfyUI (grafisch Paramter) - [civitai.com](https://civitai.com) --- ## Nutzung KI mit Python ## Quellcode ### GPU Loading ```python= import torch # Funktion definiert "device" als GPU, MPS oder CPU def gpu_load(): if torch.cuda.is_available(): device = torch.device("cuda") print(f"Nutze CUDA GPU: {torch.cuda.get_device_name(0)}") # Print GPU name elif torch.backends.mps.is_available(): device = torch.device("mps") print("Nutze MPS GPU (Apple Silicon)") else: device = torch.device("cpu") print("Nutze CPU-(No GPU detected)") return device ``` ### LLM Loading ```python= from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig import torch def model_load_unquantized(llm_model_name): try: model = AutoModelForCausalLM.from_pretrained( llm_model_name, device_map="auto", torch_dtype=torch.float16 ) print("Modell erfolgreich geladen!") except torch.cuda.OutOfMemoryError as e: print(f"OutOfMemoryError beim Laden des Modells: {e}") exit() return model def model_load_quantized(llm_model_name): quantization_config_4bit = BitsAndBytesConfig( # If using quantization load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16, bnb_4bit_quant_type="nf4", bnb_4bit_use_double_quant=True, ) try: model = AutoModelForCausalLM.from_pretrained( llm_model_name, quantization_config=quantization_config_4bit, device_map="auto", torch_dtype=torch.float16 ) print("Modell erfolgreich geladen (potenziell quantisiert)!") except torch.cuda.OutOfMemoryError as e: print(f"OutOfMemoryError beim Laden des Modells: {e}") exit() return model def model_load(llm_model_name, quantized): tokenizer = AutoTokenizer.from_pretrained(llm_model_name) # Tokenizer laden if not quantized: model = model_load_unquantized(llm_model_name) else: model = model_load_quantized(llm_model_name) return model, tokenizer ``` ### Textvorverabeitung ```python= def text_formatting(filepaths): documents = [] metadatas = [] ids = [] doc_id = 0 for filepath in filepaths: # Iterate through the list of filepaths current_doc_text = "" with open(filepath, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line == "---": if current_doc_text: documents.append(current_doc_text.strip()) metadata = {"source": filepath, "title": current_doc_text.split('\n')[0][:50] + "..." if current_doc_text.split('\n')[0] else "Document"} metadatas.append(metadata) ids.append(f"doc_{doc_id}") doc_id += 1 current_doc_text = "" else: current_doc_text += line + "\n" return documents, metadatas, ids ``` ### Datenbankgenerierung ```python= from chromadb.errors import InvalidCollectionException from text_preprocessing import text_formatting import chromadb VECTORDB_COLLECTION_NAME = "university_wiki_test" VECTORDB_PERSIST_PATH = "vectorDB_Data" def _get_chroma_client(use_persistent_DB): if use_persistent_DB: return chromadb.PersistentClient(path=VECTORDB_PERSIST_PATH) else: return chromadb.Client() def _get_collection(chroma_client, create_if_missing=True): try: collection = chroma_client.get_collection(name=VECTORDB_COLLECTION_NAME) print(f"Nutze existierende Kollektion: '{VECTORDB_COLLECTION_NAME}'") return collection except InvalidCollectionException: if create_if_missing: print(f"Kollektion '{VECTORDB_COLLECTION_NAME}' nicht gefunden. Erstelle neue Kollektion.") return chroma_client.create_collection(VECTORDB_COLLECTION_NAME) else: raise def reindex_single_file(embedding_model, use_persistent_DB, filepath): documents, metadatas, ids = text_formatting([filepath]) if not documents: print(f"Datei: {filepath} ist leer.") return chroma_client = _get_chroma_client(use_persistent_DB) collection = _get_collection(chroma_client) document_embeddings = embedding_model.encode(documents) collection.update(ids=ids, embeddings=document_embeddings.tolist(), metadatas=metadatas, documents=documents) # Use update instead of add print(f"Datei: '{filepath}' updated in Kollektion: '{VECTORDB_COLLECTION_NAME}'") def reindex_entire_database(embedding_model, use_persistent_DB, filepaths): documents, metadatas, ids = text_formatting(filepaths) chroma_client = _get_chroma_client(use_persistent_DB) try: chroma_client.delete_collection(VECTORDB_COLLECTION_NAME) print(f"Lösche existierende Kollektion: '{VECTORDB_COLLECTION_NAME}'") except ValueError: print(f"Keine existierende Kollektion '{VECTORDB_COLLECTION_NAME}' zum löschen vorhanden.") collection = chroma_client.create_collection(VECTORDB_COLLECTION_NAME) document_embeddings = embedding_model.encode(documents) collection.add(embeddings=document_embeddings.tolist(), metadatas=metadatas, ids=ids, documents=documents) print(f"Datenbank neu indexiert:'{VECTORDB_COLLECTION_NAME}'") return collection def load_existing_collection(embedding_model, use_persistent_DB, filepaths): chroma_client = _get_chroma_client(use_persistent_DB) try: collection = _get_collection(chroma_client, create_if_missing=False) print(f"Kollektion geladen: '{VECTORDB_COLLECTION_NAME}'.") return collection except InvalidCollectionException: print(f"Neue Kollektion erstellen: '{VECTORDB_COLLECTION_NAME}'") return reindex_entire_database(embedding_model, use_persistent_DB, filepaths) def text_embedding(embedding_model, use_persistent_DB, filepaths): return load_existing_collection(embedding_model, use_persistent_DB, filepaths) ``` ### Antwortgenerierung ```python= import torch def rag_chatbot_answer(user_question, collection, model, tokenizer, device, embedding_model): query_embedding = embedding_model.encode(user_question).tolist() results = collection.query(query_embeddings=[query_embedding], n_results=2) # Die 2 relevantesten Dokumente abrufen contexts = results['documents'][0] if not contexts: # No relevant context found return "Es tut mir leid, aber ich konnte keine relevante Antwort auf deine Frage finden." # context kombinieren (Subjekt für Änderungen, z. B. nur den obersten Kontext nehmen, verketten usw.) context_text = "\n\n".join(contexts) # System Prompt prompt = f""" Answer the question in German, based on the context below. Try to provide general information or an overview if available. Keep the answer concise and informative. If the context doesn't contain the answer, say you don't know. Context: {context_text} Question: {user_question} Answer: """ input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device) # prompt encodieren attention_mask = torch.ones_like(input_ids).to(device) # attention mask erstellen try: with torch.amp.autocast('cuda'): llm_output = model.generate( input_ids=input_ids, # input_ids als keyword argument verwenden attention_mask=attention_mask, # attention_mask übergeben max_new_tokens=150, pad_token_id=tokenizer.eos_token_id, num_return_sequences=1 ) generated_answer = tokenizer.decode(llm_output[0], skip_special_tokens=True) generated_answer = generated_answer.split("Answer:")[1].strip() # Extract answer part except torch.cuda.OutOfMemoryError as e: print(f"OutOfMemoryError bei der Antwortgenerierung!: {e}") return "Please try again later. / Bitte versuchen Sie es später erneut." except Exception as e: print(f"Fehler bei der Antwortgenerierung! {e}") return "Please try again later. / Bitte versuchen Sie es später erneut." return generated_answer ``` ### GUI und GUI-Funktionen ```python= from vectorDB_Gen import text_embedding, reindex_entire_database, reindex_single_file from rag_answer_gen import rag_chatbot_answer from llm_Load import model_load from gpu_Load import gpu_load from sentence_transformers import SentenceTransformer from tkinter import ttk import tkinter as tk import os # --- Funktionen --- def load_filepaths(): print("Dateipfade laden") global filepath_list filepath_list = os.listdir(filepath) filepath_list = [str(filepath + f"/{file}") for file in filepath_list] return filepath_list def generate_ui(): user_input = input_window.get("1.0", tk.END).strip() print(f"Generation gestartet mit Input: '{user_input}'") generated_answer = rag_chatbot_answer(user_input, collection, model, tokenizer, device, embedding_model) output_window.config(state=tk.NORMAL) output_window.delete("1.0", tk.END) output_window.insert(tk.END, generated_answer) output_window.config(state=tk.DISABLED) def open_filepath_popup(): popup = tk.Toplevel(root) popup.title("Dateien-Verwaltung") # --- Filepath List Display Section --- filepaths_frame = ttk.Frame(popup) filepaths_frame.pack(pady=5, padx=10, fill=tk.X) filepath_label = ttk.Label(filepaths_frame, text="Dateipfade:") filepath_label.pack(pady=5, padx=10, anchor=tk.W) filepath_labels_frame = ttk.Frame(filepaths_frame) filepath_labels_frame.pack(fill=tk.X) def reload_file(filepath): print(f"Datei neu indexieren: {filepath}") reindex_single_file(embedding_model, database_var.get(), filepath) def update_filepath_display(): for widget in filepath_labels_frame.winfo_children(): widget.destroy() for filepath in filepath_list: file_frame = ttk.Frame(filepath_labels_frame) file_frame.pack(fill=tk.X, pady=2) file_label = ttk.Label(file_frame, text=filepath, width=40, anchor=tk.W) file_label.pack(side=tk.LEFT, padx=(0, 5)) reload_button = ttk.Button(file_frame, text="Neu laden", width=10, command=lambda fp=filepath: reload_file(fp)) reload_button.pack(side=tk.LEFT, padx=2) update_filepath_display() # --- Reload Database Button --- def reload_database_popup(): global collection print("Datenbank komplett neu laden (Popup)") collection = reindex_entire_database(embedding_model, database_var.get(), filepath_list) reload_db_button = ttk.Button(popup, text="Datenbank komplett neu laden", command=reload_database_popup) reload_db_button.pack(pady=10, padx=10) # --- Reload Filepaths Button --- def reload_filepaths_popup(): global filepath_list print("Dateipfade neu laden (Popup)") filepath_list = load_filepaths() popup.destroy() open_filepath_popup() reload_db_button = ttk.Button(popup, text="Dateipfade neu Laden", command=reload_filepaths_popup) reload_db_button.pack(pady=5, padx=10) def reload_models_ui(): print("Modelle neu laden") # --- Main Window --- root = tk.Tk() root.title("Chatbot-MainWindow") # --- Settings Frame --- settings_frame = ttk.Frame(root, padding="10") settings_frame.grid(row=0, column=0, sticky=(tk.W, tk.N)) database_var = tk.BooleanVar(value=False) database_switch = ttk.Checkbutton(settings_frame, text="Persistente DB verwenden", variable=database_var) database_switch.pack(pady=5, padx=10, anchor=tk.W) quantized_var = tk.BooleanVar(value=True) quantized_switch = ttk.Checkbutton(settings_frame, text="Quantisiert", variable=quantized_var) quantized_switch.pack(pady=5, padx=10, anchor=tk.W) # --- Buttons Frame --- buttons_frame = ttk.Frame(root, padding="10") buttons_frame.grid(row=1, column=0, sticky=(tk.W, tk.N)) filepath_button = ttk.Button(buttons_frame, text="Dateipfade verwalten", command=open_filepath_popup, width=20) filepath_button.pack(pady=150, padx=10, anchor=tk.W) generate_button = ttk.Button(buttons_frame, text="Generieren", command=generate_ui, width=20) generate_button.pack(pady=5, padx=10, anchor=tk.W) # --- Chat Window Frame --- chat_frame = ttk.Frame(root, padding="10") chat_frame.grid(row=0, column=1, rowspan=3, sticky=(tk.N, tk.E, tk.S, tk.W)) input_label = ttk.Label(chat_frame, text="Eingabe:") input_label.pack(pady=5, padx=10, anchor=tk.W) input_window = tk.Text(chat_frame, height=10, width=100) input_window.pack(pady=5, padx=10) output_label = ttk.Label(chat_frame, text="Ausgabe:") output_label.pack(pady=5, padx=10, anchor=tk.W) output_window = tk.Text(chat_frame, height=20, width=100, state=tk.DISABLED) output_window.pack(pady=5, padx=10) # --- Modelle festlegen --- # ------------------------------------------------------------------- filepath = "wiki_Data" #Ordnerpfad für die Textdateien llm_model_name = "mistralai/Mistral-7B-Instruct-v0.3" embedding_model = "all-mpnet-base-v2" # ------------------------------------------------------------------- # --- Variablen definieren --- filepath_list = load_filepaths() embedding_model = SentenceTransformer(embedding_model) try: embedding_model.to("cpu") except Exception as e: print(f"Ein Fehler ist aufgetreten {e}") # --- Initialisierung --- collection = text_embedding(embedding_model, database_var.get(), filepath_list) device = gpu_load() model, tokenizer = model_load(llm_model_name, quantized_var.get()) root.mainloop() ```