publish pwr messages in JSON/REST style

This commit is contained in:
Bart Van Der Meerssche 2010-04-20 15:38:53 +02:00
parent 5f516e2fed
commit 6667727b13
2 changed files with 91 additions and 13 deletions

View file

@ -1,6 +1,7 @@
-- --
-- data.lua: property and methods for manipulating incoming measurements -- data.lua: property and methods for manipulating incoming measurements
-- Copyright (c) 2009 jokamajo.org -- Copyright (c) 2009 jokamajo.org
-- 2010 flukso.net
-- --
-- This program is free software; you can redistribute it and/or -- This program is free software; you can redistribute it and/or
-- modify it under the terms of the GNU General Public License -- modify it under the terms of the GNU General Public License
@ -45,10 +46,7 @@ end
function filter(M, span, offset) function filter(M, span, offset)
for meter, T in pairs(M) do for meter, T in pairs(M) do
local H = {} -- helper table, an indexed array containing all the measurement's timestamps local H = timestamps(T)
for timestamp in pairs(T) do H[#H+1] = timestamp end
table.sort(H) -- sort in ascending order, oldest timestamps will be treated first
local i = 2 local i = 2
while not (H[i+1] == nil or H[i] > os.time()-offset) do while not (H[i+1] == nil or H[i] > os.time()-offset) do
if math.floor(H[i-1]/span) == math.floor(H[i]/span) and math.floor(H[i]/span) == math.floor(H[i+1]/span) then if math.floor(H[i-1]/span) == math.floor(H[i]/span) and math.floor(H[i]/span) == math.floor(H[i+1]/span) then
@ -60,3 +58,41 @@ function filter(M, span, offset)
end end
end end
end end
function truncate(M, cutoff)
for meter, T in pairs(M) do
local H = timestamps(T)
for i = H[1], H[#H]-60 do
T[i] = nil
end
end
end
function fill(M)
for meter, T in pairs(M) do
local H = timestamps(T)
for i = H[1]+1, H[#H]-1 do
if T[i] == nil then T[i] = T[i-1] end
end
end
end
function json_encode(M)
J = {}
for meter, T in pairs(M) do
J[meter] = '['
local H = timestamps(T)
for i = H[1], H[#H] do
J[meter] = J[meter] .. '[' .. T[i] .. ']'
end
J[meter] = J[meter] .. ']'
end
return J
end
local function timestamps(T)
local H = {} -- helper table, an indexed array containing all the measurement's timestamps
for timestamp in pairs(T) do H[#H+1] = timestamp end
table.sort(H) -- sort in ascending order, oldest timestamps will be treated first
return H
end

View file

@ -35,10 +35,12 @@ local param = {xmlrpcaddress = 'http://logger.flukso.net/xmlrpc',
pwraddress = '255.255.255.255', pwraddress = '255.255.255.255',
pwrport = 26488, pwrport = 26488,
pwrenable = false, pwrenable = false,
pwrinterval = 1,
pwrdir = '/tmp/sensor',
device = '/dev/ttyS0', device = '/dev/ttyS0',
interval = 300} interval = 300}
function receive(child, device, pwraddress, pwrport, pwrenable) function dispatch(e_child, p_child, device, pwraddress, pwrport, pwrenable)
return coroutine.create(function() return coroutine.create(function()
-- open the connection to the syslog deamon, specifying our identity -- open the connection to the syslog deamon, specifying our identity
posix.openlog('flukso') posix.openlog('flukso')
@ -60,11 +62,15 @@ function receive(child, device, pwraddress, pwrport, pwrenable)
os.execute('gpioctl set 4 > /dev/null') os.execute('gpioctl set 4 > /dev/null')
local meter, value = line:sub(5, 36), tonumber(line:sub(38)) local meter, value = line:sub(5, 36), tonumber(line:sub(38))
coroutine.resume(child, meter, os.time(), value) coroutine.resume(e_child, meter, os.time(), value)
elseif line:sub(1, 3) == 'pwr' and line:len() == 47 and line:find(':') == 37 then -- user data + additional data integrity checks elseif line:sub(1, 3) == 'pwr' and line:len() == 47 and line:find(':') == 37 then -- user data + additional data integrity checks
if pwrenable then udp:send(line) end local meter, value = line:sub(5, 36), tonumber(line:sub(38))
if pwrenable then coroutine.resume(p_child, meter, os.time(), value) end
elseif line:sub(1, 3) == 'msg' then -- control data elseif line:sub(1, 3) == 'msg' then -- control data
posix.syslog(31, 'received message from '..device..': '..line:sub(5)) posix.syslog(31, 'received message from '..device..': '..line:sub(5))
else else
posix.syslog(27, 'input error on '..device..': '..line) posix.syslog(27, 'input error on '..device..': '..line)
end end
@ -142,24 +148,51 @@ function gc(child)
end) end)
end end
function debug() function polish(child, cutoff)
return coroutine.create(function(measurements) return coroutine.create(function(measurements)
while true do while true do
dbg.vardump(measurements) measurements:fill()
measurements:truncate(cutoff)
coroutine.resume(child, measurements)
measurements = coroutine.yield() measurements = coroutine.yield()
end end
end) end)
end end
-- receive: listen to the serial port for incoming pulses function publish(child, dir)
return coroutine.create(function(measurements)
os.execute('mkdir -p ' .. dir .. ' > /dev/null')
while true do
local measurements_json = measurements:json_encode()
for meter, json in measurements_json do
io.output(dir .. '/' .. meter)
io.write(json)
io.close()
end
coroutine.resume(child, measurements)
measurements = coroutine.yield()
end
end)
end
function debug(child)
return coroutine.create(function(measurements)
while true do
dbg.vardump(measurements)
if child then coroutine.resume(child, measurements) end
measurements = coroutine.yield()
end
end)
end
-- dispatch: listen to the serial port for incoming pulses
-- buffer: buffer the pulses in a measurement object -- buffer: buffer the pulses in a measurement object
-- filter: sweep recursively to filter all redundant entries -- filter: sweep recursively to filter all redundant entries
-- send: report the measurements to the server via xmlrpc -- send: report the measurements to the server via xmlrpc
-- gc: perform a full garbage collection cycle -- gc: perform a full garbage collection cycle
-- debug: dump measurements table to stdout -- debug: dump measurements table to stdout
local chain = receive( local e_chain = buffer(
buffer(
filter( filter(
filter( filter(
filter( filter(
@ -172,6 +205,15 @@ local chain = receive(
, 900, 7200) , 900, 7200)
, 60, 0) , 60, 0)
, param.interval) , param.interval)
, param.device, param.pwraddress, param.pwrport, param.pwrenable)
local p_chain = buffer(
polish(
publish(
debug()
, param.pwrdir)
, 60)
, param.pwrinterval)
local chain = dispatch(e_chain, p_chain, param.device, param.pwraddress, param.pwrport, param.pwrenable)
coroutine.resume(chain) coroutine.resume(chain)