# Einstieg KI
:::spoiler Inhaltsverzeichnis
[toc]
:::
## KI-Begriffe
LLM
~ ...
Prompt
~ ...
LoRa
~ Low Ranking Adaptation
Controllnet (Stable Diffusion - nur BildKI)
- ggf. Generierung etwas mehr zu steuern? (TODO)
- Maske: generiertes Bild Maske > Pixel in Maske haben höhere
- Canny: ...
Subsurface Scattering (Haut)
...
## KI-Modelle
### Flux
- Standard NF 4
- Q5
- Q8
### Stable Diffusion
## KI-Links
- comfyUI (grafisch Paramter)
- [civitai.com](https://civitai.com)
---
## Nutzung KI mit Python
## Quellcode
### GPU Loading
```python=
import torch
# Funktion definiert "device" als GPU, MPS oder CPU
def gpu_load():
if torch.cuda.is_available():
device = torch.device("cuda")
print(f"Nutze CUDA GPU: {torch.cuda.get_device_name(0)}") # Print GPU name
elif torch.backends.mps.is_available():
device = torch.device("mps")
print("Nutze MPS GPU (Apple Silicon)")
else:
device = torch.device("cpu")
print("Nutze CPU-(No GPU detected)")
return device
```
### LLM Loading
```python=
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import torch
def model_load_unquantized(llm_model_name):
try:
model = AutoModelForCausalLM.from_pretrained(
llm_model_name,
device_map="auto",
torch_dtype=torch.float16
)
print("Modell erfolgreich geladen!")
except torch.cuda.OutOfMemoryError as e:
print(f"OutOfMemoryError beim Laden des Modells: {e}")
exit()
return model
def model_load_quantized(llm_model_name):
quantization_config_4bit = BitsAndBytesConfig( # If using quantization
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
)
try:
model = AutoModelForCausalLM.from_pretrained(
llm_model_name,
quantization_config=quantization_config_4bit,
device_map="auto",
torch_dtype=torch.float16
)
print("Modell erfolgreich geladen (potenziell quantisiert)!")
except torch.cuda.OutOfMemoryError as e:
print(f"OutOfMemoryError beim Laden des Modells: {e}")
exit()
return model
def model_load(llm_model_name, quantized):
tokenizer = AutoTokenizer.from_pretrained(llm_model_name) # Tokenizer laden
if not quantized:
model = model_load_unquantized(llm_model_name)
else:
model = model_load_quantized(llm_model_name)
return model, tokenizer
```
### Textvorverabeitung
```python=
def text_formatting(filepaths):
documents = []
metadatas = []
ids = []
doc_id = 0
for filepath in filepaths: # Iterate through the list of filepaths
current_doc_text = ""
with open(filepath, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line == "---":
if current_doc_text:
documents.append(current_doc_text.strip())
metadata = {"source": filepath, "title": current_doc_text.split('\n')[0][:50] + "..." if current_doc_text.split('\n')[0] else "Document"}
metadatas.append(metadata)
ids.append(f"doc_{doc_id}")
doc_id += 1
current_doc_text = ""
else:
current_doc_text += line + "\n"
return documents, metadatas, ids
```
### Datenbankgenerierung
```python=
from chromadb.errors import InvalidCollectionException
from text_preprocessing import text_formatting
import chromadb
VECTORDB_COLLECTION_NAME = "university_wiki_test"
VECTORDB_PERSIST_PATH = "vectorDB_Data"
def _get_chroma_client(use_persistent_DB):
if use_persistent_DB:
return chromadb.PersistentClient(path=VECTORDB_PERSIST_PATH)
else:
return chromadb.Client()
def _get_collection(chroma_client, create_if_missing=True):
try:
collection = chroma_client.get_collection(name=VECTORDB_COLLECTION_NAME)
print(f"Nutze existierende Kollektion: '{VECTORDB_COLLECTION_NAME}'")
return collection
except InvalidCollectionException:
if create_if_missing:
print(f"Kollektion '{VECTORDB_COLLECTION_NAME}' nicht gefunden. Erstelle neue Kollektion.")
return chroma_client.create_collection(VECTORDB_COLLECTION_NAME)
else:
raise
def reindex_single_file(embedding_model, use_persistent_DB, filepath):
documents, metadatas, ids = text_formatting([filepath])
if not documents:
print(f"Datei: {filepath} ist leer.")
return
chroma_client = _get_chroma_client(use_persistent_DB)
collection = _get_collection(chroma_client)
document_embeddings = embedding_model.encode(documents)
collection.update(ids=ids, embeddings=document_embeddings.tolist(), metadatas=metadatas, documents=documents) # Use update instead of add
print(f"Datei: '{filepath}' updated in Kollektion: '{VECTORDB_COLLECTION_NAME}'")
def reindex_entire_database(embedding_model, use_persistent_DB, filepaths):
documents, metadatas, ids = text_formatting(filepaths)
chroma_client = _get_chroma_client(use_persistent_DB)
try:
chroma_client.delete_collection(VECTORDB_COLLECTION_NAME)
print(f"Lösche existierende Kollektion: '{VECTORDB_COLLECTION_NAME}'")
except ValueError:
print(f"Keine existierende Kollektion '{VECTORDB_COLLECTION_NAME}' zum löschen vorhanden.")
collection = chroma_client.create_collection(VECTORDB_COLLECTION_NAME)
document_embeddings = embedding_model.encode(documents)
collection.add(embeddings=document_embeddings.tolist(), metadatas=metadatas, ids=ids, documents=documents)
print(f"Datenbank neu indexiert:'{VECTORDB_COLLECTION_NAME}'")
return collection
def load_existing_collection(embedding_model, use_persistent_DB, filepaths):
chroma_client = _get_chroma_client(use_persistent_DB)
try:
collection = _get_collection(chroma_client, create_if_missing=False)
print(f"Kollektion geladen: '{VECTORDB_COLLECTION_NAME}'.")
return collection
except InvalidCollectionException:
print(f"Neue Kollektion erstellen: '{VECTORDB_COLLECTION_NAME}'")
return reindex_entire_database(embedding_model, use_persistent_DB, filepaths)
def text_embedding(embedding_model, use_persistent_DB, filepaths):
return load_existing_collection(embedding_model, use_persistent_DB, filepaths)
```
### Antwortgenerierung
```python=
import torch
def rag_chatbot_answer(user_question, collection, model, tokenizer, device, embedding_model):
query_embedding = embedding_model.encode(user_question).tolist()
results = collection.query(query_embeddings=[query_embedding], n_results=2) # Die 2 relevantesten Dokumente abrufen
contexts = results['documents'][0]
if not contexts: # No relevant context found
return "Es tut mir leid, aber ich konnte keine relevante Antwort auf deine Frage finden."
# context kombinieren (Subjekt für Änderungen, z. B. nur den obersten Kontext nehmen, verketten usw.)
context_text = "\n\n".join(contexts)
# System Prompt
prompt = f"""
Answer the question in German, based on the context below.
Try to provide general information or an overview if available.
Keep the answer concise and informative.
If the context doesn't contain the answer, say you don't know.
Context:
{context_text}
Question: {user_question}
Answer: """
input_ids = tokenizer.encode(prompt, return_tensors="pt").to(device) # prompt encodieren
attention_mask = torch.ones_like(input_ids).to(device) # attention mask erstellen
try:
with torch.amp.autocast('cuda'):
llm_output = model.generate(
input_ids=input_ids, # input_ids als keyword argument verwenden
attention_mask=attention_mask, # attention_mask übergeben
max_new_tokens=150,
pad_token_id=tokenizer.eos_token_id,
num_return_sequences=1
)
generated_answer = tokenizer.decode(llm_output[0], skip_special_tokens=True)
generated_answer = generated_answer.split("Answer:")[1].strip() # Extract answer part
except torch.cuda.OutOfMemoryError as e:
print(f"OutOfMemoryError bei der Antwortgenerierung!: {e}")
return "Please try again later. / Bitte versuchen Sie es später erneut."
except Exception as e:
print(f"Fehler bei der Antwortgenerierung! {e}")
return "Please try again later. / Bitte versuchen Sie es später erneut."
return generated_answer
```
### GUI und GUI-Funktionen
```python=
from vectorDB_Gen import text_embedding, reindex_entire_database, reindex_single_file
from rag_answer_gen import rag_chatbot_answer
from llm_Load import model_load
from gpu_Load import gpu_load
from sentence_transformers import SentenceTransformer
from tkinter import ttk
import tkinter as tk
import os
# --- Funktionen ---
def load_filepaths():
print("Dateipfade laden")
global filepath_list
filepath_list = os.listdir(filepath)
filepath_list = [str(filepath + f"/{file}") for file in filepath_list]
return filepath_list
def generate_ui():
user_input = input_window.get("1.0", tk.END).strip()
print(f"Generation gestartet mit Input: '{user_input}'")
generated_answer = rag_chatbot_answer(user_input, collection, model, tokenizer, device, embedding_model)
output_window.config(state=tk.NORMAL)
output_window.delete("1.0", tk.END)
output_window.insert(tk.END, generated_answer)
output_window.config(state=tk.DISABLED)
def open_filepath_popup():
popup = tk.Toplevel(root)
popup.title("Dateien-Verwaltung")
# --- Filepath List Display Section ---
filepaths_frame = ttk.Frame(popup)
filepaths_frame.pack(pady=5, padx=10, fill=tk.X)
filepath_label = ttk.Label(filepaths_frame, text="Dateipfade:")
filepath_label.pack(pady=5, padx=10, anchor=tk.W)
filepath_labels_frame = ttk.Frame(filepaths_frame)
filepath_labels_frame.pack(fill=tk.X)
def reload_file(filepath):
print(f"Datei neu indexieren: {filepath}")
reindex_single_file(embedding_model, database_var.get(), filepath)
def update_filepath_display():
for widget in filepath_labels_frame.winfo_children():
widget.destroy()
for filepath in filepath_list:
file_frame = ttk.Frame(filepath_labels_frame)
file_frame.pack(fill=tk.X, pady=2)
file_label = ttk.Label(file_frame, text=filepath, width=40, anchor=tk.W)
file_label.pack(side=tk.LEFT, padx=(0, 5))
reload_button = ttk.Button(file_frame, text="Neu laden", width=10, command=lambda fp=filepath: reload_file(fp))
reload_button.pack(side=tk.LEFT, padx=2)
update_filepath_display()
# --- Reload Database Button ---
def reload_database_popup():
global collection
print("Datenbank komplett neu laden (Popup)")
collection = reindex_entire_database(embedding_model, database_var.get(), filepath_list)
reload_db_button = ttk.Button(popup, text="Datenbank komplett neu laden", command=reload_database_popup)
reload_db_button.pack(pady=10, padx=10)
# --- Reload Filepaths Button ---
def reload_filepaths_popup():
global filepath_list
print("Dateipfade neu laden (Popup)")
filepath_list = load_filepaths()
popup.destroy()
open_filepath_popup()
reload_db_button = ttk.Button(popup, text="Dateipfade neu Laden", command=reload_filepaths_popup)
reload_db_button.pack(pady=5, padx=10)
def reload_models_ui():
print("Modelle neu laden")
# --- Main Window ---
root = tk.Tk()
root.title("Chatbot-MainWindow")
# --- Settings Frame ---
settings_frame = ttk.Frame(root, padding="10")
settings_frame.grid(row=0, column=0, sticky=(tk.W, tk.N))
database_var = tk.BooleanVar(value=False)
database_switch = ttk.Checkbutton(settings_frame, text="Persistente DB verwenden", variable=database_var)
database_switch.pack(pady=5, padx=10, anchor=tk.W)
quantized_var = tk.BooleanVar(value=True)
quantized_switch = ttk.Checkbutton(settings_frame, text="Quantisiert", variable=quantized_var)
quantized_switch.pack(pady=5, padx=10, anchor=tk.W)
# --- Buttons Frame ---
buttons_frame = ttk.Frame(root, padding="10")
buttons_frame.grid(row=1, column=0, sticky=(tk.W, tk.N))
filepath_button = ttk.Button(buttons_frame, text="Dateipfade verwalten", command=open_filepath_popup, width=20)
filepath_button.pack(pady=150, padx=10, anchor=tk.W)
generate_button = ttk.Button(buttons_frame, text="Generieren", command=generate_ui, width=20)
generate_button.pack(pady=5, padx=10, anchor=tk.W)
# --- Chat Window Frame ---
chat_frame = ttk.Frame(root, padding="10")
chat_frame.grid(row=0, column=1, rowspan=3, sticky=(tk.N, tk.E, tk.S, tk.W))
input_label = ttk.Label(chat_frame, text="Eingabe:")
input_label.pack(pady=5, padx=10, anchor=tk.W)
input_window = tk.Text(chat_frame, height=10, width=100)
input_window.pack(pady=5, padx=10)
output_label = ttk.Label(chat_frame, text="Ausgabe:")
output_label.pack(pady=5, padx=10, anchor=tk.W)
output_window = tk.Text(chat_frame, height=20, width=100, state=tk.DISABLED)
output_window.pack(pady=5, padx=10)
# --- Modelle festlegen ---
# -------------------------------------------------------------------
filepath = "wiki_Data" #Ordnerpfad für die Textdateien
llm_model_name = "mistralai/Mistral-7B-Instruct-v0.3"
embedding_model = "all-mpnet-base-v2"
# -------------------------------------------------------------------
# --- Variablen definieren ---
filepath_list = load_filepaths()
embedding_model = SentenceTransformer(embedding_model)
try:
embedding_model.to("cpu")
except Exception as e:
print(f"Ein Fehler ist aufgetreten {e}")
# --- Initialisierung ---
collection = text_embedding(embedding_model, database_var.get(), filepath_list)
device = gpu_load()
model, tokenizer = model_load(llm_model_name, quantized_var.get())
root.mainloop()
```