-
Notifications
You must be signed in to change notification settings - Fork 6
/
mqttTOmysql.js
108 lines (90 loc) · 2.95 KB
/
mqttTOmysql.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
var mqtt = require('mqtt'); //https://www.npmjs.com/package/mqtt
var Topic = '#'; //subscribe to all topics
var Broker_URL = 'mqtt://MQTT_BROKER_URL';
var Database_URL = 'Database_URL';
var options = {
clientId: 'MyMQTT',
port: 1883,
//username: 'mqtt_user',
//password: 'mqtt_password',
keepalive : 60
};
var client = mqtt.connect(Broker_URL, options);
client.on('connect', mqtt_connect);
client.on('reconnect', mqtt_reconnect);
client.on('message', mqtt_messsageReceived);
client.on('close', mqtt_close);
function mqtt_connect() {
console.log("Connecting MQTT");
client.subscribe(Topic, mqtt_subscribe);
};
function mqtt_subscribe(err, granted) {
console.log("Subscribed to " + Topic);
if (err) {console.log(err);}
};
function mqtt_reconnect(err) {
console.log("Reconnect MQTT");
if (err) {console.log(err);}
client = mqtt.connect(Broker_URL, options);
};
function after_publish() {
//do nothing
};
//receive a message from MQTT broker
function mqtt_messsageReceived(topic, message, packet) {
var message_str = message.toString(); //convert byte array to string
console.log("message to string", message_str);
message_str = message_str.replace(/\n$/, ''); //remove new line
//message_str = message_str.toString().split("|");
console.log("message to params array",message_str);
//payload syntax: clientID,topic,message
if (message_str.length == 0) {
console.log("Invalid payload");
} else {
insert_message(topic, message_str, packet);
//console.log(message_arr);
}
};
function mqtt_close() {
//console.log("Close MQTT");
};
////////////////////////////////////////////////////
///////////////////// MYSQL ////////////////////////
////////////////////////////////////////////////////
var mysql = require('mysql'); //https://www.npmjs.com/package/mysql
//Create Connection
var connection = mysql.createConnection({
host: Database_URL,
user: "newuser",
password: "mypassword",
database: "mydb"
});
connection.connect(function(err) {
if (err) throw err;
//console.log("Database Connected!");
});
//insert a row into the tbl_messages table
function insert_message(topic, message_str, packet) {
var message_arr = extract_string(message_str); //split a string into an array
var clientID= message_arr[0];
var message = message_arr[1];
var date= new Date();
var sql = "INSERT INTO ?? (??,??,??,??) VALUES (?,?,?,?)";
var params = ['tbl_messages', 'clientID', 'topic', 'message','date', clientID, topic, message, date];
sql = mysql.format(sql, params);
connection.query(sql, function (error, results) {
if (error) throw error;
console.log("Message added: " + message_str);
});
};
//split a string into an array of substrings
function extract_string(message_str) {
var message_arr = message_str.split(","); //convert to array
return message_arr;
};
//count number of delimiters in a string
var delimiter = ",";
function countInstances(message_str) {
var substrings = message_str.split(delimiter);
return substrings.length - 1;
};