-
Notifications
You must be signed in to change notification settings - Fork 40
/
gateway.go
116 lines (95 loc) · 2.87 KB
/
gateway.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package gateway
import (
"context"
"fmt"
"net/http"
"sync"
"sync/atomic"
"github.com/smallnest/rpcx/client"
)
// ServiceHandler converts http.Request into rpcx.Request and send it to rpcx service,
// and then converts the result and writes it into http.Response.
// You should get the http.Request and servicePath in your web handler.
type ServiceHandler func(*http.Request, string) (map[string]string, []byte, error)
// HTTPServer is a golang web interface。
// You can use echo, gin, iris or other go web frameworks to implement it.
// You must wrap ServiceHandler into your handler of your selected web framework and add it into router.
type HTTPServer interface {
RegisterHandler(base string, handler ServiceHandler)
Serve() error
}
// Gateway is a rpcx gateway which can convert http invoke into rpcx invoke.
type Gateway struct {
base string
httpserver HTTPServer
serviceDiscovery client.ServiceDiscovery
FailMode client.FailMode
SelectMode client.SelectMode
Option client.Option
mu sync.RWMutex
xclients map[string]client.XClient
seq uint64
}
// NewGateway returns a new gateway.
func NewGateway(base string, hs HTTPServer, sd client.ServiceDiscovery, failMode client.FailMode, selectMode client.SelectMode, option client.Option) *Gateway {
// base is empty or like /abc/
if base == "" {
base = "/"
}
if base[0] != '/' {
base = "/" + base
}
g := &Gateway{
base: base,
httpserver: hs,
serviceDiscovery: sd,
FailMode: failMode,
SelectMode: selectMode,
Option: option,
xclients: make(map[string]client.XClient),
}
hs.RegisterHandler(base, g.handler)
return g
}
// Serve listens on the TCP network address addr and then calls
// Serve with handler to handle requests on incoming connections.
// Accepted connections are configured to enable TCP keep-alives.
func (g *Gateway) Serve() error {
return g.httpserver.Serve()
}
func (g *Gateway) handler(r *http.Request, servicePath string) (meta map[string]string, payload []byte, err error) {
req, err := HttpRequest2RpcxRequest(r)
if err != nil {
return nil, nil, err
}
seq := atomic.AddUint64(&g.seq, 1)
req.SetSeq(seq)
var xc client.XClient
g.mu.Lock()
xc, err = getXClient(g, servicePath)
g.mu.Unlock()
if err != nil {
return nil, nil, err
}
return xc.SendRaw(context.Background(), req)
}
func getXClient(g *Gateway, servicePath string) (xc client.XClient, err error) {
defer func() {
if e := recover(); e != nil {
if ee, ok := e.(error); ok {
err = ee
return
}
err = fmt.Errorf("failed to get xclient: %v", e)
}
}()
if g.xclients[servicePath] == nil {
d, err := g.serviceDiscovery.Clone(servicePath)
if err != nil {
return nil, err
}
g.xclients[servicePath] = client.NewXClient(servicePath, g.FailMode, g.SelectMode, d, g.Option)
}
xc = g.xclients[servicePath]
return xc, err
}