Supervisore per le goroutine, fault tolerance in Golang
Quindi hai bisogno di fault tolerance? Ovviamente per alcuni motivi non puoi costruire il tuo sistema con Erlang (o Elixir) che ha 30+ anni di esperienza in questo campo. Bene.
Allora proviamo a farlo in Go. In questo post non dirò quale libreria dovresti usare o quale sia la strada più corretta. Ti mostrerò solo quello che ho trovato e tu potrai scegliere cosa più appropriato per tuo caso.
Hystrix. Il modo semplice!
Hystrix è un pacchetto Go che “mira a consentire ai programmatori Go di creare facilmente applicazioni con semantica di esecuzione simile alla libreria Hystrix basata su Java.” Io non programmo in Java, quindi non so nulla di Hystrix se sei interessato, questo è docs.
hystrix-go non usa la funzione build-in recover() e il panic semplicemente ucciderà il processo come al solito. Se leggi il codice, scoprirai che usa il proprio sistema per la gestione delle task utilizzando il context e i lock. Dispone di metriche, integrazione con Hystrix Dashboard e plug-in per le statistiche.
è abbastanza facile da usare:
// define your task
hystrix.Go("my_command", func() error {
// talk to other services
return nil
}, func(err error) error {
// do this when services are down
return nil
})
// wait for output
select {
case out := <-output:
// success
case err := <-errors:
// failure
}
Utilizzo del pacchetto Ants
Ants è un pool di goroutine ad alte prestazioni e basso costo in Go. È utile quando devi gestire le tue goroutine e ha un suo handler per il panic:
package main
import (
"log"
"sync"
"time"
"github.com/panjf2000/ants"
)
// this is where magic happens!
func handleException(err interface{}) {
log.Printf("Not today! I'm going on strike, %v", err)
// restart goroutine or send error to monitoring channel
}
func main() {
defer ants.Release()
var wg sync.WaitGroup
p, err := ants.NewPool(10, ants.WithPanicHandler(handleException))
if err != nil {
log.Println(err)
}
fn := func() {
log.Println("Start working")
panic("My boss is paying me too little!")
time.Sleep(2 * time.Second) // lunch time
wg.Done()
}
wg.Add(1)
_ = p.Submit(fn)
time.Sleep(5 * time.Second)
}
Funziona bene se la tua goroutine, per quella specifica pool, è sempre la stessa! Perché se hai diverse goroutine nella stessa pool, capire quale di essi ha fallito sarà un bel problema.
Modo più personalizzato, utilizzando la funzione build-in recovery()
Usiamo Ants per la gestione del pool di processi e della funzione recover() per poter poi inviare processo in canale per segnalare crash e riavviarlo. Ovviamente possiamo semplicemente usare il comando go per creare goroutine senza Ants, ma penso che sia più conveniente usare Ants.
Il pezzo più importante è dove eseguiamo la nostra funzione, usando defer aspettiamo fino al ritorno o all’arresto anomalo della funzione e con recover() controlliamo se era panic o errore.
...
defer func() {
if r := recover(); r != nil {
// on Error or Panic this code will be executed
channel <- proc
}
// you can write some code here if you need
// to signal on function exit whether from crash
// or from normal return
}()
// Run function
proc.Function()
...
Arricchiamo un po’ nostro codice e possiamo creare una struttura per la gestione e un’interfaccia funzionale per essa.
Codice completo:
package main
import (
"log"
"sync"
"time"
"github.com/panjf2000/ants/v2"
)
type Process struct {
// Id sarà unixtime
Id int64
Function func()
}
type Supervisor struct {
Pool *ants.Pool
Processes []Process
}
// Start the process
func (proc Process) RunProcess(channel chan<- Process) {
defer func() {
if r := recover(); r != nil {
if err, ok := r.(error); ok {
log.Printf("Process[%v] Error : %v", proc.Id, err)
channel <- proc
} else {
log.Printf("Process[%v] Panic happened with %v", proc.Id, r)
channel <- proc
}
}
}()
// Run function
proc.Function()
}
// Add a process
func (sup *Supervisor) RegisterProccess(fn func()) {
sup.Processes = append(sup.Processes, Process{
Id: time.Now().Unix(),
Function: fn,
})
}
func CreateSupervisor(max_processes int) Supervisor {
p, err := ants.NewPool(max_processes)
if err != nil {
log.Fatal(err)
}
return Supervisor{Pool: p, Processes: make([]Process, 0)}
}
func (sup *Supervisor) Run() {
defer ants.Release()
var wg sync.WaitGroup
supervisor_channel := make(chan Process, 0)
for _, proc := range sup.Processes {
fn := func() {
proc.RunProcess(supervisor_channel)
wg.Done()
}
wg.Add(1)
_ = sup.Pool.Submit(fn)
time.Sleep(1 * time.Second)
}
// you can run this in Go routine if you need
// non blocking behavior
// like go func() { ... loop for here ... }
for proc := range supervisor_channel {
fn := func() {
proc.RunProcess(supervisor_channel)
wg.Done()
}
wg.Add(1)
_ = sup.Pool.Submit(fn)
}
}
func main() {
sup := CreateSupervisor(10)
sup.RegisterProccess(func() {
time.Sleep(1 * time.Second)
panic("Error")
})
sup.RegisterProccess(func() {
time.Sleep(1 * time.Second)
log.Println("Done1")
})
sup.RegisterProccess(func() {
time.Sleep(1 * time.Second)
log.Println("Done2")
})
sup.RegisterProccess(func() {
time.Sleep(1 * time.Second)
log.Println("Done3")
})
sup.RegisterProccess(func() {
time.Sleep(10 * time.Second)
log.Println("Done4")
})
sup.Run()
}
Divertiti!
2022-04-01