在一个 Go 程序中,有时候我们需要起多个调用逻辑上没有关联的「服务」。比如起一个 HTTP server 的同时、起一个 ticker 用来做定时任务,同时期望在进程收到 SIGTERM 时,可以优雅地关闭掉全部这些服务,再退出进程。
下面的这套代码即用来处理这种场景。
框架
package infras
import (
"context"
"fmt"
"reflect"
)
// Service 抽象了一个需要长期运行的服务。
// Service 不一定需要对外提供服务,它仅需要是一个需要有长时间生命周期的对象即可,比如:
// - 监听某个端口提供服务的
// - 根据调度执行任务的
// - 定期做任务的
type Service interface {
// Start 异步地启动 service。
// 在 service 停止时应该向 done channel 发送消息。
// 如果是出错而停止,发送相应的 error;如果是正常停止,或被外部调用 Shutdown 而停止,应该发送 nil。
//
// Start 的正确实现是重要的:
// 在 service 停止时(无论正常异常),都必须向 done 写入仅 1 次数据,并关闭该 channel
//
// 设计细节:使用 chan error 而不是 context,是因为 context 无法方便地表达自定义的停止原因。
Start(done chan error)
// Shutdown 同步(blocking)地停止 service。
// 多次调用时,只有第一个起作用,后续调用不应报错。
Shutdown() error
// Name 主要用于在错误信息和日志中来表示该 service。
Name() string
}
// RunServices 将一批服务(services)启动(通过 Start 函数),并阻塞地等待:
// - 任一 service 退出(在 done chan 收到消息):会停止所有 service 并返回
// - 收到 ctx.Done():会停止所有 service 并返回
//
// RunServices 的使用场景是:有一批互相无关联的 service 需要长期同时运行,并且
// 这些 service 在正常情况下,仅在外部(ctx)要求时才应该被停止。
func RunServices(ctx context.Context, services []Service) error {
var cases []reflect.SelectCase
var chs []chan error
for i := 0; i < len(services); i++ {
chs = append(chs, make(chan error, 1))
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(chs[i]),
})
}
for i, srv := range services {
srv.Start(chs[i])
}
ctxDoneChanIndex := len(cases)
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
})
i, v, recvOK := reflect.Select(cases)
// 无论是收到 ctx.Done(),还是各 service 的 done channel,都应该停止所有服务。
for _, service := range services {
_ = service.Shutdown()
}
if i == ctxDoneChanIndex {
return nil
}
if !recvOK {
return fmt.Errorf("done channel of service %s is closed unexpectedly", services[i].Name())
}
err, ok := v.Interface().(error)
if !ok {
return fmt.Errorf("service %s exited unexpectedly", services[i].Name())
}
return fmt.Errorf("service %s exited unexpectedly: %w", services[i].Name(), err)
}
HTTP Server 的实现
package api
import (
"context"
"errors"
"git.garena.com/shopee/chatbot/data-sync/common/infras"
"git.garena.com/shopee/chatbot/data-sync/common/repo"
"git.garena.com/shopee/chatbot/data-sync/helper/ginny"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
"go.uber.org/dig"
"net/http"
"os"
"time"
)
type Service struct {
server *http.Server
}
func NewService(modules Modules) *Service {
if os.Getenv("env") == "live" {
gin.SetMode(gin.ReleaseMode)
}
binding.EnableDecoderUseNumber = true
r := gin.New()
r.Use(gin.Recovery())
api := r.Group("/v1")
// 在这里注册各模块
modules.Ping.RegisterAPIs(api)
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
return &Service{
server: &http.Server{Addr: ":" + port, Handler: r},
}
}
func (s *Service) Name() string {
return "API"
}
func (s *Service) Start(done chan error) {
go func() {
defer close(done)
if err := s.server.ListenAndServe(); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
// http.ErrServerClosed 表示外部显式地关闭了 server,不该被认为是错误。
done <- nil
} else {
done <- err
}
return
}
done <- nil
}()
}
func (s *Service) Shutdown() error {
// 给 gin 5 秒钟来优雅结束
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return s.server.Shutdown(ctx)
}
定时任务的实现
type Service struct {
db *gorm.DB
config *common.ProjectConfig
ctx context.Context
cancel context.CancelFunc
}
func NewService(db *gorm.DB, config *common.ProjectConfig) *Service {
ctx, cancel := context.WithCancel(context.Background())
return &Service{db: db, config: config, ctx: ctx, cancel: cancel}
}
// Inject 会将本 service 依赖的模块注入进来。调用方需要保证对于同一个 container,只调用一次 Inject。
func Inject(c *dig.Container) {
// 本 service 不需要 infras.InitContainer() 以外的依赖,刻意留空
}
func (s *Service) Name() string {
return "schedule"
}
func (s *Service) Start(done chan error) {
go func() {
defer close(done)
// TODO: 按 master 个数和预期的扫描间隔来设置 ticker
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Schedule() 的实现省略了,与主题无关
_ = s.Schedule()
case <-s.ctx.Done():
done <- nil
return
}
}
}()
}
func (s *Service) Shutdown() error {
s.cancel()
return nil
}
搭配优雅结束
ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
err = infras.RunServices(ctx, services)