If you have any questions or feedback, pleasefill out this form
Table of Contents
This post is translated by ChatGPT and originally written in Mandarin, so there may be some inaccuracies or mistakes.
Recently, I have been gradually migrating a project from Ruby on Rails to Golang. One reason for this shift is to practice and experience the unique syntax of Golang, while the other is to explore the power of Golang and learn how to organize my code without relying on an existing framework.
Today, I want to introduce goworker. The reason I chose it is that it was the first result that appeared when searching for "go worker," and its implementation fully supports the shape of resque
(a Ruby worker library). This allows me to enqueue tasks in my Ruby on Rails application and then execute them using Golang.
How to Use
When initializing, you need to register all the workers that will be executed, so that the worker knows which corresponding task to execute when it retrieves a job from the queue:
func init() {
settings := goworker.WorkerSettings{
URI: os.Getenv("REDIS_URL"),
Queues: []string{"worker", "queues"},
UseNumber: true,
ExitOnComplete: false, // don't complete even though queue is empty.
Concurrency: concurrency,
Connections: connections,
Namespace: "myapp" + ":",
Interval: 10.0,
}
goworker.SetSettings(settings)
}
You can define concurrency
and connections
as needed, while the Namespace allows for a scoped name for the queue. The interval
specifies how often to check for new tasks when there are none in the queue.
How do you implement a worker? You need to create a function and register it with goworker.
func workerFn(queue string, args ...interface{}) error {
// your job.
fmt.Println(queue, args)
}
goworker.Register("MyWorker", workerFn)
Once configured, you can use goworker.Start()
to listen for jobs in Redis. This operation will block, so it's common to run it in a separate server or process (you can also start a goroutine to run it when your server is up).
func main() {
err := goworker.Work()
if err != nil {
// log your error
}
}
Next, let's enqueue a task in Redis and see what happens:
RPUSH myapp:queue:worker '{"class": "MyWorker", "args": [1,2,3]}
You will notice that worker
and [1 2 3]
appear in stdout
.
You can also directly use goworker.Enqueue(&goworker.Job{})
to push your job into Redis.
How It Works
When a new job is added, goworker uses RPUSH
to insert the task into namespace:queue:job.Queue
. To ensure that parameters can be serialized, it uses a JSON
string as the content of the queue.
// workers.go L43~47
buffer, err := json.Marshal(job.Payload)
if err != nil {
logger.Criticalf("Cant marshal payload on enqueue")
return err
}
err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer)
if err != nil {
logger.Criticalf("Cant push to queue")
return err
}
Additionally, it maintains a set to keep track of which queues are currently in use.
// workers.go L49~53
err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue)
if err != nil {
logger.Criticalf("Cant register queue to list of use queues")
return err
}
When goworker.Work()
is called, an internal poller
is first invoked. This poller
performs several tasks:
- It retrieves a job using
poller.getJob()
which callsLPOP
.
reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
- The
poller.poll(duration, quit)
method regularly fetches jobs from the jobs channel and usesquit
as an exit signal.
jobs := make(chan *Job)
//......
return jobs
- Finally, it executes
worker.work(jobs, &monitor)
, which runsw.run(job, workerFunc)
for the jobs received. If no corresponding class is found, it prints an error:No Worker for ... queue with args ...
Issues
- Using goworker does not guarantee that pending jobs will execute smoothly if an unexpected shutdown occurs.
- As I understand it, Redis does not have a mechanism to ensure that the receiving end will always receive messages. If a more reliable mechanism is needed, a protocol like AMQP may be necessary. However, for a small project, Redis is already quite sufficient.
Conclusion
For general applications, Redis is more than adequate. Unless you’re dealing with high traffic and concurrency scenarios, Redis queues are quite usable. This article introduced the goworker library for development alongside Redis, which should effectively handle time-consuming (or delayed) tasks. For larger applications, more reliable protocols like AMQP can be used. Additionally, I noticed that my fan runs continuously while goworker is running; I’m not sure if this is normal, but my CPU usage hasn’t spiked noticeably...
If you found this article helpful, please consider buying me a coffee ☕ It'll make my ordinary day shine ✨
☕Buy me a coffee