# 用 AWS SQS + Shoryuken 寄信 ## SQS 官網說明: Amazon Simple Queue Service (SQS) 是全受管**訊息佇列服務**,可讓您分離和擴展微型服務、分散式系統及無伺服器應用程式。SQS 可免除與管理和操作訊息導向中介軟體相關的複雜性及開銷,也可讓開發人員專注在與眾不同的工作上。您可以使用 SQS 在軟體元件之間傳送、存放和接收不限數量的訊息,不會遺失訊息或需要其他服務可用。使用 AWS 主控台、命令列界面或自選的 SDK 以及三個簡單的命令,即可在幾分鐘內開始使用 SQS。 用途 ![](https://i.imgur.com/mdXsbbe.png) 運作方式 ![](https://i.imgur.com/zwPU6wE.png) 優點: - Cheap ($0.50 per 1 million Amazon SQS Requests) - SQS is built to scale. You are taking advantage of Amazon’s amazing infrastructure so it’s easy to scale your workers. - Amazon provides a simple console to watch your queues as well as configure what happens to dead messages. - Amazon’s Ruby SDK is very flexible when it comes to creating queues. You can create as many as you want. ## Shoryuken Purpose: 用來更方便操作和設定 aws sqs 的 gem ### gem ``` gem 'shoryuken' gem 'aws-sdk-sqs' gem 'oj' ``` #### 設定環境變數 AWS環境變數(access_key, region...) production: `ENABLE_SHORYUKEN_INLINE_EXECUTER=''` developing or testing: `ENABLE_SHORYUKEN_INLINE_EXECUTER='Y'` #### 設定檔 Configuration: ```ruby #config/shoryuken.yml logfile: ./log/shoryuken.log pidfile: ./tmp/pids/shoryuken.pid concurrency: 6 #The number of threads available for processing messages at a time. Default: 25 delay: 10 #queue 沒東西時延遲秒數 queues: - [ queue1, 3 ] - [ queue2, 1 ] # Shoryuken to fetch messages in cycles of queue1 3 times, then queue2 1 times #- [ be_async_low_300, 1] ``` ```ruby #config/initializers/shoryuken.rb Shoryuken.sqs_client = Aws::SQS::Client.new({ access_key_id: ENV['AWS_ACCESS_KEY_ID'], secret_access_key: ENV['AWS_SECRET_ACCESS_KEY'], region: ENV['AWS_REGION'] }) Shoryuken.worker_executor = ENV['ENABLE_SHORYUKEN_INLINE_EXECUTER'].present? ? Shoryuken::Worker::InlineExecutor : #測試用,不會跑到 queue 裡面 Shoryuken::Worker::DefaultExecutor #正式用 ``` ### Shoryuken Job example - queue -> 設定檔的 queue - sqs_msg -> shoryuken 給 sqs 的參數 - params -> 要傳遞的訊息,json格式 ```ruby class HelloWorker include Shoryuken::Worker shoryuken_options queue: 'queue1', auto_delete: true def perform(sqs_msg, name) puts "Hello, #{name}" end end ``` Run: ``` HelloWorker.perform_async('Ahoy') ``` ## source code: Shoryuken::Worker::InlineExecutor ```ruby # File 'lib/shoryuken/worker/inline_executor.rb', line 5 def perform_async(worker_class, body, options = {}) body = JSON.dump(body) if body.is_a?(Hash) queue_name = options.delete(:queue) || worker_class.get_shoryuken_options['queue'] sqs_msg = OpenStruct.new( body: body, attributes: nil, md5_of_body: nil, md5_of_message_attributes: nil, message_attributes: nil, message_id: nil, receipt_handle: nil, delete: nil, queue_name: queue_name ) call(worker_class, sqs_msg) #直接執行 end ``` ```ruby #worker_class.get_shoryuken_options { "queue" => "queue1", "delete" => false, "auto_delete" => false, "auto_visibility_timeout" => false, "retry_intervals" => nil, "batch" => false } ``` Shoryuken::Worker::DefaultExecutor ```ruby # File 'lib/shoryuken/worker/default_executor.rb', line 5 def perform_async(worker_class, body, options = {}) options[:message_attributes] ||= {} options[:message_attributes]['shoryuken_class'] = { string_value: worker_class.to_s, data_type: 'String' } options[:message_body] = body queue = options.delete(:queue) || worker_class.get_shoryuken_options['queue'] Shoryuken::Client.queues(queue).send_message(options) end ``` ```ruby # File 'lib/shoryuken/client.rb', line 6 def queues(name) @@queues[name.to_s] ||= Shoryuken::Queue.new(sqs, name) end ``` ## Shoryuken Job for mailer ```ruby module Mails::Shoryuken class BaseJob include Shoryuken::Worker shoryuken_options queue: Configs.aws.sqs.be_async_low_300, auto_delete: true def perform(sqs_msg, params_str, mailer = ::UserMailer) puts "run Mails::Shoryuken::#{mailer}Job" arguments = Oj.load(params_str).transform_keys(&:to_sym) #json to hash mail_method = arguments&.dig(:mail_method) mailer.send(mail_method, **arguments.except!(:mail_method)).deliver_now #寄信 rescue => e puts e.message puts e.backtrace data = { mailer: mailer, mail_method: mail_method, params_str: params_str } data[:params_str] = Oj.load(data[:params_str]) #錯誤信通知 ErrorEmail::NormalService.new( subject: "[Shoryuken Error] [#{mailer}##{mail_method}] Failed to send mail.", mail_body: data, error: e, delay: false ).perform end def self.send_mail(mail_args) params_str = Oj.dump(mail_args) # hash to json return false unless params_str.present? perform_async(params_str) end end end ``` ```ruby module Mails::Shoryuken class UserMailerJob < BaseJob def perform(sqs_msg, params_str, mailer = ::UserMailer) super end end end ``` ```ruby module Mails::Shoryuken class NotifyUserMailerJob < BaseJob def perform(sqs_msg, params_str, mailer = ::NotifyUserMailer) super end end end ``` User mailer ```ruby class UserMailer < Devise::Mailer def welcome_splashtop(user_id:, cc: nil) user = User.find_by(id: user_id) subject = I18n.t('mailer.welcome.title') mail(to: user.email, cc: cc, subject: subject) end end ``` Run: ```ruby UserMailer.welcome_splashtop(user_id: 123, cc: 'cc@mail.test').deliver_later -> Mails::Shoryuken::UserMailerJob.send_mail({ mail_method: :welcome_splashtop, user_id: 123, cc: 'cc@mail.test' }) ``` ### Spec Shoryuken Job spec: ```ruby require 'spec_helper' describe Mails::Shoryuken::BaseJob do let(:user) { create(:user) } let(:params_str) { Oj.dump(mail_method: :welcome_splashtop, user_id: user.id, cc: 'cc@mail.test' ) } let(:sqs_msg) { double message_id: 'fc754df7-9cc2-4c41-96ca-5996a44b771e', body: params_str, delete: nil } let(:error_subject) { 'Failed to send mail.' } describe '#perform' do subject { described_class.new } it 'deletes the message' do expect(sqs_msg).to receive(:delete) subject.perform(sqs_msg, params_str) end context 'success' do it 'prints the job start message' do expect { subject.perform(sqs_msg, params_str) }.to output("run Mails::Shoryuken::UserMailerJob\n").to_stdout end it 'send normal email' do subject.perform(sqs_msg, params_str) expect(ActionMailer::Base.deliveries.last.subject).not_to include(error_subject) end end context 'faild' do let(:params_str) { Oj.dump({mail_method: :not_exist_method, other_params: '12345'}) } let(:mail) { ActionMailer::Base.deliveries.last } it 'send error email' do subject.perform(sqs_msg, params_str) expect(mail.subject).to include(error_subject, 'UserMailer', 'not_exist_method') expect(mail.body.encoded).to include('UserMailer', 'not_exist_method', '12345') end end end end ``` mailer spec: ```ruby require 'spec_helper' RSpec.describe UserMailer, type: :mailer do describe '#welcome_splashtop' do let(:subject) { I18n.t('mailer.welcome.title') } let(:cc) { 'cc@mail.test' } let(:user) { create(:user) } let(:mail) { described_class.welcome_splashtop(user_id: user.id, cc: cc) } it 'send text and html' do expect(mail.body.parts.length).to eq(2) expect(mail.body.parts.to_a.map(&:content_type)).to match_array(["text/html; charset=UTF-8", "text/plain; charset=UTF-8"]) end it 'renders subject and body' do expect(mail.subject).to eq(subject) expect(mail.to).to eq([user.email]) expect(mail.cc).to eq([cc]) end end end ``` ### Ref: https://aws.amazon.com/tw/sqs/ https://docs.aws.amazon.com/zh_tw/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-how-it-works.html https://github.com/phstc/shoryuken/wiki/Shoryuken-Inline-adapter https://youtu.be/vLNDaZuA3Dc https://www.rubydoc.info/gems/shoryuken/5.2.0/index