# 用 AWS SQS + Shoryuken 寄信
## SQS
官網說明:
Amazon Simple Queue Service (SQS) 是全受管**訊息佇列服務**,可讓您分離和擴展微型服務、分散式系統及無伺服器應用程式。SQS 可免除與管理和操作訊息導向中介軟體相關的複雜性及開銷,也可讓開發人員專注在與眾不同的工作上。您可以使用 SQS 在軟體元件之間傳送、存放和接收不限數量的訊息,不會遺失訊息或需要其他服務可用。使用 AWS 主控台、命令列界面或自選的 SDK 以及三個簡單的命令,即可在幾分鐘內開始使用 SQS。
用途

運作方式

優點:
- Cheap ($0.50 per 1 million Amazon SQS Requests)
- SQS is built to scale. You are taking advantage of Amazon’s amazing infrastructure so it’s easy to scale your workers.
- Amazon provides a simple console to watch your queues as well as configure what happens to dead messages.
- Amazon’s Ruby SDK is very flexible when it comes to creating queues. You can create as many as you want.
## Shoryuken
Purpose: 用來更方便操作和設定 aws sqs 的 gem
### gem
```
gem 'shoryuken'
gem 'aws-sdk-sqs'
gem 'oj'
```
#### 設定環境變數
AWS環境變數(access_key, region...)
production:
`ENABLE_SHORYUKEN_INLINE_EXECUTER=''`
developing or testing:
`ENABLE_SHORYUKEN_INLINE_EXECUTER='Y'`
#### 設定檔 Configuration:
```ruby
#config/shoryuken.yml
logfile: ./log/shoryuken.log
pidfile: ./tmp/pids/shoryuken.pid
concurrency: 6 #The number of threads available for processing messages at a time. Default: 25
delay: 10 #queue 沒東西時延遲秒數
queues:
- [ queue1, 3 ]
- [ queue2, 1 ] # Shoryuken to fetch messages in cycles of queue1 3 times, then queue2 1 times
#- [ be_async_low_300, 1]
```
```ruby
#config/initializers/shoryuken.rb
Shoryuken.sqs_client =
Aws::SQS::Client.new({
access_key_id: ENV['AWS_ACCESS_KEY_ID'],
secret_access_key: ENV['AWS_SECRET_ACCESS_KEY'],
region: ENV['AWS_REGION']
})
Shoryuken.worker_executor =
ENV['ENABLE_SHORYUKEN_INLINE_EXECUTER'].present? ?
Shoryuken::Worker::InlineExecutor : #測試用,不會跑到 queue 裡面
Shoryuken::Worker::DefaultExecutor #正式用
```
### Shoryuken Job example
- queue -> 設定檔的 queue
- sqs_msg -> shoryuken 給 sqs 的參數
- params -> 要傳遞的訊息,json格式
```ruby
class HelloWorker
include Shoryuken::Worker
shoryuken_options queue: 'queue1', auto_delete: true
def perform(sqs_msg, name)
puts "Hello, #{name}"
end
end
```
Run:
```
HelloWorker.perform_async('Ahoy')
```
## source code:
Shoryuken::Worker::InlineExecutor
```ruby
# File 'lib/shoryuken/worker/inline_executor.rb', line 5
def perform_async(worker_class, body, options = {})
body = JSON.dump(body) if body.is_a?(Hash)
queue_name = options.delete(:queue) || worker_class.get_shoryuken_options['queue']
sqs_msg = OpenStruct.new(
body: body,
attributes: nil,
md5_of_body: nil,
md5_of_message_attributes: nil,
message_attributes: nil,
message_id: nil,
receipt_handle: nil,
delete: nil,
queue_name: queue_name
)
call(worker_class, sqs_msg) #直接執行
end
```
```ruby
#worker_class.get_shoryuken_options
{
"queue" => "queue1",
"delete" => false,
"auto_delete" => false,
"auto_visibility_timeout" => false,
"retry_intervals" => nil,
"batch" => false
}
```
Shoryuken::Worker::DefaultExecutor
```ruby
# File 'lib/shoryuken/worker/default_executor.rb', line 5
def perform_async(worker_class, body, options = {})
options[:message_attributes] ||= {}
options[:message_attributes]['shoryuken_class'] = {
string_value: worker_class.to_s,
data_type: 'String'
}
options[:message_body] = body
queue = options.delete(:queue) || worker_class.get_shoryuken_options['queue']
Shoryuken::Client.queues(queue).send_message(options)
end
```
```ruby
# File 'lib/shoryuken/client.rb', line 6
def queues(name)
@@queues[name.to_s] ||= Shoryuken::Queue.new(sqs, name)
end
```
## Shoryuken Job for mailer
```ruby
module Mails::Shoryuken
class BaseJob
include Shoryuken::Worker
shoryuken_options queue: Configs.aws.sqs.be_async_low_300, auto_delete: true
def perform(sqs_msg, params_str, mailer = ::UserMailer)
puts "run Mails::Shoryuken::#{mailer}Job"
arguments = Oj.load(params_str).transform_keys(&:to_sym) #json to hash
mail_method = arguments&.dig(:mail_method)
mailer.send(mail_method, **arguments.except!(:mail_method)).deliver_now #寄信
rescue => e
puts e.message
puts e.backtrace
data = { mailer: mailer, mail_method: mail_method, params_str: params_str }
data[:params_str] = Oj.load(data[:params_str])
#錯誤信通知
ErrorEmail::NormalService.new(
subject: "[Shoryuken Error] [#{mailer}##{mail_method}] Failed to send mail.",
mail_body: data,
error: e,
delay: false
).perform
end
def self.send_mail(mail_args)
params_str = Oj.dump(mail_args) # hash to json
return false unless params_str.present?
perform_async(params_str)
end
end
end
```
```ruby
module Mails::Shoryuken
class UserMailerJob < BaseJob
def perform(sqs_msg, params_str, mailer = ::UserMailer)
super
end
end
end
```
```ruby
module Mails::Shoryuken
class NotifyUserMailerJob < BaseJob
def perform(sqs_msg, params_str, mailer = ::NotifyUserMailer)
super
end
end
end
```
User mailer
```ruby
class UserMailer < Devise::Mailer
def welcome_splashtop(user_id:, cc: nil)
user = User.find_by(id: user_id)
subject = I18n.t('mailer.welcome.title')
mail(to: user.email, cc: cc, subject: subject)
end
end
```
Run:
```ruby
UserMailer.welcome_splashtop(user_id: 123, cc: 'cc@mail.test').deliver_later
->
Mails::Shoryuken::UserMailerJob.send_mail({
mail_method: :welcome_splashtop,
user_id: 123,
cc: 'cc@mail.test'
})
```
### Spec
Shoryuken Job spec:
```ruby
require 'spec_helper'
describe Mails::Shoryuken::BaseJob do
let(:user) { create(:user) }
let(:params_str) { Oj.dump(mail_method: :welcome_splashtop, user_id: user.id, cc: 'cc@mail.test' ) }
let(:sqs_msg) { double message_id: 'fc754df7-9cc2-4c41-96ca-5996a44b771e', body: params_str, delete: nil }
let(:error_subject) { 'Failed to send mail.' }
describe '#perform' do
subject { described_class.new }
it 'deletes the message' do
expect(sqs_msg).to receive(:delete)
subject.perform(sqs_msg, params_str)
end
context 'success' do
it 'prints the job start message' do
expect { subject.perform(sqs_msg, params_str) }.to output("run Mails::Shoryuken::UserMailerJob\n").to_stdout
end
it 'send normal email' do
subject.perform(sqs_msg, params_str)
expect(ActionMailer::Base.deliveries.last.subject).not_to include(error_subject)
end
end
context 'faild' do
let(:params_str) { Oj.dump({mail_method: :not_exist_method, other_params: '12345'}) }
let(:mail) { ActionMailer::Base.deliveries.last }
it 'send error email' do
subject.perform(sqs_msg, params_str)
expect(mail.subject).to include(error_subject, 'UserMailer', 'not_exist_method')
expect(mail.body.encoded).to include('UserMailer', 'not_exist_method', '12345')
end
end
end
end
```
mailer spec:
```ruby
require 'spec_helper'
RSpec.describe UserMailer, type: :mailer do
describe '#welcome_splashtop' do
let(:subject) { I18n.t('mailer.welcome.title') }
let(:cc) { 'cc@mail.test' }
let(:user) { create(:user) }
let(:mail) { described_class.welcome_splashtop(user_id: user.id, cc: cc) }
it 'send text and html' do
expect(mail.body.parts.length).to eq(2)
expect(mail.body.parts.to_a.map(&:content_type)).to match_array(["text/html; charset=UTF-8", "text/plain; charset=UTF-8"])
end
it 'renders subject and body' do
expect(mail.subject).to eq(subject)
expect(mail.to).to eq([user.email])
expect(mail.cc).to eq([cc])
end
end
end
```
### Ref:
https://aws.amazon.com/tw/sqs/
https://docs.aws.amazon.com/zh_tw/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-how-it-works.html
https://github.com/phstc/shoryuken/wiki/Shoryuken-Inline-adapter
https://youtu.be/vLNDaZuA3Dc
https://www.rubydoc.info/gems/shoryuken/5.2.0/index