# Base Module
### Описание модуля
[BSM](http://project.dev.tengriwallet.kz/browser/tengri/mod_base/tengri-basemodule) модуль представляет собой реализацию механизма отправки/приема и обработки сообщений в формате HTTP/REST/JSON между модулями TengriWallet на основе [Матрицы требований](http://project.dev.tengriwallet.kz/wiki/Requirements/RequirementsMatrix#CMN). Сообщение хранится во входящей очереди, исходящей очереди
базовой структуры модуля и передается между БСМ разных модулей нашей системы.
В каждом конкретном модуле БСМ извлекает `body` из сообщения. По адресу БСМ находит
поведение модуля, которому предназначено сообщение, и передает этому поведению `params`
### Подключение модуля
После согласование с командой о добавлении нового модуля, необходимо:
**Добавить в Gemfile** проекта нового модуля
```ruby
gem 'log_cleaner', path: '../dependencies/log_cleaner'
gem 'tengri-basemodule', path: '../mod_base/tengri-basemodule'
```
**Создать адаптер с поведением**.
Добавить его имя и класс в переменных .env файла в корне модуля. Так же добавить название базы, ип:порт.
```bash
# ID модуля. Используется в поле сообщения address.sender для
# идентефикации отправителя сообщения, так же message.sender_id
MODULES_ID=NewModule
# Настройки Puma для Server
HTTP_SERVER_HOST=localhost
HTTP_SERVER_PORT=XXXX # обсуждается с архитектором
# Настройки Poller.
# Название класса адаптера предоставляющего интерфейс для других модулей
SUBSCRIBED_CLASS=NewModule
# Относительный путь от корня модуля к файлу реализации адаптера
SUBSCRIBED_CLASS_LOCATION=./lib/new_module_adapter.rb
# Название базы для хранения сообщений
MESSAGES_DB_NAME=tengri_new_module_messages
# Список модулей в системе TengriWallet. Poller при отправке сообщения определенному модулю берет его адрес из этой переменной
MODULES_ZOO = {"API": "http://localhost:4000", "API2": "http://localhost:4012", ...}
```
Обновить .env файлы других модулей в tools-and-utils/distribution_package_routines/environments, добавить в MODULES_ZOO адрес нового модуля.
### Структура BSM
BSM состоит из:
* [Poller](http://project.dev.tengriwallet.kz/browser/tengri/mod_base/tengri-basemodule/lib/tengri/basemodule/poller.rb) - Обрабатывает сообщения из очереди
* [Server](http://project.dev.tengriwallet.kz/browser/tengri/mod_base/tengri-basemodule/lib/tengri/basemodule/http_server.rb) - прием и сохранение сообщений в очередь для обработки.
Каждый модуль использующий BSM запускает по одному экземпляру Poller и Server.
Если проанализровать, что делает команда [tlc start](http://project.dev.tengriwallet.kz/browser/tengri/launch-control/start.sh) в локальном окружении разработчика,то мы видим, что с помощью [Invoker](http://project.dev.tengriwallet.kz/browser/tengri/launch-control/bin/invoker) поочередно из файла [invoker.ini](http://project.dev.tengriwallet.kz/browser/tengri/launch-control/invoker.ini) запускаются модули TengriWallet.
```ini
[api-poller]
directory = ../mod_api
command = rvm in . do bundle exec poller
[api-server]
directory = ../mod_api
command = rvm in . do bundle exec server
[crm-poller]
directory = ../mod_crm
command = rvm in . do bundle exec poller
[crm-server]
directory = ../mod_crm
command = rvm in . do bundle exec server
```
### Схема работы BSM Server
Основной задачей сервера является получение сообщений от других модулей с дальнейшим сохранением их в базе ```ENV['MESSAGE_DB_NAME']``` в таблице **incoming** через [Publisher]() используя [IncomingStorageManager]() с базовым классом [StorageManager]().
Метод Tengri::Basemodule::HTTPServer.start
```ruby
def self.start
# Настройки Puma
server_options = {
Host: ENV['HTTP_SERVER_HOST'],
Port: ENV['HTTP_SERVER_PORT']
}
# Создание обработчика запросов
app = Proc.new do |env|
# Получение запроса и извлечение сообщения
request = Rack::Request.new(env)
message = request.body.read
# Журналирование сообщения
Tengri::Basemodule.logger.info("Pid #{Process.pid} #{module_name}-SERVER RECIEVE MESSAGE FROM #{sender_name(message)}")
Tengri::Basemodule.logger.info(message)
# Сохранение сообщения
response = Publisher.new(IncomingStorageManager.new,
OutgoingStorageManager.new).incoming(message)
# Возврат HTTP ответа
[200, {}, [response.to_s]]
end
# Запуск Web сервера
Rack::Handler::Puma.run(app, server_options)
end
```
### Схема работы BSM Poller
Задача Poller в цикле из базы ```ENV['MESSAGES_DB_NAME']``` получать сообщения из таблиц incoming, outgoing со статусом ```QUEUED``` , затем после обработки помечать их ```DEQUEUED```.
Poller использует gem [Wisper]() для реализации pub/sub.
При запуске Poller выполняются след. шаги:
1) На глобальные broadcast события подписывается адаптер модуля Wisper.subscribe(ENV['SUBSCRIBED_CLASS'])
2) На broadcast события подписывается [UseCaseAdapter]()
3) Создается пул потоков в количестве 2 штук посредством [ThreadsPool]()
4) Создаются Poller'ы для обработки входящих и исходящих сообщений [IncomingMessagesPoller]() / [OutgoingMessagesPoller]()
5) В цикле запускается проверка и обработка сообщений.
6) После обработки сообщение помечается статусом ```DEQUEUED```
### Poller - обработка incoming сообщений
Poller создает экземпляр IncomingMessagesPoller, затем в цикле проверяет очередь сообщений в методе [IncomingMessagesPoller.perform]().
Метод получает из таблицы incoming все сообщения со статустом `QUEQUED` и поочередно передает их в [UseCaseAdapter.perform](), который в свою очередь в зависимости от поля сообщения is_async :
```ruby
message.is_async ? process_message(message) : process_message_synchronously(message)
```
После обработки сообщение помечается `DEQUEUED`
Обработчики
```ruby
# асинхронная обработка сообщения путем вызова названия метода из
# значения атрибута behaviour
def process_message(message)
if message.params.nil?
# Создание глобального события с названием метода поведения
# для ранее подписанныех слушателей в Subscribe.initialize
publish(message.behaviour)
else
# Событие с параметрами
publish(message.behaviour, message.params)
end
end
def process_message_synchronously(message)
request_id = Tengri::Basemodule.generate_uuid
time = Tengri::Basemodule.date_in_rfc3339
if message.params.nil?
# need create and pass request_id to behaviour method
# if is not presented.
publish(message.behaviour, message.request_id)
else
# Вызов метода Адаптера с параметрами
response = subscribed_class.send(message.behaviour.to_sym, message.params)
address = { 'module' => message.sender_id, 'behaviour' => ''}
header = { "request_id" => request_id,
"response_id" => message.request_id,
"tracking_id" => message.tracking_id,
"requested_at" => time,
"sender_id" => ENV['MODULE_ID'],
"priority" => "23",
"is_async" => false
}
# Отправка сообщения-ответа
send_synchronously( address, response, header, is_response = true)
end
end
```
### Poller - обработка outgoing сообщений
Poller создает экземпляр OutgoingMessagesPoller, затем в цикле вызывет метод [OutgoingMessagesPoller.perform](), который получает из таблицы outgoing все сообщения со статустом `QUEQUED` и поочередно передает их в [Sender.perform](), который в свою очередь отправяет сообщения другим модулям через [HTTPTransport](). Как отмечалось ранее адреса модулей беруться из `ENV['MODULES_ZOO']`
> В код класса Sender мною было добавлено создание дополнительные поля sender/sender_adapter в структуре message.address, в которое помещается MODULE_ID, для идентефикации отправителя сообщения
```ruby
def set_sender_info
@body['address']['sender'] = ENV['MODULE_ID']
@body['address']['sender_adapter'] = ENV['SUBSCRIBED_CLASS']
end
```
### Глоссарий
*Модуль* - приложение реализующее часть бизнес-логики запущенное в отдельном процессе/контейнере.
*Сообщение* - Пакет данных в формате JSON передаваемый между модулями посредством HTTP/REST. см [Message](http://project.dev.tengriwallet.kz/browser/tengri/mod_base/tengri-basemodule/lib/tengri/basemodule/message-processing/message.rb)
*Адаптер* - Код в виде Класса/Модуля с методами реализующие определенную бизнес-логику сервиса. К примеру адаптер [PaymentAdapter](http://project.dev.tengriwallet.kz/browser/tengri/mod_payment/core/payment_adapter.rb) позволяет в сервисе [mod_payment](http://project.dev.tengriwallet.kz/browser/tengri/mod_payment) предоставлять другим сервисам доступ к методам [CreateContract](http://project.dev.tengriwallet.kz/browser/tengri/mod_payment/core/payment_adapter.rb#L7),[ExecuteContract](http://project.dev.tengriwallet.kz/browser/tengri/mod_payment/core/payment_adapter.rb#L22), и т.д
### Схема взаимодействия на примере двух модулей
### Структура таблиц
Таблица **incoming**
Имя колонки | Определение | Описание
--------- | ------- | -----------
id | serial NOT NULL | Уникальный идентификатор в очереди модуля
body | text | Object сообщения
status | text | Статус ```QUQUED``` или ```DEQUEUED```
created_at | timestamp without timezone | Время создания
updated_at | timestamp without timezone | Время обновления
request_id | text | ID запроса
response_id | text | ID ответа
tracking_id | text |
requested_at | text | Время запроса
sender_id | text | ID отправителя
priority | text | Приоритет ( по умолчанию 50 из матрицы требований )
is_async | boolean | Флаг определяющих синхронность обработки
Таблица **outgoing**
Имя колонки | Определение | Описание
--------- | ------- | -----------
id | serial NOT NULL | Уникальный идентификатор в очереди модуля
body | text | Object сообщения
status | text | Статус ```QUQUED``` или ```DEQUEUED```
created_at | timestamp without timezone | Время создания
updated_at | timestamp without timezone | Время обновления
request_id | text | ID запроса
response_id | text | ID ответа
tracking_id | text | (?)
requested_at | text | Время запроса
sender_id | text | ID отправителя
priority | text | Приоритет ( по умолчанию 50 из матрицы требований )
is_async | boolean | Флаг определяющих синхронность обработки
error_code | text | Код ошибки (?)
state_code | text | (?)
Объект body
Параметр | Значение | Описание
--------- | ------- | -----------
address | Object | Объект, содержащий адрес Класса и метода модуля, подписанного на этот род сообщений, и имя отправителя
params | Object | Объект, который передается в теле команды или запроса к модулю. Возможные объекты описаны далее, каждый в разделе модуля, который потребляет этот объект.
### Предложения для улучшения
* Вынести количество потоков в ENV для гибкой настройки модуля
* Вынести [max_tries](http://project.dev.tengriwallet.kz/browser/tengri/mod_base/tengri-basemodule/lib/tengri/basemodule/message-processing/use_case_adapter.rb#L91), base_sleep_seconds, max_sleep_seconds в ENV
* Рефакторинг
* Создать StoreManagerFactory возвращающий Incoming/Outgoing менеджеры
* Привести в порядок структуру файлов BSM.