动态gRPC-HTTP代理(五):代理

代理模块作为最外层的关键组件,统一处理外部HTTP请求、调用底层模块进行gRPC转换,并返回HTTP响应。本文将详细介绍代理模块的设计理念、核心功能和实现细节,以构建一个高效、稳定、可扩展的代理服务。

实现

我们的代理模块最终会返回go标准包http.HandlerFunc,并且用option设计提供一系列自定义配置和操作。 接下里,我将一步步实现我们的代理模块。

提取grpc服务

当有请求过来时,我们需要请求对应的grpc服务是哪个,因此我们在最前面得先提取出来。

func (p *Proxy) Handler() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		ctx, cancel := context.WithTimeout(r.Context(), p.opts.timeout)
		defer cancel()

		target, path := p.opts.pathExtract(r.URL.Path)
		if target == "" || path == "" {
			p.opts.log.Warn("path not found", "path", r.URL.Path)
			w.WriteHeader(http.StatusNotFound)
			return
		}
		...

默认我们可以以路径的第一个参数作为我们服务的taget,并自动转发到target:50051的服务上,后续为其proto定义的路径。

// DefaultPathExtract 格式:/target/route*
func DefaultPathExtract(path string) (string, string) {
	parts := strings.Split(path, "/")
	if len(parts) < 3 {
		return "", ""
	}
	target := fmt.Sprintf("%s:50051", parts[1])
	route := strings.TrimPrefix(path, fmt.Sprintf("/%s", parts[1]))
	return target, route
}

获取grpc客户端

知道了grpc服务后,我们就可以初始化我们的grpc客户端。

func (p *Proxy) Handler() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		...

		client, err := p.Client(ctx, target)
		if err != nil {
			p.opts.log.Warn("target not found", "target", target)
			w.WriteHeader(http.StatusNotFound)
			return
		}
		
		...

这里我们可以用sync.Map存储之前已经初始化好的,不需要每次http调用我们都去创建一次grpc连接。

func (p *Proxy) Client(ctx context.Context, target string) (*ReflectClient, error) {
	client, ok := p.srv.Load(target)
	if ok {
		return client.(*ReflectClient), nil
	}
	c, err := NewReflectClient(ctx, target, p.opts.log, p.opts.grpcOpts)
	if err != nil {
		return nil, err
	}
	p.srv.Store(target, c)
	return c, nil
}

获取grpc方法

初始化好后,就通过路径匹配proto声明的方法

func (p *Proxy) Handler() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		...

		md, params := client.MethodParams(r.Method, path)
		if md == nil {
			p.opts.log.Warn("path not found", "path", r.URL.Path)
			w.WriteHeader(http.StatusNotFound)
			return
		}
		
		...

请求转换

确定了服务的方法后,我们就需要将HTTP请求参数转换成proto二进制数据

func (p *Proxy) Handler() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		...
		
		msg := dynamic.NewMessage(md.GetInputType())
		if err = RequestEncode(r, msg, params); err != nil {
			p.opts.log.Error("request encode", "err", err)
			p.opts.errDecoder(w, err)
			return
		}
		
		...

这里用到我们之前实现的codec实现

func CodecForRequest(r *http.Request, name string) encoding.Codec {
	for _, accept := range r.Header[name] {
		codec := encoding.CodecBySubtype(contentSubtype(accept))
		if codec != nil {
			return codec
		}
	}
	return encoding.CodecBySubtype(encoding.JsonSubType)
}

func RequestEncode(req *http.Request, msg *dynamic.Message, pathParams map[string]string) error {
	switch req.Method {
	case http.MethodGet, http.MethodDelete:
		return QueryEncode(req, msg, pathParams)
	case http.MethodPost, http.MethodPut, http.MethodPatch:
		return BodyEncode(req, msg, pathParams)
	}
	return nil
}

func QueryEncode(req *http.Request, msg *dynamic.Message, pathParams map[string]string) error {
	codec := encoding.CodecBySubtype(encoding.FormSubType)
	if err := codec.Unmarshal([]byte(req.URL.RawQuery), pathParams, msg); err != nil {
		return fmt.Errorf("codec unmarshal error: %v", err)
	}
	return nil
}

func BodyEncode(req *http.Request, msg *dynamic.Message, pathParams map[string]string) error {
	codec := CodecForRequest(req, "Content-Type")
	data, err := io.ReadAll(req.Body)
	if err != nil {
		return fmt.Errorf("read body error: %v", err)
	}
	defer req.Body.Close()
	if err = codec.Unmarshal(data, pathParams, msg); err != nil {
		return fmt.Errorf("codec unmarshal error: %v", err)
	}
	return nil
}

grpc调用

转换完成后,就可以用反射直接动态调用grpc服务了

func (p *Proxy) Handler() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		...

		ctx = metadata.NewOutgoingContext(ctx, p.metadataFromHeaders(r.Header))
		resp, header, err := client.Invoke(ctx, md, msg)
		if err != nil {
			p.opts.log.Error("client invoke", "err", err)
			p.opts.errDecoder(w, err)
			return
		}
		
		...

要注意将HTTP header转成grpc metadata

func (p *Proxy) metadataFromHeaders(raw map[string][]string) metadata.MD {
	md := make(map[string][]string)
	for k, v := range raw {
		key, ok := p.opts.incomingHeaderMatcher(k)
		if !ok {
			continue
		}
		newKey := strings.ToLower(key)
		// https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L417
		if newKey == "connection" {
			continue
		}
		md[newKey] = v
	}
	return md
}

如果调用有错误,需要处理grpc和HTTP错误码的转换

func DefaultHTTPError(w http.ResponseWriter, err error) {
	grpcStatus := status.Convert(err)
	w.WriteHeader(runtime.HTTPStatusFromCode(grpcStatus.Code()))
	w.Write([]byte(grpcStatus.Message()))
}

响应转换

最后就是将grpc返回转换成json格式返回

func (p *Proxy) Handler() http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		...

		h := p.HeadersFromMetadata(header)
		for k, vs := range h {
			for _, v := range vs {
				w.Header().Add(k, v)
			}
		}

		if err = ResponseDecode(r, w, resp); err != nil {
			p.opts.log.Error("response decode", "err", err)
			p.opts.errDecoder(w, err)
			return
		}

		return
	}
}

还是使用之前实现的codec实现

func ResponseDecode(r *http.Request, w http.ResponseWriter, msg *dynamic.Message) error {
	codec := CodecForRequest(r, "Accept")
	buf, err := codec.Marshal(msg)
	if err != nil {
		w.WriteHeader(http.StatusInternalServerError)
		return fmt.Errorf("failed to marshal output JSON: %v", err)
	}
	b, err := json.Marshal(Response{
		Status: 0,
		Data:   buf,
		Msg:    "ok",
	})
	if err != nil {
		return fmt.Errorf("failed to write response: %v", err)
	}
	w.WriteHeader(http.StatusOK)
	w.Header().Set("Content-Type", "application/"+codec.Subtype())
	if _, err = w.Write(b); err != nil {
		return fmt.Errorf("failed to write response: %v", err)
	}
	return nil
}

配置

为了应对不同使用者的需求,需要提供一些option给使用者,让proxy更加灵活

type proxyOptions struct {
	log                   *slog.Logger
	timeout               time.Duration
	marshaler             *jsonpb.Marshaler
	unmarshaler           *jsonpb.Unmarshaler
	incomingHeaderMatcher runtime.HeaderMatcherFunc
	outgoingHeaderMatcher runtime.HeaderMatcherFunc
	pathExtract           PathExtractFunc
	errDecoder            ErrorDecodeFunc
	grpcOpts              []grpc.DialOption
}

func WithLogger(logger *slog.Logger) ProxyOption {
	return func(o *proxyOptions) {
		o.log = logger
	}
}

func WithMarshaler(m *jsonpb.Marshaler) ProxyOption {
	return func(o *proxyOptions) {
		o.marshaler = m
	}
}

func WithUnmarshaler(m *jsonpb.Unmarshaler) ProxyOption {
	return func(o *proxyOptions) {
		o.unmarshaler = m
	}
}

func WithIncomingHeaderMatcher(f runtime.HeaderMatcherFunc) ProxyOption {
	return func(o *proxyOptions) {
		o.incomingHeaderMatcher = f
	}
}

func WithOutgoingHeaderMatcher(f runtime.HeaderMatcherFunc) ProxyOption {
	return func(o *proxyOptions) {
		o.outgoingHeaderMatcher = f
	}
}

func WithPathExtract(f PathExtractFunc) ProxyOption {
	return func(o *proxyOptions) {
		o.pathExtract = f
	}
}

func WithTimeout(d time.Duration) ProxyOption {
	return func(o *proxyOptions) {
		o.timeout = d
	}
}

func WithErrDecode(f ErrorDecodeFunc) ProxyOption {
	return func(o *proxyOptions) {
		o.errDecoder = f
	}
}

func WithGrpcOpts(opts ...grpc.DialOption) ProxyOption {
	return func(o *proxyOptions) {
		o.grpcOpts = opts
	}
}

使用

只需要几行代码就能轻松将后台的grpc服务以HTTP方式暴露出去

func main() {
	proxy := dynamic_proxy.NewProxy()
	http.HandleFunc("/", proxy.Handler())
	if err := http.ListenAndServe(":8080", nil); err != nil {
		log.Fatalf("failed to http serve: %v", err)
	}
}

假设我后台目前有这样一个proto服务

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply)  {
    option (google.api.http) = {
      get: "/helloworld/{name}"
    };
  }
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

这时候,比如我发送一个请求

http://proxyAddr/grpcAddr/helloworld/dynamic-proxy

proxy服务收到这个请求后,就会转发到grpcAddr这个地址的服务上,并调用SayHello方法后返回。

总结

通过本文的介绍,您将深入了解代理模块在整个系统中的关键作用,以及如何设计和实现一个高效、可靠的代理模块来统一处理外部HTTP请求与gRPC转换。无论您是正在构建自己的代理服务还是对代理模块的设计感兴趣,本文都将为您提供有价值的见解和实用建议。

项目源码 https://github.com/lemon-1997/dynamic-grpc