HomePage

Supervisore per le goroutine, fault tolerance in Golang

Quindi hai bisogno di fault tolerance? Ovviamente per alcuni motivi non puoi costruire il tuo sistema con Erlang (o Elixir) che ha 30+ anni di esperienza in questo campo. Bene.

Allora proviamo a farlo in Go. In questo post non dirò quale libreria dovresti usare o quale sia la strada più corretta. Ti mostrerò solo quello che ho trovato e tu potrai scegliere cosa più appropriato per tuo caso.

Hystrix. Il modo semplice!

Hystrix è un pacchetto Go che “mira a consentire ai programmatori Go di creare facilmente applicazioni con semantica di esecuzione simile alla libreria Hystrix basata su Java.” Io non programmo in Java, quindi non so nulla di Hystrix se sei interessato, questo è docs.

hystrix-go non usa la funzione build-in recover() e il panic semplicemente ucciderà il processo come al solito. Se leggi il codice, scoprirai che usa il proprio sistema per la gestione delle task utilizzando il context e i lock. Dispone di metriche, integrazione con Hystrix Dashboard e plug-in per le statistiche.

è abbastanza facile da usare:

// define your task
hystrix.Go("my_command", func() error {
	// talk to other services
	return nil
}, func(err error) error {
	// do this when services are down
	return nil
})

// wait for output
select {
case out := <-output:
	// success
case err := <-errors:
	// failure
}

Utilizzo del pacchetto Ants

Ants è un pool di goroutine ad alte prestazioni e basso costo in Go. È utile quando devi gestire le tue goroutine e ha un suo handler per il panic:

package main

import (
	"log"
	"sync"
	"time"

	"github.com/panjf2000/ants"
)

// this is where magic happens! 
func handleException(err interface{}) {
	log.Printf("Not today! I'm going on strike, %v", err)
	// restart goroutine or send error to monitoring channel
}

func main() {
	defer ants.Release()
	var wg sync.WaitGroup

	p, err := ants.NewPool(10, ants.WithPanicHandler(handleException))
	if err != nil {
		log.Println(err)
	}

	fn := func() {
		log.Println("Start working")
		panic("My boss is paying me too little!")
		time.Sleep(2 * time.Second) // lunch time
		wg.Done()
	}
	wg.Add(1)
	_ = p.Submit(fn)

	time.Sleep(5 * time.Second)
}

Funziona bene se la tua goroutine, per quella specifica pool, è sempre la stessa! Perché se hai diverse goroutine nella stessa pool, capire quale di essi ha fallito sarà un bel problema.

Modo più personalizzato, utilizzando la funzione build-in recovery()

Usiamo Ants per la gestione del pool di processi e della funzione recover() per poter poi inviare processo in canale per segnalare crash e riavviarlo. Ovviamente possiamo semplicemente usare il comando go per creare goroutine senza Ants, ma penso che sia più conveniente usare Ants.

Il pezzo più importante è dove eseguiamo la nostra funzione, usando defer aspettiamo fino al ritorno o all’arresto anomalo della funzione e con recover() controlliamo se era panic o errore.

...


	defer func() {
		if r := recover(); r != nil {
			// on Error or Panic this code will be executed
			channel <- proc
		}
		// you can write some code here if you need 
		// to signal on function exit whether from crash 
		// or from normal return 
	}()

	// Run function
	proc.Function()

...

Arricchiamo un po’ nostro codice e possiamo creare una struttura per la gestione e un’interfaccia funzionale per essa.

Codice completo:

package main

import (
	"log"
	"sync"
	"time"

	"github.com/panjf2000/ants/v2"
)

type Process struct {
	// Id sarà unixtime
	Id       int64
	Function func()
}

type Supervisor struct {
	Pool      *ants.Pool
	Processes []Process
}

// Start the process
func (proc Process) RunProcess(channel chan<- Process) {
	defer func() {
		if r := recover(); r != nil {
			if err, ok := r.(error); ok {
				log.Printf("Process[%v] Error : %v", proc.Id, err)
				channel <- proc
			} else {
				log.Printf("Process[%v] Panic happened with %v", proc.Id, r)
				channel <- proc
			}
		}
	}()

	// Run function
	proc.Function()
}

// Add a process
func (sup *Supervisor) RegisterProccess(fn func()) {
	sup.Processes = append(sup.Processes, Process{
		Id:       time.Now().Unix(),
		Function: fn,
	})
}

func CreateSupervisor(max_processes int) Supervisor {
	p, err := ants.NewPool(max_processes)
	if err != nil {
		log.Fatal(err)
	}

	return Supervisor{Pool: p, Processes: make([]Process, 0)}
}

func (sup *Supervisor) Run() {
	defer ants.Release()
	var wg sync.WaitGroup

	supervisor_channel := make(chan Process, 0)

	for _, proc := range sup.Processes {
		fn := func() {
			proc.RunProcess(supervisor_channel)
			wg.Done()
		}
		wg.Add(1)

		_ = sup.Pool.Submit(fn)
		time.Sleep(1 * time.Second)
	}

	// you can run this in Go routine if you need
	// non blocking behavior
	// like go func() { ... loop for here ... }
	for proc := range supervisor_channel {
		fn := func() {
			proc.RunProcess(supervisor_channel)
			wg.Done()
		}
		wg.Add(1)

		_ = sup.Pool.Submit(fn)
	}
}

func main() {
	sup := CreateSupervisor(10)

	sup.RegisterProccess(func() {
		time.Sleep(1 * time.Second)
		panic("Error")
	})

	sup.RegisterProccess(func() {
		time.Sleep(1 * time.Second)
		log.Println("Done1")
	})
	sup.RegisterProccess(func() {
		time.Sleep(1 * time.Second)
		log.Println("Done2")
	})
	sup.RegisterProccess(func() {
		time.Sleep(1 * time.Second)
		log.Println("Done3")
	})
	sup.RegisterProccess(func() {
		time.Sleep(10 * time.Second)
		log.Println("Done4")
	})

	sup.Run()
}

Divertiti!

2022-04-01
Come eseguire la tua applicazione Phoenix usando Systemd

In questo post, esplorerò come sfruttare systemd per garantire che le phoenix o elixir app continuino a funzionare anche dopo un'interruzione anomala dell'intero sistemaLearn more