# AsynQ

- prviously we experimented with v0.17.1, now we are using v0.18.5
### Prerequisite
- Redis
## Basic
### Defining Task
```go=
payload, _ := json.Marshal(SomeStruct{values...})
task := asynq.NewTask("name/type of the task", payload)
```
### Defining Handler for the task
```go=
func HandleSomeTask(ctx context.Context, t *asynq.Task) error {
var p SomeStruct
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return error
}
// do the task
return nil
}
```
### Server/Worker Defining
```go=
// first we need a redis connection
redisConnection := asynq.RedisClientOpt{
Addr: "localhost:6379", // Redis server address
}
// we need to create the server
server := asynq.NewServer(redisConnection, asynq.Config{
// Specify how many concurrent workers to use.
Concurrency: 10,
Queues: map[string]int{
"workerqueue1": 6, // processed 60% of the time
"workerqueue2": 3, // processed 30% of the time
"workerqueue3": 1, // processed 10% of the time
},
})
// creating a server mux
mux := asynq.NewServeMux()
// registering handler
mux.HandleFunc(
"tasktype/name",
handler, // handler function
)
// runnning
if err := server.Run(mux); err != nil {
log.Fatal(err)
}
```
### Client/ Enqueing Tasks
```go=
redisConnection := asynq.RedisClientOpt{
Addr: "localhost:6379", // Redis server address
}
// Create a new Asynq client.
client := asynq.NewClient(redisConnection)
defer client.Close()
// task creation
task := asynq.NewTask(....)
.....
// enqueing task
taskInfo, err := client.Enqueue(
task, // task payload
asynq.Queue("workerqueue1"), // choose a particular queue
asynq.MaxRetry(5), // max tries
// other configs.....
)
```
## Advanced
### Middleware
```go=
func Log(h asynq.Handler) asynq.Handler {
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
taskName := t.Type()
taskId, _ := asynq.GetTaskID(ctx)
queueName, _ := asynq.GetQueueName(ctx)
fmt.Printf(
"Task With Name:%v And Id:%v , From Queue :%v Is Now Being Executed",
taskName,
taskId,
queueName,
)
fmt.Println()
err := h.ProcessTask(ctx, t)
if err != nil {
return err
}
fmt.Println("Processing End")
return nil
})
}
```
### Task Configuration
```go=
taskInfo, err := client.Enqueue(
task1, // task payload
asynq.Queue("randomname"),
asynq.MaxRetry(5),
....
// set deadline, process in, process at...
)
```
### Monitoring
```go=
package main
import (
"fmt"
"github.com/hibiken/asynq"
)
func main() {
redisConnection := asynq.RedisClientOpt{
Addr: "localhost:6379", // Redis server address
}
// inspector can be used to monitor the system, as
// there are so many methods to get information.
inspector := asynq.NewInspector(redisConnection)
defer inspector.Close()
// server returns list of all the servers running
srvs, err := inspector.Servers()
if err != nil {
panic(err)
}
fmt.Println(srvs)
}
```
## Study Aspects : Questions And Aswers.
### Q1. How things work in asynq not the priority but the tasks to worker concept
### Ans1.
If we do not create queues with priorities, there will be one created for us by default, which will also have the name `default`.
We can define how many concurrent workers there will be, but we cannot directly assign a worker/multiple workers to a queue. However we can control how often each queue is executed.
Something like this:
```go=
server := asynq.NewServer(redisConnection, asynq.Config{
// Specify how many concurrent workers to use.
Concurrency: 10,
Queues: map[string]int{
"workerqueue1": 6, // processed 60% of the time
"workerqueue2": 3, // processed 30% of the time
"workerqueue3": 1, // processed 10% of the time
},
})
```
Notice here, there are 10 concurrent workers but we did not assign them to a task or any queue, but we have given priorities to queues, the tasks of `workerqueue1` will be executed 60% of the time and so on.
Whenever we create a task and want to put it in the system, we generally push them to a queue, depending on its priority or how much importance it has.
```go=
taskInfo, err := client.Enqueue(
task, // task payload
asynq.Queue("workerqueue1"), // choose a particular queue
)
```
The flow of the system is something like this :
```bash=
task creation -> pushed to a queue (if not defined goes to `default` queue-> queue assigns a worker (but really the server pulls task from queue and creates a goroutine) -> worker executes the task.
```

### Q2. Message passing (Worker 1: Running Task 1)
### Ans2.
As mentioned in the previous question, we do not directly assign to a worker , rather push task to a queue and from the queue worker is created and assigned to complete task.
We can extract information from the context:
```go=
func someTaskHandler(c context.Context, t *asynq.Task) error {
taskName := t.Type() // returns task name
taskId, _ := asynq.GetTaskID(ctx) // returns task id
queueName, _ := asynq.GetQueueName(ctx) // from which queue the task belongs to
fmt.Printf(
"task with name:%v and id:%v , from queue :%v is now being executed",
taskName,
taskId,
queueName,
)
fmt.Println()
return nil
}
```
Note : rather than printing out in each handler we could use global middleware.
### Q3. 3. Streamming data to one task to another (data feeding)
### Ans3.
Seems like theres no direct way to stream data from one task to another, as `asynq.Task` has only 2 methods 1 returns its name other returns its payload. but we can pass data in other ways. One way could be :
#### Creating 2nd task from 1st task:
lets say we want to define 2 tasks, one that will read data from a file , and the other will take the data and do something with it.
```go=
type Task1 struct{
... fields
*client
}
type Task2 struct {
datafromtask1 .....
}
func EnqueTask(....) {
client := .....
task := Task1{...., client}
enqueue
}
func Task1Handler(...) {
do task1
somedata := ....
//
// create task2
//
task := Task2{datafromtask1:somedata}
enqueue
}
```
Another could be :
#### Taking a task into it's own structure
```go=
type Task1 struct{
fields...
*client
Next *asynq.Task
}
type (t *Task1) Next() {
next := t.Next
t.client.Enqueue(next)
}
func EnqueTask(....) {
client := .....
next := anotherTask....
task := Task1{...., client, Next:next}
client.enqueue(task)
}
client.Enqueue
```
## References
- [asynQ-wiki](https://github.com/hibiken/asynq/wiki/Getting-Started)
- [asynQ-docs](https://pkg.go.dev/github.com/hibiken/asynq)
- [code-experiments](https://github.com/nafeem-evatix/asynqv2)