[fluksod] POST measurements to the new RESTful API
This commit is contained in:
parent
d14866cb97
commit
54ab3a3740
2 changed files with 74 additions and 7 deletions
|
@ -53,6 +53,16 @@ function add(M, sensor, timestamp, value)
|
||||||
M[sensor][timestamp] = value
|
M[sensor][timestamp] = value
|
||||||
end
|
end
|
||||||
|
|
||||||
|
function get_sensors(M)
|
||||||
|
local S = {}
|
||||||
|
|
||||||
|
for sensor in pairs(M) do
|
||||||
|
S[#S+1] = sensor
|
||||||
|
end
|
||||||
|
|
||||||
|
return S
|
||||||
|
end
|
||||||
|
|
||||||
function clear(M, sensor)
|
function clear(M, sensor)
|
||||||
if sensor then
|
if sensor then
|
||||||
M[sensor] = nil
|
M[sensor] = nil
|
||||||
|
|
|
@ -26,6 +26,8 @@ local dbg = require 'dbg'
|
||||||
local nixio = require 'nixio'
|
local nixio = require 'nixio'
|
||||||
nixio.fs = require 'nixio.fs'
|
nixio.fs = require 'nixio.fs'
|
||||||
local uci = require 'luci.model.uci'.cursor()
|
local uci = require 'luci.model.uci'.cursor()
|
||||||
|
local httpclient = require 'luci.httpclient'
|
||||||
|
local opkg = require 'luci.model.ipkg'
|
||||||
local data = require 'flukso.data'
|
local data = require 'flukso.data'
|
||||||
|
|
||||||
local arg = arg or {} -- needed when this code is not loaded via the interpreter
|
local arg = arg or {} -- needed when this code is not loaded via the interpreter
|
||||||
|
@ -52,6 +54,7 @@ local WAN_INTERVAL = 300
|
||||||
local LAN_ENABLED = true
|
local LAN_ENABLED = true
|
||||||
local TIMESTAMP_MIN = 1234567890
|
local TIMESTAMP_MIN = 1234567890
|
||||||
|
|
||||||
|
-- set WAN parameters
|
||||||
local WAN_FILTER = { [1] = {}, [2] = {}, [3] = {} }
|
local WAN_FILTER = { [1] = {}, [2] = {}, [3] = {} }
|
||||||
WAN_FILTER[1].span = 60
|
WAN_FILTER[1].span = 60
|
||||||
WAN_FILTER[1].offset = 0
|
WAN_FILTER[1].offset = 0
|
||||||
|
@ -60,6 +63,18 @@ WAN_FILTER[2].offset = 7200
|
||||||
WAN_FILTER[3].span = 86400
|
WAN_FILTER[3].span = 86400
|
||||||
WAN_FILTER[3].offset = 172800
|
WAN_FILTER[3].offset = 172800
|
||||||
|
|
||||||
|
local WAN_BASE_URL = 'https://api.flukso.net/sensor/'
|
||||||
|
local WAN_KEY = '0123456789abcdef0123456789abcdef'
|
||||||
|
uci:foreach('system', 'system', function(x) WAN_KEY = x.key end) -- quirky but it works
|
||||||
|
|
||||||
|
-- https headers
|
||||||
|
local FLUKSO_OPKG_INFO = opkg.info('flukso')
|
||||||
|
local FLUKSO_VERSION = FLUKSO_OPKG_INFO.flukso.Version
|
||||||
|
|
||||||
|
local USER_AGENT = 'Fluksometer v' .. FLUKSO_VERSION
|
||||||
|
local CACERT = '/etc/ssl/certs/flukso.ca.crt'
|
||||||
|
|
||||||
|
-- set LAN parameters
|
||||||
local LAN_POLISH_CUTOFF = 60
|
local LAN_POLISH_CUTOFF = 60
|
||||||
local LAN_PUBLISH_PATH = DAEMON_PATH .. '/sensor'
|
local LAN_PUBLISH_PATH = DAEMON_PATH .. '/sensor'
|
||||||
|
|
||||||
|
@ -100,6 +115,7 @@ function dispatch(wan_child, lan_child)
|
||||||
|
|
||||||
-- resume both branches
|
-- resume both branches
|
||||||
if WAN_ENABLED then
|
if WAN_ENABLED then
|
||||||
|
-- TODO send incoming pulses to syslog
|
||||||
coroutine.resume(wan_child, sensor_id, timestamp, counter)
|
coroutine.resume(wan_child, sensor_id, timestamp, counter)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -128,6 +144,9 @@ function wan_buffer(child)
|
||||||
while true do
|
while true do
|
||||||
if not previous[sensor_id] then
|
if not previous[sensor_id] then
|
||||||
previous[sensor_id] = {}
|
previous[sensor_id] = {}
|
||||||
|
-- use the first received counter value as guard
|
||||||
|
previous[sensor_id].timestamp = timestamp
|
||||||
|
previous[sensor_id].counter = counter
|
||||||
end
|
end
|
||||||
|
|
||||||
if timestamp > TIMESTAMP_MIN
|
if timestamp > TIMESTAMP_MIN
|
||||||
|
@ -161,10 +180,48 @@ function filter(child, span, offset)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
function send(child) -- TODO fill in dummy send
|
function send(child)
|
||||||
return coroutine.create(function(measurements)
|
return coroutine.create(function(measurements)
|
||||||
|
local headers = {}
|
||||||
|
headers['Content-Type'] = 'application/json'
|
||||||
|
headers['X-Version'] = '1.0'
|
||||||
|
headers['User-Agent'] = USER_AGENT
|
||||||
|
|
||||||
|
local options = {}
|
||||||
|
options.sndtimeo = 5
|
||||||
|
options.rcvtimeo = 5
|
||||||
|
options.method = 'POST'
|
||||||
|
options.tls_context_set_verify = 'peer'
|
||||||
|
options.cacert = CACERT
|
||||||
|
options.headers = headers
|
||||||
|
|
||||||
while true do
|
while true do
|
||||||
-- measurements:clear()
|
local sensors = measurements:get_sensors()
|
||||||
|
local measurements_json = measurements:json_encode()
|
||||||
|
local http_persist = httpclient.create_persistent()
|
||||||
|
|
||||||
|
for i, sensor_id in ipairs(sensors) do
|
||||||
|
if i ~= #sensors then
|
||||||
|
options.headers['Connection'] = 'keep-alive'
|
||||||
|
else
|
||||||
|
options.headers['Connection'] = 'close'
|
||||||
|
end
|
||||||
|
|
||||||
|
options.body = '{"measurements":' .. measurements_json[sensor_id] .. '}'
|
||||||
|
options.headers["Content-Length"] = tostring(#options.body)
|
||||||
|
|
||||||
|
local hash = nixio.crypto.hmac('sha1', WAN_KEY)
|
||||||
|
hash:update(options.body)
|
||||||
|
options.headers['X-Digest'] = hash:final()
|
||||||
|
|
||||||
|
local response, code, msg = http_persist(WAN_BASE_URL .. sensor_id, options)
|
||||||
|
|
||||||
|
if response then
|
||||||
|
-- TODO send response string to syslog
|
||||||
|
measurements:clear(sensor_id)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
coroutine.resume(child, measurements)
|
coroutine.resume(child, measurements)
|
||||||
measurements = coroutine.yield()
|
measurements = coroutine.yield()
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue