Recently, I have been gradually moving my project from Ruby On Rails to golang. One reason is to practice and get a feel for the syntax of golang, and the other reason is to experience the power of golang and how to organize my code without an existing framework.
Today, I want to introduce goworker. The reason I chose it is because it was the first result when searching for "go worker," and it has full support for the shape of resque
(a Ruby worker package). This means I can enqueue tasks in the Ruby On Rails application and execute them using golang.
Usage
During initialization, you need to register all the workers that will be executed with goworker. This lets the worker know which corresponding task to execute when it fetches a queue:
func init() {
settings := goworker.WorkerSettings{
URI: os.Getenv("REDIS_URL"),
Queues: []string{"worker", "queues"},
UseNumber: true,
ExitOnComplete: false, // don't complete even though the queue is empty.
Concurrency: concurrency,
Connections: connections,
Namespace: "myapp" + ":",
Interval: 10.0,
}
goworker.SetSettings(settings)
}
You can define concurrency
and connections
as needed. The Namespace below is used to provide a scope for the queue names. The interval
indicates how often to check for new tasks if there are no tasks in the queue.
To implement a worker, you need to define a function and register it with goworker:
func workerFn(queue string, args ...interface{}) error {
// your job.
fmt.Println(queue, args)
}
goworker.Register("MyWorker", workerFn)
After setting everything up, you can use goworker.Start()
to start listening to Redis. This operation will block, so it's usually done in a separate server or process (you can also start a goroutine for it when your server starts).
func main() {
err := goworker.Work()
if err != nil {
// log your error
}
}
Next, let's enqueue a task in Redis:
RPUSH myapp:queue:worker '{"class": "MyWorker", "args": [1,2,3]}
You will see worker
and [1 2 3]
printed on the stdout
.
You can also directly enqueue your task into Redis using goworker.Enqueue(&goworker.Job{})
.
How It Works
When a new task is added, goworker uses RPUSH
to push the task into namespace:queue:job.Queue
. To allow parameter serialization, the content is stored as a JSON string in the queue.
// workers.go L43~47
buffer, err := json.Marshal(job.Payload)
if err != nil {
logger.Criticalf("Cant marshal payload on enqueue")
return err
}
err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer)
if err != nil {
logger.Criticalf("Cant push to queue")
return err
}
In addition, a set is used to keep track of the currently used queues.
// workers.go L49~53
err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue)
if err != nil {
logger.Criticalf("Cant register queue to the list of used queues")
return err
}
When calling goworker.Work()
, it internally calls a poller
which does the following:
- Uses
poller.getJob()
to fetch the task by callingLPOP
.
reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
- Uses
poller.poll(duration, quit)
to periodically retrieve jobs from the jobs channel and usesquit
as the signal to exit.
jobs := make(chan *Job)
//......
return jobs
- Finally, it executes
worker.work(jobs, &monitor)
. When a job is received, it executesw.run(job, workerFunc)
, which is the function we defined earlier. If no corresponding class is found, an error "No Worker for ... queue with args ..." is printed.
Issues
- Using goworker does not guarantee that pending jobs will be executed if an unexpected shutdown occurs.
- As far as I understand, Redis does not have a mechanism to ensure that the recipient will definitely receive a message. If a more reliable mechanism is needed, protocols like AMQP may be required. However, for a small project, Redis is sufficient.
Conclusion
For general applications, Redis is already quite sufficient, unless there are high traffic and high concurrency scenarios where a more advanced queueing system may be needed. In this article, I introduced the goworker library for development with Redis, which should be able to handle time-consuming or delayed tasks. For larger applications, more reliable protocols like AMQP can be used. Additionally, when running goworker, my fan keeps spinning constantly. I'm not sure if this is normal, but the CPU doesn't show any significant increase...