Dynamically limiting the amount of concurrent goroutines with resizableChannel
27 Jan 2025TLDR: I found ResizableChannel
in github.com/eapache/channels
and used it to dynamically change the number of goroutines running at the same time
For a CPU intensive and parallel task I needed a solution to be able to throttle it. During the day it should simmer and use one core, not to disturb other running processes, but during the night it could use a lot more cores. Since it was written in Go, and already used goroutines, I decided to dynamically limit the number of goroutines running at the same time.
I’m aware that number of goroutines is not necessarily the same as the number of cores used, but I’ll use them interchangeably anyway. In my code, they were roughly the same.
Golang’s recommended way is to use a limit on a group. This is challenging, because the documentation says you can (should) not change the limit while goroutines are running. If you want to change it, you should wait until all concurrent goroutines are done, change it, and continue with the new limit. Depending on your use case this is probably not ideal.
A second pattern Go programs often use is a channel with a limit; a buffered channel.
By limiting the number of slots in the channel, and getting them out once the goroutine is done, it automatically limits the number of concurrent goroutines.
On Go’s default chan
it’s not possible to change the size, however.
I’ve slightly modified the example from the Go Wiki. The below example shows how a buffered channel would typically limit the number of running goroutines.
package main
import (
"runtime"
)
var goroutineLimit = make(chan struct{}, runtime.NumCPU())
func Serve(queue chan *Request) {
for {
goroutineLimit <- struct{}{} // Block until there's capacity
req := <-queue
go handle(req)
}
}
func handle(r *Request) {
process(r) // May take a long time & use a lot of memory or CPU
<-goroutineLimit // Done; enable next request to run.
}
This is almost what I need. If only the Go channel was resizable…
What felt like a naive approach: googling for a Go channel alternative, but make it resizable, actually yielded results.
The library github.com/eapache/channels
offers just that, a ResizableChannel
implementation.
While this library says it’s no longer maintained, I didn’t encounter any issues using the current Go runtime. It builds, and the built executables didn’t panic. Go code can sit untouched for years and still work fine.
Below are the changes that you should make to switch to ResizableChannel
:
package main
import (
+ "github.com/eapache/channels"
"runtime"
)
-var goroutineLimit = make(chan struct{}, runtime.NumCPU())
+var goroutineLimit = channels.NewResizableChannel()
func Serve(queue chan *Request) {
+ goroutineLimit.Resize(channels.BufferCap(runtime.NumCPU())) // Initial channel size
for {
- goroutineLimit <- struct{}{} // Block until there's capacity
+ goroutineLimit.In() <- struct{}{} // Block until there's capacity
req := <-queue
go handle(req) // Don't wait for handle to finish.
}
}
func handle(r *Request) {
- process(r) // May take a long time & use a lot of memory or CPU
- <-goroutineLimit // Done; enable next request to run.
+ process(r) // May take a long time & use a lot of memory or CPU
+ <-goroutineLimit.Out() // Done; enable next request to run.
}
Note that the diff above doesn’t have a way of actually changing the channel size yet. How you communicate that the number of concurrent goroutines should change depends on your use case. You could consider doing it with gRPC, but it’s also possible to run a simple webserver that accepts a number as GET parameter. Here’s a server that you could start early in your program that would do the latter:
go func() {
log.Println("Starting web server on port 8080...")
http.HandleFunc("/resize", func(w http.ResponseWriter, r *http.Request) {
var newSize int
if r.Method != http.MethodGet {
http.Error(w, "Invalid request method", http.StatusMethodNotAllowed)
return
}
sizeParam := r.URL.Query().Get("new_size")
if sizeParam == "" {
return
}
newSize, err := strconv.Atoi(sizeParam)
if err != nil || newSize <= 0 {
http.Error(w, "Invalid 'new_size' parameter", http.StatusBadRequest)
return
}
goroutineLimit.Resize(channels.BufferCap(newSize))
// log.Printf("Resized goroutine limit to %d", newSize)
w.WriteHeader(http.StatusOK)
})
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatalf("Failed to start web server: %v", err)
}
}()
After creating this solution to dynamically limit the number of concurrent goroutines, I read an interesting discussion about using weighted Semaphores to achieve more or less the same.
There has also been an effort to make Golang’s own weighted Semaphore
implementation resizable, but this has not been merged.
In the end this feature was not included in the default Semaphore implementation.
The discussion references that the quotapool
package by cockroachdb
might be a solution.
I didn’t look into that too much, since it is a bit more complicated.
It can deal with heterogeneous work units, and I simply don’t need that.
Each of my tasks consumes one core fully (but briefly), and always takes about the same time.
I like this solution because it only requires a few lines of changes in existing code. (Plus some logic to change the capacity of the channel, but how to handle that will vary on your situation).
It could still be better in one way:
if the resizable channel actually has meaningful variables in it, instead of empty struct
s.
By actually communicating useful items it would function as a “proper” channel instead of as a locking hack.