# Programación Concurrente - 2025C1 TP2 PedidosRust
## Introducción
*PedidosRust* es un sistema distribuido de gestión de pedidos de comida en tiempo real, implementado en Rust. Permite a los usuarios explorar restaurantes, realizar pedidos y recibir actualizaciones de estado aprovechando herramientas de concurrencia y una implementación distribuida para garantizar escalabilidad y tolerancia a fallos.
## Integrantes
|Padrón|Nombre y apellido |
|:----:|:----------------------:|
|111120|Santino Zucconi |
|110944|Aizen Sanchez Acero |
|110873|Facundo Calderan |
|110464|Francisco Lopez Tancredi|
## Entidades principales
### `Diner` (Comensal)
#### Finalidad general
El Comensal es quien inicia el flujo. Éste se conecta a la aplicación y realiza un pedido después de haber recibido la lista de opciones de restaurantes se encarga de generar su pedido correspondiente, pasarlo al broker y que este chequee mediante un commit de 2 fases que tanto el pago, la orden y el pedido fueron aceptados y realizados.
#### Estado interno
```rust
struct Diner {
id: String,
order_state: Option<OrderStatus>,
restaurant_addr: Vec<(SocketAddr, Location)>,
broker_addr: Vec<SocketAddr>,
}
```
El estado interno del comensal almacena:
- Un **identificador** de **comensal** que nos permitirá tener diferenciados a los comensales a la hora de mandar y recibir una orden.
- Un **estado de la orden** que nos servirá para mantener informado al comensal acerca del estado en el que se encuentra nuestro pedido.
- Un **registro de los restaurantes y sus ubicaciones** para luego poder determinar un restaurante que va a realizar nuestro pedido.
- La **direccion de los Brokers** para establecer y re-establecer una conexion en el caso de una caida.
#### Mensajes que recibe
```rust
enum OrderStatus {
Created, PaymentAuthorized,
Confirmed, Cooking,
Ready, RiderAssigned,
PickedUp, OnTheWay,
Delivered, Error
}
```
```rust
enum DinerMessages {
UpdateStatus {status},
RestaurantsAvailable {restaurants}
}
```
- **`UpdateStatus`:**
- **Significado:** cambiar el estado del pedido en curso.
- **Reacción:** actualiza la variable interna *order_state* del comensal al estado pasado en el mensaje.
- **`RestaurantsAvailable`:**
- **Significado:** información de los restaurantes disponibles.
- **Reacción:** retorna los restaurantes disponibles consultados en los brokers y actualiza los restaurantes disponibles en su estado.
#### Mensajes que envía
- **`RequestRestaurantsInfo`**`(diner_id)`: (Diner -> Broker)
- El comensal de id `diner_id` le pide al Broker la lista de restaurantes.
- **`PlaceOrder`**`(diner_id, rest_id)`: (Diner -> Broker)
- El comensal de id `diner_id` realiza un pedido en el restaurante `rest_id`.
#### Protocolos de transporte
La comunicación con el Broker es persistente y con protocolo TCP para una entrega garantizada y ordenada de mensajes. Para enviar los mensajes por el stream se serializan en formato JSON con Serde, y se empaquetan en Frames de Tokio.
#### Casos de interés
- **Ruta feliz:**
- Al iniciarse el actor, automáticamente se conecta con un broker y manda `RequestRestaurantsInfo`.
- Al recibir `RestaurantsAvailable` por parte del Broker, elige uno y le devuelve `PlaceOrder`.
- El Broker lo mantendrá actualizado en todo momento del estado de su orden.
- **Fallos:** si la conexión con un broker se cae, se reconectará automáticamente con otro. Si el pedido es cancelado, volverá a hacer otro después de un tiempo.
---
### `Restaurant` (Restaurante)
#### Finalidad general
El `Restaurante` es quien recibe solicitudes de pedido de parte del broker, las procesa (pudiendo rechazarlas o aceptarlas) y reporta el estado de cada orden al broker en cada paso (rechazada, aceptada, en preparación, lista).
#### Estado interno
```rust
struct Restaurant {
id: String,
location: Location,
broker_addr: Vec<SocketAddr>,
writer: Option<Addr<TcpWriter>>,
kitchen: Addr<Kitchen>,
}
```
En su estado interno, el restaurante almacena:
- Un **identificador** que se usará en el pasaje de mensajes para identificarse (y que el *Broker* pueda identificarlo).
- Una **ubicación** que representará su posición *(x, y)* en el mapa.
- La **dirección de los Brokers** para establecer la conexión, y por re-conexión automática si se detecta una caída en la conexión.
- La **dirección del TCPWriter** para el envío asincrónico de mensajes, abstrayéndose de la lógica de la serialización del contenido del mensaje.
- La **dirección del Kitchen**, actor que representa la Cocina del restaurante y es quien se encarga de la preparación de pedidos.
#### Mensajes que recibe
```rust
enum RestaurantMessages {
OrderRequest { order_id },
OrderStarted { order_id },
OrderPrepared { order_id },
OrderRejected { order_id },
OrderAccepted { order_id },
DeliveryConfirmed { rider_id },
}
```
- **`OrderRequest`:**
- **Significado:** el broker le envió una nueva orden.
- **Reacción:** le envía el mismo mensaje a la Cocina (para que decida si lo puede preparar o no).
- **`OrderRejected`:**
- **Significado:** la cocina rechazó el pedido (por ejemplo, por falta de stock).
- **Reacción:** le envía el mismo mensaje al Broker para informar el estado del pedido.
- **`OrderAccepted`:**
- **Significado:** la cocina aceptó el pedido (y lo preparará apenas pueda).
- **Reacción:** le envía el mismo mensaje al Broker para informar el estado del pedido.
- **`OrderStarted`:**
- **Significado:** la cocina comenzó a preparar el pedido.
- **Reacción:** le envía el mismo mensaje al Broker para informar el estado del pedido.
- **`OrderPrepared`:**
- **Significado:** la cocina terminó de preparar el pedido y pasa a estar listo para ser retirado.
- **Reacción:** le envía el mismo mensaje al Broker para informar el estado del pedido.
#### Mensajes que envia
- **`RegisterRestaurant`**`(rest_id, location)`: (Restaurant -> Broker)
- Registra su id `rest_id` y su posición geográfica `location` en el Broker.
- **`OrderAccepted`**`(order_id)`: (Restaurant -> Broker)
- La orden de id `order_id` fue aceptada por el restaurante.
- **`OrderRejected`**`(order_id)`: (Restaurant -> Broker)
- La orden de id `order_id` fue rechazada por el restaurante.
- **`OrderStarted`**`(order_id)`: (Restaurant -> Broker)
- La orden de id `order_id` está en preparación.
- **`OrderPrepared`**`(order_id)`: (Restaurant -> Broker)
- La orden de id `order_id` está lista para retirar.
- **`OrderArrived`**`(order_id)`: (Restaurant -> Kitchen)
- Avisa a la cocina que llegó una orden de id `order_id`.
#### Protocolos de transporte
La comunicación con el Broker es persistente y con protocolo TCP para una entrega garantizada y ordenada de mensajes. Para enviar los mensajes por el stream se serializan en formato JSON con Serde, y se empaquetan en Frames de Tokio.
#### Casos de interés
- **Ruta feliz:**
- Al iniciarse el actor, automáticamente se conecta con un broker y manda `RegisterRestaurant`.
- Al recibir un `OrderRequest` por parte del Broker, envía `OrderArrived` a la Cocina. Si la acepta, ésta le devolverá un mensaje de tipo `OrderAccepted` y el Restaurante le enviará `OrderAccepted` al Broker.
- La Cocina le enviará `OrderStarted` cuando haya empezado a prepararla, y `OrderCompleted` cuando esté lista (en ambos casos el Restaurante le enviará el mensaje del mismo nombre al Broker para mantenerlo actualizado).
- **Fallos:** si la conexión con un broker se cae, se reconectará automáticamente con otro.
---
### `Rider` (Repartidor)
#### Finalidad general
El `Rider` es quien está a la espera de pedidos listos para ser enviados a los comensales.
#### Estado interno
```rust
struct Rider {
location: Location,
broker_addr: Vec<SocketAddr>,
writer: Option<Addr<UdpWriter>>
}
```
En su estado interno, el rider almacena:
- Su posición actual.
- Todas las direcciones de los brokes disponibles.
- Broker con el que **actualmente esta comunicándose**, necesario para hacerle saber que está disponible para recibir pedidos.
- La **dirección del UdpWriter**, para el envío de mensajes de manera asincrónica.
#### Que mensajes recibe
```rust
enum RiderMessages {
DeliveryRequest { _broker_id: usize },
DeliveryConfirmed { order_id: u64 }
}
```
- **`DeliveryRequest`:**
- **Significado:** el broker le envío una solicitud de pedido.
- **Reacción:** intenta tomar ese pedido.
- **`DeliveryConfirmed`:**
- **Significado:** el broker confirma que es el quien debe tomar el pedido.
- **Reacción:** toma el pedido y emula la acción de llevarle el pedido al comensal.
---
### `Broker` (Intermediario)
#### Finalidad general
La finalidad general del broker es de administrar y coordinar los mensajes entre comensales-restaurantes-repartidores,
#### Estado interno
El estado interno del Broker puede variar según quien sea, su principal distinción es sí es Coordinador de Riders o no, en este caso tiene la lista de todos los riders en el sistema.
```rust
struct Broker {
riders_locales: HashMap<IdRider, InfoRider>,
riders_totales: Option<HashMap<IdRider, InfoRider>>,
restaurantes_locales: HashMap<IdRestaurante, InfoRestaurante>,
orders: HashMap<String, Order>,
restaurantes_totales: HashMap<Restaurante, SocketAddr>
...
}
```
#### Mensajes que recibe
El broker recibe diferentes mensajes según la entidad que se lo envíe
```rust
enum BrokerMessages {
// Restaurant
RegisterRestaurant { rest_id: u128, x: i32, y: i32 },
OrderAccepted { order_id: u64, rest_id: u128 },
OrderRejected { order_id: u64 },
OrderStarted { order_id: u64 },
OrderPrepared { order_id: u64 },
...
```
- **`RegisterRestaurant`:**
- **Significado:** El restaurante quiere registrar su información.
- **Reacción:** Se agrega en el mapa de `restaurante_locales` y le avisa a los demás brokers para que lo registren .
- **`OrderAccepted`:**
- **Significado:** El restaurante acepta la orden pedida por el comensal.
- **Reacción:** Se le notifica al comensal que su orden fue aceptada y se baja al log.
- **`OrderRejected`:**
- **Significado:** El restaurante rechaza la orden pedida por el comensal.
- **Reacción:** Se le notifica al comensal que su orden fue rechazada y se baja al log.
- **`OrderStarted`:**
- **Significado:** El restaurante empieza a cocinar la orden pedida por el comensal.
- **Reacción:** Se le notifica al comensal que su orden está preparandose.
- **`OrderPrepared`:**
- **Significado:** El restaurante notifica que la orden pedida por el comensal ha sido completada y está a la espera del rider.
- **Reacción:** Se le notifica al comensal que su orden fue completada, está a la espera y se baja al log. Se le notifica al broker coodinador para que empiece la busqueda de riders.
```rust
...
// Rider
RegisterRider { rider_id: u64, x: i32, y: i32 },
DeliveryAccepted { order_id: u64 },
DeliveryCompleted { order_id: u64 },
RiderStatus { rider_id: u64, x: i32, y: i32 },
...
```
- **`RegisterRider`:**
- **Significado:** El rider notifica que está online y a la espera de pedidos.
- **Reacción:** Se registra el rider en `riders_locales` y se le notifica al broker coordinador para que lo agregue en `riders_totales`.
- **`DeliveryAccepted`:**
- **Significado:** El rider acepta el pedido.
- **Reacción:** Se fija si otro rider ya había aceptado el pedido, de ser así, se le notifica que no se le asigna, en caso contrario se le asigna, y se le notifica al comensal el estatus actual del pedido.
- **`DeliveryCompleted`:**
- **Significado:** El rider notifica que el pedido fue entregado con exito.
- **Reacción:** Se le notifica al usuario que fue entregado el pedido y se descuenta el dinero del mismo, se baja al log y se termina la transaccion.
- **`RiderStatus`:**
- **Significado:** El rider notifica su posicion actual.
- **Reacción:** Se actualiza la información del rider en los registros de los mismos.
```rust
...
// Diner
RequestRestaurantsInfo,
PlaceOrder { diner_id: u64, rest_id: u128 },
...
```
- **`RequestRestaurantsInfo`:**
- **Significado:** El comensal pide la información de todos los restaurantes.
- **Reacción:** Se pide la información de los restaurantes locales a cada broker para luego devolver la información acumulada.
- **`PlaceOrder`:**
- **Significado:** El comensal hace la orden de un pedido.
- **Reacción:** Se le envía al broker que tenga a ese restaurante la solicitud de hacer el pedido al restaurante.
```rust
...
// Gateway
PaymentPrepared(order_id),
PaymentAborted(order_id),
PaymentCommited(order_id),
...
```
- **`PaymentPrepared`:**
- **Significado:** El Gateway aprobó el pago del comensal para la orden especificada.
- **Reacción:** El Broker actualiza el estado de la orden a Aprobado y le envía al Restaurante el pedido (para que sea aprobado o rechazado).
- **`PaymentAborted`:**
- **Significado:** El Gateway rechazó el pago del comensal para la orden especificada.
- **Reacción:** El Broker actualiza el estado de la orden a Rechazada y le avisa al comensal que hubo un Error en su Orden.
- **`PaymentCommited`:**
- **Significado:** El Gateway realizó efectivamente el cobro del dinero de la cuenta del comensal para la orden especificada.
- **Reacción:** El Broker actualiza el estado de la orden a Terminada y le avisa al comensal que su Orden se completó con éxito.
```rust
...
//Broker
Election,
Coordinator{coordinator_addr: SocketAddr},
RequestRestaurantsLocalInfo{request_broker: BrokerId},
RegisterRestaurantBroker{restaurant: RestaurantInfo, broker_place: SocketAddr}
Prepared,
Ready,
Commit,
Abort
...
```
- **`Election`:**
- **Significado:** Se cayó el broker coordinador fue avisado que hay proceso de elección.
- **Reacción:** Se ejecuta el algoritmo de elección de tipo ring.
- **`Coordinator`:**
- **Significado:** Se ha completado el proceso de elección y ya hay coordinador.
- **Reacción:** El broker actualiza la dirección del broker coordinador y pasa sus `riders_locales` al nuevo coordinador para que arme la lista de `riders_globales`.
- **`RequestRestaurantsLocalInfo`:**
- **Significado:** El request_broker pide la lista de los restaurantes locales para mostrarle al comensal.
- **Reacción:** El broker responde con los restaurantes que están conectados a él.
```rust
...
//Broker Coordinator
RegisterRidersLocalCoordinator{riders_locales: Hashmap<IdRider, InfoRider>},
UpdateRiderGlobal{ rider_id: u64, x: i32, y: i32 },
}
```
- **`RegisterRidersLocalCoordinator`:**
- **Significado:** Los brokers le envían la información de los `riders_locales` al coordinador luego del proceso de elección.
- **Reacción:** El broker coordinador agrega los riders a la lista global.
- **`UpdateRiderGlobal`:**
- **Significado:** Los brokers envían update de un rider local ya sea por registrarse o actualizar su posición o estado.
- **Reacción:** El broker coordinador actualiza al rider en su lista global.
#### Mensajes que envia
```rust
enum BrokerMessages {
// To Restaurant
OrderRequest { order_id },
// To Diner
UpdateStatus {status: OrderStatus}
// To Rider
DeliveryRequest { _broker_id: SocketAddr },
DeliveryConfirmed { order: InfoOrder }
// To Broker
Election,
Coordinator{coordinator_addr: SocketAddr},
RequestRestaurantsLocalInfo{request_broker: BrokerId},
Prepared,
Commit,
Abort,
// To Broker Coordinator
RegisterRidersLocalCoordinator{riders_locales: Hashmap<IdRider, InfoRider>},
UpdateRiderGlobal{ rider_id: u64, x: i32, y: i32 },
}
```
#### Protocolos de comunicación
La comunicación estará serializada en JSON con serde y enviarse por los siguientes tipos de transporte:
- UDP: Para la comunicación con los riders ya que pueden ser muchos riders y no nos interesa una información actualizada de los mismos
- TCP: Para la comunicación con los Comensales, los Restaurantes y entre Brokers, ya que queremos que los mensajes entre estos lleguen, además de que será recibir para responder al mismo
- De aplicación: El broker usa Actores con su pasaje de mensajes para hablar con los actores Writers del mismo, y para autoenviarse mensajes
#### Casos de interés
- **Ruta feliz:**
- Al iniciarse el actor, automáticamente se conecta con los brokers con id mayores a él y espera las conexiones de los menores, a su vez espera conexiones de comensales y restaurantes.
- Puede recibir mensajes de registrar restaurantes y riders para ir empezando a agregarlos al sistema, haciendo sus respectivos reenvíos de mensajes a los demás brokers para registrar.
- El comensal quiere empezar a hacer una orden y pide información sobre los restaurantes, este devuelve la lista de restaurantes que tiene almacenada
- Al recibir un pedido de orden por parte del comensal, envía solicitud de pedido al restaurante si es parte de él y si no, al broker correspondiente para que reenvíe este mensaje al restaurante
- Antes de enviar el mensaje al restaurante simula el sistema de aceptación de la tarjeta del comensal, en caso de ser aceptado, commitea a los demas brokers para guardar en el log
- A partir de aqui el broker notifica al comensal sobre el estado de su pedido según le informe el restaurante
- Una vez el pedido esté listo en el restaurante, el broker coordinador que es quien tiene la lista de todos los riders online, busca los `n` riders más cercanos y envía la notificación.
- El rider cuyo mensaje llegue primero, será a quien se le asigne ese pedido. Una vez asignado, se le envia un codigo de verifacion al restaurante y al rider.
- Una vez que el rider llega al restaurante, asumimos que se hace el intercambio de los codigos y se inicia el delivery del pedido.
- Por último el broker será avisado cuando se entregue el pedido para cerrar este caso y terminar de loguear.
- **Fallos:**
- En caso de que el pago sea rechazado, este notificará a los demás brokers para que se aborte y deshagan los logs escritos hasta el momento, y se le notifica al usuario
- En caso de que el restaurante sea calificado como "caido" o rechace el pedido, este notifica a los demas brokers para que se aborte, devolviendo el dinero retenido del comensal.
- En caso de que el rider asignado sea calificado como "caido" mientras se esta realizando el delivery, se hara rollback de toda la transaccion. En caso de que el rider asignado sea calificado como "caido" previamente a retirar el pedido, se deberia solicitar otro rider.
- Si el broker coordinador de riders se cae, los demás empezarán un proceso de elección sin poder seguir su procesamiento hasta que haya sido elegido un nuevo coordinador
- Si el broker con quien habla el comensal se cae, el comensal se conectará a otro broker y podrá seguir su pedido nuevamente por la recuperación del log.
## Diagramas
### Diagrama de Secuencias
### Caso feliz
```mermaid
sequenceDiagram
participant C as Diner
participant B as Broker
participant R as Restaurant
participant X as Rider
C->>B: RequestRestaurantsInfo()
B-->>C: RestaurantsAvailable(Vec<(SocketAddr, Location)>)
C->>B: PlaceOrder(diner_id, rest_id, amount)
B->>B: PreparePayment(order_id, diner_id, amount)
B-->>B: PaymentApproved(order_id, diner_id, amount)
B->>R: OrderRequest(order_id)
R-->>B: OrderAccepted(order_id)
B-->>C: UpdateStatus(Confirmed)
R->>B: OrderStarted(order_id)
B-->>C: UpdateStatus(Cooking)
R->>B: OrderPrepared(order_id)
B-->>C: UpdateStatus(Ready)
B->>X: DeliveryRequest(order_id)
X-->>B: DeliveryAccepted(order_id)
B-->>X: DeliveryConfirmed(order_id)
B-->>R: DeliveryConfirmed(order_id, rider_id)
B-->>C: UpdateStatus(RiderAssigned)
X->>B: DeliveryStarted(order_id)
B-->>C: UpdateStatus(OnTheWay)
X->>B: DeliveryCompleted(order_id)
B->>B: CommitPayment(order_id)
B-->>B: PaymentCommited(order_id)
B-->>C: UpdateStatus(Delivered)
```
#### Caso fallido: Pago rechazado
```mermaid
sequenceDiagram
participant C as Diner
participant B as Broker
C->>B: RequestRestaurantsInfo()
B-->>C: RestaurantsAvailable(Vec<(SocketAddr, Location)>)
C->>B: PlaceOrder(diner_id, rest_id, amount)
B->>B: PreparePayment(order_id, diner_id, amount)
B-->>B: PaymentAborted(order_id)
B-->>C: UpdateStatus(PaymentRejected)
```
#### Caso fallido: Restaurante cancela pedido
```mermaid
sequenceDiagram
participant C as Diner
participant B as Broker
participant R as Restaurant
C->>B: RequestRestaurantsInfo()
B-->>C: RestaurantsAvailable(Vec<(SocketAddr, Location)>)
C->>B: PlaceOrder(diner_id, rest_id, amount)
B->>B: PreparePayment(order_id, diner_id, amount)
B-->>B: PaymentApproved(order_id, diner_id, amount)
B->>R: OrderRequest(order_id)
R-->>B: OrderRejected(order_id)
B-->>C: UpdateStatus(Cancelled)
```
### Diagrama: Commit en dos Fases
```mermaid
sequenceDiagram
participant D as Diner
participant B1 as Broker 1
participant B2 as Broker 2
participant B3 as Broker 3
participant . as ...
participant BN as Broker N
D->>B1: RequestRestaurantsInfo()
B1-->>D: RestaurantsAvailable(Vec<(SocketAddr, Location)>)
D->>B1: PlaceOrder(diner_id, rest_id, amount)
B1->>B2: Prepare(OrderData)
B1->>B3: Prepare(OrderData)
B1->>.: Prepare(OrderData)
B1->>BN: Prepare(OrderData)
B2-->>B1: Ready
B3-->>B1: Ready
.-->>B1: Ready
BN-->>B1: Ready
B1->>B2: Commit
B1->>B3: Commit
B1->>.: Commit
B1->>BN: Commit
B2-->>B1: Finished
B3-->>B1: Finished
.-->>B1: Finished
BN-->>B1: Finished
B1-->>D: UpdateOrder(Confirmed)
```
### Diagrama: Elección de líder entre Brokers (Algoritmo Ring)
```mermaid
sequenceDiagram
participant B1 as Broker 1 (Detecta falla)
participant B2 as Broker 2
participant . as ...
participant BN as Broker N
B1->>B2: Election([1])
B2->>.: Election([1,2])
.->>BN: Election([1,2,…,N-1])
BN->>B1: Election([1,2,…,N-1,N])
B1->>B2: Coordinator(N)
B2->>.: Coordinator(N)
.->>BN: Coordinator(N)
```
# Cambios realizados
## Broker
### Estado interno
Ahora al broker se le agregan una entidad llamada BrokerRing, que contiene toda la informacion del anillo de brokers.
```
pub struct Broker {
connections: Addr<ConnectionManager>,
orders: HashMap<String, Order>,
gateway: Addr<Gateway>,
ring: BrokerRing,
local_restaurants: HashMap<String, Location>,
local_riders: HashMap<SocketAddr, RiderInfo>,
local_diners: HashMap<String, Location>,
message_count: usize,
}
pub struct BrokerRing {
id: BrokerId,
brokers: Vec<SocketAddr>,
election: ElectionManager,
neighbors: NeighborManager,
}
```
- ElectionManager tiene la informacion necesaria para poder realizar una eleccion, sabe quien es el leader y si esta participando actualmente en una eleccion.
- NeighborManager conoce al sucesor y al predecesor del broker en el ring.
```
pub struct ElectionManager {
leader: Option<BrokerId>,
participating: bool,
}
pub struct NeighborManager {
successor: Option<Neighbor>,
predecessor: Option<Neighbor>,
}
```
Tambien se le agrega una entidad llamada ConnectionManager, una capa de abstraccion que se encarga de interceptar los mensajes que se envian a traves de la red entre brokers y comunicarselo al broker que le pertenece y a su vez comunicar la respuesta de dicho mensaje al receptor correspondiente.
```
pub struct ConnectionManager {
broker: Addr<Broker>,
broker_bind: SocketAddr,
incoming_connections: HashMap<String, ConnectionEntry>,
outgoing_connections: HashMap<String, ConnectionEntry>,
udp_writer: Option<Addr<UdpWriter>>,
udp_timer: Addr<UdpTimer<Self, TimerTimeout>>,
leader: Option<Addr<LeaderActor>>,
}
pub struct ConnectionEntry {
pub writer: Addr<TcpWriter>,
pub reader: Addr<TcpReader<ConnectionManager>>,
}
```
Otra incorporacion es la de UdpTimer, el cual se encarga de generar un timer por mensaje que se envie mediante UDP y avisar en caso de que dicho mensaje efectivamente de timeout. Esto es necesario para poder manejar el estado de la conexion entre distintas entidades.
```
pub struct UdpTimer<A: actix::Actor, R> {
messages: HashMap<String, (Addr<A>, R)>,
}
pub enum TimerTimeout {
Broker(BrokerTimeouts),
Leader(LeaderTimeouts),
}
pub enum BrokerTimeouts {
NoAckFromLeader(StartElection),
RestaurantTimeout(RestaurantTimeout),
RiderTimeout(RiderTimeout),
}
pub enum LeaderTimeouts {
NoAckDeliveryRequest(RemoveRiderFromOrder),
}
```
Finalmente se incorpora LeaderActor, un actor encargado de todas las funciones que tiene el coordinador dentro del ring, como por ejemplo:
- Buscar los _N_ riders mas cercanos para todos los pedidos y asignarselo al primero que responda.
- Compartir la lista completa de restaurantes para los clientes.
```
pub struct LeaderActor {
count_messages: usize,
global_restaurants: HashMap<RestaurantId, RestaurantInfo>,
global_riders: HashMap<RiderId, RiderInfo>,
orders: HashMap<String, Order>,
order_finding_riders: HashMap<String, Vec<SocketAddr>>,
}
```
### Mensajes que recibe
Se define ahora un enum de enums para poder manejar los mensajes que recibe el broker.
```
pub enum BrokerMessages {
FromBrokerFollower(FromBrokerFollower),
FromBrokerLeader(FromBrokerLeader),
FromBrokerPeer(FromBrokerPeer),
FromRestaurant(FromRestaurant),
FromDiner(FromDiner),
FromRider(FromRider),
FromGateway(FromGateway),
}
```
#### From Broker Follower
Se definen los mensajes que puede recibir el broker cuando es el coordinador:
```
pub enum FromBrokerFollower {
RegisterLocal(AddLocal),
RegisterRestaurant(AddRestaurant),
RegisterRider(AddRider),
RemoveRider(RemoveRider),
RemoveRestaurant(RemoveRestaurant),
RequestRestaurants(GetRestaurants),
FindRider(FindRider),
}
```
- RegisterLocal(AddLocal): Registra la informacion que contiene localmente el broker que envio el mensaje.
```
pub struct AddLocal {
pub message_id: String,
pub restaurants: HashMap<String, Location>,
pub riders: HashMap<SocketAddr, (RiderStatus, Location)>,
pub broker_addr: SocketAddr,
}
```
- RegisterRestaurant(AddRestaurant): Registra un restaurante que se conecto al broker que envio el mensaje.
```
pub struct AddRestaurant {
pub message_id: String,
pub restaurant_id: String,
pub location: Location,
pub broker_addr: SocketAddr,
}
```
- RegisterRider(AddRider): Registra un rider que se conecto al broker que envio el mensaje.
```
pub struct AddRider {
pub message_id: String,
pub rider_id: SocketAddr,
pub location: Location,
pub broker_addr: SocketAddr,
}
```
- RemoveRider(RemoveRider): Elimina un rider de la lista global. Para que se ejecute el envio de este mensaje, el broker que tenia registrado localmente ese rider le tuvo que haber dado timeout al mismo, luego el broker le envia este mensaje al coordinador.
```
pub struct RemoveRider {
pub message_id: String,
pub rider_id: SocketAddr,
pub broker_addr: SocketAddr,
}
```
- RemoveRestaurant(RemoveRestaurant): Elimina un restaurante de la lista global, aplica la misma logica que en el caso del rider.
```
pub struct RemoveRestaurant {
pub message_id: String,
pub restaurant_id: String,
pub broker_addr: SocketAddr,
}
```
- RequestRestaurants(GetRestaurants): Devuelve la lista entera de los restaurantes a un broker para que se lo reenvie a un cliente.
```
pub struct GetRestaurants {
pub message_id: String,
pub client_id: String,
}
```
- FindRider(FindRider): Busca un rider para un pedido en especifico.
```
pub struct FindRider {
pub message_id: String,
pub order: Order,
}
```
#### From Broker Leader
Se definen los mensajes que puede recibir un broker del coordinador:
```
pub enum FromBrokerLeader {
AckLeader {
message_id: String,
},
GlobalRestaurantsResponse {
client_id: String,
restaurants: HashMap<String, Location>,
},
NoRidersAvailable {
order_id: String,
},
DeliveryAssigned {
order_id: String,
rider_id: SocketAddr,
},
}
```
- AckLeader: ACK de cualquier mensaje que le envie al coordinador
- GlobalRestaurantsResponse: Devuelve la lista de los restaurantes totales
- NoRidersAvailable: Indica que no se encontraron riders disponibles para el pedido con id _order_id_.
- DeliveryAssigned: Indica que se asigno la orden con el id _order_id_ al rider con id _rider_id_.
#### From Broker Peer
Se definen los mensajes que puede recibir un broker de otro broker del ring:
```
pub enum FromBrokerPeer {
LeaderQuery(usize),
LeaderReply(usize, HashMap<String, Order>),
Election(BrokerId),
Elected(BrokerId),
Relay {
message: Box<FromBrokerPeer>,
from: BrokerId,
to: String,
},
SendDiner {
client_id: String,
message: ClientMessages,
},
SendRestaurant {
restaurant_id: String,
message: RestaurantMessages,
},
Commit {
order: Order,
initiator: BrokerId,
},
}
```
- LeaderQuery: Indica que un broker del ring quiere saber quien es el lider. En caso de conocerlo se lo paso al siguiente en el ring, y si vuelve al emisor, significa que no hay coordinaron entonces se inicia una eleccion.
- LeaderReply: Es la respuesta al LeaderQuery, donde tambien se encuentra un hash con las ordenes activas.
- Election: Indica que se inicio un proceso de eleccion, contiene el id del ultimo broker. Cada broker se postula solo si su id es mayor al que recibio, una vez que vuelve al que inicio el proceso de eleccion se define al coordinador.
- Elected: Indica que se eligio un nuevo coordinador, contiene al id del mismo.
- Relay:
- SendDiner: Contiene un mensaje que le debo reenviar al cliente que tengo conectado locamente.
- SendRestaurant: Contiene un mensaje que le debo reenviar al restaurante que tengo conectado locamente.
- Commit:
#### From Restaurant
Se definen los mensajes que puede recibir un broker de un restaurante, donde solo se encuentran el pedido de registro y las actualizaciones del estado de un pedido:
```
pub enum FromRestaurant {
RegisterRestaurant {
rest_id: String,
addr: String,
location: Location,
},
OrderAccepted {
order_id: String,
},
OrderRejected {
order_id: String,
},
OrderStarted {
order_id: String,
},
OrderCompleted {
order_id: String,
},
}
```
#### From Diner
Se definen los mensajes que puede recibir un broker de un cliente, donde solo se encuentran el pedido de registro del cliente, el pedido de la lista total de restaurantes y efectuar una orden:
```
pub enum FromDiner {
RegisterDiner {
client_id: String,
addr: String,
location: Location,
},
PlaceOrder {
client_info: ClientInfo,
rest_info: RestaurantInfo,
amount: f32,
},
RequestRestaurantsInfo {
client_id: String,
},
}
```
#### From Rider
Se definen los mensajes que puede recibir un broker de un rider, donde solo se encuentran el pedido de registro del rider, la actualizacion de su posicion, aceptar un pedido y el aviso de que se completo un pedido:
```
pub enum FromRider {
RegisterRider {
message_id: u64,
rider_id: SocketAddr,
location: Location,
},
RiderStatus {
rider_id: SocketAddr,
location: Location,
timestamp: std::time::SystemTime,
},
DeliveryAccepted {
message_id: String,
order_id: String,
},
DeliveryCompleted {
order_id: String,
},
}
```
#### From Gateway
Se definen los mensajes que puede recibir un broker del gateway, donde solo se encuentran los mensajes que indican el estado de un pago:
```
pub enum FromGateway {
PaymentAuthorized { order_id: String },
PaymentAborted { order_id: String },
PaymentCommited { order_id: String },
}
```
### Mensajes que envia
#### Broker Handling
- DropConnection:
- LeaderQuery: Envia quien es el lider al broker que se lo solicito.
#### Connection Handling
- StartConnectionLoop: Envia este mensaje al ConnectionManager cuando se dropea una conexion, para que busque reconectarse al siguiente en caso de que sea el predecesor.
- RemoveRestaurantFromGlobalList: Le manda un mensaje al lider en caso de que se droppee la conexion con el restaurante para que lo remueva de la lista global.
#### Diner Handling
- UpdateConnection:
- RequestRestaurantsFromLeader: Envia este mensaje al lider para pedirle la lista de todos los restaurantes para enviarselo al cliente.
- AuthorizePayment: Envia este mensaje al gateway para que autorice el pago de la orden que creo el cliente.
- RestuarantAvailable: Envia la lista completa de todos los restaurantes que estan disponibles.
#### Gateway Handling
- OrderRequest: Una vez que se autoriza el pago, manda la solicitud de la orden al restaurante en caso de que tenga el restaurante, sino se lo pasa al sucesor.
#### Leader Handling
- RestaurantsAvailable: Envia la lista completa de restaurantes al broker que se lo solicito.
#### Leader Sending
- AddLocal: Envia este mensaje al lider para que agregue la informacion que tiene localmente el broker.
- GetRestaurants: Envia este mensaje al lider para que le devuelva la lista global de restaurantes.
- AddRider: Envia este mensaje al lider para que agregue a un rider a la lista global.
- AddRestaurant: Envia este mensaje al lider para que agregue un restaurante a la lista global.
- RemoveRider: Envia este mensaje al lider para que elimine a un rider de la lista global.
- RemoveRestaurant: Envia este mensaje al lider para que elimine a un restaurante de la lista global.
#### Peer Handling
- StartLeaderActor: Se automanda este mensaje en caso de que haya quedado como lider.
- SendInfoToLeader:
- UpdateStatus: En la ronda de commit en caso de tener al cliente, le paso la actualizacion del estado de su orden.
#### Restaurant Handling
- RegisterRestaurantInGlobalList: Manda al coordinador a registrar al restaurante en la lista global.
- OrderRequest: Le avisa al restaurante que tiene una orden.
- UpdateConnection:
- FindRider: Una vez terminado el pedido, envia este mensaje al coordinador para que encuentre un rider.
#### Rider Handling
- Ack: Este mensaje se envia como confirmacion de que llego el estado del rider.
- RegisterRiderInGlobalList: Manda al coordinador a registrar al rider en la lista global.
- RemoveRiderInGlobalList: Manda al coordinador a eliminar al rider en la lista global, al haber este dado timeout.
- CommitPayment: Una vez que el rider haya avisado que completo el pedido, se le pide al gateway que commitee el pago.
- RiderHeartbeat: Se automanda un mensaje de forma periodica para eliminar un rider en caso de que haya dado timeout.
## Rider
### Estado interno
Similar al broker, se abstrajo la logica de el deserializado y el envio de mensajes a traves de la red con el actor _ConnectionManager_.
```
pub struct Rider {
addr: SocketAddr,
location: Location,
current_order: Option<Order>,
connection_manager: Option<Addr<ConnectionManager>>,
}
```
_ConnectionManager_ contiene el estado de la conexion, los timestamps del ultimo heartbeat que se envio y la ultima respuesta del broker actual.
Con esta informacion y, ademas, con una lista con todos los brokers, el rider puede reconectarse al ring en caso de que se tome como caido (mediante timeouts) al broker con el que estaaba conectado.
Finalmente contiene el id del broker actual y un _UdpWriter_, un actor encargado de escribir mensajes mediante UDP.
```
pub struct ConnectionManager {
status: ConnectionStatus,
last_heartbeat: SystemTime,
last_broker_message: SystemTime,
rider_addr: Addr<Rider>,
broker_ring_addr: Vec<SocketAddr>,
id_broker_actual: usize,
udp_writer: Addr<UdpWriter>,
}
enum ConnectionStatus {
READY,
RECONNECTING,
}
```
### Mensajes que recibe
```
pub enum RiderMessages {
Ack {
_id_message: u64,
},
DeliveryRequest {
id_message: String,
order_id: String,
},
DeliveryConfirmed {
order: Order,
},
DeliveryAlreadyTaken {
id_message: String,
order_id: String,
},
}
```
- Ack: Mensaje de respuesta a los heartbeat, sirven para registrar el estado del broker.
- DeliveryRequest: Indica que el rider fue uno de los elegidos para llevar la orden con id _order_id_.
- DeliveryConfirmed: Indica que la orden le fue asignada al rider.
- DeliveryAlreadyTaken:
### Mensajes que envia
- RegisterRider: Le pide al broker con el que esta conectado que lo registre en el ring.
- DeliveryAccepted: Acepta la solicitud de delivery de una orden.
- GoToRestaurant: Se automanda este mensaje de forma continua para que se mueva un poco hacia el restaurante hasta llegar a el.
- GoToClient: Una vez que llega al restaurante y toma el pedido, se automanda este mensaje de forma continua para que se mueva un poco hacia el cliente hasta llegar a el.
- DeliveryCompleted: Una vez que llega al destino del cliente se completa el pedido y se le avisa al broker.
- RiderStatus: Se manda este mensaje al broker de forma periodica, para mantener un estado de la conexion y la posicion del rider.
## Restaurant
### Estado interno
Como podemos apreciar, el estado interno no cambio desde las solucion propuesta.
```
pub struct Restaurant {
id: String,
location: Location,
kitchen: Addr<Kitchen>,
brokers: Vec<SocketAddr>,
writer: Option<Addr<TcpWriter>>,
}
```
### Mensajes que recibe
Se agrega _BrokerDisconnected_, un mensaje interno que indica que debe reconectarse al ring.
```
pub enum RestaurantMessages {
OrderRequest { order_id: String },
OrderStarted { order_id: String },
OrderPrepared { order_id: String },
OrderRejected { order_id: String },
OrderAccepted { order_id: String },
BrokerDisconnected,
}
```
### Mensajes que envia
- RegisterRestaurant: Se envia ese mensaje al broker para que lo registre en el ring.
- OrderRequest: Avisa a la cocina que llego una orden.
- OrderStarted, OrderAccepted, OrderRejected, OrderCompleted: Avisa sobre la actualizacion del estado de la orden al broker.
## Client
### Estado interno
Ahora el cliente pasas a tener un _Location_ y un _TcpWriter_ por donde se va a comunicar con algun broker del ring.
```
pub struct Client {
id: String,
location: Location,
brokers: Vec<SocketAddr>,
writer: Option<Addr<TcpWriter>>,
}
```
### Mensajes que recibe
Como podemos apreciar, los mensajes que recibe no cambiaron desde las solucion propuesta.
```
pub enum ClientMessages {
RestaurantsAvailable {
restaurants: HashMap<String, Location>,
},
UpdateStatus {
order_id: String,
status: OrderStatus,
},
}
```
### Mensajes que envia
- InitConnection: Se autoenvia un mensaje para iniciar la conexion con el broker.
- SetupConnection: Se autoenvia un mensaje para terminar de definir la conexion con el broker.
- RegisterDiner: Envia este mensaje al broker para que lo registre en el ring.
- PlaceOrder: Se manda este mensaje cuando el cliente quiere crear una orden.
## Udp Writer
```
pub struct UdpWriter {
framed: Option<UdpHalfWriter>,
}
pub type UdpHalfWriter = SplitSink<UdpFramed<LengthDelimitedCodec, UdpSocket>, (Bytes, SocketAddr)>;
```
# Graficos
## Broker Innit
```mermaid
sequenceDiagram
participant B as Broker
participant CM as ConnectionManager
participant T as Broker TcpListener
participant TC as Client TcpListener
participant D as UdpListener
B->>CM: Started
CM->>T: Started
CM->>TC: Started
CM->>D: Started
T->>CM: StartConnectionLoop
```
## Broker joining Ring
```mermaid
sequenceDiagram
participant NM as NeighborManager (from Broker)
participant BR as BrokerRing (from Broker)
participant B as Broker
participant CM as ConnectionManager
participant BFR as Broker From Ring
CM->>B: SetSuccesor
CM->>BFR: broker id as u8
CM->>CM: RegisterConnection
CM->>B: AskForLeader
B->>BR: send_succesor()
BR->>NM: send_to_succesor()
NM->>CM: LeaderQuery
CM->>BFR: LeaderQuery
```
### Case 1: There is a leader
```mermaid
sequenceDiagram
participant B as Broker
participant BFR as Broker From Ring
B->>BFR: LeaderQuery
BFR->>B: LeaderReply(BrokerId)
```
### Case 2: There is a no leader
```mermaid
sequenceDiagram
participant B as Broker
participant BFR as Broker 1 From Ring
participant BFR1 as Broker 2 From Ring
B->>BFR: LeaderQuery
BFR->>BFR1: LeaderQuery
BFR1->>B: LeaderQuery
B->>B: begin_election()
B->>BFR: Election(0)
BFR->>BFR1: Election(1)
BFR1->>B: Election(2)
B->>BFR: Elected(2)
BFR->>BFR1: Elected(2)
BFR1->>B: Elected(2)
```
## Register Restaurant
```mermaid
sequenceDiagram
participant Re as Restaurant
participant BF as Broker Follower
participant BL as Broker Leader
Re->>BF: RegisterRestaurant
BF->>BL: RegisterRestaurantInGlobalList
```
## Register Rider
```mermaid
sequenceDiagram
participant Ri as Rider
participant BF as Broker Follower
participant BL as Broker Leader
Ri->>BF: RegisterRider
BF->>BL: RegisterRiderInGlobalList
```
## Full example
```mermaid
sequenceDiagram
participant C as Client
participant BF as Broker Follower
participant G as Gateway
participant BL as Broker Leader
participant Re as Restaurant
participant Ri as Rider
C->>BF: RegisterDiner
BF->>BL: GetRestaurants
BL->>BF: GlobalRestaurantsResponse
BF->>C: RestaurantsAvailable
C->>BF: PlaceOrder(Order)
BF->>G: AuthorizePayment
G->>BF: PaymentAuthorized(OrderId)
BF->>Re: OrderRequest(Order)
Re->>BF: OrderAccepted(Order)
BF->>C: UpdateStatus
Re->>BF: OrderStarted(Order)
BF->>C: UpdateStatus
Re->>BF: OrderCompleted(Order)
BF->>C: UpdateStatus
BF->>BL: FindRider(Order)
BL->>Ri: DeliveryRequest(Order)
Ri->>BL: DeliveryAccepted(Order)
BL->>Ri: DeliveryConfirmed(Order)
BL->>BF: DeliveryAssigned(Order)
BF->>C: UpdateStatus
Ri->>BL: RiderStatus
Ri->>BL: ...
Ri->>BL: RiderStatus
Ri->>BL: DeliveryCompleted(Order)
BL->>BF: Commit(Order)
BF->>C: UpdateStatus
BF->>BL: Commit(Order) (no hace nada ya que el lider es el initiator)
```