-
Notifications
You must be signed in to change notification settings - Fork 0
/
occupancy_collector
executable file
·85 lines (74 loc) · 2.87 KB
/
occupancy_collector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/ruby
# collect data coming in via HTTP requests and put it on the occupancy queue
require 'sinatra'
require 'inifile'
require 'json'
require 'bunny'
set :port, 8081
# make sure log messages hit the log immediately
$stdout.sync = true
cfg = IniFile.load(File.expand_path("~/.labamqprc"))
occupancy_cfg = IniFile.load(File.expand_path("~/.occupancyrc"))
amqp_host = cfg['broker']['BrokerHost']
amqp_user = cfg['occupancy_collector']['BrokerUsername']
amqp_pass = cfg['occupancy_collector']['BrokerPassword']
sensors = occupancy_cfg['sensors']
aggregates = occupancy_cfg['aggregates']
occupancy_status = Hash.new
last_detected = Hash.new
amqp = Bunny.new :host => amqp_host, :user => amqp_user, :password => amqp_pass, :threaded => false
amqp.start
channel = amqp.create_channel
exchange = channel.fanout("statistics.occupancy")
get '/occupancysense' do
sensor_id = params[:id]
status = params[:status] == '0'
if(!params.has_key?('id') || !params.has_key?('status')) then
puts "Invalid request, missing parameters."
return "Missing required parameters."
end
if(!sensors.has_key?(sensor_id)) then
puts "Unknown sensor ID #{sensor_id}."
return "Unknown sensor ID #{sensor_id}."
end
sensor_name = sensors[sensor_id]
retval = update_sensor_status sensor_name, status, occupancy_status, last_detected, exchange
process_aggregates aggregates, occupancy_status, last_detected, exchange
retval
end
get '/' do
response = "<h1>Lab Occupancy Status as of #{Time.now}</h1><ul>"
occupancy_status.each do |area, occupied|
response += "<li>#{area}: #{occupied}</li>"
end
response += "</ul>"
response
end
def update_sensor_status(sensor_name, status, occupancy_status, last_detected, exchange)
status_word = status ? "occupied" : "unoccupied"
change = false
change = true if occupancy_status[sensor_name] != status_word
occupancy_status[sensor_name] = status_word
last_detected[sensor_name] = Time.at(0) if !last_detected.has_key? sensor_name
last_detected[sensor_name] = Time.now if status
msg = Hash.new
msg[:area] = sensor_name
msg[:occupied] = status
msg[:change] = change
msg[:last_detected] = ((Time.now - last_detected[sensor_name]) / 60).round
exchange.publish msg.to_json
puts msg.to_json
puts "Occupancy state of #{sensor_name} changed to #{status_word}." if change
"Occupancy data '#{status_word}' recorded for sensor #{sensor_name}."
end
# run through all the aggregates we have defined and update any requiring updating
def process_aggregates(aggregates, occupancy_status, last_detected, exchange)
aggregates.each do |name, agg_sensors|
occupied = false
sensors_included = agg_sensors.split(",")
sensors_included.each do |sensor|
occupied = true if occupancy_status[sensor] && occupancy_status[sensor] == 'occupied'
end
update_sensor_status name, occupied, occupancy_status, last_detected, exchange
end
end