-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
223 lines (188 loc) · 6.33 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package main
import (
"SDCC-A3-Project/sqsManagement"
"SDCC-A3-Project/utilities"
"encoding/json"
"flag"
"fmt"
"github.com/aws/aws-sdk-go/aws/session"
"io/ioutil"
"log"
"net/rpc"
"os"
"time"
)
type Arguments struct {
ID string `json:"user_id"`
SubscribeTopics []string `json:"subscribe_topics"` // need to activate a subscription for these topics
UnsubscribeTopics []string `json:"unsubscribe_topics"` // need to unsubscribe these topics
Actions []Item `json:"actions"` // GET, SEND
}
type Item struct {
Action string `json:"action"`
Topic string `json:"topic"`
Message []string `json:"messages"`
Number int
}
var QueueURL = make(map[string]string)
func main() {
// if the filename is not specified we use "prodA.json" as default
//after build just use $./producer -h to retrieve usage's information
filename := flag.String("json", "jsons/actions.json", "a json file")
serverAddr := flag.String("addr", "localhost", "server ip address")
serverPort := flag.Int("serverPort", utilities.ServerPort, "server port number")
flag.Parse()
var client *rpc.Client
client = connectWithServer(*serverAddr, *serverPort)
arguments := parseJsonFile(*filename)
clientRoutine(client, arguments)
}
func clientRoutine(client *rpc.Client, args Arguments) {
if len(args.ID) <= 0 { //need to do a registration
args.ID = doRegistration(client)
} //otherwise we already have a valid user id
doSubscriptions(client, args)
doActions(client, args)
deleteSubscriptions(client, args)
}
func sendAMessage(URL *string, message *string, userId *string) {
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))
err := sqsManagement.SendMsg(sess, URL, message, userId)
if err != nil {
fmt.Println("Got an error sending the message:")
fmt.Println(err)
return
}
fmt.Println("Sent message to queue ")
}
func getAMessage(URL *string, visibilityTO *int64) bool {
sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))
msgResult, err := sqsManagement.GetMessages(sess, URL, visibilityTO)
if err != nil {
fmt.Println("Got an error receiving messages:")
fmt.Println(err)
return false
}
if len(msgResult.Messages) == 0 {
fmt.Println("no messages available")
return false
}
//otherwise the message return visible after the visibility timeout
err = sqsManagement.DeleteMessage(sess, URL, msgResult.Messages[0].ReceiptHandle)
if err != nil {
fmt.Println("Got an error deleting the message:")
fmt.Println(err)
return false
}
fmt.Println("Message ID: " + *msgResult.Messages[0].MessageId)
fmt.Println("Message Handle: " + *msgResult.Messages[0].ReceiptHandle)
fmt.Println("Message Attributes: ")
for key, element := range msgResult.Messages[0].MessageAttributes {
fmt.Println("Key:", key, "=>", "Element:", element)
}
fmt.Println("Message Body: " + *msgResult.Messages[0].Body)
return true
}
func deleteSubscriptions(client *rpc.Client, args Arguments) {
for i := 0; i < len(args.UnsubscribeTopics); i++ { //iterate over subscription
arg := utilities.RequestArg{ID: args.ID, Tag: args.UnsubscribeTopics[i]}
var reply int
err := client.Call("MessageService.DeleteSubscription", &arg, &reply)
if err != nil {
log.Fatal("error in DeleteSubscription: ", err)
}
//store urls into a map to not ask for a queue reference in a second time
fmt.Printf("exit status: %d\n", reply)
}
}
func doActions(client *rpc.Client, args Arguments) {
for i := 0; i < len(args.Actions); i++ {
current := args.Actions[i]
arg := utilities.RequestArg{ID: args.ID, Tag: current.Topic}
var URL string
// url of the queue retrieval
if _, isPresent := QueueURL[current.Topic]; !isPresent {
err := client.Call("MessageService.GetQueueURL", &arg, &URL)
if err != nil {
log.Fatal("error in GetQueueURL: ", err)
}
QueueURL[current.Topic] = URL
}
fmt.Println(QueueURL[current.Topic])
url := QueueURL[current.Topic]
if current.Action == "SEND" {
for j := 0; j < len(current.Message); j++ {
sendAMessage(&url, ¤t.Message[j], &args.ID)
}
} else if current.Action == "GET" {
attempts := utilities.Attempts
for current.Number != 0 && attempts != 0 {
var to int64
to = utilities.VisibilityTimeOut
if getAMessage(&url, &to) {
current.Number--
attempts = utilities.Attempts
continue
}
attempts--
time.Sleep(time.Second * 10)
}
}
}
}
func doSubscriptions(client *rpc.Client, args Arguments) {
for i := 0; i < len(args.SubscribeTopics); i++ { //iterate over subscription
arg := utilities.RequestArg{ID: args.ID, Tag: args.SubscribeTopics[i]}
reply := new(utilities.SubscriptionOutput)
err := client.Call("MessageService.MakeSubscriptionToTopic", &arg, reply)
if err != nil {
log.Fatal("error in MakeSubscriptionToTopic: ", err)
}
//store urls into a map to not ask for a queue reference in a second time
QueueURL[args.SubscribeTopics[i]] = reply.QueueURL
fmt.Println(reply.QueueURL)
}
}
func doRegistration(client *rpc.Client) string {
var replyID string
arg := new(utilities.RequestArg)
//blocking call.. we cannot do anything without a user id
err := client.Call("MessageService.GenerateUserId", arg, &replyID)
if err != nil {
log.Fatal("error in GenerateUserId: ", err)
}
fmt.Println("user ID: " + replyID)
return replyID
}
func connectWithServer(serverAddr string, serverPort int) *rpc.Client {
// Try to connect to localhost:1234 (the serverPort on which RPC server is listening)
serverRefer := fmt.Sprintf("%s:%d", serverAddr, serverPort)
client, err := rpc.Dial("tcp", serverRefer)
if err != nil {
log.Fatal("Error in dialing: ", err)
}
return client
}
func parseJsonFile(filename string) Arguments {
// Open our jsonFile
jsonFile, err := os.Open(filename)
// if we os.Open returns an error then handle it
if err != nil {
fmt.Println(err)
}
fmt.Println("Successfully Opened the file")
// defer the closing of our jsonFile so that we can parse it later on
defer jsonFile.Close()
// read our opened xmlFile as a byte array.
byteValue, _ := ioutil.ReadAll(jsonFile)
// we initialize our Users array
var arguments Arguments
// we unmarshal our byteArray which contains our
// jsonFile's content into 'users' which we defined above
json.Unmarshal(byteValue, &arguments)
return arguments
}