---
title: '16 AWS Messaging - SQS, SNS & Kinesis'
disqus: hackmd
---
:::info
AWS Certified Developer Associate DVA-C01
:::
16 AWS Messaging - SQS, SNS & Kinesis
===
<style>
img{
/* border: 2px solid red; */
margin-left: auto;
margin-right: auto;
width: 90%;
display: block;
}
</style>
## Table of Contents
[TOC]
Messaging in AWS
---
- apps will have to comm with one another
- 2 patterns of app comm
- sync comms - app to app
- async/event based - app to queue to app
- limitations
- sync between apps can be problematic if thrs sudden spikes of traffic
- hence, better to debouple your apps
- using SQS - queue model
- using SNS - pub/sub model
- using kinesis - realtime streaming model
- these services can scale independently of app
AWS SQS
---
### Queues

- simple queuing service (sqs)
- __producer__ - sth that sends msgs into our sqs queue
- can have multiple producers
- __consumers__ - consume the msgs from the queue
- poll msgs from queue and get info, process it and delete it from the queue
- can also have multiple consumers
#### Standard Queues
- oldest offering - >10yo
- fully managed service
- used to decouple apps
- attributes
- unlimited throughput (msg sent per sec)
- unlimited num of msgs in queue
- default retention of msgs 4 days
- max 14 days
- low latency
- < 10ms on publish and receive
- limitation of 256kb per msg sent
- can have duplicate msgs
- hence called at least once delivery, occasionally
- can have out of order msgs
- best effort ordering
### Messages
#### Producing Messages
- produced to sqs using sdk
- `SendMessage` API
- msg is persisted in sqs until consumer deletes it
- msg retention default 4 days
- up to 14 days
- Eg. send an order to be process, msg will have
- order id
- customer id
- any other attrs
- sqs standard - unlimited throughput

#### Consuming Messages
- consumers - apps on ec2 instances, servers, aws lambda etc.
- poll sqs for msgs
- can receive up to 10 msgs at once
- process msgs
- Eg. insert msg into rds db
- delete msgs with `DeleteMessage` API
- guarantee that no other consumer can see these msgs
- msg processing completes

### Use Cases
#### SQS - Multiple EC2 Consumers
- consumers receive and process msgs __in parallel__
- consumers delete msg after processing
- at least once delivery
- best-effort msg ordering
- can scale consumers horizontally to improve throughput for processing

#### SQS with ASG

- asg with ec2 polling for msgs
- asg has to scale on a metric
- can use queue length cloudwatch metric of sqs - `ApprroximateNumberOfMessages`
- setup alarm whenever queue length goes over x
- the more msgs in sqs, more ec2 instances provisioned to process said msgs at higher throughputs
#### Decouple between App Tiers

- eg. vid processing needed
- cant do it frontend, slows down website
- hence decouple app - req and actual processing can happen in diff apps
- send msg into sqs to process file
- 2nd backend processing tier with own asg to receive msgs and process vids
- finally inserts into s3 bucket
- ec2 instances in backend can also be specially setup for vid processing
- powerful GPUs
### SQS Security
- encryption
- inflight encryption using https API
- at-rest encryption using KMS keys
- client-side encryption if client wants to perform encryption/decryption himself
- access controls
- iam policies to regulate access to sqs api
- sqs access policies
- similar to s3 bucket policies
- useful for cross-acc access to sqs queues
- useful for allowing other services (Eg. sns, s3 etc.) to write to sqs queue
### Message Visibility Timeout
- after msg polled by consumer, it becomes invisible to other consumers
- by default, msg visibility timeout is 30 seconds
- means msg has 30secs to be processed
- after msg visibility timeout over, the msg is visible in sqs

- if msg not processed within visibility timeout, it will be processed twice
- since its received by 2 diff consumers or twice by same consumer
- consumer can call `ChangeMessageVisbility` api to get more time
- if visi timeout is high (hours) and consumer crashes, reprocessing will take time
- if visi timeout is low (seconds), may get duplicates
- consumers shld be programmed that if they know they need a bit more time, shld call changevisi api
### Dead Letter Queue
- if consumer fails to process msg within visi timeout, msg goes back to queue
- can set threshold of how many times a msg can go back to queue
- after `MaximumReceives` threshold exceeded, msg goes into __dead letter queue (DLQ)__
- very useful for debugging
- msgs that cannot be processed sent to dlq to be analysed by another app
- make sure to process msg in dlq before they expire
- good to set retention of 14 days in dlq for enough time for processing

### Delay Queue
- delay a msg up to 15mins
- consumers dont see it immediately
- default is 0 seconds (avail instantly)
- can set default at queue level
- can override default on send using `DelaySeconds` param

### Long Polling
- when consumer reqs msgs from queue, can optionally wait for msgs to arrive if thr's none in queue
- AKA long polling
- long polling decreases the num of api calls made to sqs while increasing the efficiency and latency of your app
- as when msg received by queue, instantly received by consumer too
- wait time can be from 1sec to 20sec (preferable)
- long polling is preferable to short polling
- long polling can be enabled at the queue lvl or at API lvl using `WaitTimeSeconds`
### SQS Extended Client
- msg size limit is 256kb
- how to send large msgs?
- use sqs extended client (java library)
- use s3 bucket as repo for large data
- small metadata with pointer to s3 sent in msg to sqs
- consumer reads from sqs queue using the extended client lib and consume metadata and read/retrieve from the s3 bucket

### Must Know API
- `CreateQueue`
- can set `MessageRetentionPeriod`
- `DeleteQueue`
- `PurgeQueue` - delete all msgs in queue
- `SendMessage`
- can set `DelaySeconds`
- `ReceiveMessage`
- `DeleteMessage`
- `ReceiveMessageWaitTimeSeconds` - long polling
- `ChangeMessageVisbility` - change msg timeout
- batch APIs for sendmsg, deletemsg and changemsgvisi helps decrease costs
- by minimising num of api calls done to sqs
### FIFO Queue
- fifo (first in first out, denotes ordering of msgs in queue)
- 1st msg to arrive is 1st to leave
- limited throughput
- 300 msgs per sec w/o batching
- 3000msgs per sec with batching
- exactly once send capability by removing duplicates
- msgs processed in order by consumer

#### FIFO - Deduplication
- deduplication interval is 5mins
- 2 deduplication methods
- content based deduplication
- will do sha256 hash of msg body
- hash then compared
- explicitly provide msg deduplication id

#### FIFO - Message Grouping
- if specify same val of `MessageGroupID` in sqs fifo queue, can only have 1 consumer
- all msgs also in order
- msg grp id is mandatory param
- to get ordering at lvl of subset of msgs, specify diff vals for msggrpid
- msgs that share common msg grp id will be in order within the grp
- ea grp id can have diff consumer
- parallel processing
- ordering across grps not guaranteed

### Console

- 2 types of sqs queue
- standard
- FIFO


- access policy - who can access queue
- by default, only queue owner can send/receive msg
- advanced option allows us to directly edit the json policy obj
- or use policy generator

- can set other accs, roles or iam users to send msgs into the queue


- choose CMK (customer master key)
- can enter default cmk
- or choose alias if create own key
- set data key reuse - how long data key shld be used to encrypt data

- top right side of sqs console to send/receive messages

- can see sent msg if click on poll for msgs button at bottom

- view msgs details when clicked
- also view msg body and attributes
- key vals can be placed in msg attrs
- the msg might also be recieved twice
- as we didnt process it in enough time
- after 30 seconds, the msg went back into queue and we received it again
- if u poll again, msg received counter will go up again
- to process a msg, you have to delete it

- purging a queue deletes all msgs within

- monitoring of queue

#### Message Visibility Timeout

- settings is in queue settings
#### DLQ

- create a new standard queue
- set retention to max (14 days)

- edit your __main queue__ to use the new queue as the dlq
- also set max receives
#### Delay Queue

- create new standard queue and set delivery delay param
#### Long Polling

- edit receieve msg wait time in queue settings
#### FIFO Queue

- choose fifo when creating queue
- you have to end your queue name with `.fifo` else it wont work

- extra config content-based duplication
- to deduplicate msgs if the same is sent twice in small 5min window

- when sending msg, more configs present
- msg grp id
- msg deduplication id

- order of first received is read from bottom to top in the console
- the bottom most msg is the 1st msg received
AWS SNS
---
- what if want to send 1 msg to many receivers?

- create publish/subscribe system
- client send msg to sns topic hence publishing msg into topic
- topic will have many subscribers
- in sns (simple notif service), event producer only sends msg to 1 sns topic
- can have many event listeners/subscribers to listen on sns topic notifs
- ea subscriber will get all msgs in topic
- thrs new feature to filter msgs
- up to 10,000,000 subscriptions per topic
- 100,000 topics limits
- subscribers can be
- sqs
- http/https with delivery retries
- lambda
- emails
- sms msgs
- mobile notifs
- integrates with many aws services
- cloudwatch for alarms
- asg notifs
- s3 bucket events
- cloudformation when state changes (Eg. failed to build etc.)
- more
### Publishing
- topic publish using sdk
- create topic
- create subscription
- publish to topic
- direct publish for mobile apps sdk
- create platform app
- create platform endpt
- publish to platform endpt
- works with google gcm, apple apns, amazon adm etc.
### SNS Security
- encryption
- inflight encryption using https api
- at-rest encryption using kms keys
- client side encryption if client wants to perform encryption/decryption itself
- access controls
- iam policies to regulate access to sns api
- sns access policies
- similar to s3 bucket policies
- useful for cross acc access to sns topics
- useful for allowing other services (Eg. s3) to write to sns topic
### SNS + SQS: Fan Out
- push once in sns, receive in all sqs queues that are subscribers
- fully decoupled
- no data loss
- sqs allows for
- data persistence
- delayed processing
- retries of work
- abillity to add more sqs subs over time
- make sure sqs queue access policy allows for sns to write
- sns cannot send msgs to sqs fifo queues
- this is an aws limitation

### S3 Events to Multiple Queues
- for same combination of event type (Eg. obj create) and prefix (Eg. images/), you can only have 1 s3 event rule
- if want to send same s3 event to many sqs queues, use fan-out
- sns topic will send msg to multiple sqs queues or event lambda funcs
- very helpful to circumvent the limitations of s3 bucket event rules

### Console





- many protocol to choose from
- if choose email have to verify it

#### Fan Out

- sqs can sub to sns topic

- for fifo its greyed out
AWS Kinesis
---
- kinesis is managed alternative to apacha kafka
- is basically big data streaming tool
- great for app logs, metrics, iot, clickstreams
- great for realtime big data
- great for streaming processing frameworks
- Eg. spark, nifi etc.
- compatible with framework allowing you to perform computations in realtime on data that arrives through a stream
- data automatically replicated to 3 az
- 3 sub-kinesis products
- kinesis streams
- low latency streaming ingest at scale
- kinesis analytics
- perform realtime analytics on streams using sql
- perform filters in realtime
- kinesis firehose
- load streams into s3, redshift, elastisearch etc.

- devices, streams, logs will be producing data directly into our kinesis streams
- process data in kinesis analytics
- store the analysis through firehose into s3 or redshift
### Streams Overview
- streams divided in ordered __shards/partitions__
- if want to scale up our stream, just add shards
- more shards = higher throughput
- data retention is 1 day by default
- can go up to 7 days
- ability to reprocess/replay data
- diff in sqs, once data consumed in sqs, its gone
- multiple apps can consume same stream
- kinda like sns
- realtime processing with scale of throughput
- once data inserted into kinesis, cant be deleted
- immutability

#### Streams Shards
- 1 stream made of many diff shards
- 1mb/s or 1000 msg per sec at write __per shard__
- this is receiving
- 2mb/s at read __per shard__
- this is sending out msg
- billing per shard provisioned
- can have as many shards as you want
- batching avail or per msg calls
- allows to efficiently push msgs into kinesis
- num of shards can evolve over time
- reshard - add shard
- merging - reduce shard
- can add some kind of auto scaling for your kinesis stream
- records are ordered per shard
- in sqs no order
- in sqs fifo only have 1 queue and all records going in that queue
- kinesis is kind of in between
- can have many shards and records ordered per shard
### Kinesis API
#### Put Records
- `PutRecord` api + partition key that gets hashed
- same key goes to same partition
- helps with ordering for specific key
- just provide the key to a data pt and they'll order for you
- msgs sent gets a sequence number
- seq num is always increasing
- choose partition key that is highly distributed
- helps prevent hot partition
- if key is not distributed, all data will go to 1 shard and it'll be overwhelmed (AKA hot partition)
- eg. use user_id key if many users
- since very distributed
- Eg. not country_id if 90% of users are in 1 country
- use batching and putrecords to reduce costs and increase throughput
- `ProvisionedThroughputExceeded` if we go over the limits
- for this just use retries and exponential backoff
- can use cli, aws sdk or producer libraries from various frameworks

#### Exceptions
- `ProvisionedThroughputExceeded` exception
- when sending more data exceeding mb/s or tps for any shard
- ensure you dont have a hot shard
- eg. when partition key is bad and too much data goes to partition
- solution
- retries with backoff
- inc shards (scaling)
- ensure partition key is good one
- well distributed
#### Consumers
- can use normal consumer
- eg. cli, sdk etc.
- can use kinesis client library
- eg. in java, node, python, ruby, .net
- KCl uses dynamodb to checkpt offsets
- kcl uses dynamodb to track other workers and share work amongst shards
- kcl basically enables to consume from kinesis efficiently

### Kinesis KCL in Depth
- kinesis client lib (KCL) is java lib that helps record from a kinesis stream with distributed apps sharing the read workload
- rule - ea shard is read by only 1 kcl instance
- eg.
- 4 shards = max 4 kcl instances
- 6 shards = max 6 kcl instances
- progress checkpointed into dynamodb
- need iam access
- kcl can run on ec2, elastic beanstalk, on-premise app
- records are read in order at shard lvl
#### Example

- ea shard consumed by 1 kcl app and ea kcl app consumes 2 shards

- can scale to max (1 kcl for ea shard)

- if want to scale more, can do shard splitting
- means add more shards
- can afterwards scale kcl to 6 too
### Kinesis Security
- control access/authorisation using iam policies
- encryption inflight using https endpts
- encryption at rest using kms
- possibility to encrypt/decrypt data client side
- though harder
- vpc endpts avail for kinesis to access within vpc
### Kinesis Data Analytics
- perform realtime analytics on kinesis streams using sql
- data anlytics
- auto scaling
- managed - no servers to provision
- continuous (realtime)
- no delay to consuming
- pay for actual consumption rate
- can create streams out of realtime queries
### Kinesis Firehose
- fully managed service, no administration
- near realtime
- 60 seconds latency
- load data in redshift/s3/elastisearch/splunk
- automatic scaling
- support many data formats
- pay for conversion
- pay for amt of data going through firehose
### Console



- can go up to 200 shards before hitting acc limit




- capture shard metrics


- type `aws kinesis help` to see list of funcs
- do `aws kinesis <func> help` for even more details to specific func


- return shard id and sequence number

- use `get-shard-iterator` to get a shard iterator to place into `get-records` to get the records for a specific shard

- also shows arrival timestamp
- data is base64 encoded
SQS vs SNS vs Kinesis
---

### Data Ordering in Kinesis

- as long as always specify same key, data will continue to flow into the specified shard the key is allocated to
### Data Ordering in SQS
- for sqs standard, no ordering
- for sqs fifo, u dont use grp id but msgs are consumed in order they are sent with only 1 consumer
- if want to scale num of consumers but want msgs to be grped when they are related to ea other, use grp id
- similar to partition key in kinesis


### Kinesis vs SQS Ordering
- assume 100 trucks, 5 kinesis shards and 1 sqs fifo
- kinesis data streams
- on avg you'll have 20 trucks per shard
- trucks will have their data stored within ea shard
- max amt of consumers in parallel can have is 5
- can receive up to 5mb/s of data
- sqs fifo
- can only have 1 sqs fifo queue
- will have 100 grp id
- can have up to 100 consumers due to 100 grp id
- can have up to 300 msgs per second
- or 3000 is using batching
- NOTE
- sometimes better to use fifo if have dynamic num of consumers
- sometimes btr to use kinesis data streams if have a lot of data and also want ordering per shard
Quiz
---


###### tags: `AWS Developer Associate` `Notes`