-
Notifications
You must be signed in to change notification settings - Fork 4
/
http_device.go
142 lines (122 loc) · 3.46 KB
/
http_device.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package iot
import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/go-resty/resty/v2"
"net/http"
"sync"
"time"
)
// 使用HTTP协议的设备,当前使用HTTP协议的设备只支持上报消息和上报属性
type HttpDevice interface {
// 上报消息
SendMessage(message Message) bool
// 上报属性
ReportProperties(properties DeviceProperties) bool
}
type restyHttpDevice struct {
Id string
Password string
Servers string
client *resty.Client
lock sync.RWMutex
accessToken string
}
func (device *restyHttpDevice) SendMessage(message Message) bool {
resp, err := device.client.R().
SetBody(message).
Post(fmt.Sprintf("%s/v5/devices/%s/sys/messages/up", device.Servers, device.Id))
if err != nil {
fmt.Printf("send message failed %s\n", err)
}
return err == nil && resp.StatusCode() == http.StatusOK
}
func (device *restyHttpDevice) ReportProperties(properties DeviceProperties) bool {
response, err := device.client.R().
SetBody(properties).
Post(fmt.Sprintf("%s/v5/devices/%s/sys/properties/report", device.Servers, device.Id))
if err != nil {
fmt.Printf("report properties failed %s\n", err)
}
return err == nil && response.StatusCode() == http.StatusOK
}
func (device *restyHttpDevice) init() {
accessTokenBody := accessTokenRequest{
DeviceId: device.Id,
SignType: 0,
Timestamp: "2019120219",
Password: hmacSha256(device.Password, "2019120219"),
}
response, err := device.client.R().
SetBody(accessTokenBody).
Post(fmt.Sprintf("%s%s", device.Servers, "/v5/device-auth"))
if err != nil {
fmt.Printf("get device access token failed %s\n", err)
return
}
tokenResponse := &accessTokenResponse{}
err = json.Unmarshal(response.Body(), tokenResponse)
if err != nil {
fmt.Printf("json unmarshal failed %v", err)
return
}
device.lock.Lock()
device.accessToken = tokenResponse.AccessToken
device.lock.Unlock()
}
type accessTokenResponse struct {
AccessToken string `json:"access_token"`
}
type accessTokenRequest struct {
DeviceId string `json:"device_id"`
SignType int `json:"sign_type"`
Timestamp string `json:"timestamp"`
Password string `json:"password"`
}
func CreateHttpDevice(config HttpDeviceConfig) HttpDevice {
c := resty.New()
c.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
c.SetTimeout(30 * time.Second)
c.SetRetryCount(3)
c.SetRetryWaitTime(10 * time.Second)
c.AddRetryCondition(func(response *resty.Response, err error) bool {
return response.StatusCode() == http.StatusForbidden
})
connsPerHost := 10
if config.MaxConnsPerHost != 0 {
connsPerHost = config.MaxConnsPerHost
}
c.SetTransport(&http.Transport{
MaxConnsPerHost: connsPerHost,
})
device := &restyHttpDevice{
Id: config.Id,
Password: config.Password,
Servers: config.Server,
client: c,
lock: sync.RWMutex{},
}
device.init()
device.client.OnBeforeRequest(func(client *resty.Client, request *resty.Request) error {
device.lock.RLock()
request.SetHeader("access_token", device.accessToken)
device.lock.RUnlock()
request.SetHeader("Content-Type", "application/json")
return nil
})
device.client.OnAfterResponse(func(client *resty.Client, response *resty.Response) error {
if response.StatusCode() == http.StatusForbidden {
device.init()
}
return nil
})
return device
}
type HttpDeviceConfig struct {
Id string
Password string
Server string // https://iot-mqtts.cn-north-4.myhuaweicloud.com:443
MaxConnsPerHost int
MaxIdleConns int
}