<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of Contents** *generated with [DocToc](https://github.com/thlorenz/doctoc)*
- [Hydroponics - Ein automatisiertes System zur hydroponischen Anzucht von Nutzpflanzen](#hydroponics---ein-automatisiertes-system-zur-hydroponischen-anzucht-von-nutzpflanzen)
- [Projektbeschreibung](#projektbeschreibung)
- [Architektur](#architektur)
- [Collector Service](#collector-service)
- [Beschreibung](#beschreibung)
- [Technologien](#technologien)
- [Umsetzung](#umsetzung)
- [Tracking Service](#tracking-service)
- [Beschreibung](#beschreibung-1)
- [Verarbeitung der Sensorwerte](#verarbeitung-der-sensorwerte)
- [Technologien](#technologien-1)
- [Domänenmodell](#domänenmodell)
- [Data Transfer Objects](#data-transfer-objects)
- [Umsetzung](#umsetzung-1)
- [REST Endpunkte](#rest-endpunkte)
- [AMQP Nachrichtenverarbeitung](#amqp-nachrichtenverarbeitung)
- [Notification Service](#notification-service)
- [Beschreibung](#beschreibung-2)
- [Technologien](#technologien-2)
- [Domänenmodell](#domänenmodell-1)
- [NotificationDefinition](#notificationdefinition)
- [NotificationLog](#notificationlog)
- [Data Transfer Objects](#data-transfer-objects-1)
- [Umsetzung](#umsetzung-2)
- [REST Endpunkte](#rest-endpunkte-1)
- [Verwendung von OpenApi](#verwendung-von-openapi)
- [Bean Validation](#bean-validation)
- [AMQP Nachrichtenverarbeitung - Consumer](#amqp-nachrichtenverarbeitung---consumer)
- [Eclipse MicroProfile RestClient](#eclipse-microprofile-restclient)
- [Eclipse MicroProfile Config](#eclipse-microprofile-config)
- [Notification-Publisher-Service](#notification-publisher-service)
- [Eckdaten](#eckdaten)
- [Umsetzung](#umsetzung-3)
- [Erkenntnisse](#erkenntnisse)
- [Notification-Web-Push Service](#notification-web-push-service)
- [Eckdaten](#eckdaten-1)
- [Node.js](#nodejs)
- [Kopplung Client / Server](#kopplung-client--server)
- [Umsetzung](#umsetzung-4)
- [WebApp](#webapp)
- [Umsetzung](#umsetzung-5)
- [Konsumation der REST-Schnittstellen](#konsumation-der-rest-schnittstellen)
- [Konsumation der Notifications](#konsumation-der-notifications)
- [mqtt-message-simulator](#mqtt-message-simulator)
- [Api Gateway](#api-gateway)
- [Beschreibung](#beschreibung-3)
- [Technologien](#technologien-3)
- [Umsetzung](#umsetzung-6)
- [Datenbanken](#datenbanken)
- [Beschreibung](#beschreibung-4)
- [Technologien](#technologien-4)
- [Umsetzung](#umsetzung-7)
- [MOM](#mom)
- [MQTT](#mqtt)
- [RabbitMQ](#rabbitmq)
- [Abhängigkeiten](#abhängigkeiten)
- [Notification Service / Tracking Service](#notification-service--tracking-service)
- [Deployments](#deployments)
- [Docker](#docker)
- [Dateistruktur](#dateistruktur)
- [docker-compose und RabbitMQ](#docker-compose-und-rabbitmq)
- [Kubernetes](#kubernetes)
- [Anforderungen](#anforderungen)
- [Ressourcen](#ressourcen)
- [Installation](#installation)
- [Weitere nützliche Befehle](#weitere-nützliche-befehle)
- [Kubernetes ohne DockerHub](#kubernetes-ohne-dockerhub)
- [Kubernetes mit DockerHub](#kubernetes-mit-dockerhub)
- [Kubernetes Deployments](#kubernetes-deployments)
- [Mosquitto Deployment/Service](#mosquitto-deploymentservice)
- [PostgreSQL Deployment](#postgresql-deployment)
- [RabbitMQ Deployment](#rabbitmq-deployment)
- [Collector Deployment](#collector-deployment)
- [Tracking Deployment](#tracking-deployment)
- [Notification Deployment](#notification-deployment)
- [Kubernetes HPAs (Horizontal Pod Autoscaler)](#kubernetes-hpas-horizontal-pod-autoscaler)
- [Quellen](#quellen)
- [Anforderungen](#anforderungen-1)
- [Erstellen eines HPA (Horizontal Pod Autoscaler)](#erstellen-eines-hpa-horizontal-pod-autoscaler)
- [Ergebnisse](#ergebnisse)
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
# Hydroponics - Ein automatisiertes System zur hydroponischen Anzucht von Nutzpflanzen
Dieses Projekt wurde im Zuge der Lehrveranstaltungen CLC (Cloud Computing), MUS (Mobile und ubiquitäre Systeme) und SVE (Service Engineering) von:
* David Lang (S1910454020)
* Alexander Doppelbauer (S1910454005)
umgesetzt.
# Projektbeschreibung
Unter dem Begriff Hydroponik versteht man die Kultivierung von Pflanzen ohne Erde. Die Wurzeln der Pflanzen werden dabei in eine Nährlösung bestehend aus Wasser und Nährstoffen getaucht. Diese Herangehensweise bietet eine Vielzahl von Vorteilen: Verringerter Platzverbrauch, verringerter Wasserverbrauch, frühere Ernte, erhöhte Erträge und weitestgehender Verzicht auf Pestizide.
Diese Vorteile machen Hydroponik zu einer attraktiven Alternative zur konventionellen Landwirtschaft, welche im kleineren Maßstab auch zu Hause praktiziert werden kann. Um Pflanzen erfolgreich hydroponisch zu kultivieren, ist es jedoch erforderlich, sämtliche die Pflanzen beeinflussenden Faktoren, fortlaufend zu kontrollieren. Dazu gehören: Die Umgebungstemperatur, die Lichtverhältnisse, der PH-Wert der Nährlösung, der EC-Wert der Nährlösung, der Sauerstoffgehalt der Nährlösung und der Füllstand der Nährlösung. In einem manuell betriebenen hydroponischen System kann dies nur schwer umgesetzt werden. Ziel dieses Projekts ist es daher, ein System zu entwickeln, welches möglichst große Teile des Prozesses zur Anzucht von Pflanzen (mittels Hydroponik) überwacht und automatisiert. Da hierfür sowohl IoT, in Form von hydroponischen Stationen, als auch eine Infrastruktur benötigt wird, welche die Sensordaten verarbeitet und den Benutzern zugänglich macht, haben wir uns dazu entschieden dieses Projekt Lehrveranstaltungsübergreifend mit SVE und MUS durchzuführen. Der MUS Teil beschränkt sich hierbei hauptsächlich auf die hydroponische Station, das Senden von Sensordaten an einen MQTT Broker und auf das Frontend zur Anzeige der Stationsdaten. Im Rahmen der Lehrveranstaltung CLC setzen wir den Fokus des Projekt auf den Betrieb der komplexen Architektur in der Cloud. Dabei nutzen dabei AWS als Cloud-Anbieter. Besonderes Augenmerk legen wird dabei auf Punkte wie Infrastructure-as-Code, Skalierbarkeit und Ausfallsicherheit. Diese werden in den folgenden Abschnitten näher beschrieben.
Hydroponische Stationen in unserem System generieren laufend Daten. Diese Sensordaten werden an AWS Iot-Core gesendet. Iot-Core leitet die Daten an den Collector Service weiter (AWS Lambda). Dieser vorverarbeitet die Daten und speist sie in eine RabbitMQ-Instanz (Amazon MQ) ein.
Diese Daten werden anschließend durch verschiedene Konsumenten verarbeitet (Business-Logik, Persistierung). Diese Architektur wird in Kubernetes (AWS EKS) betrieben. Dazu müssen alle Services containerisiert werden.
Die Datenbanken werden als AWS RDS Postgres Instanzen betrieben. Das Angular-basierte Frontend wird in AWS S3 gehostet.
# Architektur AWS
Dieser Abschnitt beschäftigt sich mit dem Hosting der Architektur in AWS. Außerdem werden hier die im Kurs gestellten Fragestellungen beantwortet.
Folgende Abbildung gibt einen Überblick über die Architektur.

## AWS Cloud Formation
Um das Deployment unserer Service weitgehend zu automatisieren verwenden wir AWS Cloud Formation. Cloud Formation erlaubt es die benötigten Services deklarativ in Form einer .yaml Datei zu beschreiben. Mithilfe eines einzigen Befehls kann die deklarative Beschreibung an AWS gesendet werden, welche die verschiedenen Services daraufhin automatisch provisioniert.
## IOT Core
AWS IoT-Core ist darauf ausgelegt IoT-Geräte mit der AWS-Cloud zu verbinden, ohne dass dafür explizite Server bereitgestellt werden müssen. Laut Dokumentation kann Iot-Core Milliarden von Geräte und Billionen von Nachrichten unterstützen ohne dass dabei händisch Skalierungsschritte gesetzt werden müssen.
Um IoT-Core verwenden zu können, adaptierten wir unseren StationSimulator und integrierten das zur Verfügung gestellte SDK um Nachrichten zu versenden. Dieses SDK verwendet intern MQTT zum Versenden der Nachrichten.
Bei IoT-Core ist es möglich Aktionen auf Basis von sogenannten Regeln auszulösen. In unserem Fall wird eine AWS-Lambda Funktion (collector lambda) aufgerufen, wenn Nachrichten an das Topic 'sensors/#' gesendet werden. Regeln werden dabei in Form von SQL-Statements definiert: ```SELECT *, topic() AS topic FROM 'sensors/#'```
### Deployement
Um AWS IoT-Core zu verwenden muss lediglich einmalig ein Endpunkt registriert werden. Auf diesen können dann IoT-Geräte mit den entsprechenden Zugangsdaten zugreifen.
## AWS Lambda
AWS Lambda ist die Funktion as a Service Lösung von AWS. Diese ermöglicht es Code in der Cloud auszuführen ohne explizite Server bereitstellen zu müssen. Dabei können verschiedenste Programmiersprachen zur Umsetzung eingesetzt werden. In unserem Fall haben wir die Implementierung des Collector-Service (Java) zu einer Lambda-Funktion migriert.
Die Lambda-Funktion wird ausgelöst sobald in IoT-Core Nachrichten auf dem Topic 'sensors/#' eintreffen. Die Lambda-Funktion
verarbeitet diese und leitet die Nachrichten an eine Rabbit-MQ (gehostet in Amazon MQ) weiter.
AWS Lambda skaliert automatisch und kann die Funktion in bis zu 1000 Instanzen parallel ausführen.
### Deployment
Das nachfolgende Listing zeigt das Deployement file des collector-lambda-service.
Besonders interessant ist dabei das Event, bei welchem die Funktion aufgerufen wird.
Hier wird die oben erwähnte IoT-Rule definiert. Außerdem wird eine Variable aus AWS Systems Manager Parameter Store importiert. Diese enthält den Endpunkt der Amazon MQ, welcher vom Service verwendet wird um Nachrichten an die Queue zu senden.
```yaml=
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Description: collector-lambda
Resources:
Collector:
Type: AWS::Serverless::Function
Properties:
CodeUri: CollectorFunction
Handler: collector.App::handleRequest
Runtime: java8
Timeout: 10
MemorySize: 512
Environment:
Variables:
QUEUE_URL: '{{resolve:ssm:/QueueStack/URL:1}}'
Events:
IotRule:
Type: IoTRule
Properties:
Sql: SELECT *, topic() AS topic FROM 'sensors/#'
```
## Amazon MQ
Mit Amazon MQ lässt sich eine Message-Oriented-Middleware in der AWS Cloud betreiben. Dabei wird sowohl Apache ActiveMQ, als auch
RabbitMQ unterstützt. Wir verwenden RabbitMQ was eine Migration der bestehenden Services (welche den Broker nutzen) ermöglicht
ohne Code anpassen zu müssen. Der Broker empfängt die Nachrichten aus der Lambda-Funktion, dupliziert sie und stellt sie in zwei Queues zur Verfügung, welche vom Tracking- und Notification-Service abgearbeitet werden.
Zusätzlich schreibt der Notification-Service die generierten Notification in eine weitere Queue, welche wiederum der Notification-Publisher-Service verarbeitet.
Die Verwendung von MOM-Technologien erlaubt es die Services in unserer Architekture stark entkoppelt zu betreiben. Die Queue fungieren dabei als Puffer um einzelnen Services nicht mit einer Flut von Nachrichten zu überlasten.
### Deployment
Das nachfolgende Listing zeigt das Deployement file des AmazonMQ Service.
Im Abschnitt RabbitMQEndpoint wird der Endpunkt der Queue als Variable exportiert, welche wiederum von der Lambda-Funktion importiert wird.
```yaml=
AWSTemplateFormatVersion: 2010-09-09
Description: Template for hydroponics rabbitMQ deployement
Resources:
HydroponicsRabbitMQ:
Type: AWS::AmazonMQ::Broker
Properties:
AutoMinorVersionUpgrade: "false"
BrokerName: HydroponicsRabbitBroker
DeploymentMode: SINGLE_INSTANCE
EngineType: RabbitMQ
EngineVersion: "3.8.6"
HostInstanceType: mq.t3.micro
PubliclyAccessible: "true"
Users:
-
Password: 982xP2W!RQIi*OEJC #only for testing, never declare password in template in prod!, use aws secrets manager instead
Username: hydroman
RabbitMQEndpoint:
Type: AWS::SSM::Parameter
Properties:
Type: String
Name: /QueueStack/URL
Value:
Ref: "HydroponicsRabbitMQ"
```
## AWS RDS (Amazon Relational Database Service) PostgreSql
Gemäß dem Prinzip von Microservices verwenden wir für jeden Microservice eine eigene Datenbank. Der Datenaustausch zwischen den einzelnen Services erfolgt über deren REST-Schnittstellen. Zur Bereitstellung der Datenbanken haben wir uns für den AWS RDS (Amazon Relational Database Service) entschieden, da hierbei die Verwaltung der Datenbanken über AWS erfolgt und leicht konfiguriert werden kann. So können Datenbanken skaliert, repliziert sowie Backups erstellt werden. Dies begünstigt wiederum die Ausfallsicherheit.
Amazon RDS mit der PostgreSQL Engine ermöglichrt das einfache konfigurieren von Replikas um die lesenden Zugriffe zu verteilen und um eine Wiederherstellung der Ressourcen zu ermöglichen. Die Replikas können in der selben Region, wie auch in unterschiedlichen Regionen konfiguriert werden (https://aws.amazon.com/de/blogs/database/best-practices-for-amazon-rds-postgresql-replication/).
Zur Bereitstellung der Datenbanken haben wir uns dazu entschieden `CloudFormation` zu verwenden, um die Ressourcen automatisch auf AWS zu allokieren. Das nachfolgende Deployment beschreibt die Konfiguration der Tracking-Datenbank (Tracking Service) und dder Notification-Datenbank (Notification Service). Hierbei werden `db.t3.micro` Instanzen mit der Datenbank Engine `postgres` und `50 GB` Speicher allokiert.
```yml
AWSTemplateFormatVersion: 2010-09-09
Description: >-
Hydroponics DBs
Resources:
sensorsDB:
Type: 'AWS::RDS::DBInstance'
Properties:
DBName: sensor_db
DBInstanceClass: db.t3.micro
Engine: postgres
MasterUsername: postgres
MasterUserPassword: ...
AllocatedStorage: 50
PubliclyAccessible: true
notificationDB:
Type: 'AWS::RDS::DBInstance'
Properties:
DBName: notification_db
DBInstanceClass: db.t3.micro
Engine: postgres
MasterUsername: postgres
MasterUserPassword: ...
AllocatedStorage: 50
PubliclyAccessible: true
```
## EKS (Amazon Elastic Kubernetes Service)
Zur Bereitstellung unserer Microservices haben wir uns für die Container-Orchestrierungsplattform Kubernetes entschieden. Amazon bietet hierfür den Service Amazon EKS an (Amazon Elastic Kubernetes Service). Dieser bietet die Möglichkeit Kubernetes-Anwendungen in der Cloud zu starten, auszuführen und zu skalieren. Weiters ist EKS mit Kubernetes vollständig kompatibel und ermöglicht das einfache Migrieren von Kubernetes Anwendungen. Dies war unter Anderem ein Grund dafür, dass wir uns für Amazon EKS entschieden haben. Hinzu kommt, dass EKS-Cluster vollständig in AWS verwaltet werden können.
Ein EKS-Cluster kann auf Basis von EC2 Instanzen oder auf Basis von Fargate betrieben werden. EC2 Instanzen entsprechen virtuellen Worker-Nodes. Fargate ermöglicht das deployen von serverless Containern. Unser ursprünglicher Plan war es Fargate zu verwenden. Auf Basis der vorangegangenen Recherche und Experimenten mussten wir leider feststellen, dass es zu Problemen mit dem öffentlichen Zugriff von außen gekommen ist. Aus diesem Grund haben wir uns für einen EKS-Cluster bestehend aus EC2 Instanzen entschieden.
### Automated Infrastructure Provisioning (Infrastructure-as-Code)
Für das automatisierte Provisioning des EKS-Clusters verwenden wir das Befehlszeilenprogramm `eksctl` (https://eksctl.io/). Hierbei handelt es sich um ein von der Community entwickeltes Tool, welches auf GitHub veröffentlicht wurde (https://github.com/weaveworks/eksctl). Dieses Tool ermöglicht das einfache erstellen eines EKS-Clusters. Dabei wird auf Amazons `CloudFormation`, welches in AWS für Infrastructure-As-Code verwendet wird, aufgesetzt und bildet im wesentlichen nur eine weitere Abstraktionsebene. `CloudFormation` bietet die Möglichkeit eine Sammlung von Ressourcen von AWS und Drittanbietern schnell und einfach bereitzustellen und diese über deren Lebenszyklus zu verwalten. Wie im nachfolgenden Quellcode Auszug dargestellt kann ein EKS-Cluster bestehend aus drei EC2 Linux Instanzen mit dem Befehl `eksctl create cluster` erzeugt und gestartet werden.
```
eksctl create cluster
--name hydroponics-cluster
--version 1.17
--region us-east-2
--nodegroup-name linux-nodes
--node-type t3.medium
--nodes 3
```

Für das weitere Arbeiten mit dem Kubernetes-Cluster wird das Befehlszeilenprogramm `kubectl` verwendet. Mit `kubectl` können Clusterressourcen überprüft, erstellt, gelöscht und aktualisiert werden. Nachdem der Cluster erfolgreich mittels `eksctl` erstellt wurde werden unsere Anwendungen mittels eines Windows-Powershell Skript in den Cluster deployed. Die Deployments sind in Form von Kubernetes-YAML Deploymentdateien beschrieben. Auf diese wird nachfolgend näher eingegangen.

### Kubernetes
K8s ist ein quelloffenes System um das Deployment, das Skalieren und das Verwalten von containerisierten Anwendungen zu automatisieren. Unsere Kubernetes Architektur lässt sich in die folgenden drei Bestandteile einteilen.
* `Services`: In Kubernetes bilden die Services eine logische Abstraktion einer Menge von Pods. Die Pods, welche einem bestimmten Service nachgestellt sind werden über sogenannte Selektoren festgelegt. Jeder Service ist demnach für eine Menge von Pods mit einem bestimmten Selektor/Label zuständig. In unserer Anwendung verwenden wir Services vom Typ `Type=ClusterIP` für unsere Pods. Dadurch werden die Services mittels einer internen Cluster-IP verfügbar gemacht und sind demnach nur innerhalb des Clusters erreichbar. Das Routing von außerhalb zu den Services erfolgt durch einen Ingress.
* `Ingress`: Der Ingress ist dafür zuständig die HTTP/HTTPS Anfragen von außerhalb des Clusters in den Cluster zu den Services im Cluster zu routen. Hierfür werden Regeln definiert, mittels welcher die korrekte Zuordnung zu den Services passiert. Um einen Ingress verwenden zu können bedarf es eines Ingress Controllers. Das alleinige Erstellen eines Ingress ohne Controller hat keine Auswirkung. In AWS kann zum Beispiel der `ingress-nginx` verwendet werden. Dieser erzeugt einen Netzwerk-Loadbalanver auf der AWS Seite und verbirgt sich hinter einem Service vom Typ `Type=LoadBalancer`. Der Vorteil, welcher sich durch die Verwendung eines Ingress ergibt ist, dass dieser einerseits als API-Gateway fungiert und andererseits ein direkter Zugriff von Außen auf die Kubernetes-Services nicht möglich ist (Security).
* `Deployment`: Die Pods unserer Kubernetes-Anwendung werden über Deployments erstellt und aktualisiert. Hierbei haben wir die Möglichkeit mehrere Instanzen pro Pod zu betreiben, wobei innerhalb eines Pods ein Docker-Container betrieben wird.

### Deployment
In diesem Abschnitt werden die wesentlichen Kubernetes Deployments näher erläutert und dargestellt, mit welchen unsere Anwendungen bereitgestellt werden. In Hinblick auf die Automatisierung des Deployments haben wir uns dazu entschieden das Deployment der Kubernetes Ressourcen/Deployments mittels eines Skrips zu automatisieren, welches nach dem erfolgreichen Starten des Clusters ausgeführt werden kann.
```
kubectl apply -f ./../kubernetes/deployments/tracking-deployment.yml
kubectl apply -f ./../kubernetes/deployments/notification-deployment.yml
kubectl apply -f ./../kubernetes/deployments/notification-web-push-deployment.yml
kubectl apply -f ./../kubernetes/deployments/ingress-deployment.yml
kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/deploy/static/provider/aws/deploy.yaml
```
Der Ingress enthält Regeln für das Routen und fungiert als API-Gateway. Eintreffende Anfragen werden mittels den in `path` definierten Regeln an den entsprechenden Service weitergeleitet. Um die Anfragen an den entsprechenden Service weiterleiten zu können wird das DNS von Kubernetes verwendet. Dies ermöglicht es die Services über deren Namen anzusprechen.
```yml
apiVersion: networking.k8s.io/v1beta1
kind: Ingress
metadata:
name: ingress
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /$2
spec:
rules:
- http:
paths:
- path: /tracking(/|$)(.*)
backend:
serviceName: tracking-service
servicePort: 8082
- path: /notification(/|$)(.*)
backend:
serviceName: notification-service
servicePort: 8083
- path: /webpush(/|$)(.*)
backend:
serviceName: notification-web-push-service
servicePort: 9001
```
Als Ingress Controller verwenden wir den zuvor beschriebenen `ingress-nginx` Controller. Durch das deployen dieses Controllers wird automatisch ein AWS Loadbalancer erzeugt und eine öffentliche Addresse vergeben, durch welche über den Ingress auf unsere Anwendungen zugegriffen werden kann.
```
https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/deploy/static/provider/aws/deploy.yaml
```
Nachfolgend dargestellt ist das Kubernetes Deployment für den `tracking-service`. Dieses setzt sich aus einem Service com Typ `ClusterIP` und einem `Deployment` zusammen.
```yml
apiVersion: v1
kind: Service
metadata:
name: tracking-service
labels:
app: tracking-service
spec:
type: ClusterIP
ports:
- port: 8082
targetPort: 8080
selector:
app: tracking-service
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: tracking-service
spec:
selector:
matchLabels:
app: tracking-service
replicas: 1
template:
metadata:
labels:
app: tracking-service
spec:
containers:
- name: tracking-service
image: doppl/aws-tracking:1.2
ports:
- containerPort: 8080
env:
- name: TRACKINGDB_DB_SERVER
value: "hsb0x9jjpo3fis.cgpullhhhuof.us-east-2.rds.amazonaws.com"
# ...
```
Beim Deployment sind Problemen mit den verwendeten EC2-Instanz-Typen aufgetreten. Aus Kostengründen haben wir uns zu Beginn dazu entschieden zwei EC2-Instanzen vom Typ `t2.micro` zu verwenden. Es stellte sich jedoch heraus, dass die Kapazitäten nicht ausgereicht haben um mehr als zwei Pods zu betreiben. Unsere Lösung bestand darin, einen Cluster bestehend aus drei EC2-Instanzen vom Typ `t3.medium` zu verwenden.
### Skalierbarkeit
Bei AWS EKS müssen für Node-Gruppen keine Datenverarbeitungskapazitäten explizit bereitgestellt werden um die Anwendung skalieren zu können. Weiters können Pods in Kubernetes über manuelles Anpassen der Replikas oder automatisch über HPA (Horicontal Pod Autoscaling) in abhängigkeit der Auslastung der einzelnen Pods skaliert werden. Die Verwendung von HPA wurde aus Kostengründen nur in einem loaklen `minikube` Setup getestet und nicht in AWS. Da Amazon EKS jedoch mit Kubernetes vollständig kompatibel ist sollte dies jedoch keine Probleme bereiten, da es sich lediglich um eine Erweiterung des Deployments handelt.
### Ausfallsicherheit
EKS führt die Kubernetes-Steuerebene über mehrere AWS Availability Zones hinweg aus, erkennt automatisch nicht betriebsbereite Knoten. Fehlerhafte Knoten werden ersetzt. Weiters werden On-Demand-Upgrades und -Patches ohne Ausfallzeit angteboten. EKS bietet ein SLA mit 99,95 % Betriebszeit. Ausfallsicherheit ist weiters dadurch gegeben, dass der EKS Cluster aus mehreren Knoten besteht. Fällt ein Knoten aus, kann dieser einfach ersetzt werden. Ausfallsicherheit auf Ebene von Kubernetes ist dadurch gegeben, dass terminierte Pods vom System automatisch neu gestartet werden. Dies kann um Health-Checks erweitert werden, welche im Deployment definiert werden. Diese dienen dazu, nicht erreichbare/fehlerhafte Anwendungen zu identifizieren.
## S3 Bucket
Wir verwenden Amazon Simple Storage Service um unser angular-basierte Webanwendung zu hosten. Über den Ingress-Endpunkt des EKS-Clusters greift das Frontend auf die dahinterliegenden Services zu.
## CLC Fragen
### Automated Infrastructue Provisioning/(Infrastructure-as-Code). Wie wurde im vorliegenden Projekt Automated Infrastructure Provisioning berücksichtigt?
Siehe Abschnitt "Cloud Formation" oben. Zusätzlich haben wir Powershell-Scripts implementiert, mit denen es möglich ist die gesamte Architektur zu deployen und auch wieder zu löschen. Diese Skripte verwenden die Cloud-Formation Deployment-Files in Verbindung mit der AWS Cloudformation CLI.
Da die Beschreibung der Ressourcen als Code vorliegt (.yaml-Dateien), kann diese in ein Version Control System eingecheckt werden. Somit wurde "Infrastructure-as-Code" umgesetzt.
### Skalierbarkeit. Wie wurde im vorliegenden Projekt Skalierbarkeit berücksichtigt?
Unsere Architektur wurde mit Fokus auf Skalierbarkeit bei allen Services umgesetzt.
* IoT-Core: Skaliert automatisch und kann mit Billionen von Nachrichten von Milliarden von Endgeräten umgehen. IoT-Core ist vollverwaltet, was bedeutet das keine bestimmte Infrastruktur händisch verwaltet werden muss.
* AWS-Lambda: Skaliert automatisch, Code wird ohne expliziten Server ausgeführt.
* Amazon-MQ: Die Verwendung von MOM-Technologien erlaubt es die Services in unserer Architektur stark entkoppelt zu betreiben. Die Queues fungieren dabei als Puffer um einzelnen Services nicht mit einer Flut von Nachrichten zu überlasten.
* AWS Elastic Kubernetes Service: Bei AWS EKS müssen für Node-Gruppen keine Datenverarbeitungskapazitäten explizit bereitgestellt werden um die Anwendung skalieren zu können. Weiters können Pods in Kubernetes über manuelles Anpassen der Replikas oder automatisch über HPA (Horicontal Pod Autoscaling) in abhängigkeit der Auslastung der einzelnen Pods skaliert werden. Die Verwendung von HPA wurde aus Kostengründen nur in einem loaklen minikube Setup getestet und nicht in AWS. Da Amazon EKS jedoch mit Kubernetes vollständig kompatibel ist sollte dies jedoch keine Probleme bereiten, da es sich lediglich um eine Erweiterung des Deployments handelt.
* PostgresDBs: Wir verwenden von AWS verwaltete Instanzen der DBs. Dies führt dazu, dass einfach Read-Replicas hinzugeschalten werden können um die Leselast auf den Datenbanken besser regulieren zu können. Außerdem kann jederzeit auf eine größere Datenbank-Instanz gewechselt werden (Scale-Up).
### Ausfallssicherheit. Wie wurde im vorliegenden Projekt Ausfallssicherheit berücksichtigt?
* IoT-Core: Es werden keine expliziten Server-Instanzen betrieben. Ausfälle treten nur auf, wenn die ganze AWS-Region in der der IoT-Core Endpunkt betrieben wird, ausfällt.
* AWS-Lambda: Es werden keine expliziten Server-Instanzen betrieben. Ausfälle treten nur auf, wenn die ganze AWS-Region in der der die Lambda-Funktion gehostet wird, ausfällt.
* AWS Elastic Kubernetes Service: Dank der Orchestrierung mittels Kubernetes werden bei Ausfall einzelner Services, diese automatisch neu gestartet.
*
### NoSql. Welchen Beitrag leistet NoSql in der vorliegenden Problemstellung?
Keinen.
### Replikation. Wo nutzen Sie im gegenständlichen Projekt Daten-Replikation?
Siehe Frage Skalierbarkeit: Unterpunkt PostgresDBs. AWS RDS unterstützt zudem das automatisierte Erstellen von Backups und Snapshots.
### Kosten. Welche Kosten verursacht Ihre Lösung? Welchen monetären Vorteil hat diese Lösung gegenüber einer Nicht-Cloud-Lösung?
Zur Berechnung der Kosten verwenden wir den AWS Pricing Calculator. Mithilfe dieses Werkzeugs lassen sich die Kosten für den Betrieb von Architekturen in AWS berechnen.
Die nachfolgenden Screenshots zeigen die Kosten unserer Architektur bei 1000 aktiven Hydroponics-Stationen.



Der monetäre Vorteil unserer Cloud-Lösung liegt in der automatischen Skalierung der Kosten mit der Anzahl der Nutzer. Bei niedrigen Nutzerzahlen sind auch die Kosten entsprechend niedrig. Steigen die Nutzerzahlen steigen auch die Kosten in einem angemessenen Verhältnis.
Nimmt man an das jeder User (insgesamt 1000) ein Abo in Höhe von 10€/Monat abschließt so belaufen sich die Kosten für das Hosting auf rund 3,7% des Umsatzes, was in unseren Augen vertretbar ist.
Würde man unsere Architektur auf herkömmlichen Weg hosten, müssten im vorhinein die Anzahl der Nutzer abgeschätzt werden, um die entsprechende Hardware zukaufen zu können. Bei einem Anstieg der Zahlen müssten zudem laufend neue Hardware angekauft werden.
----------------------------------------------
Die folgenden Abschnitte beschreiben die einzelnen Services und deren Funktionalitäten. Diese wurden bereits im Rahmen der LVAs MUS und SVE entwickelt.
----------------------------
# Architektur
Die nachfolgenden Erläuterungen sollen einen Gesamtüberblick über das von uns entworfene System geben. Die einzelnen Komponenten des Systems werden in weiteren Abschnitten noch näher ausgeführt.

Ein zentraler Bestandteil unserer Architektur besteht in der Verwendung von MOM Technologien für den asynchronen Nachrichtenaustausch von den hydroponischen Stationen hin zu unseren Services welche die Geschäftslogik realisieren.
In der linken Seite der Abbildung befindet sich der IoT-Bereich. Hydroponische Stationen senden Sensordaten an einen MQTT-Broker. Da wir zum Testen nur eine physische Station haben, gibt es ebenfalls einen Simulator, welcher das Simulieren und Senden von Sensordaten an den MQTT-Broker übernimmt.
Der Collector Service bildet die Brücke zwischen dem IoT-Bereich und der eigentlichen Geschäftslogik. Er verarbeitet die eingehenden MQTT NAchrichten und sendet diese Weiter an ein Exchange des RabbitMQ Brokers.
Über die Queues des RabbitMQ Brokers werden Nachrichten anschließend auf die Services verteilt, welche die Geschäftslogik implementieren. Die Logik unseres Systems wird von drei Microservices realisiert:
* **Tracking Service**: Der Tracking Service bekommt Sensordaten von einer AMQP-Worker-Queue zugewiesen. Die Sensordaten werden anschließend überprüft und in einer PostgreSQL Datenbank persistiert.
* **Notification Service**: Der Notification Service ist für die Überprüfung der Sensorwerte und die Erstellung von Benachrichtigungen verantwortlich.
* **Notification-Web-Push Service**: Dieser Service empfängt Benachrichtigungen und sendet diese mittels WebPush an die Clients.
Das Api Gateway bildet anschließend die Schnittstelle nach außen für die Clients zur Kommunikation mit den Microservices.
# Collector Service
Der Collector Service ist ein MQTT-Subscriber und ein AMQP-Publisher. Er empfängt MQTT-Nachrichten vom MQTT-Broker `eclipse-mosquitto` und sendet Nachrichten an einen Exchange des Brokers RabbitMQ. Er bildet demnach die Brücke zwischen der IoT-Infrastruktur und der eigentlichen Business-Logik.
## Beschreibung
Sensor-Stationen generieren laufend Daten. Diese Sensordaten werden an eine Mosquitto MQTT Queue gesendet. Der Collector Service bildet den Subscriber auf die Sensordaten von MQTT, liest diese und sendet diese an einen Exchange des RabbitMQ Brokers. RabbitMQ stellt zwei Worker-Queues zur Verfügung, welche mittels Round-Robin-Dispatching die Nachrichten an ihre Consumer verteilen.
Die Nachrichten, welche an die MQTT Queue gesendet werden werden im CSV Format: `T1:11,82;U2:12,58;` an das Topic `station/<STATIONID>` gesendet. Die Nachricht setzt sich aus folgenden Bestandteilen zusammen: (`<SENSORACRONYM><SENSORID>:<SENSORVALUE>;`)+. Der Collector Service ist ein Subscriber auf das Topic `station/+`. Da Sensordaten von mehreren Stationen gesendet werden können, verwenden wir ein hierarchisch aufgebautes Topic, wobei wir als Wildcard `+` verwenden. Der Collector Service vereinzelt die eingehenden Nachrichten, fügt einen Zeitstempel hinzu und sendet diese im `JSON` Format an ein Exchange des RabbitMQ Brokers, welcher die Nachrichten an die Queues `measures-bl` und `measures-notification-bl` routet. Da eine MQTT Nachricht beliebig viele Messwerte verschiedener Sensoren enthalten kann, bedeutet vereinzeln, dass der Collector Service für jeden Sensorwert eine eigene AMQP Nachricht erzeugt. Die Nachricht im `JSON` Format sieht wie folgt aus:
```json
{
"stationId":1,
"sensorId":1,
"acronym":"T",
"value":2.06,
"createdAt":"2020-05-30T14:13:03.331677"
}
```
## Technologien
Der Collector Service wird in Form eines Maven Projekts realisiert. Dieses enthält Abhängigkeiten auf die Frameworks `org.eclipse.paho` und `com.rabbitmq` und `com.fasterxml.jackson.core`.
* Das Framework `org.eclipse.paho` wird für die Kommunikation mit dem MQTT Broker verwendet.
* Das Framework `com.rabbitmq` wird für die Kommunikation mit dem RabbitMQ Broker verwendet.
* Das Framework `com.fasterxml.jackson.core` wird zum erzeugen von `JSON` Dokumenten aus Java Objekten (unmarshalling) verwendet.
```xml
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.5.3</version>
</dependency>
```
## Umsetzung
Die beiden wesentlichen Klassen dieser Implementierung sind der `MeasureConsumer` und der `MeasurePublisher`.

Am `MqttClient` wird eine Consumer registiert (`MeasureConsumer`) welcher das Interface `IMqttMessageListener` implementiert.
```Java
try{
MqttClient mqttClient = new MqttClient(mqttConnectionString, mqttClientId);
mqttClient.connect(getMqttOptions());
mqttClient.subscribe(mqttTopic, new MeasureConsumer());
} catch(MqttException ex) {
ex.printStackTrace();
}
```
Die Callback Methode des `MeasureConsumer` wird bei jedem Eintreffen einer Nachricht aufgerufen. In dieser Methode wird anschließend die Nachricht deserialisiert, die Sensorwerte vereinzelt und an den `MeasurePublisher` übergeben.
```Java
public class MeasureConsumer implements IMqttMessageListener {
private Deserializer<List<MeasureModel>> deserializer;
private MeasurePublisher measurePublisher;
public MeasureConsumer(){
this.deserializer = new MeasureMessageDeserializer();
this.measurePublisher = new MeasurePublisher();
}
@Override
public void messageArrived(String topic, MqttMessage msg) throws Exception {
List<MeasureModel> measures = deserializer.deserialize(topic, msg.toString());
System.out.println(measures.size());
for(MeasureModel measureModel : measures){
measurePublisher.publish(measureModel);
}
}
}
```
Der nachfolgend dargestellte Auszug aus dem Quellcode zeigt, wie der `MeasurePublisher` zuerst das `MeasureModel` Objekt in ein `JSON` Dokument transformiert und dieses anschließend an das Exchange `measures-exchange` des RabbitMQ Brokers sendet.
```Java
public class MeasurePublisher {
private ConnectionFactory connectionFactory;
private ObjectMapper objMapper;
// ...
public void publish(MeasureModel measureModel){
String measureExchangeName = AppSettings.getProperty("hydroponic.amqp.exchange.measures.name").getValue();
try (Connection connection = this.connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(measureExchangeName, BuiltinExchangeType.FANOUT);
String message = objMapper.writeValueAsString(measureModel);
channel.basicPublish(measureExchangeName, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch(Exception ex){
ex.printStackTrace();
}
}
}
```
# Tracking Service
Der Tracking Service bekommt Sensordaten von einer AMQP Worker-Queue zugewiesen. Die Sensordaten werden anschließend überprüft und in einer PostgreSQL Datenbank persistiert.
## Beschreibung
Der Tracking Service ist für die Verwaltung der hydroponischen Stationen und ihren Sensoren verantwortlich und stellt eine REST-Schnittstelle zur Verwaltung derselben zur Verfügung. Stationen müssen explizit registriert werden und bekommen eine eindeutige Id zugewiesen. Die Sensoren hingegen werden automatisch bei der Station registriert, sobald Sensorwerte beim Tracking Service für die entsprechende Station einlangen. Die Sensorwerte bekommt der Tracking Service von der AMQP Worker-Queue `measures-bl` des RabbitMQ Brokers zugewiesen. Die Payload der Nachricht sind die Sensorwerte im `JSON` Format. Die Verwendung einer Worker-Queue ermöglicht es uns den Tracking Service zu skalieren, wobei die Sensorwerte nicht mehrfach verarbeitet werden, da diesem immer nur einem Consumer zugestellt werden.
Die Anwendung ist in drei Schichten realisiert.
* Die REST-Schnittstelle bildet die Präsentationsschicht.
* Geschäfts Logik Schicht
* Persistenz Schicht
### Verarbeitung der Sensorwerte
Für jeden eingelangten Sensorwert wird überprüft ob die angegebene Station existiert. Existiert keine Station wird der Sensorwert verworfen. Anderenfalls wird überprüft ob ein entsprechender Sensor für die Station registriert ist. Ist dies nicht der Fall wird auf Basis des in der Nachricht enthaltenen Acronym überprüft ob der Sensortyp von unserem System unterstützt wird. Wird der Sensortyp unterstützt wird ein neuer Sensor für die Station angelegt. Der Messwert wird anschließend dem entsprechenden Sensor zugewiesen und persistiert. Die Daten des Tracking Service werden dabei in einer eigenen PostgreSQL Datenbank verwaltet, welche in einem Docker Container betrieben wird.
## Technologien
Die nachfolgend aufgelisteten Technologien wurden unter anderen zur Implementierung des Tracking Service verwendet.
* Als Applikationsserver verwenden wir `Quarkus`. Hierbei handelt es sich um einen leichtgewichtigen Applikationssserver welcher die Entwicklung von Microservices unterstützt.
* Zur Konfiguration unserer Anwendung mittels Abhängigkeitsinjektion verwenden wir eine `Eclipse MicroProfile Config` Implementierung welche von Quarkus mitgeliefert wird.
* Zur Dokumentation der REST-Schnittstelle unserer Anwendung verwenden wir `OpenApi`. Ergebnis davon ist, eine Schnittstellenbeschreibung im `yml` Format welche mittels `Swagger-Ui` komfortabel konsumiert werden kann.
* Zur Kommunikation mit dem RabbitMQ Broker verwenden wir `amqp-client` von `com.rabbitmq`.
* Zur Validierung der Werte der Data-Transfer-Objekte welche an die REST-Schnittstelle übergeben werden verwenden wir Bean Validation. Im speziellen verwenden wir die Implementierung `quarkus-hibernate-validator`.
## Domänenmodell

## Data Transfer Objects
Um die Ressourcen, welche über die REST-Schnittstelle ausgetauscht werden unabhängig vom Domänenmodell zu machen, haben wir uns dazu entschieden Data-Transfer-Objekte für den Austausch von Ressourcen einzuführen.

## Umsetzung
Viele Konzepte welche der Tracking Service verwendet werden im Abschnitt zum Notification Service näher beschrieben. Aus diesem Grund wird an dieser Stelle nur auf die wesentlichsten Bestandteile eingegangen.
### REST Endpunkte
* Über den Endpunkt `<IP>:<PORT>/aopenapi` kann das OpenApi Dokument abgefragt werden.
* Über den Endpunkt `<IP>:<PORT>/swagger-ui` gelangt man zur Swagger-Ui. Die nachfolgenden Abbildungen wurden der Swagger-Ui entnommen.




### AMQP Nachrichtenverarbeitung
Zum Konsumieren der Sensorwerte von RabbitMQ haben wir einen `MeasureConsumer` implementiert. Dieser hat den Scope `@ApplicationScoped` und implementiert das Interface `Runnable`. Weiters registriert sich diese Implementierung auf das `@Observes StartupEvent` und das `@Observes ShutdownEvent` des Applikationsservers. Ist der Applikationsserver gestartet wird durch den `ExecutorService` ein eigener Thread gestartet in welchem ein Consumer bei RabbitMQ registriert wird und Sensorwerte entgegengenommen werden. Diese werden in der registrierten Callback Methode `deliverCallback` verarbeitet.
Der RabbitMQ Broker wird in einem Docker Container bzw. in einem Kubernetes Pod betrieben. Da der Broker eine gewisse Zeit zur Initialisierung benötigt, steht der Dienst nicht sofort nach dem Start des Containers/Pods zur Verfügung (auf dies wurde auch in der Dokumentation auf DockerHub hingewiesen). Aus diesem Grund wird zu Beginn der `run()` Methode versucht eine Verbindung aufzubauen. Schlägt dies Fehl wird der Vorgang nach 1000 Millisekunden wiederholt, solange bis eine Verbindung hergestellt werden kann.
Da die Nachrichten standardmäßig mittels Round Robin Dispatch an die Consumer verteilt werden und nicht darauf geachtet wird wieviele unbestätigte Nachrichten ein Consumer bereits zugewisen bekommen hat, haben wir uns dazu entschieden `Consumer Prefatching` zu verwenden. Durch folgendes Kommando: `channel.basicQos(1);` wird ein Consumer Prefatch von Eins festgelegt.
```Java
@ApplicationScoped
public class MeasureConsumer implements Runnable {
@ConfigProperty(name = "hydroponic.amqp.queue.measures.name")
private String queueName;
@ConfigProperty(name = "hydroponic.amqp.exchange.measures.name")
private String measureExchangeName;
private final ExecutorService scheduler;
private AmqpConnectionFactory connectionFactory;
private MeasureManager measureManager;
private Connection connection;
@Inject
public MeasureConsumer(AmqpConnectionFactory connectionFactory,
MeasureManager measureManager){
this.connectionFactory = connectionFactory;
this.measureManager = measureManager;
this.scheduler = Executors.newSingleThreadExecutor();
}
void onStart(@Observes StartupEvent ev) {
scheduler.submit(this);
}
void onStop(@Observes ShutdownEvent ev) {
scheduler.shutdown();
}
@Override
public void run() {
try{
while(connection == null) {
try {
System.out.println("attempt amqp reconnect ...");
connection = connectionFactory.getConnection();
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
Channel channel = CreateChannel();
System.out.println(" [*] Waiting for messages.");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [*] Received '" + message + "'");
process(message);
};
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> { });
} catch(IOException ex) {
ex.printStackTrace();
} catch(Exception ex){
ex.printStackTrace();
}
}
private Channel CreateChannel() throws IOException{
Channel channel = connection.createChannel();
channel.exchangeDeclare(measureExchangeName, BuiltinExchangeType.FANOUT);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, measureExchangeName, "");
channel.basicQos(1); // consumer prefetch
return channel;
}
/**
* parses the received JSON message and passes it to
* the measureManager for further processing.
* */
private void process(String message) {
Jsonb jsonBuilder = JsonbBuilder.create();
MeasureModel measure = jsonBuilder.fromJson(message, MeasureModel.class);
System.out.println(measure.toString());
measureManager.processMeasure(measure);
}
}
```
# Notification Service
Der Notification Service ist für die Überprüfung der Sensorwerte und die Erstellung und Persistierung von Benachrichtigungen verantwortlich.
## Beschreibung
Der Notification Service ist für die Überprüfung der Sensorwerte und die Erstellung von Benachrichtigungen verantwortlich. Diese werden in einer eigenen PostgreSQL Datenbank persistiert und können über eine REST-Endpunkte konsumiert werden. Weiters werden diese Benachrichtigungen an das FANOUT Exchange `notificationExchange` von RabbitMQ gesendet. Diese werden dann durch eine Instanz des `notification-web-push` Service konsumiert.
Die Anwendung ist in drei Schichten realisiert.
* Die REST-Schnittstelle bildet die Präsentationsschicht.
* Geschäfts Logik Schicht
* Persistenz Schicht
## Technologien
Die nachfolgend aufgelisteten Technologien wurden zur Implementierung des Notification Service verwendet.
* Als Applikationsserver wird `Quarkus` verwendet. Hierbei handelt es sich um einen leichtgewichtigen Applikationssserver welcher die Entwicklung von Microservices unterstützt.
* Zur Konfiguration unserer Anwendung mittels Abhängigkeitsinjektion verwenden wir eine `Eclipse MicroProfile Config` Implementierung welche von Quarkus zur Verfügung gestellt wird.
* Zur Dokumentation der REST-Schnittstelle unserer Anwendung verwenden wir `OpenApi`. Ergebnis davon ist eine Schnittstellenbeschreibung im `yml` Format welche mittels `Swagger-Ui` komfortabel konsumiert werden kann.
* Zur Kommunikation mit dem RabbitMQ Broker verwenden wir `amqp-client` von `com.rabbitmq`.
* Zur Validierung der Werte der Data-Transfer-Objekte welche an die REST-Schnittstelle übergeben werden verwenden wir Bean Validation. Im speziellen verwenden wir die Implementierung `quarkus-hibernate-validator`.
## Domänenmodell

### NotificationDefinition
Der Notification Service bietet Schnittstellen zum Verwalten von Notifikation-Definitionen. Auf Basis dieser Definitionen werden die Sensorwerte überprüft und ggf. Benachrichtigungen erstellt. Eine `NotificationDefinition` setzt sich wie nachfolgend dargestellt zusammen.

* Eine `NotificationDefinition` ist immer einem Sensor zugewisen. Um hier Inkonsistenzen zu vermeiden (es wird eine `NotificationDefinition` für einen Sensor angelegt welcher nicht existiert) kommuniziert der Notification Service mit dem Tracking Service, welcher die Stationen und deren Sensoren verwaltet um zu überprüfen ob der Sensor existiert und um zusätzliche Informationen zu bekommen.
* Bei der Überprüfung wird so vorgegangen, dass der aktuelle Sensorwert mit dem in der Definition festgelegten `threshold` verglichen wird. Das Property `direction` bestimmt die Richtung der Überprüfung.
* Hat `direction` den Wert `false` wird geprüft ob der aktuelle Wert unter dem Wert von `threshold` liegt.
* Hat `direction` den Wert `true` wird geprüft ob der aktuelle Wert über dem Wert von `threshold` liegt.
* Das Property `interval` legt fest in welchen Zeitabständen eine Benachrichtigung generiert werden soll. Sensorwerte werden in unterschiedlichen Zeitabständen von den Stationen gesendet und ein bestimmter Zustand (z.B.: aktueller Wert liegt unter dem Threshold) kann länger andauern. Da wir jedoch nicht immer für jeden Sensorwert sofort eine Benachrichtigung erzeugen wollen haben wir das Intervall eingeführt. Dabei wird so vorgegangen, dass überprüft wird ob in den vorangegangenen `interval` Sekunden bereits eine Nachricht für den Sensor erzeugt wurde. Ist dies der Fall wird keine neue Nachricht erzeugt. Anderenfalls wird eine neue Benachrichtigung erzeugt.
* Da Grenzwerte abhängig von der Pflanze sind, welche angebaut werden kann einer `NotificationDefinition` ein `context` zugewiesen werden. Um eine `NotificationDefinition` nicht löschen zu müssen, wenn eine andere Pflanze angesetzt wird kann diese über das Property `active` deaktiviert werden.
* Da es mehrere `NotificationDefinition`s für einen Sensor geben kann, werden immer nur jene bei der Überprüfung der Sensorwerte berücksichtigt, welche den geringsten Abstand zwischen dem aktuellen Sensorwert und dem definierten `threshold` aufweisen.
### NotificationLog
Ein `NotificationLog` ist eine konkrete Ausprägung einer `NotificationDefinition`. Wird auf Basis einer `NotificationDefinition` eine Benachrichtigung ausgelöst wird diese in Form eines `NotificationLog` persistiert und an das `notificationExchange` gesendet. Da Einstellungen einer `NotificationDefinition` geändert werden können, haben wir uns dazu entschieden gewisse Werte zu duplizieren, sodass die Ursache für jeden Log genau ermittelt werden kann.

## Data Transfer Objects
Um die Ressourcen, welche über die REST-Schnittstelle ausgetauscht werden unabhängig vom Domänenmodell zu machen, haben wir uns dazu entschieden Data-Transfer-Objekte für den Austausch von Ressourcen einzuführen. Das Mapping zwischen den Dtos und den Entity Klassen wird durch eigens implementierte `Mapper` Klassen realisiert.

## Umsetzung
### REST Endpunkte
Zur Implementierung der REST-Enpunkte wird die JAX-RS Laufzeitumgebung verwendet. Services, Daos usw. werden als CDI-Beans verwaltet und mittels Abhängigkeitsinjektion zur Verfügung gestellt.
* Über den Endpunkt `<IP>:<PORT>/aopenapi` kann das OpenApi Dokument abgefragt werden.
* Über den Endpunkt `<IP>:<PORT>/swagger-ui` gelangt man zur Swagger-Ui. Die nachfolgenden Abbildungen wurden der Swagger-Ui entnommen.




### Verwendung von OpenApi
```xml
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
```
Durch die Verwendung von OpenApi können die REST-Endpunkte sehr gut dokumentiert werden. Hierbei können die HTTP-Antwort `@APIResponses`, die HTTP-Methode `@Operation`, und die Parameter näher beschrieben werden.
```Java
@Path("/")
@GET
@Produces(MediaType.APPLICATION_JSON)
@APIResponses({
@APIResponse(
responseCode = "200",
name = "notification severities",
description = "notification severities"
)
})
@Operation(
summary = "Get hydroponic notification definitions available",
description = "Retrieves and returns all the hydroponic notification definitions")
@Tag(ref = App.OPEN_API_TAG_NAME_NOTIFICATION_DEFINITION)
public Response getAllNotificationDefinitions(){
try{
List<NotificationDefinitionDto> dtos = notificationDefinitionManager.findAllDefinitions();
return Response.status(Response.Status.OK)
.entity(dtos)
.build();
} catch(Exception ex){
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.build();
}
}
```
### Bean Validation
Um Bean Validation verwenden zu können wird eine entsprechende Implementierung derselben benötigt. Hierbei haben wir uns für die Verwendung des Hibernate Validators entschieden. Diese wird dem Projekt als Maven Dependency hinzugefügt.
```xml
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-hibernate-validator</artifactId>
</dependency>
```
Die Dto Klassen des Notification Service, wie auch des zuvor beschriebenen Tracking Service sind mit Annotationen für das JSON Binding `@JsonbProperty(...)` versehen. Es werden explizit die Namen der JSON Properties festgelegt. Weiters werden Annotationen zur Validierung z.B.: `@PositiveOrZero` der Dtos definiert.
```Java
public final class NotificationDefinitionCreateDto {
@JsonbProperty(value = "sensorId")
@PositiveOrZero
private Long sensorId;
@JsonbProperty(value = "threshold")
private double threshold;
@JsonbProperty(value = "text")
@NotBlank
@NotNull
private String text;
@JsonbProperty(value = "context")
private String context;
@JsonbProperty(value = "direction")
@NotNull
private boolean direction;
@JsonbProperty(value = "interval")
@Min(1)
private int interval;
@JsonbProperty(value = "active")
private boolean active;
@JsonbProperty(value = "severityId")
@NotNull
@PositiveOrZero
private Long severityId;
// getter/setter Methoden
}
```
Die Validierung erfolgt anschließend durch den Container beim Aufruf der REST Endpunkte. Die Validierung durch den Container wird mittels der Annotation `@Valid` eingeleitet.
```Java
public Response createNotificationDefinition(@Valid @RequestBody NotificationDefinitionCreateDto createDto){
/...
}
```
### AMQP Nachrichtenverarbeitung - Consumer
Zum Konsumieren der Sensorwerte von RabbitMQ haben wir einen `MeasureConsumer` implementiert. Dieser hat den Scope `@ApplicationScoped` und implementiert das Interface `Runnable`. Weiters registriert sich diese Implementierung auf das `@Observes StartupEvent` und das `@Observes ShutdownEvent` des Applikationsservers. Ist der Applikationsserver gestartet wird durch den `ExecutorService` ein eigener Thread gestartet in welchem ein Consumer bei RabbitMQ registriert wird und Sensorwerte entgegengenommen werden. Diese werden in der registrierten Callback Methode `deliverCallback` verarbeitet.
Der RabbitMQ Broker wird in einem Docker Container bzw. in einem Kubernetes Pod betrieben. Da der Broker eine gewisse Zeit zur Initialisierung benötigt, steht der Dienst nicht sofort nach dem Start des Containers/Pods zur Verfügung (auf dies wurde auch in der Dokumentation auf DockerHub hingewiesen). Aus diesem Grund wird zu Beginn der `run()` Methode versucht eine Verbindung aufzubauen. Schlägt dies Fehl wird der Vorgang wiederholt, solange bis eine Verbindung hergestellt werden kann.
Da die Nachrichten standardmäßig mittels Round Robin Dispatch in die Consumer verteilt wird und nicht darauf geachtet wird wieviele unbestätigte Nachrichten ein Consumer bereits zugewisen bekommen hat, haben wir uns dazu entschieden `Consumer Prefatching` zu verwenden. Durch folgendes Kommando: `channel.basicQos(1);` wird ein Consumer Prefatch von Eins festgelegt.
```Java
@ApplicationScoped
public class MeasureConsumer implements Runnable {
@ConfigProperty(name = "hydroponic.amqp.queue.measures.name")
private String queueName;
@ConfigProperty(name = "hydroponic.amqp.exchange.measures.name")
private String measureExchangeName;
private final ExecutorService scheduler;
private AmqpConnectionFactory connectionFactory;
private Handler measureHandler;
private Connection connection;
@Inject
public MeasureConsumer(AmqpConnectionFactory connectionFactory,
Handler measureHandler){
this.connectionFactory = connectionFactory;
this.measureHandler = measureHandler;
this.scheduler = Executors.newSingleThreadExecutor();
}
void onStart(@Observes StartupEvent ev) {
scheduler.submit(this);
}
void onStop(@Observes ShutdownEvent ev) {
scheduler.shutdown();
}
@Override
public void run() {
try{
while(connection == null) {
connection = connectionFactory.Reconnect();
}
Channel channel = CreateChannel();
System.out.println(" [*] Waiting for messages.");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
process(message);
};
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> { });
} catch(IOException ex) {
ex.printStackTrace();
} catch(Exception ex) {
ex.printStackTrace();
}
}
private Channel CreateChannel() throws IOException{
Channel channel = connection.createChannel();
channel.exchangeDeclare(measureExchangeName, BuiltinExchangeType.FANOUT);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, measureExchangeName, "");
channel.basicQos(1); // consumer prefetch
return channel;
}
/**
* parses the received JSON message and passes it to
* the measureManager for further processing.
* */
private void process(String message) {
Jsonb jsonBuilder = JsonbBuilder.create();
MeasureModel measure = jsonBuilder.fromJson(message, MeasureModel.class);
System.out.println(measure.toString());
measureHandler.process(measure);
}
}
```
### Eclipse MicroProfile RestClient
Um sicherzustellen dass eine `NotificationDefinition` nur für Sensoren angelegt wird, welche existieren und einer Station zugewiesen sind, kommuniziert der Notification Service mit dem Tracking Service über dessen REST Schnittstelle. Dazu verwenden wir eine Implementierung von Eclipse MicroProfile RestClient, welche von Quarkus zur Verfügung gestellt wird. Eingebunden wird diese Implementierung über die nachfolgend dargestellte Maven Dependency.
```xml
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest-client</artifactId>
</dependency>
```
Der Rest Client wird in Form eines Interface definiert. Zur Laufzeit wird ein Proxy erzeugt, welcher die Kommunikation mit dem Tracking Service realisiert.
```Java
@RegisterRestClient(configKey = "trackingService")
@Path("/sensors")
public interface SensorsClient extends AutoCloseable {
@GET
@Path("/{id}")
SensorDto getSensor(@PathParam("id") long sensorId);
@GET
public List<SensorDto> getAllSensors();
}
```
Der RestClient wird als CDI-Bean zur Verfügung gestellt und kann mittels Abhängigkeitsinjektion wie nachfolgend dargestellt einem CDI-Bean injeziert werden.
```Java
@Inject
@RestClient
private SensorsClient sensorsClient;
```
### Eclipse MicroProfile Config
Zur Konfiguration unserer Anwendung mittels Konfigurationsparameter aus der Datei `application.properties` haben wir uns für die Verwendung von Eclipse MicroProfile Config entschieden. Die Konfigurationsparameter können durch die Angabe von `@ConfigProperty(name = "key")` mittels Abhängigkeitsinjektion in ein Bean injeziert werden, wie nachfolgend am Beispiel der Klasse `AmqpConnectionFactory` gezeigt wird.
```Java
@ApplicationScoped
public class AmqpConnectionFactory {
@ConfigProperty(name = "hydroponic.amqp.host")
private String host;
@ConfigProperty(name = "hydroponic.amqp.port")
private int port;
@ConfigProperty(name = "hydroponic.amqp.username")
private String userName;
@ConfigProperty(name = "hydroponic.amqp.password")
private String password;
// ...
}
```
Weiters haben wir uns zur Konfiguration unserer Services für die Verwendung von Profilen entschieden. Neben den vordefinierten Profilen: dev, test, prod, können weitere benutzerdefinierte Profile definiert werden. Da in verschiedenen Umgebungen `docker-compose`, `lokal`, oder in `Kubernetes` unterschiedliche Einstellungen benötigt werden ermöglicht uns dies das Bauen der jeweiligen Anwendung, in Abhängigkeit von der Umgebung, in welche die Anwendung ausgeliefert wird. Mittels dem Befehl `mvn package -D"quarkus.profile"="compose"` kann die Anwendung für verschiedene Profile gebaut werden, wobei immer nur die mit `%profil.` gekennzeichneten Einstellungen herangezogen werden.
```
# settings for profile dev
%dev.server.port=8081
%dev.tracking.server=localhost
%dev.tracking.server.port=8080
%dev.notification.db.server=localhost
%dev.notification.db.port=5433
%dev.amqp.broker=localhost
%dev.amqp.broker.port=5672
# settings for profile test
%test.server.port=8081
%test.tracking.server=localhost
%test.tracking.server.port=8080
%test.notification.db.server=localhost
%test.notification.db.port=5433
%test.amqp.broker=localhost
%test.amqp.broker.port=5672
# settings for profile compose
%compose.server.port=8080
%compose.tracking.server=tracking
%compose.tracking.server.port=8080
%compose.notification.db.server=notificationdb
%compose.notification.db.port=5432
%compose.amqp.broker=rabbitmq
%compose.amqp.broker.port=5672
# settings for profile k8s
%k8s.server.port=8080
%k8s.tracking.server=tracking-service
%k8s.tracking.server.port=8082
%k8s.notification.db.server=notificationdb-service
%k8s.notification.db.port=5433
%k8s.amqp.broker=rabbitmq-service-queue
%k8s.amqp.broker.port=5672
```
# Notification-Publisher-Service
Benachrichtigungen sind ein zentraler Bestandteil unseres Systems. Damit Nutzer auf Fehler im
System reagieren können, müssen sie zuverlässig die entsprechenden Nachrichten erhalten.
Da wir auf der Clientseite auf eine SPA-Anwendung setzen, entschieden wir uns zunächst dafür die
eintreffenden Benachrichtigungen mittels Server-Sent-Events an die Clients zu senden.
## Eckdaten
* Quarkus
* Client registriert sich bei einer Instanz des Service (via API-Gateway)
* Konsumiert Über Fanout-Exchange Benachrichtigungen
* Mittels reactive messaging wird ein in-memory stream erzeugt, welcher die Daten über den Endpunkt an den Client sendet
## Umsetzung

Obige Abbildung zeigt überblicksartig die Klassen des Service.
Besonders interessant ist die Verwendung der `smallrye-reactive-messaging` Implementierung zum Senden der
konsumierten Nachrichten an den REST-Endpunkt mittels Stream.
```Java
@ApplicationScoped
public class NotificationConsumer implements Runnable {
@ConfigProperty(name = "hydroponic.amqp.exchange.notifications.name")
private String EXCHANGE_NAME;
@Inject
private AmqpConnectionFactory connectionFactory;
private Connection connection;
private final ExecutorService scheduler;
private ObjectMapper objectMapper;
/**
* Use Emitter to send messages to in memory stream from imperative context
*/
@Inject
@org.eclipse.microprofile.reactive.messaging.Channel("notificationStream")
Emitter<String> notificationEmitter;
...
/**
* Called everytime a new notification arrives
*/
private void process(String message) throws JsonProcessingException {
NotificationLogDto notification = objectMapper.readValue(message, NotificationLogDto.class);
System.out.println(notification.toString());
notificationEmitter.send(message); //send messages to stream
}
}
```
```Java
@Path("/stream")
public class NotificationStream {
/**
* consumes stream of notifications
* NotificationConsumer reads messages from rabbitmq and pushes them
* to "notificationsStream" in memory stream
*/
@Inject
@Channel("notificationStream") Publisher<String> notifications;
/**
* publish notifications via server sent events
* @return stream of notifications
*/
@GET
@Path("/notifications")
@Produces(MediaType.SERVER_SENT_EVENTS)
public Publisher<String> stream() {
return notifications;
}
}
```
## Erkenntnisse
Server-Sent-Events erlauben es dem Server Nachrichten an den Client zu schicken, sobald sich dieser einmalig registriert hat.
Clients empfangen Nachrichten und können sie nutzen um Inhalte der Webapp zu aktualisieren (zum Beispiel den Zustand der Hydroponischen Station, oder
das Senden von nativen Benachrichtigungen). Der Nutzer muss dazu die Seite nicht neu laden.
Limitation bei diesem Ansatz sind, dass Nutzer den Browser mit der entsprechenden Webapp geöffnet haben müssen, um neue
Nachrichten empfangen zu können. Dies ist, vor allem in der mobilen Nutzung, nicht möglich, da Nutzer das Endgerät auch für andere Aufgaben
verwenden.
Deshalb haben wir uns entschlossen die Implementierung erneut auf Basis von Web-Push umzusetzen.
Der entsprechende Service wird im folgenden Abschnitt ``notification-web-push-service`` beschrieben.
# Notification-Web-Push Service
Dieser Service konsumiert AMQP-Nachrichten (Benachrichtigungen) und sendet sie mittels Web-Push an Clients.
## Eckdaten
### Node.js
Die Bibliothek zum Senden von Nachrichten via WebPush, scheint in der Node.js-Portierung am ausgereiftesten zu sein.
Deshalb haben wir uns für die Implementierung des Services mit dieser Plattform entschieden. Zusätzlich zeigt dies die
Interoperabilität von Microservices ,die durch die loose Kopplung der Komponenten und der standardprotokollbasierten Kommunikation
erreicht wird.
### Kopplung Client / Server
Die Verwendung der Web-Push API erlaubt die zeitliche Entkopplung des Servers vom Client.
Dies wird möglich durch sogenannte Push-Service-Server, welche von den einzelnen Browserherstellern zu Verfügung gestellt werden.
Die Nachrichten werden nicht direkt an den Client gesendet, sondern an den Push-Service-Server.
Dieser leitet die Nachrichten weiter sobald der Client online ist.
Diese Architektur bietet den Vorteil, dass der Server zu jedem Zeitpunkt Nachrichten senden kann, unabhängig vom Zustand des Clients.
Das folgende Sequenzdiagramm veranschaulicht noch einmal die Funktionsweise der Web-Push API

### Umsetzung
Clients abonnieren Benachrichtigungen indem sie ein ``PushSubscription-Objekt`` an den entsprechenden Endpunkt senden.
Dieses JSON-Objekt dient zur eindeutigen Identifizierung von Clients und wird in Assoziation zum jeweiligen Nutzer in einer
DB abgelegt (da wir keinen Fokus auf Nutzerverwaltung gelegt haben, speichern wir die Objekte nur zur Laufzeit des Servers).
Mithilfe der Bibliothek ``amqplib/callback-api`` konsumiert der Service Nachrichten. Das folgende Snippet zeigt
die Implementierung:
```Typescript
import {sendNotification} from "./sendNotification";
var amqp = require('amqplib/callback_api');
const AMQP_LOCAL_URL = "amqp://localhost";
const AMQP_DOCKER_URL = "amqp://rabbitmq";
const AMQP_k8s_URL = "amqp://rabbitmq-service-queue";
// if the connection is closed or fails to be established at all, we reconnect
var amqpConn = null;
export function startAMQP() {
//establishes connection and calls startWorker()
}
// defines the exchange and registers callback method
function startWorker() {
amqpConn.createChannel(function(err, ch) {
if (closeOnErr(err)) return;
ch.on("error", function(err) {
console.error("[AMQP] channel error", err.message);
});
ch.on("close", function() {
console.log("[AMQP] channel closed");
});
ch.prefetch(10);
ch.assertExchange("notificationExchange", 'fanout', { durable: false }, function(err, _ok) {
if (closeOnErr(err)) return;
ch.assertQueue('', {exclusive: true}, function(queue, options) {
console.log(options);
ch.bindQueue(options.queue, "notificationExchange", '');
ch.consume(options.queue, message => {
sendNotification(message.content.toString());
}, {noAck : true})
})
});
});
}
```
Die Methode sendNotification() nutzt die ``web-push-api`` um die empfangene Nachricht an den Web-Push-Service-Server zu
senden. Das folgende Snippet zeigt die Implementierung:
```Typescript
import {USER_SUBSCRIPTIONS} from "./in-memory-db";
const webpush = require('web-push');
export function sendNotification(notification) {
var obj = JSON.parse(notification);
console.log(obj.actualValue);
// the notification object, defined by web-push api
const notificationPayload = {
"notification": {
"title": obj.text,
"body": "Sensor Value: " + obj.actualValue + " Threshold: " + obj.threshold,
"icon": "/assets/img/" + obj.severity + ".png",
"vibrate": [100, 50, 100],
"data": {
"dateOfArrival": Date.now(),
"primaryKey": 1
},
"actions": [{
"action": "explore",
"title": "Go to the site"
}]
}
};
//get subscription from db and send to web-push-server
Promise.all(USER_SUBSCRIPTIONS.map(sub => webpush.sendNotification(
sub, JSON.stringify(notificationPayload) )));
}
```
# WebApp
Die Webanwendung bildet die Schnittstelle zwischen dem Nutzer und dem System.
Nutzer können sich hier über den aktuellen Status des Systems informieren und neue
NotificationDefinitions anlegen. Zusätzlich können Nutzer Benachrichtigungen abonnieren und
werden so immer über Statusveränderungen des Systems informiert.
Die folgende Abbildung zeigt das Dashboard, welches über den Status der Station informiert.
Es zeigt ob Sensoren derzeit Fehler melden und die aktuellsten Daten jedes Sensors.

Die folgende Abildung zeigt den Bereich Notifications. Hier können neue Definitionen angelegt und
gelöscht werden.

## Umsetzung
Zur Umsetzung verwendeten wir das SPA-Framework Angular. Dies eigent sich sehr gut zur Umsetzung von
dynamischen Webanwendungen.
Um die Entwicklungszeit zu beschleunigen und den Fokus auf die kritischen Aspekte unserer Anwendung legen
zu können, setzten wir auf einem Dashboard-Template auf. Dieses erweiterten wir
um eine Reihe von Komponenten.
### Konsumation der REST-Schnittstellen
Da alle unsere Services OpenAPI verwenden um Ihre Schnittstellen zu dokumentieren, konnten wir
mit dem Werkzeug [OpenAPI Generator](https://openapi-generator.tech/docs/generators/typescript-angular)
Angular-Services generieren.
Je Ressource wird dabei ein Service generiert. Diese Services lassen sich dann in beliebige Komponenten
der Anwendung (z.B.: StationHealth) injizieren und dort konsumieren.
Der Zugriff auf die Services erfolgt über NGINX.
### Konsumation der Notifications
Web-Push-Notifications verwenden ServiceWorker, um Benachrichtigungen anzuzeigen.
Die Anwendung registriert dazu einen ServiceWorker.
Bei einem Service Worker handelt es sich um eine spezielle Form von Web-
Worker. Diese erlauben es, JavaScript-Code getrennt vom Hauptthread im Hintergrund
auszuführen. Ein Service Worker ist ein Skript, welches registriert werden kann, um eine oder
mehrere Seiten einer Website zu kontrollieren. Einmal installiert, ist es selbst dann noch
aktiv, wenn das entsprechende Browserfenster geschlossen wird. Es kann auf Events hören und reagieren, welche sich auf die von ihm kontrollierten Seiten beziehen.
Eines dieser Events sind Push-Notifications.
Angular abstrahiert die Verwendung von ServiceWorkern. Anstelle der direkten Implementierung des ServiceWorker
wird statdessen eine Konfigurationsdatei angepasst, welche bestimmt wie der ServiceWorker generiert wird.
Zusätzlich bietet Angular Services an, um mit dem ServiceWorker zu interagieren und beispielsweise die
Erlaubnis zum Senden von Benachrichtigungen einzuholen.
Das folgende Snippet zeigt wie, der Nutzer aufgefordert werden kann Push-Benachrichtigungen zu erlauben.
```Typescript
private subscribeToNotifications() {
this.swPush.requestSubscription({
serverPublicKey: this.VAPID_PUBLIC_KEY
})
.then(sub => {
return this.subscriptionService.addPushSubscriber(sub).subscribe();
})
.catch(err => console.error("Could not subscribe to notifications", err));
}
```
Die Methode ``addPushSubscriber()`` sendet das NotificationSubscription-Objekt an den ``Notification-Web-Push Service``.
Die folgende Abbildung zeigt die Anfrage um Erlaubnis der Push-Benachrichtigung

Die folgende Abbildung zeigt den Erhalt einer Push-Benachrichtigung. Bemerkenswert ist hierbei, dass es sich um
native Benachrichtigungen des jeweiligen Geräts handelt. Dies funktioniert auch wenn der Browser nur im Hintergrund
geöffnet ist (wie meist bei Mobilgeräten der Fall). Ermöglicht wird dies durch den ServiceWorker der auf eingehende
Nachrichten hört und die Benachrichtigung auslöst.

Die folgende Abbildung zeigt den registrierten ServiceWorker der Anwendung

# mqtt-message-simulator
Um die Skalierbarkeit unserer Services zu testen, implementierten wir ein Maven-Projekt
zur Simulation von Sensorwerten.
Dies erlaubt die Definition von Stationen und zugehöriger Sensoren.
Die Sensoren generieren zufällige Daten und werden über MQTT an den Mosquitto-Broker gesendet.
Um größere Lasten zu erzeugen kann auch ein StationCluster angelegt werden, um den Betrieb einer
Vielzahl von Stationen zu simulieren.
Die folgende Abbildung gibt einen Überblick über die Anwendung:

Zur Kommunikation mit dem MQTT-Broker kam eclipse.paho zum Einsatz.
# Api Gateway
Das API Gateway bildet die zentrale Anlaufstelle für Anfragen von Clients an unser System.
## Beschreibung
Da unsere Anwendung mehrere Services umfasst und ein Client demnach mit mehreren veschiedenen Enpunkten kommunizieren muss, haben wir uns für den Einsatz eines API Gateways entschieden. Dieses bietet eine zentrale Anlaufstelle für die Anfragen von Clients. Der Client muss demnach nur eine IP Adresse und Port kennen.
Da unsere Services in Kubernetes betrieben werden, wobei wir auf Konzepte von Kubernetes zum Skalieren unserer Services zurückgreifen verwenden wir das API Gateway nur als Reverse Proxy mit statischem Routing.
## Technologien
Zur Umsetzung des API Gateways verwenden wir den **Nginx Webserver**. Dieser wird in Form eines Docker Containers betrieben.
## Umsetzung
Die Konfiguration des Routing erfolgt in der Datei `nginx.conf` welche nachfolgend dargestellt ist. Für jeden Service wird ein `upstream` und ein `location` Objekt definiert. Wird ein Request an `nginx-server:80/tracking/...` gesendet wird dieser an `server tracking-service:8082` weitergeleitet.
```json
events {}
http {
upstream tracking {
server tracking-service:8082;
}
upstream notification {
server notification-service:8083;
}
upstream webpush {
server notification-web-push-service:9001;
}
server {
listen 80;
server_name nginx_proxy;
location /tracking/ {
proxy_pass http://tracking/;
}
location /notification/ {
proxy_pass http://notification/;
}
location /webpush/ {
proxy_pass http://webpush/;
}
}
}
```
# Datenbanken
## Beschreibung
In unserem System setzen wir auf dezentrale Datenhaltung. Jedem Service, welcher Daten persistiert, ist eine eigene Datenbank zugewiesen. Der Tracking Service und der Notification Service haben jeweils eine eigene Datenbank.
## Technologien
Bei den Datenbanken handelt es sich um PostgreSQL Datenbanken, welche jeweils in einem eigenen Docker Container betrieben werden.
## Umsetzung
Jeder Service (Tracking, Notification) hat seine eigene Datenbank mit einem eigenen Schema. Die Datenbank Schemata sowie auch die Servicegrenze sind in der nachfolgenden Abbildung dargestellt. Aufgrund der Servicegrenze muss die Fremdschlüsselbeziehung zwischen den Tabellen `sensor` und `notification_definition` über die API abgebildet werden. Um hierbei Inkonsistenzen zu vermeiden kommunizieren der Tracking Service und der Notification Service über die REST-Schnittstelle miteinander.

# MOM
In unserem System setzen wir sehr stark auf die Kommunikation mittels asynchronem Nachrichtenaustausch (MOM). In der nachfolgenden Abbildung werden die Elemente des asynchronen Nachrichtenaustauschs dargestellt. Da MOM ein sehr wesentlicher Bestandteil unserer Architektur ist, möchten wir hier nochmals genauer auf die Struktur und die Verwendeten Konzepte des asynchronen Nachrichtenaustauschs eingehen, aus der Sicht des Gesamtsystems.

## MQTT
Der MQTT Broker bildet den Kommunikationsenpunkt für unsere hydroponischen Stationen und den Simulator derselben. Hierbei wird das Publisher/Subscriber Muster realisiert. Nachrichten werden an ein Topic gesendet und in weiterer Folge an alle Subscriber dieses Topics gesendet. Die Stationen sind Publisher und senden Messwerte im Csv Format an den MQTT Broker `eclipse-mosquitto`. Der Collector Service bildet den Subscriber für Nachrichten welche an das Topic `sensors/+` geschickt werden. Diese Nachrichten werden anschließend verarbeitet und an den Exchange `measures-exchange` des RabbitMQ Brokers gesendet.
### RabbitMQ
Der Grund für die Verwendung von RabbitMQ besteht darin, dass RabbitMQ das Konzept der Worker Queue unterstützt. Dieses Konzept ist die Basis für die Skalierbarkeit der Services unseres Systems. Da die Worker Queue die Nachrichten an die Consumer mittels Round Robin Dispatching verteilt, wobei eine Nachricht immer nur an einen Consumer zugestellt wird, erlaubt uns dies, viele Consumer Instanzen (Tracking Service, Notification Service) zu betreiben, ohne dass Nachrichten mehrfach verarbeitet werden.
Der Tracking Service und der Notification Service erledigen unterschiedliche Aufgaben auf Basis der Sensorwerte. Diese Services sollen unabhängig voneinander skaliert werden können und sich nicht gegenseitig beeinflussen. Um die Unabhängigkeit der beiden Services zu gewährleisten wurde für jeden Service eine eigene Worker Queue definiert. Da sowohl die Instanzen des Tracking Service, wie auch die Instanzen des Notification Service alle Sensordaten benötigen verwenden wir den Exchange `measures-exchange` vom Typ `FANOUT`. Der Collector Service sendet die Sensordaten an dieses Exchange (`measures-exchange`). Anschließend werden die Nachrichten an beide, an den Exchange gebundenen, Worker-Queues weitergeleitet. Durch die Verwendung des `FANOUT` Exchange muss der Collector Service die Nachrichten nicht an jede Queue einzeln schicken. Das duplizieren der Nachrichten erfolgt durch den Exchange.
Weiters werden vom Notification Service erstellte Benachrichtigungen an den `FANOUT` Exchange `notificationExchange` gesendet. Diese werden nach dem Publisher/Subscriber Muster an alle Consumer zugestellt. Dadurch können der Notification-Publisher Service und der Notification-Web-Push Service skaliert werden, da diese Services immer alle Benachrichtigungen benötigen. Umgsetzt wird dieses Publisher/Subscriber Muster durch das Exchange vom Typ `FANOUT`. Dieses Exchange routet die Nachrichten an alle, pro Consumer, automatisch erzeugten Queues.
#### Abhängigkeiten
Zur interaktion mit dem RabbitMQ Broker wird die nachfolgend angeführte Dependency benötigt.
```xml
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
```
#### Notification Service / Tracking Service
Wie im nachfolgenden Quellcode Ausschnitt dargestellt wird durch das `Connection` Objekt ein `Channel` erstellt. Da die Methoden zum Erzeugen eines Exchange und einer Queue idempotent sind können diese in jedem Service ausgeführt werden. Demnach wird in jedem Service das Exchange mittels `channel.exchangeDeclare(measureExchangeName, BuiltinExchangeType.FANOUT)` vom Typ `BuiltinExchangeType.FANOUT` angelegt. Weiters wird die entsprechende Worker-Queue durch folgenden Methodenaufruf: `channel.queueDeclare(queueName, false, false, false, null);` angelegt. Anschließend wird die Queue mittels `channel.queueBind(queueName, measureExchangeName, "")` an das Exchange gebunden. Mittels dem Consumer Prefatch `channel.basicQos(1)` wird festgelegt, dass ein Consumer immer nur eine unbestätigte Nachricht zugewisen bekommt.
```Java
Channel channel = connection.createChannel();
channel.exchangeDeclare(measureExchangeName, BuiltinExchangeType.FANOUT);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, measureExchangeName, "");
channel.basicQos(1); // consumer prefetch
```
# Deployments
## Docker
Um die Architektur mit Kubernetes betreiben zu können bestand der erste Schritt darin, die einzelnen Komponenten zu dockerisieren, also jede Komponente in einem Docker Container laufen zu lassen. Dies wurde so umgesetzt, dass für jede von uns erstellte Komponente ein Dockerfile definiert wurde. Weiters wird die Container Infrastruktur mittels `docker-compose` zusammengefasst, wobei dieses wiederum auf die Dockerfiles der einzelnen Komponenten verweist.
### Dateistruktur
Dieser Abschnitt soll einen Überblick darüber geben, wo in unserem Projektverzeichnis die einzelnen Dockerfiles und das `docker-compose.yml` zu finden ist.
```
src/
docker-compose.yaml
api-gateway/
Dockerfile
collector/
Dockerfile
database/tracking-service-db/
Dockerfile
database/notification-service-db/
Dockerfile
tracking/src/main/docker
Dockerfile
notification/src/main/docker
Dockerfile
notification-web-push/
Dockerfile
```
Um das Deployment mittels `docker-compose` zu vereinfachen haben wir uns dazu entschieden ein kleines Powershell Skript zu schreiben. Dieses stoppt zu Beginn alle laufenden Docker Container. Anschließend werden die Anwendungen mitels `mvn -f ... package` gebaut. Hierfür wird das Profil `compose` verwendet, welche die benötigten Konfigurationsparameter für die `docker-compose` Umgebung aktiviert. Nur die Einstellungen des Collector Service müssen abhängig von der Umgebung manuell angepasst werden, bevor das Skript ausgeführt werden kann.
```
docker-compose down
mvn -f ./collector/pom.xml package
mvn -f ./notification/pom.xml package -D"quarkus.profile"="compose"
mvn -f ./tracking/pom.xml package -D"quarkus.profile"="compose"
docker-compose up --build
```
Nachfolgend ist die gesamte Datei `docker-compose.yml` dargestellt. Hierbei haben wir darauf geachtet, dass die einzelnen Services in der richtigen Reihenfolge gestartet werden. Dabei ist es jedoch zu Problemen in Verbindung mit RabbitMQ und dem Tracking/Notification Service gekommen, auf welche wir im nachfolgenden Abschnitt (docker-compose und RabbitMQ) näher eingehen werden.
```yaml
version: '3'
networks:
hydroponic:
services:
trackingdb:
container_name: trackingdb
build:
context: ./database/tracking-service-db
ports:
- 5432:5432
networks:
- hydroponic
notificationdb:
container_name: notificationdb
build:
context: ./database/notification-service-db
ports:
- 5433:5432
networks:
- hydroponic
mosquitto:
image: eclipse-mosquitto
container_name: mosquitto
ports:
- 1883:1883
- 9001:9001
networks:
- hydroponic
depends_on:
- trackingdb
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
networks:
- hydroponic
depends_on:
- mosquitto
collector:
container_name: collector
build:
context: ./collector
networks:
- hydroponic
depends_on:
- rabbitmq
apigateway:
container_name: apigateway
build:
context: ./api-gateway
ports:
- 30080:80
- 30443:443
networks:
- hydroponic
depends_on:
- collector
notification-web-push:
container_name: notification-web-push
build:
context: ./notification-web-push
ports:
- 9000:9000
networks:
- hydroponic
depends_on:
- rabbitmq
notification:
container_name: notification
build:
context: ./notification
ports:
- 8090:8080
networks:
- hydroponic
depends_on:
- rabbitmq
- notificationdb
tracking:
container_name: tracking
build:
context: ./tracking
ports:
- 8091:8080
networks:
- hydroponic
depends_on:
- rabbitmq
- trackingdb
```
### docker-compose und RabbitMQ
Wie im nachfolgenden Abschnitt von `docker-compose` dargestellt, ist das Starten des Tracking Service abhängig von RabbitMQ. Der Tracking Service versucht sofort nach dem Start sich mit RabbitMQ zu verbinden. Das Problem hierbei bestand darin, dass der RabbitMQ Container zu früh signalisiert, dass er bereit ist. Der Container ist gestartet, aber die darin laufende Anwendung ist noch nicht soweit. Demnach ist der Tracking Service immer abgestürzt.
Wir sind dann, wie in der Dokumentation auf Dockerhub empfohlen, so vorgegangen, dass wir einen Mechanismus zum Reconnecten des Tracking Service auf RabbitMQ implementiert haben. Dies hat unser Problem dann behoben.
## Kubernetes
Dieser Abschnitt gibt einen Überblick über die Anforderungen, die verwendeten Ressourcen sowie über den Prozess der lokalen Installation von Kubernetes, im speziellen **minikube**, auf Windows.
### Anforderungen
- [X] Kubernetes Instanz (minikube) muss installiert sein
- [X] Alle Services des vorliegenden Projekts müssen dockerisiert sein.
### Ressourcen
* [Installation von Docker und Kubernetes auf Windows](https://learnk8s.io/blog/installing-docker-and-kubernetes-on-windows)
* [Kubernetes Tutorial (Youtube)](https://www.youtube.com/watch?v=1xo-0gCVhTU)
* ( [Kubernetes Dashboard](https://www.replex.io/blog/how-to-install-access-and-add-heapster-metrics-to-the-kubernetes-dashboard) )
### Installation
Für die Installation von **minikube** unter Windows haben wir uns für den Package Manager [Chocolatey](https://chocolatey.org/) für Windows entschienden. Bei der Installation hat es bei uns beiden keine Probleme gegeben. Nachfolgend werden noch die einzelnen Befehle zur Installation mittels Chocolatey angeführt.
```
# run powershell as administrator
> Set-ExecutionPolicy Bypass -Scope Process -Force
> iex ((New-Object System.Net.WebClient).DownloadString('https://chocolatey.org/install.ps1'))
```
```
# open new powershell as administrator
> choco install minikube -y
> minikube start
```
```
# test if setup was successful
> kubectl get nodes
```
### Weitere nützliche Befehle
```
# other helpful commands:
> minikube ip
> kubectl get nodes
> kubectl get pods
> kubectl get deployments
> minikube dashboard
```
### Kubernetes ohne DockerHub
Die übliche Vorgehensweise besteht darin, Docker Images auf DockerHub zu pushen und diese von dort in Kubernetes durch das Deployment herunterzuladen. Zu Beginn haben wir uns jedoch dazu entschieden das Docker Environment von minikube zu verwenden um die Images direkt darin zu erstellen. Dazu haben wir uns mit dem Docker Deamon von minikube verbunden. Das Vorgehen ist nachfolgend dargestellt.
```
# get the docker environment definition from minikube
> minikube docker-env
# connect to minikube docker daemon
> & minikube -p minikube docker-env | Invoke-Expression
```
### Kubernetes mit DockerHub
Der Einfachheit halber haben wir uns jedoch für unser finales Deployment auf Kubernetes dazu entschieden, die Images unserer Services über die DockerHub Registry zur Verfügung zu stellen und zu laden.
## Kubernetes Deployments
Um den Deployment Prozess für Kubernetes zu vereinfachen haben wir uns ebenfalls dazu entschieden Powershell Skripte dafür zu verwenden.
Das nachfolgend dargestellte Skript ist für das Packagen der Anwendungen, das Erstellen eines Docker Images und das Pushen desselben auf ein DockerHub Repository verantwortlich.
```
mvn -f ./collector/pom.xml package
mvn -f ./notification/pom.xml package -D"quarkus.profile"="k8s"
mvn -f ./tracking/pom.xml package -D"quarkus.profile"="k8s"
docker build --tag langdavid/api-gateway_k8s:1.0 ./api-gateway/
docker push langdavid/api-gateway_k8s:1.0
docker build --tag langdavid/collector_k8s:1.0.1 ./collector/
docker push langdavid/collector_k8s:1.0.1
docker build --tag langdavid/notification-db_k8s:1.0 ./database/notification-service-db/
docker push langdavid/notification-db_k8s:1.0
docker build --tag langdavid/tracking-db_k8s:1.0 ./database/tracking-service-db/
docker push langdavid/tracking-db_k8s:1.0
docker build --tag langdavid/notification_k8s:1.4 ./notification/
docker push langdavid/notification_k8s:1.4
docker build --tag langdavid/tracking_k8s:1.0 ./tracking/
docker push langdavid/tracking_k8s:1.0
docker build --tag langdavid/notification-web-push_k8s:1.1 ./notification-web-push/
docker push langdavid/notification-web-push_k8s:1.1
```
Weiters haben wir ein Skript erstellt, welches das Aktualisieren unserer Deployment Deskriptoren vereinfacht. Weiters werden durch dieses Skript die HPAs für den Tracking und den Notification Service erstellt. Diese übernehemen anschließend die Skalierung der Services. Die Skalierung durch HPAs wird an späterer Stelle noch genauer beschrieben.
```
kubectl apply -f ./kubernetes/deployments/trackingdb-deployment.yml
kubectl apply -f ./kubernetes/deployments/notificationdb-deployment.yml
kubectl apply -f ./kubernetes/deployments/mosquitto-deployment.yml
kubectl apply -f ./kubernetes/deployments/rabbitmq-deployment.yml
kubectl apply -f ./kubernetes/deployments/api-gateway-deployment.yml
kubectl apply -f ./kubernetes/deployments/collector-deployment.yml
kubectl apply -f ./kubernetes/deployments/tracking-deployment.yml
kubectl apply -f ./kubernetes/deployments/notification-deployment.yml
kubectl apply -f ./kubernetes/deployments/notification-web-push-deployment.yml
kubectl autoscale deployment tracking --cpu-percent=20 --min=1 --max=5
kubectl autoscale deployment notification --cpu-percent=20 --min=1 --max=5
```
Nachfolgend werden einige unserer Deployments näher beschrieben. Da die einzelnen Deployments viele Gemeinsamkeiten haben wir uns zu Gunsten der Länge der Dokumentation dazu entschieden, nicht jedes Deployment einzeln zu beschreiben.
### Tracking Deployment
Beim Tracking Service handelt es sich um den Service, welchen wir automatisch skalieren wollen. Dieser verbindet sich mit der `measure-bl` Queue, konsumiert die Sensordaten und verarbeitet diese. In der Datei `tracking-deployment.yml` definieren wir einen `Service` und ein `Deployment`. Da der Tracking Service eine Schnittstelle nach außen anbietet und mehrere Instanzen laufen können, wurde ein ``Service`` vom Typ `LoadBalancer` erstellt. Dieser teilt die Anfragen an den Tracking Service auf die Replikas des Tracking Service auf. Skaliert wird der Tracking Service mittels einem HPA welcher die Skalierung auf Basis der CPU Auslastung vornimmt. Damit dies funktioniert, müssen die Ressourcen des Containers näher definiert werden. Dies erfolgt durch die Angabe des Elements `resources`.
```yml
apiVersion: v1
kind: Service
metadata:
name: tracking-service
spec:
ports:
- protocol: "TCP"
# Port accessible inside cluster
port: 8082
# Port to forward to inside the pod
targetPort: 8080
# Port accessible outside cluster
nodePort: 30436
selector:
app: tracking
type: LoadBalancer
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: tracking
spec:
replicas: 1
selector:
matchLabels:
app: tracking
template:
metadata:
labels:
app: tracking
spec:
containers:
- image: langdavid/tracking_k8s:1.0
name: tracking
ports:
- containerPort: 8080
resources:
limits:
cpu: 500m
requests:
cpu: 200m
```
### Notification Deployment
Beim Notification Service handelt es sich ebenfalls um einen Service, welchen wir automatisch skalieren wollen. In der Datei `notification-deployment.yml` definieren wir einen `Service` und ein `Deployment`. Aufgrund der Skalierung wird ebenfalls ein `Service` vom Typ `LoadBalancer` erstellt. Dieser teilt die Anfragen an den Notification Service auf die Replikas des Notification Service auf. Die Skalierung wird ebenfalls durch einen HPA, auf Basis der CPU Auslastung, automatisch vorgenommen.
```yml
apiVersion: v1
kind: Service
metadata:
name: notification-service
spec:
ports:
- protocol: "TCP"
# Port accessible inside cluster
port: 8083
# Port to forward to inside the pod
targetPort: 8080
# Port accessible outside cluster
nodePort: 30437
selector:
app: notification
type: LoadBalancer
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: notification
spec:
replicas: 1
selector:
matchLabels:
app: notification
template:
metadata:
labels:
app: notification
spec:
containers:
- image: langdavid/notification_k8s:1.4
name: notification
ports:
- containerPort: 8080
resources:
limits:
cpu: 500m
requests:
cpu: 200m
```
## Kubernetes HPAs (Horizontal Pod Autoscaler)
### Quellen
* https://kubernetes.io/de/docs/tasks/run-application/horizontal-pod-autoscale/
* https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale-walkthrough/#create-horizontal-pod-autoscaler
### Anforderungen
Zur Umsetzung von HPA mit Kubernetes wird eine Kubernetes Instanz benötigt. Wir verwenden dazu minikube in der Version `minikube version: v1.10.1`. Weiters muss das AddOn **metrics-server** aktiviert werden. Dadurch können Metriken zu den einzelnen Pods ermittelt werden. Durch das Aktivieren dieses AddOns wird weiters das minikube Dashboard, welches mit dem Befehl `minikube dashboard` gestartet werden kann, um Monitoring Elemente erweitert. Diese Monitoring Elemente beinhalten neben der grafischen Darstellung von Charts zu CPU und Speicherverbrauch auch die Möglichkeit die Logs aus den Docker Containern einzusehen. Dies hat uns dabei geholfen Fehler schneller zu finden und das korrekte Funktionieren unserer Architektur zu belegen.
```
# Die AddOns von minikube können mit folgendem Befehl angezeigt werden
> minikube addons list
```
```
# Ein AddOn kann mit dem folgenden Befehl aktiviert werden
> minikube addons enable metrics-server
```
Nachfolgend wird die Ausführung der Befehle sowie die Erweiterung des minikube dashboards dargestellt.


Weiters müssen die für die Skalierung verwendeten Deployments um Limitierungen, die Ressourcen betreffend, erweitert werden. Werden die Container Ressourcen nicht näher spezifiziert kann die aktuelle CPU Auslastung des Pods nicht bestimmt werden.
### Erstellen eines HPA (Horizontal Pod Autoscaler)
Der nachfolgenden Befehle erstellen zwei HPAs welche jeweils zwischen einer und fünf Instanzen des Tracking Service und des Notification Service verwalten. Durch die Angabe von `--cpu-percent=20` erzeugt der HPA neue Instanzen und entfernt diese wieder (bei Bedarf) um eine durchschnittliche CPU Auslastung von 20% zu erreichen. Das Down-Scaling, wenn erforderlich, erfolgt gemäß der Defaulteinstellung nach fünf Minuten. Vorraussetzung dafür ist, dass bereits ein `tracking` und ein `notification` Deployment existiert.
```
kubectl autoscale deployment tracking --cpu-percent=20 --min=1 --max=5
kubectl autoscale deployment notification --cpu-percent=20 --min=1 --max=5
```
# Ergebnisse
[Präsentation](https://docs.google.com/presentation/d/1j-dPVDYUnQ1AN5yuDn0up049ebgDaVq48vcIGaoRSmI/edit?usp=sharing)






