Google分布式框架Weaver(四):多进程部署原理

到上一小节,我们已经学会了如何去使用weaver进行项目开发,相信很多人对weaver的原理很感兴趣,想了解weaver内部到底是如何实现的。 这一节,我将介绍weaver在多进程部署中,组件之间的通信过程。

codegen

在看源码之前,我们可以先阅读weaver生成的代码。

func init() {
	codegen.Register(codegen.Registration{
		Name:        "github.com\\lemon-1997\\weaver\\service\\product\\T",
		Iface:       reflect.TypeOf((*T)(nil)).Elem(),
		New:         func() any { return &impl{} },
		ConfigFn:    func(i any) any { return i.(*impl).WithConfig.Config() },
		LocalStubFn: func(impl any, tracer trace.Tracer) any { return t_local_stub{impl: impl.(T), tracer: tracer} },
		ClientStubFn: func(stub codegen.Stub, caller string) any {
			return t_client_stub{stub: stub, listMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com\\lemon-1997\\weaver\\service\\product\\T", Method: "List"}), createMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com\\lemon-1997\\weaver\\service\\product\\T", Method: "Create"}), updateMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com\\lemon-1997\\weaver\\service\\product\\T", Method: "Update"}), deleteMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com\\lemon-1997\\weaver\\service\\product\\T", Method: "Delete"})}
		},
		ServerStubFn: func(impl any, addLoad func(uint64, float64)) codegen.Server {
			return t_server_stub{impl: impl.(T), addLoad: addLoad}
		},
	})
}

这里可以看到codegen会注册我们的组件,比较重要的是这三个函数LocalStubFnClientStubFnServerStubFn

  1. LocalStubFn返回本地调用对象t_local_stubt_local_stub实现了product/T的接口。
  2. ClientStubFn返回RPC客户端t_client_stubt_client_stub实现了product/T的接口。
  3. ServerStubFn返回RPC服务端t_server_stubt_server_stub处理来自t_client_stub的调用。

这里估计大多数应该可以猜到,LocalStubFn是用于单进程部署,而ClientStubFnServerStubFn则会在多进程部署用到。

weavelet

weavelet在weaver中是用来管理组件,每个进程中都会有一个weavelet(通过weaver.Init创建)。

func Init(ctx context.Context) Instance {
	root, err := initInternal(ctx)
	if err != nil {
		fmt.Fprintln(os.Stderr, fmt.Errorf("error initializing Service Weaver: %w", err))
		os.Exit(1)
	}
	return root
}

func initInternal(ctx context.Context) (Instance, error) {
	wlet, err := newWeavelet(ctx, codegen.Registered())
	if err != nil {
		return nil, fmt.Errorf("internal error creating weavelet: %w", err)
	}

	return wlet.start()
}

weavelet初始化时会拿到组件注册的信息(codegen.Registered),注册服务(前面生成代码提到的PRC服务端)。

func (w *weavelet) start() (Instance, error) {
	...
	handlers := &call.HandlerMap{}
	for _, c := range w.componentsByName {
		w.addHandlers(handlers, c)
	}
	...
}

envelope

envelope运行在部署进程中,能够和weavelet进行通讯,双方是利用管道发送消息,有两条管道,一个是weavelet=>envelope,另一个是envelope=>weaveletenvelope的主要作用是检查weavelet的运行状态,通知订阅weavelet组件的路由信息,处理来自weavelet的消息,如创建新的组件,http代理等等。

type EnvelopeHandler interface {
	// StartComponent starts the given component.
	StartComponent(entry *protos.ComponentToStart) error

	// GetAddress gets the address a weavelet should listen on for a listener.
	GetAddress(req *protos.GetAddressRequest) (*protos.GetAddressReply, error)

	// ExportListener exports the given listener.
	ExportListener(req *protos.ExportListenerRequest) (*protos.ExportListenerReply, error)

	// RecvLogEntry enables the envelope to receive a log entry.
	RecvLogEntry(entry *protos.LogEntry)

	// RecvTraceSpans enables the envelope to receive a sequence of trace spans.
	RecvTraceSpans(spans []trace.ReadOnlySpan) error
}

babysitter

babysitter运行在部署进程上,管理了所有envelope,是weaver中的大脑。 当我们运行命令weave mulit deploy时,会创建babysitterbabysitter会固定创建出两个main组件。 这两个main组件运行在不同进程,执行配置文件指定的binary文件。一般来说,我们会指定http服务的端口号。

func (s *Server) Run(addr string) error {
	lis, err := s.root.Listener("lemon", weaver.ListenerOptions{LocalAddress: addr})
	if err != nil {
		return err
	}
	s.root.Logger().Debug("listener available", "addr", lis)
	return http.Serve(lis, otelhttp.NewHandler(http.DefaultServeMux, "http"))
}

weaver为了防止端口被占用,实际上两个main进程绑定的都是随机的端口,通过weavelet调用ExportListener发送到envelope,由babysitter处理代理的逻辑

func (b *Babysitter) ExportListener(req *protos.ExportListenerRequest) (*protos.ExportListenerReply, error) {
	if p, ok := b.proxies[req.Listener.Name]; ok {
		p.proxy.AddBackend(req.Listener.Addr)
		return &protos.ExportListenerReply{ProxyAddress: p.addr}, nil
	}

	lis, err := net.Listen("tcp", req.LocalAddress)
	if errors.Is(err, syscall.EADDRINUSE) {
		// Don't retry if this address is already in use.
		return &protos.ExportListenerReply{Error: err.Error()}, nil
	}
	if err != nil {
		return nil, fmt.Errorf("proxy listen: %w", err)
	}
	addr := lis.Addr().String()
	b.logger.Info("Proxy listening", "address", addr)
	proxy := proxy.NewProxy(b.logger)
	proxy.AddBackend(req.Listener.Addr)
	b.proxies[req.Listener.Name] = &proxyInfo{
		listener: req.Listener.Name,
		proxy:    proxy,
		addr:     addr,
	}
	go func() {
		if err := serveHTTP(b.ctx, lis, proxy); err != nil {
			b.logger.Error("proxy", err)
		}
	}()
	return &protos.ExportListenerReply{ProxyAddress: addr}, nil
}

main程序执行过程中,程序会调用到weaver.Get来获取组件。

func Get[T any](requester Instance) (T, error) {
	var zero T
	iface := reflect.TypeOf(&zero).Elem()
	rep := requester.rep()
	component, err := rep.wlet.getComponentByType(iface)
	if err != nil {
		return zero, err
	}
	result, err := rep.wlet.getInstance(component, rep.info.Name)
	if err != nil {
		return zero, err
	}
	return result.(T), nil
}

在第一次获取组件的过程中需要初始化,main进程调用RegisterComponentToStart中向babysitter发送需要初始化的组件, babysitter收到请求后,会创建两个新的子进程,子进程创建后weavelet会把自己组件的tcp地址发送回babysitterbabysitter会把路由信息发送给订阅的weavelet组件。

func (h *handler) StartComponent(req *protos.ComponentToStart) error {
	if err := h.subscribeTo(req); err != nil {
		return err
	}
	return h.startComponent(req)
}

这样一来,当进程调用组件的方法时,就能拿到组件提供RPC服务的地址,完成组件方法的调用,最终的程序多进程部署后会像这样。

$ weaver multi status
╭────────────────────────────────────────────────────╮
│ DEPLOYMENTS                                        │
├───────┬──────────────────────────────────────┬─────┤
│ APP   │ DEPLOYMENT                           │ AGE │
├───────┼──────────────────────────────────────┼─────┤
│ lemon │ f74f7512-a8ff-48e2-bdce-8a1a3dd4c640 │ 22s │
╰───────┴──────────────────────────────────────┴─────╯
╭──────────────────────────────────────────────────────────────────────────╮
│ COMPONENTS                                                               │
├───────┬────────────┬──────────────────────────────────────┬──────────────┤
│ APP   │ DEPLOYMENT │ COMPONENT                            │ REPLICA PIDS │
├───────┼────────────┼──────────────────────────────────────┼──────────────┤
│ lemon │ f74f7512   │ lemon\service\category\T             │ 10272, 15264 │
│ lemon │ f74f7512   │ lemon\service\category\categoryCache │ 4692, 15116  │
│ lemon │ f74f7512   │ lemon\service\product\T              │ 11788, 13260 │
│ lemon │ f74f7512   │ main                                 │ 4508, 13236  │
╰───────┴────────────┴──────────────────────────────────────┴──────────────╯
╭─────────────────────────────────────────────────╮
│ LISTENERS                                       │
├───────┬────────────┬──────────┬─────────────────┤
│ APP   │ DEPLOYMENT │ LISTENER │ ADDRESS         │
├───────┼────────────┼──────────┼─────────────────┤
│ lemon │ f74f7512   │ lemon    │ 127.0.0.1:12345 │
╰───────┴────────────┴──────────┴─────────────────╯

每个组件都运行在两个不同的进程,main处理http服务,其他组件各自处理自己的服务,这8个进程都是部署进程的子进程,通过管道进行通信,同步组件服务的路由, main最早被初始化,后续通过weaver.Get不断创建新的组件进程。

小结

讲的比较简单,只是个大概,有兴趣的可以去看github上的源码,比较有意思,我在看源码的过程中也修了两个Windows上的兼容bug,算是有所收获。