# Experimenting AsynQ ## Prerequisite * [What is task queue and why ?](https://cloud.google.com/appengine/docs/standard/python/taskqueue#:~:text=Task%20queues%20let%20applications%20perform,is%20designed%20for%20asynchronous%20work.) ## Requirement : * Need to have redis running * I have used docker : ```sudo docker run --name redis-server -p 6379:6379 -d redis``` ## Idea Explained : * Need A server, which processes tasks in queue * Need To create tasks * Need To create handlers (just like we do with http) * Need To have a client to push tasks (Enqueue) ## Steps : 1. go mod init ```github.com/.....``` 2. ```mkdir server```, ```mkdir client```, ```mkdir tasks``` 3. ```touch tasks\tasks.go``` ``` package tasks import ( "time" "github.com/hibiken/asynq" ) const ( // TypeWelcomeEmail is a name of the task type // for sending a welcome email. TypeWelcomeEmail = "email:welcome" // TypeReminderEmail is a name of the task type // for sending a reminder email. TypeReminderEmail = "email:reminder" ) // NewWelcomeEmailTask task payload for a new welcome email. func NewWelcomeEmailTask(id int) *asynq.Task { // Specify task payload. payload := map[string]interface{}{ "user_id": id, // set user ID } // Return a new task with given type and payload. return asynq.NewTask(TypeWelcomeEmail, payload) } // NewReminderEmailTask task payload for a reminder email. func NewReminderEmailTask(id int, ts time.Time) *asynq.Task { // Specify task payload. payload := map[string]interface{}{ "user_id": id, // set user ID "sent_in": ts.String(), // set time to sending } // Return a new task with given type and payload. return asynq.NewTask(TypeReminderEmail, payload) } ``` 4. ```touch tasks/handlers.go``` ``` package tasks import ( "context" "fmt" "github.com/hibiken/asynq" ) // HandleWelcomeEmailTask handler for welcome email task. func HandleWelcomeEmailTask(c context.Context, t *asynq.Task) error { // Get user ID from given task. id, err := t.Payload.GetInt("user_id") if err != nil { return err } // Dummy message to the worker's output. fmt.Printf("Send Welcome Email to User ID %d\n", id) return nil } // HandleReminderEmailTask for reminder email task. func HandleReminderEmailTask(c context.Context, t *asynq.Task) error { // Get int with the user ID from the given task. id, err := t.Payload.GetInt("user_id") if err != nil { return err } // Get string with the sent time from the given task. time, err := t.Payload.GetString("sent_in") if err != nil { return err } // Dummy message to the worker's output. fmt.Printf("Send Reminder Email to User ID %d\n", id) fmt.Printf("Reason: time is up (%v)\n", time) return nil } ``` 5. ```touch server/main.go``` ``` package main import ( "log" "github.com/haquenafeem/asynq/tasks" "github.com/hibiken/asynq" ) func main() { // Create and configuring Redis connection. redisConnection := asynq.RedisClientOpt{ Addr: "localhost:6379", // Redis server address } // Create and configuring Asynq worker server. worker := asynq.NewServer(redisConnection, asynq.Config{ // Specify how many concurrent workers to use. Concurrency: 10, // Specify multiple queues with different priority. Queues: map[string]int{ "critical": 6, // processed 60% of the time "default": 3, // processed 30% of the time "low": 1, // processed 10% of the time }, }) // Create a new task's mux instance. mux := asynq.NewServeMux() // Define a task handler for the welcome email task. mux.HandleFunc( tasks.TypeWelcomeEmail, // task type tasks.HandleWelcomeEmailTask, // handler function ) // Define a task handler for the reminder email task. mux.HandleFunc( tasks.TypeReminderEmail, // task type tasks.HandleReminderEmailTask, // handler function ) // Run worker server. if err := worker.Run(mux); err != nil { log.Fatal(err) } } ``` 6. ```touch client/main.go``` ``` package main import ( "log" "math/rand" "time" "github.com/haquenafeem/asynq/tasks" "github.com/hibiken/asynq" ) func main() { // Create a new Redis connection for the client. redisConnection := asynq.RedisClientOpt{ Addr: "localhost:6379", // Redis server address } // Create a new Asynq client. client := asynq.NewClient(redisConnection) defer client.Close() // Infinite loop to create tasks as Asynq client. for i := 0; i < 5; i++ { // Generate a random user ID. userID := rand.Intn(1000) + 10 // Set a delay duration to 2 minutes. delay := 2 * time.Minute // Define tasks. task1 := tasks.NewWelcomeEmailTask(userID) task2 := tasks.NewReminderEmailTask(userID, time.Now().Add(delay)) // Process the task immediately in critical queue. if _, err := client.Enqueue( task1, // task payload asynq.Queue("critical"), // set queue for task ); err != nil { log.Fatal(err) } // Process the task 2 minutes later in low queue. if _, err := client.Enqueue( task2, // task payload asynq.Queue("low"), // set queue for task asynq.ProcessIn(delay), // set time to process task ); err != nil { log.Fatal(err) } } } ``` ## References : * [Golang-Library (asynq)](https://github.com/hibiken/asynq) * [AsynQ-Example With Explanation](https://dev.to/koddr/asynq-simple-reliable-efficient-distributed-task-queue-for-your-next-go-project-4jhg) * [asynchronous-task-queues](https://99designs.com/blog/engineering/asynchronous-task-queues/)