# Experimenting AsynQ
## Prerequisite
* [What is task queue and why ?](https://cloud.google.com/appengine/docs/standard/python/taskqueue#:~:text=Task%20queues%20let%20applications%20perform,is%20designed%20for%20asynchronous%20work.)
## Requirement :
* Need to have redis running
* I have used docker : ```sudo docker run --name redis-server -p 6379:6379 -d redis```
## Idea Explained :
* Need A server, which processes tasks in queue
* Need To create tasks
* Need To create handlers (just like we do with http)
* Need To have a client to push tasks (Enqueue)
## Steps :
1. go mod init ```github.com/.....```
2. ```mkdir server```, ```mkdir client```, ```mkdir tasks```
3. ```touch tasks\tasks.go```
```
package tasks
import (
"time"
"github.com/hibiken/asynq"
)
const (
// TypeWelcomeEmail is a name of the task type
// for sending a welcome email.
TypeWelcomeEmail = "email:welcome"
// TypeReminderEmail is a name of the task type
// for sending a reminder email.
TypeReminderEmail = "email:reminder"
)
// NewWelcomeEmailTask task payload for a new welcome email.
func NewWelcomeEmailTask(id int) *asynq.Task {
// Specify task payload.
payload := map[string]interface{}{
"user_id": id, // set user ID
}
// Return a new task with given type and payload.
return asynq.NewTask(TypeWelcomeEmail, payload)
}
// NewReminderEmailTask task payload for a reminder email.
func NewReminderEmailTask(id int, ts time.Time) *asynq.Task {
// Specify task payload.
payload := map[string]interface{}{
"user_id": id, // set user ID
"sent_in": ts.String(), // set time to sending
}
// Return a new task with given type and payload.
return asynq.NewTask(TypeReminderEmail, payload)
}
```
4. ```touch tasks/handlers.go```
```
package tasks
import (
"context"
"fmt"
"github.com/hibiken/asynq"
)
// HandleWelcomeEmailTask handler for welcome email task.
func HandleWelcomeEmailTask(c context.Context, t *asynq.Task) error {
// Get user ID from given task.
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
// Dummy message to the worker's output.
fmt.Printf("Send Welcome Email to User ID %d\n", id)
return nil
}
// HandleReminderEmailTask for reminder email task.
func HandleReminderEmailTask(c context.Context, t *asynq.Task) error {
// Get int with the user ID from the given task.
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
// Get string with the sent time from the given task.
time, err := t.Payload.GetString("sent_in")
if err != nil {
return err
}
// Dummy message to the worker's output.
fmt.Printf("Send Reminder Email to User ID %d\n", id)
fmt.Printf("Reason: time is up (%v)\n", time)
return nil
}
```
5. ```touch server/main.go```
```
package main
import (
"log"
"github.com/haquenafeem/asynq/tasks"
"github.com/hibiken/asynq"
)
func main() {
// Create and configuring Redis connection.
redisConnection := asynq.RedisClientOpt{
Addr: "localhost:6379", // Redis server address
}
// Create and configuring Asynq worker server.
worker := asynq.NewServer(redisConnection, asynq.Config{
// Specify how many concurrent workers to use.
Concurrency: 10,
// Specify multiple queues with different priority.
Queues: map[string]int{
"critical": 6, // processed 60% of the time
"default": 3, // processed 30% of the time
"low": 1, // processed 10% of the time
},
})
// Create a new task's mux instance.
mux := asynq.NewServeMux()
// Define a task handler for the welcome email task.
mux.HandleFunc(
tasks.TypeWelcomeEmail, // task type
tasks.HandleWelcomeEmailTask, // handler function
)
// Define a task handler for the reminder email task.
mux.HandleFunc(
tasks.TypeReminderEmail, // task type
tasks.HandleReminderEmailTask, // handler function
)
// Run worker server.
if err := worker.Run(mux); err != nil {
log.Fatal(err)
}
}
```
6. ```touch client/main.go```
```
package main
import (
"log"
"math/rand"
"time"
"github.com/haquenafeem/asynq/tasks"
"github.com/hibiken/asynq"
)
func main() {
// Create a new Redis connection for the client.
redisConnection := asynq.RedisClientOpt{
Addr: "localhost:6379", // Redis server address
}
// Create a new Asynq client.
client := asynq.NewClient(redisConnection)
defer client.Close()
// Infinite loop to create tasks as Asynq client.
for i := 0; i < 5; i++ {
// Generate a random user ID.
userID := rand.Intn(1000) + 10
// Set a delay duration to 2 minutes.
delay := 2 * time.Minute
// Define tasks.
task1 := tasks.NewWelcomeEmailTask(userID)
task2 := tasks.NewReminderEmailTask(userID, time.Now().Add(delay))
// Process the task immediately in critical queue.
if _, err := client.Enqueue(
task1, // task payload
asynq.Queue("critical"), // set queue for task
); err != nil {
log.Fatal(err)
}
// Process the task 2 minutes later in low queue.
if _, err := client.Enqueue(
task2, // task payload
asynq.Queue("low"), // set queue for task
asynq.ProcessIn(delay), // set time to process task
); err != nil {
log.Fatal(err)
}
}
}
```
## References :
* [Golang-Library (asynq)](https://github.com/hibiken/asynq)
* [AsynQ-Example With Explanation](https://dev.to/koddr/asynq-simple-reliable-efficient-distributed-task-queue-for-your-next-go-project-4jhg)
* [asynchronous-task-queues](https://99designs.com/blog/engineering/asynchronous-task-queues/)