# Base Module ### Описание модуля [BSM](http://project.dev.tengriwallet.kz/browser/tengri/mod_base/tengri-basemodule) модуль представляет собой реализацию механизма отправки/приема и обработки сообщений в формате HTTP/REST/JSON между модулями TengriWallet на основе [Матрицы требований](http://project.dev.tengriwallet.kz/wiki/Requirements/RequirementsMatrix#CMN). Сообщение хранится во входящей очереди, исходящей очереди базовой структуры модуля и передается между БСМ разных модулей нашей системы. В каждом конкретном модуле БСМ извлекает `body` из сообщения. По адресу БСМ находит поведение модуля, которому предназначено сообщение, и передает этому поведению `params` ### Подключение модуля После согласование с командой о добавлении нового модуля, необходимо: **Добавить в Gemfile** проекта нового модуля ```ruby gem 'log_cleaner', path: '../dependencies/log_cleaner' gem 'tengri-basemodule', path: '../mod_base/tengri-basemodule' ``` **Создать адаптер с поведением**. Добавить его имя и класс в переменных .env файла в корне модуля. Так же добавить название базы, ип:порт. ```bash # ID модуля. Используется в поле сообщения address.sender для # идентефикации отправителя сообщения, так же message.sender_id MODULES_ID=NewModule # Настройки Puma для Server HTTP_SERVER_HOST=localhost HTTP_SERVER_PORT=XXXX # обсуждается с архитектором # Настройки Poller. # Название класса адаптера предоставляющего интерфейс для других модулей SUBSCRIBED_CLASS=NewModule # Относительный путь от корня модуля к файлу реализации адаптера SUBSCRIBED_CLASS_LOCATION=./lib/new_module_adapter.rb # Название базы для хранения сообщений MESSAGES_DB_NAME=tengri_new_module_messages # Список модулей в системе TengriWallet. Poller при отправке сообщения определенному модулю берет его адрес из этой переменной MODULES_ZOO = {"API": "http://localhost:4000", "API2": "http://localhost:4012", ...} ``` Обновить .env файлы других модулей в tools-and-utils/distribution_package_routines/environments, добавить в MODULES_ZOO адрес нового модуля. ### Структура BSM BSM состоит из: * [Poller](http://project.dev.tengriwallet.kz/browser/tengri/mod_base/tengri-basemodule/lib/tengri/basemodule/poller.rb) - Обрабатывает сообщения из очереди * [Server](http://project.dev.tengriwallet.kz/browser/tengri/mod_base/tengri-basemodule/lib/tengri/basemodule/http_server.rb) - прием и сохранение сообщений в очередь для обработки. Каждый модуль использующий BSM запускает по одному экземпляру Poller и Server. Если проанализровать, что делает команда [tlc start](http://project.dev.tengriwallet.kz/browser/tengri/launch-control/start.sh) в локальном окружении разработчика,то мы видим, что с помощью [Invoker](http://project.dev.tengriwallet.kz/browser/tengri/launch-control/bin/invoker) поочередно из файла [invoker.ini](http://project.dev.tengriwallet.kz/browser/tengri/launch-control/invoker.ini) запускаются модули TengriWallet. ```ini [api-poller] directory = ../mod_api command = rvm in . do bundle exec poller [api-server] directory = ../mod_api command = rvm in . do bundle exec server [crm-poller] directory = ../mod_crm command = rvm in . do bundle exec poller [crm-server] directory = ../mod_crm command = rvm in . do bundle exec server ``` ### Схема работы BSM Server Основной задачей сервера является получение сообщений от других модулей с дальнейшим сохранением их в базе ```ENV['MESSAGE_DB_NAME']``` в таблице **incoming** через [Publisher]() используя [IncomingStorageManager]() с базовым классом [StorageManager](). Метод Tengri::Basemodule::HTTPServer.start ```ruby def self.start # Настройки Puma server_options = { Host: ENV['HTTP_SERVER_HOST'], Port: ENV['HTTP_SERVER_PORT'] } # Создание обработчика запросов app = Proc.new do |env| # Получение запроса и извлечение сообщения request = Rack::Request.new(env) message = request.body.read # Журналирование сообщения Tengri::Basemodule.logger.info("Pid #{Process.pid} #{module_name}-SERVER RECIEVE MESSAGE FROM #{sender_name(message)}") Tengri::Basemodule.logger.info(message) # Сохранение сообщения response = Publisher.new(IncomingStorageManager.new, OutgoingStorageManager.new).incoming(message) # Возврат HTTP ответа [200, {}, [response.to_s]] end # Запуск Web сервера Rack::Handler::Puma.run(app, server_options) end ``` ### Схема работы BSM Poller Задача Poller в цикле из базы ```ENV['MESSAGES_DB_NAME']``` получать сообщения из таблиц incoming, outgoing со статусом ```QUEUED``` , затем после обработки помечать их ```DEQUEUED```. Poller использует gem [Wisper]() для реализации pub/sub. При запуске Poller выполняются след. шаги: 1) На глобальные broadcast события подписывается адаптер модуля Wisper.subscribe(ENV['SUBSCRIBED_CLASS']) 2) На broadcast события подписывается [UseCaseAdapter]() 3) Создается пул потоков в количестве 2 штук посредством [ThreadsPool]() 4) Создаются Poller'ы для обработки входящих и исходящих сообщений [IncomingMessagesPoller]() / [OutgoingMessagesPoller]() 5) В цикле запускается проверка и обработка сообщений. 6) После обработки сообщение помечается статусом ```DEQUEUED``` ### Poller - обработка incoming сообщений Poller создает экземпляр IncomingMessagesPoller, затем в цикле проверяет очередь сообщений в методе [IncomingMessagesPoller.perform](). Метод получает из таблицы incoming все сообщения со статустом `QUEQUED` и поочередно передает их в [UseCaseAdapter.perform](), который в свою очередь в зависимости от поля сообщения is_async : ```ruby message.is_async ? process_message(message) : process_message_synchronously(message) ``` После обработки сообщение помечается `DEQUEUED` Обработчики ```ruby # асинхронная обработка сообщения путем вызова названия метода из # значения атрибута behaviour def process_message(message) if message.params.nil? # Создание глобального события с названием метода поведения # для ранее подписанныех слушателей в Subscribe.initialize publish(message.behaviour) else # Событие с параметрами publish(message.behaviour, message.params) end end def process_message_synchronously(message) request_id = Tengri::Basemodule.generate_uuid time = Tengri::Basemodule.date_in_rfc3339 if message.params.nil? # need create and pass request_id to behaviour method # if is not presented. publish(message.behaviour, message.request_id) else # Вызов метода Адаптера с параметрами response = subscribed_class.send(message.behaviour.to_sym, message.params) address = { 'module' => message.sender_id, 'behaviour' => ''} header = { "request_id" => request_id, "response_id" => message.request_id, "tracking_id" => message.tracking_id, "requested_at" => time, "sender_id" => ENV['MODULE_ID'], "priority" => "23", "is_async" => false } # Отправка сообщения-ответа send_synchronously( address, response, header, is_response = true) end end ``` ### Poller - обработка outgoing сообщений Poller создает экземпляр OutgoingMessagesPoller, затем в цикле вызывет метод [OutgoingMessagesPoller.perform](), который получает из таблицы outgoing все сообщения со статустом `QUEQUED` и поочередно передает их в [Sender.perform](), который в свою очередь отправяет сообщения другим модулям через [HTTPTransport](). Как отмечалось ранее адреса модулей беруться из `ENV['MODULES_ZOO']` > В код класса Sender мною было добавлено создание дополнительные поля sender/sender_adapter в структуре message.address, в которое помещается MODULE_ID, для идентефикации отправителя сообщения ```ruby def set_sender_info @body['address']['sender'] = ENV['MODULE_ID'] @body['address']['sender_adapter'] = ENV['SUBSCRIBED_CLASS'] end ``` ### Глоссарий *Модуль* - приложение реализующее часть бизнес-логики запущенное в отдельном процессе/контейнере. *Сообщение* - Пакет данных в формате JSON передаваемый между модулями посредством HTTP/REST. см [Message](http://project.dev.tengriwallet.kz/browser/tengri/mod_base/tengri-basemodule/lib/tengri/basemodule/message-processing/message.rb) *Адаптер* - Код в виде Класса/Модуля с методами реализующие определенную бизнес-логику сервиса. К примеру адаптер [PaymentAdapter](http://project.dev.tengriwallet.kz/browser/tengri/mod_payment/core/payment_adapter.rb) позволяет в сервисе [mod_payment](http://project.dev.tengriwallet.kz/browser/tengri/mod_payment) предоставлять другим сервисам доступ к методам [CreateContract](http://project.dev.tengriwallet.kz/browser/tengri/mod_payment/core/payment_adapter.rb#L7),[ExecuteContract](http://project.dev.tengriwallet.kz/browser/tengri/mod_payment/core/payment_adapter.rb#L22), и т.д ### Схема взаимодействия на примере двух модулей ### Структура таблиц Таблица **incoming** Имя колонки | Определение | Описание --------- | ------- | ----------- id | serial NOT NULL | Уникальный идентификатор в очереди модуля body | text | Object сообщения status | text | Статус ```QUQUED``` или ```DEQUEUED``` created_at | timestamp without timezone | Время создания updated_at | timestamp without timezone | Время обновления request_id | text | ID запроса response_id | text | ID ответа tracking_id | text | requested_at | text | Время запроса sender_id | text | ID отправителя priority | text | Приоритет ( по умолчанию 50 из матрицы требований ) is_async | boolean | Флаг определяющих синхронность обработки Таблица **outgoing** Имя колонки | Определение | Описание --------- | ------- | ----------- id | serial NOT NULL | Уникальный идентификатор в очереди модуля body | text | Object сообщения status | text | Статус ```QUQUED``` или ```DEQUEUED``` created_at | timestamp without timezone | Время создания updated_at | timestamp without timezone | Время обновления request_id | text | ID запроса response_id | text | ID ответа tracking_id | text | (?) requested_at | text | Время запроса sender_id | text | ID отправителя priority | text | Приоритет ( по умолчанию 50 из матрицы требований ) is_async | boolean | Флаг определяющих синхронность обработки error_code | text | Код ошибки (?) state_code | text | (?) Объект body Параметр | Значение | Описание --------- | ------- | ----------- address | Object | Объект, содержащий адрес Класса и метода модуля, подписанного на этот род сообщений, и имя отправителя params | Object | Объект, который передается в теле команды или запроса к модулю. Возможные объекты описаны далее, каждый в разделе модуля, который потребляет этот объект. ### Предложения для улучшения * Вынести количество потоков в ENV для гибкой настройки модуля * Вынести [max_tries](http://project.dev.tengriwallet.kz/browser/tengri/mod_base/tengri-basemodule/lib/tengri/basemodule/message-processing/use_case_adapter.rb#L91), base_sleep_seconds, max_sleep_seconds в ENV * Рефакторинг * Создать StoreManagerFactory возвращающий Incoming/Outgoing менеджеры * Привести в порядок структуру файлов BSM.