# What is Amazon SQS(Simple Queue Service) > 什麼是SQS?它是一個message queue([訊息佇列](https://medium.com/@b98901052/%E4%BB%80%E9%BA%BC%E6%98%AFmessage-queue-%E5%84%AA%E9%BB%9E%E5%8F%8A%E4%BD%BF%E7%94%A8%E5%A0%B4%E6%99%AF-23d6a39cc4f2)),當producer產生資料時傳送資訊給consumer過程中,用來存放message的緩衝區。像是fb通知,像這種不是即時處理的訊息,就可以放在queue,然後讓其他服務處理。像有些情況會有多個worker需要處理,這時候queue就扮演著重要的角色,降低worker的耦合性,將過程變得比較簡單,也比較好維護。有時候服務量比較大時或是系統異常時會造成塞車的情況,所以放在queue可以避免遺失,等到系統回復時再慢慢處理。 ![](https://i.imgur.com/b3WqNQi.png) ![](https://i.imgur.com/QMvruxp.png) Standard Queue VS FIFO Queue --- | Standard Queue | FIFO Queue | | ------------------------------------------------------- |:---------- | | 1. 每秒訊息的傳輸量(TPS)幾乎是沒有限制 | 1. 高傳輸量,每秒訊息的傳輸量(TPS)是3000 | | 2. 每個訊息至少傳送一次,但有時候同一個訊息不只傳送一次 | 2. 訊息只處理一次,不會重複處理 | | 3. 盡可能維持訊息的順序,但一則 | 3. 按照 first in first out 傳送與接收 | SQS架構 --- SQS 的架構如下圖,其主要由三個部分所組成,分別為分散系統、server上的queue及message。SQS會將message複製到很多台queue server,但並不是每台都是儲存所有的message,pull時可能那台server 只有message A、C 或 B、C、D 等,所以自己的server 如果有許多台,就可能會拿到不同或重複的messages 。 ![](https://i.imgur.com/hE1M7HN.png) SQS訊息週期 --- SQS的訊息週期如下圖所示: ![](https://i.imgur.com/UDpyvuD.png) 1. 需要傳送訊息的producer將選取Amazon SQS佇列,並使用SendMessage將新訊息A傳送到佇列 2. consumer開始接收處理queue的訊息,並使用ReceiveMessage然後傳回此訊息,其中當訊息被接收後,仍然留在queue不會馬上消失,這是由於Amazon SQS是分散式系統,不保證consumer會實際接收到訊息(例如因為連線問題或consumer應用程式的問題而未接收到)。因此consumer必須在接收和處理訊息後,才可將訊息從佇列上刪除 3. 但是如果訊息持續留在queue,可能會造成其他consuner拿到同一筆message,為了避免此強況發生,Amazon SQS會設定visibility timeout,也就是說在visibility timeout這段時間內,其他consumer不能重複處理同一筆message,除非visibility timeout已過,否則不會由其他ReceiveMessage傳回 4. 如果consumer順利接收訊息,呼叫DeleteMessage從queue刪除message,這樣可以避免在visibility timeout過後,其他consumer處理同一筆message Long Polling VS Short Polling --- 其中consumer主要是透過兩種方式來接收訊息,分別為long polling及short polling。 * short polling: 如下圖所示,consumer可以很快的從queue server接收到message,但也因為時間太短,所以沒有辦法將每個queue的message都拿到,是基於weghted random的方法,找尋部分queue server裡的message並且傳回給consumer,缺點是太過於頻繁使用SQS,是成本偏高。 ![](https://i.imgur.com/tFRkUPV.png) * long polling: consumer把接收message的時間拉長,在這段時間內如果queue裡沒有其他message,則接收的request會暫時block,等到queue有message時才解除block,這樣可以避免過於頻繁使用SQS,減少使用成本。 程式碼實作 --- 步驟分別為create queue, list queue, get queue url, send message, receive message, delete message, delete queue ``` import ( "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" ) const ( CredPath = ".aws/credentials" CredProfile = "op" Queueurl = "https://sqs.us-west-2.amazonaws.com/478205036267/queue1s" ) ``` ``` sess := session.New(&aws.Config{ Region: aws.String("us-west-2"), Credentials: credentials.NewSharedCredentials(CredPath, CredProfile), }) svc := sqs.New(sess) ``` 開始建立一個queue,其中參數DelaySeconds是指,consumer在這段時間內是看不到任何傳到這個queue裡面的message,MessageRetentionPeriod是指訊息保留的時間,成功建立後可以拿到QueueUrl。 ``` result, err := svc.CreateQueue(&sqs.CreateQueueInput{ QueueName: aws.String("queue1"), Attributes: map[string]*string{ "DelaySeconds": aws.String("60"), "MessageRetentionPeriod": aws.String("86400"), // "FifoQuesue": aws.String("true"), }, }) if err != nil { fmt.Println("Error", err) return } fmt.Println("Success", *result.QueueUrl) ``` ![](https://i.imgur.com/EUyk7EJ.png) 列出所有建立的queue,其中ListQueues所帶的參數如果是nil,代表列出所有的queue,最多能列出1000個。 ``` results, err := svc.ListQueues(nil) if err != nil { fmt.Println("Error", err) return } fmt.Println("Success") for i, urls := range results.QueueUrls { if urls == nil { continue } fmt.Printf("%d: %s\n", i, *urls) } ``` ![](https://i.imgur.com/2ISipJ7.png) 但是如果要列出特定的queue,可以使用指定queue名稱字首的方式。 ``` queueName := "queue1.fi" results, err := svc.ListQueues(&sqs.ListQueuesInput{QueueNamePrefix: &queueName}) //svc.ListQueues(nil) if err != nil { fmt.Println("Error", err) return } fmt.Println("Success") for i, urls := range results.QueueUrls { if urls == nil { continue } fmt.Printf("%d: %s\n", i, *urls) } ``` ![](https://i.imgur.com/LdfuHmA.png) 給定queue的名稱就能拿到QueueUrl ``` url, err := svc.GetQueueUrl(&sqs.GetQueueUrlInput{ QueueName: aws.String("queue1s"), }) if err != nil { fmt.Println("Error", err) return } fmt.Println("Success", *url.QueueUrl) ``` ![](https://i.imgur.com/IXBZYcc.png) 開始傳送message到指定的queue,並且輸入要傳送的message,傳送成功便可拿到一組MD5及MessageId ``` send_message := &sqs.SendMessageInput{ MessageBody: aws.String("Hi, Andy"), QueueUrl: aws.String(Queueurl), DelaySeconds: aws.Int64(3), } Sendresp, err := svc.SendMessage(send_message) if err != nil { panic(err) } fmt.Printf("Send message \n%v \n\n", Sendresp) ``` ![](https://i.imgur.com/vuUF3RG.png) 成功傳送之後,就可以檢查是否可以接收到此message,參數MaxNumberOfMessages是指設定一次最多message數量, WaitTimeSeconds是設定一個long polling的時間。 ``` receive_message := &sqs.ReceiveMessageInput{ QueueUrl: aws.String(Queueurl), MaxNumberOfMessages: aws.Int64(3), //設定一次最多message數量 VisibilityTimeout: aws.Int64(10), //如果此message沒被刪除,必須等10秒後才能拿 WaitTimeSeconds: aws.Int64(20), //long polling 方式取,會建立一條長連線並且等在那邊,直到 SQS 收到新 message 回傳給這條連線才中斷 } Receivwresp, err := svc.ReceiveMessage(receive_message) if err != nil { panic(err) } fmt.Printf("Send message \n%v \n\n", Receivwresp) ``` ![](https://i.imgur.com/MiCB8Kp.png) 在接收完message之後,建議能夠將此messag刪除,避免其他consumer重複拿到。 ``` for _, message := range Receivwresp.Messages { delete_message := &sqs.DeleteMessageInput{ QueueUrl: aws.String(Queueurl), ReceiptHandle: message.ReceiptHandle, //用來提供刪除message的憑證 } _, err := svc.DeleteMessage(delete_message) if err != nil { panic(err) } fmt.Printf("Delete message \nMessage ID: %s has beed deleted.\n\n", *message.MessageId) } ``` ![](https://i.imgur.com/U7kBZVJ.png) 參考文獻 --- 1. https://docs.aws.amazon.com/zh_tw/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html#standard-queues-at-least-once-delivery 2. https://ithelp.ithome.com.tw/articles/10194654 3. https://ithelp.ithome.com.tw/articles/10219553 4. http://pragmaticnotes.com/2017/11/20/amazon-sqs-long-polling-versus-short-polling/