# What is Amazon SQS(Simple Queue Service)
> 什麼是SQS?它是一個message queue([訊息佇列](https://medium.com/@b98901052/%E4%BB%80%E9%BA%BC%E6%98%AFmessage-queue-%E5%84%AA%E9%BB%9E%E5%8F%8A%E4%BD%BF%E7%94%A8%E5%A0%B4%E6%99%AF-23d6a39cc4f2)),當producer產生資料時傳送資訊給consumer過程中,用來存放message的緩衝區。像是fb通知,像這種不是即時處理的訊息,就可以放在queue,然後讓其他服務處理。像有些情況會有多個worker需要處理,這時候queue就扮演著重要的角色,降低worker的耦合性,將過程變得比較簡單,也比較好維護。有時候服務量比較大時或是系統異常時會造成塞車的情況,所以放在queue可以避免遺失,等到系統回復時再慢慢處理。


Standard Queue VS FIFO Queue
---
| Standard Queue | FIFO Queue |
| ------------------------------------------------------- |:---------- |
| 1. 每秒訊息的傳輸量(TPS)幾乎是沒有限制 | 1. 高傳輸量,每秒訊息的傳輸量(TPS)是3000 |
| 2. 每個訊息至少傳送一次,但有時候同一個訊息不只傳送一次 | 2. 訊息只處理一次,不會重複處理 |
| 3. 盡可能維持訊息的順序,但一則 | 3. 按照 first in first out 傳送與接收 |
SQS架構
---
SQS 的架構如下圖,其主要由三個部分所組成,分別為分散系統、server上的queue及message。SQS會將message複製到很多台queue server,但並不是每台都是儲存所有的message,pull時可能那台server 只有message A、C 或 B、C、D 等,所以自己的server 如果有許多台,就可能會拿到不同或重複的messages 。

SQS訊息週期
---
SQS的訊息週期如下圖所示:

1. 需要傳送訊息的producer將選取Amazon SQS佇列,並使用SendMessage將新訊息A傳送到佇列
2. consumer開始接收處理queue的訊息,並使用ReceiveMessage然後傳回此訊息,其中當訊息被接收後,仍然留在queue不會馬上消失,這是由於Amazon SQS是分散式系統,不保證consumer會實際接收到訊息(例如因為連線問題或consumer應用程式的問題而未接收到)。因此consumer必須在接收和處理訊息後,才可將訊息從佇列上刪除
3. 但是如果訊息持續留在queue,可能會造成其他consuner拿到同一筆message,為了避免此強況發生,Amazon SQS會設定visibility timeout,也就是說在visibility timeout這段時間內,其他consumer不能重複處理同一筆message,除非visibility timeout已過,否則不會由其他ReceiveMessage傳回
4. 如果consumer順利接收訊息,呼叫DeleteMessage從queue刪除message,這樣可以避免在visibility timeout過後,其他consumer處理同一筆message
Long Polling VS Short Polling
---
其中consumer主要是透過兩種方式來接收訊息,分別為long polling及short polling。
* short polling: 如下圖所示,consumer可以很快的從queue server接收到message,但也因為時間太短,所以沒有辦法將每個queue的message都拿到,是基於weghted random的方法,找尋部分queue server裡的message並且傳回給consumer,缺點是太過於頻繁使用SQS,是成本偏高。

* long polling: consumer把接收message的時間拉長,在這段時間內如果queue裡沒有其他message,則接收的request會暫時block,等到queue有message時才解除block,這樣可以避免過於頻繁使用SQS,減少使用成本。
程式碼實作
---
步驟分別為create queue, list queue, get queue url, send message, receive message, delete message, delete queue
```
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)
const (
CredPath = ".aws/credentials"
CredProfile = "op"
Queueurl = "https://sqs.us-west-2.amazonaws.com/478205036267/queue1s"
)
```
```
sess := session.New(&aws.Config{
Region: aws.String("us-west-2"),
Credentials: credentials.NewSharedCredentials(CredPath, CredProfile),
})
svc := sqs.New(sess)
```
開始建立一個queue,其中參數DelaySeconds是指,consumer在這段時間內是看不到任何傳到這個queue裡面的message,MessageRetentionPeriod是指訊息保留的時間,成功建立後可以拿到QueueUrl。
```
result, err := svc.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String("queue1"),
Attributes: map[string]*string{
"DelaySeconds": aws.String("60"),
"MessageRetentionPeriod": aws.String("86400"),
// "FifoQuesue": aws.String("true"),
},
})
if err != nil {
fmt.Println("Error", err)
return
}
fmt.Println("Success", *result.QueueUrl)
```

列出所有建立的queue,其中ListQueues所帶的參數如果是nil,代表列出所有的queue,最多能列出1000個。
```
results, err := svc.ListQueues(nil)
if err != nil {
fmt.Println("Error", err)
return
}
fmt.Println("Success")
for i, urls := range results.QueueUrls {
if urls == nil {
continue
}
fmt.Printf("%d: %s\n", i, *urls)
}
```

但是如果要列出特定的queue,可以使用指定queue名稱字首的方式。
```
queueName := "queue1.fi"
results, err := svc.ListQueues(&sqs.ListQueuesInput{QueueNamePrefix: &queueName}) //svc.ListQueues(nil)
if err != nil {
fmt.Println("Error", err)
return
}
fmt.Println("Success")
for i, urls := range results.QueueUrls {
if urls == nil {
continue
}
fmt.Printf("%d: %s\n", i, *urls)
}
```

給定queue的名稱就能拿到QueueUrl
```
url, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{
QueueName: aws.String("queue1s"),
})
if err != nil {
fmt.Println("Error", err)
return
}
fmt.Println("Success", *url.QueueUrl)
```

開始傳送message到指定的queue,並且輸入要傳送的message,傳送成功便可拿到一組MD5及MessageId
```
send_message := &sqs.SendMessageInput{
MessageBody: aws.String("Hi, Andy"),
QueueUrl: aws.String(Queueurl),
DelaySeconds: aws.Int64(3),
}
Sendresp, err := svc.SendMessage(send_message)
if err != nil {
panic(err)
}
fmt.Printf("Send message \n%v \n\n", Sendresp)
```

成功傳送之後,就可以檢查是否可以接收到此message,參數MaxNumberOfMessages是指設定一次最多message數量, WaitTimeSeconds是設定一個long polling的時間。
```
receive_message := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(Queueurl),
MaxNumberOfMessages: aws.Int64(3), //設定一次最多message數量
VisibilityTimeout: aws.Int64(10), //如果此message沒被刪除,必須等10秒後才能拿
WaitTimeSeconds: aws.Int64(20), //long polling 方式取,會建立一條長連線並且等在那邊,直到 SQS 收到新 message 回傳給這條連線才中斷
}
Receivwresp, err := svc.ReceiveMessage(receive_message)
if err != nil {
panic(err)
}
fmt.Printf("Send message \n%v \n\n", Receivwresp)
```

在接收完message之後,建議能夠將此messag刪除,避免其他consumer重複拿到。
```
for _, message := range Receivwresp.Messages {
delete_message := &sqs.DeleteMessageInput{
QueueUrl: aws.String(Queueurl),
ReceiptHandle: message.ReceiptHandle, //用來提供刪除message的憑證
}
_, err := svc.DeleteMessage(delete_message)
if err != nil {
panic(err)
}
fmt.Printf("Delete message \nMessage ID: %s has beed deleted.\n\n", *message.MessageId)
}
```

參考文獻
---
1. https://docs.aws.amazon.com/zh_tw/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html#standard-queues-at-least-once-delivery
2. https://ithelp.ithome.com.tw/articles/10194654
3. https://ithelp.ithome.com.tw/articles/10219553
4. http://pragmaticnotes.com/2017/11/20/amazon-sqs-long-polling-versus-short-polling/