# AsynQ ![](https://i.imgur.com/MVS7SkX.png) - prviously we experimented with v0.17.1, now we are using v0.18.5 ### Prerequisite - Redis ## Basic ### Defining Task ```go= payload, _ := json.Marshal(SomeStruct{values...}) task := asynq.NewTask("name/type of the task", payload) ``` ### Defining Handler for the task ```go= func HandleSomeTask(ctx context.Context, t *asynq.Task) error { var p SomeStruct if err := json.Unmarshal(t.Payload(), &p); err != nil { return error } // do the task return nil } ``` ### Server/Worker Defining ```go= // first we need a redis connection redisConnection := asynq.RedisClientOpt{ Addr: "localhost:6379", // Redis server address } // we need to create the server server := asynq.NewServer(redisConnection, asynq.Config{ // Specify how many concurrent workers to use. Concurrency: 10, Queues: map[string]int{ "workerqueue1": 6, // processed 60% of the time "workerqueue2": 3, // processed 30% of the time "workerqueue3": 1, // processed 10% of the time }, }) // creating a server mux mux := asynq.NewServeMux() // registering handler mux.HandleFunc( "tasktype/name", handler, // handler function ) // runnning if err := server.Run(mux); err != nil { log.Fatal(err) } ``` ### Client/ Enqueing Tasks ```go= redisConnection := asynq.RedisClientOpt{ Addr: "localhost:6379", // Redis server address } // Create a new Asynq client. client := asynq.NewClient(redisConnection) defer client.Close() // task creation task := asynq.NewTask(....) ..... // enqueing task taskInfo, err := client.Enqueue( task, // task payload asynq.Queue("workerqueue1"), // choose a particular queue asynq.MaxRetry(5), // max tries // other configs..... ) ``` ## Advanced ### Middleware ```go= func Log(h asynq.Handler) asynq.Handler { return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error { taskName := t.Type() taskId, _ := asynq.GetTaskID(ctx) queueName, _ := asynq.GetQueueName(ctx) fmt.Printf( "Task With Name:%v And Id:%v , From Queue :%v Is Now Being Executed", taskName, taskId, queueName, ) fmt.Println() err := h.ProcessTask(ctx, t) if err != nil { return err } fmt.Println("Processing End") return nil }) } ``` ### Task Configuration ```go= taskInfo, err := client.Enqueue( task1, // task payload asynq.Queue("randomname"), asynq.MaxRetry(5), .... // set deadline, process in, process at... ) ``` ### Monitoring ```go= package main import ( "fmt" "github.com/hibiken/asynq" ) func main() { redisConnection := asynq.RedisClientOpt{ Addr: "localhost:6379", // Redis server address } // inspector can be used to monitor the system, as // there are so many methods to get information. inspector := asynq.NewInspector(redisConnection) defer inspector.Close() // server returns list of all the servers running srvs, err := inspector.Servers() if err != nil { panic(err) } fmt.Println(srvs) } ``` ## Study Aspects : Questions And Aswers. ### Q1. How things work in asynq not the priority but the tasks to worker concept ### Ans1. If we do not create queues with priorities, there will be one created for us by default, which will also have the name `default`. We can define how many concurrent workers there will be, but we cannot directly assign a worker/multiple workers to a queue. However we can control how often each queue is executed. Something like this: ```go= server := asynq.NewServer(redisConnection, asynq.Config{ // Specify how many concurrent workers to use. Concurrency: 10, Queues: map[string]int{ "workerqueue1": 6, // processed 60% of the time "workerqueue2": 3, // processed 30% of the time "workerqueue3": 1, // processed 10% of the time }, }) ``` Notice here, there are 10 concurrent workers but we did not assign them to a task or any queue, but we have given priorities to queues, the tasks of `workerqueue1` will be executed 60% of the time and so on. Whenever we create a task and want to put it in the system, we generally push them to a queue, depending on its priority or how much importance it has. ```go= taskInfo, err := client.Enqueue( task, // task payload asynq.Queue("workerqueue1"), // choose a particular queue ) ``` The flow of the system is something like this : ```bash= task creation -> pushed to a queue (if not defined goes to `default` queue-> queue assigns a worker (but really the server pulls task from queue and creates a goroutine) -> worker executes the task. ``` ![](https://i.imgur.com/tjFnH9C.png) ### Q2. Message passing (Worker 1: Running Task 1) ### Ans2. As mentioned in the previous question, we do not directly assign to a worker , rather push task to a queue and from the queue worker is created and assigned to complete task. We can extract information from the context: ```go= func someTaskHandler(c context.Context, t *asynq.Task) error { taskName := t.Type() // returns task name taskId, _ := asynq.GetTaskID(ctx) // returns task id queueName, _ := asynq.GetQueueName(ctx) // from which queue the task belongs to fmt.Printf( "task with name:%v and id:%v , from queue :%v is now being executed", taskName, taskId, queueName, ) fmt.Println() return nil } ``` Note : rather than printing out in each handler we could use global middleware. ### Q3. 3. Streamming data to one task to another (data feeding) ### Ans3. Seems like theres no direct way to stream data from one task to another, as `asynq.Task` has only 2 methods 1 returns its name other returns its payload. but we can pass data in other ways. One way could be : #### Creating 2nd task from 1st task: lets say we want to define 2 tasks, one that will read data from a file , and the other will take the data and do something with it. ```go= type Task1 struct{ ... fields *client } type Task2 struct { datafromtask1 ..... } func EnqueTask(....) { client := ..... task := Task1{...., client} enqueue } func Task1Handler(...) { do task1 somedata := .... // // create task2 // task := Task2{datafromtask1:somedata} enqueue } ``` Another could be : #### Taking a task into it's own structure ```go= type Task1 struct{ fields... *client Next *asynq.Task } type (t *Task1) Next() { next := t.Next t.client.Enqueue(next) } func EnqueTask(....) { client := ..... next := anotherTask.... task := Task1{...., client, Next:next} client.enqueue(task) } client.Enqueue ``` ## References - [asynQ-wiki](https://github.com/hibiken/asynq/wiki/Getting-Started) - [asynQ-docs](https://pkg.go.dev/github.com/hibiken/asynq) - [code-experiments](https://github.com/nafeem-evatix/asynqv2)