Snippets: Golang: Run Multiple Services

 5th May 2022 at 3:32pm

在一个 Go 程序中,有时候我们需要起多个调用逻辑上没有关联的「服务」。比如起一个 HTTP server 的同时、起一个 ticker 用来做定时任务,同时期望在进程收到 SIGTERM 时,可以优雅地关闭掉全部这些服务,再退出进程。

下面的这套代码即用来处理这种场景。

框架

package infras

import (
	"context"
	"fmt"
	"reflect"
)

// Service 抽象了一个需要长期运行的服务。
// Service 不一定需要对外提供服务,它仅需要是一个需要有长时间生命周期的对象即可,比如:
// - 监听某个端口提供服务的
// - 根据调度执行任务的
// - 定期做任务的
type Service interface {
	// Start 异步地启动 service。
	// 在 service 停止时应该向 done channel 发送消息。
	// 如果是出错而停止,发送相应的 error;如果是正常停止,或被外部调用 Shutdown 而停止,应该发送 nil。
	//
	// Start 的正确实现是重要的:
	// 在 service 停止时(无论正常异常),都必须向 done 写入仅 1 次数据,并关闭该 channel
	//
	// 设计细节:使用 chan error 而不是 context,是因为 context 无法方便地表达自定义的停止原因。
	Start(done chan error)

	// Shutdown 同步(blocking)地停止 service。
	// 多次调用时,只有第一个起作用,后续调用不应报错。
	Shutdown() error

	// Name 主要用于在错误信息和日志中来表示该 service。
	Name() string
}

// RunServices 将一批服务(services)启动(通过 Start 函数),并阻塞地等待:
// - 任一 service 退出(在 done chan 收到消息):会停止所有 service 并返回
// - 收到 ctx.Done():会停止所有 service 并返回
//
// RunServices 的使用场景是:有一批互相无关联的 service 需要长期同时运行,并且
// 这些 service 在正常情况下,仅在外部(ctx)要求时才应该被停止。
func RunServices(ctx context.Context, services []Service) error {
	var cases []reflect.SelectCase
	var chs []chan error
	for i := 0; i < len(services); i++ {
		chs = append(chs, make(chan error, 1))
		cases = append(cases, reflect.SelectCase{
			Dir:  reflect.SelectRecv,
			Chan: reflect.ValueOf(chs[i]),
		})
	}

	for i, srv := range services {
		srv.Start(chs[i])
	}

	ctxDoneChanIndex := len(cases)
	cases = append(cases, reflect.SelectCase{
		Dir:  reflect.SelectRecv,
		Chan: reflect.ValueOf(ctx.Done()),
	})

	i, v, recvOK := reflect.Select(cases)

	// 无论是收到 ctx.Done(),还是各 service 的 done channel,都应该停止所有服务。
	for _, service := range services {
		_ = service.Shutdown()
	}

	if i == ctxDoneChanIndex {
		return nil
	}

	if !recvOK {
		return fmt.Errorf("done channel of service %s is closed unexpectedly", services[i].Name())
	}

	err, ok := v.Interface().(error)
	if !ok {
		return fmt.Errorf("service %s exited unexpectedly", services[i].Name())
	}
	return fmt.Errorf("service %s exited unexpectedly: %w", services[i].Name(), err)
}

HTTP Server 的实现

package api

import (
	"context"
	"errors"
	"git.garena.com/shopee/chatbot/data-sync/common/infras"
	"git.garena.com/shopee/chatbot/data-sync/common/repo"
	"git.garena.com/shopee/chatbot/data-sync/helper/ginny"
	"github.com/gin-gonic/gin"
	"github.com/gin-gonic/gin/binding"
	"go.uber.org/dig"
	"net/http"
	"os"
	"time"
)

type Service struct {
	server *http.Server
}

func NewService(modules Modules) *Service {
	if os.Getenv("env") == "live" {
		gin.SetMode(gin.ReleaseMode)
	}
	binding.EnableDecoderUseNumber = true

	r := gin.New()
	r.Use(gin.Recovery())
	api := r.Group("/v1")

	// 在这里注册各模块
	modules.Ping.RegisterAPIs(api)

	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
	}
	return &Service{
		server: &http.Server{Addr: ":" + port, Handler: r},
	}
}

func (s *Service) Name() string {
	return "API"
}

func (s *Service) Start(done chan error) {
	go func() {
		defer close(done)

		if err := s.server.ListenAndServe(); err != nil {
			if !errors.Is(err, http.ErrServerClosed) {
				// http.ErrServerClosed 表示外部显式地关闭了 server,不该被认为是错误。
				done <- nil
			} else {
				done <- err
			}
			return
		}
		done <- nil
	}()
}

func (s *Service) Shutdown() error {
	// 给 gin 5 秒钟来优雅结束
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	return s.server.Shutdown(ctx)
}

定时任务的实现

type Service struct {
	db     *gorm.DB
	config *common.ProjectConfig

	ctx    context.Context
	cancel context.CancelFunc
}

func NewService(db *gorm.DB, config *common.ProjectConfig) *Service {
	ctx, cancel := context.WithCancel(context.Background())
	return &Service{db: db, config: config, ctx: ctx, cancel: cancel}
}

// Inject 会将本 service 依赖的模块注入进来。调用方需要保证对于同一个 container,只调用一次 Inject。
func Inject(c *dig.Container) {
	// 本 service 不需要 infras.InitContainer() 以外的依赖,刻意留空
}

func (s *Service) Name() string {
	return "schedule"
}

func (s *Service) Start(done chan error) {
	go func() {
		defer close(done)

		// TODO: 按 master 个数和预期的扫描间隔来设置 ticker
		ticker := time.NewTicker(3 * time.Second)
		defer ticker.Stop()

		for {
			select {
			case <-ticker.C:
				// Schedule() 的实现省略了,与主题无关
				_ = s.Schedule()
			case <-s.ctx.Done():
				done <- nil
				return
			}
		}
	}()
}

func (s *Service) Shutdown() error {
	s.cancel()
	return nil
}

搭配优雅结束

ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
err = infras.RunServices(ctx, services)