more stable frame rate

This commit is contained in:
Stefan Kögl 2014-04-27 15:44:38 +02:00
parent 40ffb2e43d
commit dac236ca12
1 changed files with 143 additions and 241 deletions

View File

@ -24,62 +24,43 @@
from __future__ import absolute_import from __future__ import absolute_import
from datetime import datetime
import threading
import Queue
import traceback
import logging
import numpy as np
import string
import time
import random
import socket
import os.path
from os import curdir, sep
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SocketServer import ThreadingMixIn, ForkingMixIn
import select
import re
from collections import deque
from PyQt4.QtCore import QBuffer, QByteArray, QIODevice
from PyQt4 import QtGui, QtCore
import pyqtgraph as pg
from pyqtgraph.widgets.PlotWidget import PlotWidget
from chaosc.argparser_groups import * from chaosc.argparser_groups import *
from chaosc.lib import logger, resolve_host from chaosc.lib import logger, resolve_host
from collections import deque, defaultdict
from datetime import datetime
from operator import attrgetter
from os import curdir, sep
from PyQt4 import QtGui, QtCore
from PyQt4.QtCore import QBuffer, QByteArray, QIODevice
from SocketServer import ThreadingMixIn, ForkingMixIn
import cPickle
import logging
import numpy as np
import os.path
import pyqtgraph as pg
import Queue
import random
import re
import select
import socket
import string
import threading
import time
import traceback
fh = logging.FileHandler(os.path.expanduser("~/.chaosc/ekgplotter.log")) fh = logging.FileHandler(os.path.expanduser("~/.chaosc/ekgplotter.log"))
fh.setLevel(logging.DEBUG) fh.setLevel(logging.DEBUG)
logger.addHandler(fh) logger.addHandler(fh)
try: try:
from chaosc.c_osc_lib import OSCMessage, decode_osc from chaosc.c_osc_lib import OSCMessage, decode_osc
except ImportError as e: except ImportError as e:
print(e) logging.exception(e)
from chaosc.osc_lib import OSCMessage, decode_osc from chaosc.osc_lib import OSCMessage, decode_osc
class PlotWindow(PlotWidget):
def __init__(self, title=None, **kargs):
self.win = QtGui.QMainWindow()
PlotWidget.__init__(self, **kargs)
self.win.setCentralWidget(self)
for m in ['resize']:
setattr(self, m, getattr(self.win, m))
if title is not None:
self.win.setWindowTitle(title)
class OSCThread(threading.Thread): class OSCThread(threading.Thread):
def __init__(self, args): def __init__(self, args):
super(OSCThread, self).__init__() super(OSCThread, self).__init__()
@ -94,24 +75,13 @@ class OSCThread(threading.Thread):
self.osc_sock.bind(self.client_address) self.osc_sock.bind(self.client_address)
self.osc_sock.setblocking(0) self.osc_sock.setblocking(0)
print "%s: starting up osc receiver on '%s:%d'" % ( logging.info("%s: starting up osc receiver on '%s:%d'",
datetime.now().strftime("%x %X"), self.client_address[0], self.client_address[1]) datetime.now().strftime("%x %X"), self.client_address[0], self.client_address[1])
#self.subscribe_me() self.subscribe_me()
def subscribe_me(self): def subscribe_me(self):
"""Use this procedure for a quick'n dirty subscription to your chaosc instance. logging.info("%s: subscribing to '%s:%d' with label %r", datetime.now().strftime("%x %X"), self.chaosc_address[0], self.chaosc_address[1], self.args.subscriber_label)
:param chaosc_address: (chaosc_host, chaosc_port)
:type chaosc_address: tuple
:param receiver_address: (host, port)
:type receiver_address: tuple
:param token: token to get authorized for subscription
:type token: str
"""
print "%s: subscribing to '%s:%d' with label %r" % (datetime.now().strftime("%x %X"), self.chaosc_address[0], self.chaosc_address[1], self.args.subscriber_label)
msg = OSCMessage("/subscribe") msg = OSCMessage("/subscribe")
msg.appendTypedArg(self.client_address[0], "s") msg.appendTypedArg(self.client_address[0], "s")
msg.appendTypedArg(self.client_address[1], "i") msg.appendTypedArg(self.client_address[1], "i")
@ -125,7 +95,7 @@ class OSCThread(threading.Thread):
if self.args.keep_subscribed: if self.args.keep_subscribed:
return return
print "%s: unsubscribing from '%s:%d'" % (datetime.now().strftime("%x %X"), self.chaosc_address[0], self.chaosc_address[1]) logging.info("%s: unsubscribing from '%s:%d'", datetime.now().strftime("%x %X"), self.chaosc_address[0], self.chaosc_address[1])
msg = OSCMessage("/unsubscribe") msg = OSCMessage("/unsubscribe")
msg.appendTypedArg(self.client_address[0], "s") msg.appendTypedArg(self.client_address[0], "s")
msg.appendTypedArg(self.client_address[1], "i") msg.appendTypedArg(self.client_address[1], "i")
@ -136,9 +106,9 @@ class OSCThread(threading.Thread):
while self.running: while self.running:
try: try:
reads, writes, errs = select.select([self.osc_sock], [], [], 0.01) reads, writes, errs = select.select([self.osc_sock], [], [], 0.005)
except Exception, e: except Exception, e:
print "select error", e logging.exception(e)
pass pass
else: else:
if reads: if reads:
@ -147,106 +117,66 @@ class OSCThread(threading.Thread):
osc_address, typetags, messages = decode_osc(osc_input, 0, len(osc_input)) osc_address, typetags, messages = decode_osc(osc_input, 0, len(osc_input))
queue.put_nowait((osc_address, messages)) queue.put_nowait((osc_address, messages))
except Exception, e: except Exception, e:
print "recvfrom error", e logging.info(e)
#else:
#queue.put_nowait(("/bjoern/ekg", [0]))
#queue.put_nowait(("/merle/ekg", [0]))
#queue.put_nowait(("/uwe/ekg", [0]))
#self.unsubscribe_me()
print "OSCThread is going down" self.unsubscribe_me()
self.osc_sock.close()
logging.info("OSCThread is going down")
queue = Queue.Queue() queue = Queue.Queue()
class Actor(object): class Actor(object):
shadowPen = pg.mkPen(255, 255, 255) def __init__(self, name, num_data, color, ix, max_actors, actor_height):
brush = pg.mkBrush("w")
def __init__(self, name, num_data, color):
self.data = [0] * num_data
self.data_pointer = 0
self.name = name self.name = name
self.active = True
self.plotItem = pg.PlotCurveItem(pen=pg.mkPen(color, width=3), name=name)
self.num_data = num_data self.num_data = num_data
#self.plotItem.setShadowPen(pen=Actor.shadowPen, width=3, cosmetic=True) self.color = color
self.plotPoint = pg.ScatterPlotItem(pen=Actor.shadowPen, brush=self.brush, size=5) self.ix = ix
self.max_actors = max_actors
self.actor_height = actor_height
self.updated = 0
self.offset = ix * actor_height
self.data = np.array([self.offset] * num_data)
self.head = 0
self.pre_head = 0
self.plotItem = pg.PlotCurveItem(pen=pg.mkPen(color, width=3), name=name)
self.plotPoint = pg.ScatterPlotItem(pen=pg.mkPen("w", width=5), brush=pg.mkBrush(color), size=5)
def __str__(self): def __str__(self):
return "<Actor name:%r, active=%r, position=%r>" % (self.name, self.active, self.data_pointer) return "<Actor name:%r, active=%r, position=%r>" % (self.name, self.active, self.head)
__repr__ = __str__ __repr__ = __str__
def scale_data(self, ix, max_items):
scale = 255 / max_items * ix
return [value / max_items + scale for value in self.data]
def set_point(self, value, ix, max_items): def add_value(self, value):
scale = 255 / max_items * ix dp = self.head
self.plotPoint.setData(x = [self.data_pointer], y = [value / max_items + scale]) self.data[dp] = value / self.max_actors + self.offset
self.pre_head = dp
self.head = (dp + 1) % self.num_data
self.updated += 1
#def find_max_value(self, item_data): def fill_missing(self, count):
#max_index = -1 dp = self.head
#for ix, i in enumerate(item_data): for i in range(count):
#if i > 250: self.data[dp] = self.offset
#return ix, i dp = (dp + 1) % self.num_data
#return None, None self.updated += 1
self.pre_head = (dp - 1) % self.num_data
self.head = dp
#def rearrange(self, item_data, actual_pos, max_items): def render(self):
#max_value_index, max_value = find_max_value(item_data) self.plotItem.setData(y=self.data, clear=True)
#if max_value_index is None: self.plotPoint.setData(x=[self.pre_head], y = [self.data[self.pre_head]])
#return actual_pos
#mean = int(max_items / 2.)
#start = mean - max_value_index
#if start != 0:
#item_data.rotate(start)
#pos = (actual_pos + start) % max_items
#else:
#pos = actual_pos
#print "rearrange", mean, start, actual_pos, pos, item_data
#return pos
def set_value(self, value):
self.data[self.data_pointer] = value
self.data_pointer = (self.data_pointer + 1) % self.num_data
#def resize(item_data, max_length, new_max_length, pos):
#print "resize", max_length, new_max_length
#if new_max_length < 15:
#return max_length, pos
#if new_max_length > max_length:
#pad = (new_max_length - max_length)
#print "pad", pad
#for i in range(pad):
#if i % 2 == 0:
#item_data.append(0)
#else:
#item_data.appendleft(0)
#pos += 1
#return new_max_length, pos
#elif new_max_length < max_length:
#pad = (max_length - new_max_length)
#for i in range(pad):
#if i % 2 == 0:
#item_data.pop()
#if pos >= new_max_length:
#pos = 0
#else:
#item_data.popleft()
#if pos > 0:
#pos -= 1
#return new_max_length, pos
#return max_length, pos
class EkgPlot(object): class EkgPlot(object):
def __init__(self, actor_names, num_data, colors): def __init__(self, actor_names, num_data, colors):
self.plot = pg.PlotWidget() self.plot = pg.PlotWidget()
#self.plot.setConfigOptions(antialias=True)
self.plot.hide() self.plot.hide()
#self.plot.setLabel('left', "<h2>Amplitude</h2>")
#self.plot.setLabel('bottom', "<h2><sup>Time</sup></h2>")
self.plot.showGrid(False, False) self.plot.showGrid(False, False)
self.plot.setYRange(0, 255) self.plot.setYRange(0, 255)
self.plot.setXRange(0, num_data) self.plot.setXRange(0, num_data)
@ -256,14 +186,19 @@ class EkgPlot(object):
bl = self.plot.getAxis("left") bl = self.plot.getAxis("left")
ba.setTicks([]) ba.setTicks([])
bl.setTicks([]) bl.setTicks([])
ba.hide()
bl.hide()
self.active_actors = list() self.active_actors = list()
self.actors = dict() self.actors = dict()
self.lengths1 = [0] self.lengths1 = [0]
self.num_data = num_data self.num_data = num_data
for actor_name, color in zip(actor_names, colors): self.max_value = 255
self.add_actor(actor_name, num_data, color) self.max_actors = len(actor_names)
self.actor_height = self.max_value / self.max_actors
for ix, (actor_name, color) in enumerate(zip(actor_names, colors)):
self.add_actor(actor_name, num_data, color, ix, self.max_actors, self.actor_height)
self.set_positions() self.set_positions()
@ -272,8 +207,8 @@ class EkgPlot(object):
self.updated_actors = set() self.updated_actors = set()
def add_actor(self, actor_name, num_data, color): def add_actor(self, actor_name, num_data, color, ix, max_actors, actor_height):
actor_obj = Actor(actor_name, num_data, color) actor_obj = Actor(actor_name, num_data, color, ix, max_actors, actor_height)
self.actors[actor_name] = actor_obj self.actors[actor_name] = actor_obj
self.plot.addItem(actor_obj.plotItem) self.plot.addItem(actor_obj.plotItem)
self.plot.addItem(actor_obj.plotPoint) self.plot.addItem(actor_obj.plotPoint)
@ -282,61 +217,43 @@ class EkgPlot(object):
def set_positions(self): def set_positions(self):
for ix, actor_obj in enumerate(self.active_actors): for ix, actor_obj in enumerate(self.active_actors):
actor_obj.plotItem.setPos(0, ix * 6) actor_obj.plotItem.setPos(0, ix * 2)
actor_obj.plotPoint.setPos(0, ix * 6) actor_obj.plotPoint.setPos(0, ix * 2)
def active_actor_count(self): def active_actor_count(self):
return len(self.active_actors) return self.max_actors
def new_round(self):
for ix, actor in enumerate(self.active_actors):
actor.updated = 0
def update_missing_actors(self):
liste = sorted(self.active_actors, key=attrgetter("updated"))
max_values = liste[-1].updated
if max_values == 0:
# handling no signal
for actor in self.active_actors:
actor.add_value(0)
return
for ix, actor in enumerate(self.active_actors):
diff = max_values - actor.updated
if diff > 0:
for i in range(diff):
actor.add_value(0)
def update(self, osc_address, value): def update(self, osc_address, value):
#print "update", osc_address
res = self.ekg_regex.match(osc_address) res = self.ekg_regex.match(osc_address)
if res: if res:
#print("matched data")
actor_name = res.group(1) actor_name = res.group(1)
actor_obj = self.actors[actor_name] actor_obj = self.actors[actor_name]
max_actors = len(self.active_actors) actor_obj.add_value(value)
actor_data = actor_obj.data
data_pointer = actor_obj.data_pointer
actor_data[data_pointer] = value
try:
ix = self.active_actors.index(actor_obj)
actor_obj.set_point(value, ix, max_actors)
actor_obj.plotItem.setData(y=np.array(actor_obj.scale_data(ix, max_actors)), clear=True)
except ValueError as e:
#print("data", e)
pass
actor_obj.data_pointer = (data_pointer + 1) % self.num_data
return
res = self.ctl_regex.match(osc_address)
if res:
print "received cmd", osc_address
actor_name = res.group(1)
actor_obj = self.actors[actor_name]
if value == 1 and not actor_obj.active:
print "actor on", actor_name, self.active_actors
self.plot.addItem(actor_obj.plotItem)
self.plot.addItem(actor_obj.plotPoint)
actor_obj.active = True
if not actor_obj in self.active_actors:
self.active_actors.append(actor_obj)
elif value == 0 and actor_obj.active:
print "actor off", actor_name, self.active_actors
actor_obj.active = False
self.plot.removeItem(actor_obj.plotItem)
self.plot.removeItem(actor_obj.plotPoint)
try:
self.active_actors.remove(actor_obj)
except ValueError as e:
print "active actors error", e, self.active_actors
pass
assert actor_obj not in self.active_actors
else:
print "internal data not in sync", self.active_actors, actor_obj
self.set_positions() def render(self):
for ix, actor in enumerate(self.active_actors):
actor.render()
class MyHandler(BaseHTTPRequestHandler): class MyHandler(BaseHTTPRequestHandler):
@ -348,7 +265,6 @@ class MyHandler(BaseHTTPRequestHandler):
if self.path=="" or self.path==None or self.path[:1]==".": if self.path=="" or self.path==None or self.path[:1]==".":
self.send_error(403,'Forbidden') self.send_error(403,'Forbidden')
if self.path.endswith(".html"): if self.path.endswith(".html"):
directory = os.path.dirname(os.path.abspath(__file__)) directory = os.path.dirname(os.path.abspath(__file__))
data = open(os.path.join(directory, self.path), "rb").read() data = open(os.path.join(directory, self.path), "rb").read()
@ -360,83 +276,66 @@ class MyHandler(BaseHTTPRequestHandler):
self.thread = thread = OSCThread(self.server.args) self.thread = thread = OSCThread(self.server.args)
thread.daemon = True thread.daemon = True
thread.start() thread.start()
actor_names = ["merle", "bjoern", "uwe"] actor_names = ["bjoern", "uwe", "merle"]
num_data = 100 num_data = 100
colors = ["r", "g", "b"] colors = ["r", "g", "b"]
qtapp = QtGui.QApplication([]) qtapp = QtGui.QApplication([])
plotter = EkgPlot(actor_names, num_data, colors) plotter = EkgPlot(actor_names, num_data, colors)
self.send_response(200) self.send_response(200)
self.send_header("Content-Type", "multipart/x-mixed-replace; boundary=--aaboundary") self.send_header("Content-Type", "multipart/x-mixed-replace; boundary=--2342")
self.end_headers() self.end_headers()
#lastTime = time.time()
#fps = None
event_loop = QtCore.QEventLoop() event_loop = QtCore.QEventLoop()
last_frame = time.time() - 1.0
frame_rate = 13.0
frame_length = 1. / frame_rate
plotter.new_round()
while 1: while 1:
event_loop.processEvents() event_loop.processEvents()
qtapp.sendPostedEvents(None, 0) qtapp.sendPostedEvents(None, 0)
while 1: while 1:
try: try:
osc_address, args = queue.get_nowait() osc_address, args = queue.get_nowait()
plotter.update(osc_address, args[0])
except Queue.Empty: except Queue.Empty:
break break
else:
plotter.update(osc_address, args[0])
exporter = pg.exporters.ImageExporter.ImageExporter(plotter.plot.plotItem) now = time.time()
exporter.parameters()['width'] = 768 delta = now - last_frame
img = exporter.export("tmpfile", True) if delta > frame_length:
buffer = QBuffer() plotter.update_missing_actors()
buffer.open(QIODevice.WriteOnly) plotter.render()
img.save(buffer, "JPG") exporter = pg.exporters.ImageExporter.ImageExporter(plotter.plot.plotItem)
img.save("/tmp/test2.jpg", "JPG") exporter.parameters()['width'] = 768
img = exporter.export(toBytes=True)
buffer = QBuffer()
buffer.open(QIODevice.WriteOnly)
img.save(buffer, "JPG")
JpegData = buffer.data()
self.wfile.write("--2342\r\nContent-Type: image/jpeg\r\nContent-length: %d\r\n\r\n%s\r\n\r\n\r\n" % (len(JpegData), JpegData))
last_frame = now
plotter.new_round()
#JpegData = None
#buffer = None
#img = None
#exporter = None
time.sleep(0.01)
JpegData = buffer.data() except (KeyboardInterrupt, SystemError), e:
self.wfile.write("--aaboundary\r\nContent-Type: image/jpeg\r\nContent-length: %d\r\n\r\n%s\r\n\r\n\r\n" % (len(JpegData), JpegData)) raise e
JpegData = None
buffer = None
img = None
exporter = None
#now = time.time()
#dt = now - lastTime
#lastTime = now
#if fps is None:
#fps = 1.0/dt
#else:
#s = np.clip(dt*3., 0, 1)
#fps = fps * (1-s) + (1.0/dt) * s
#print '%0.2f fps' % fps
time.sleep(0.05)
elif self.path.endswith(".jpeg"):
directory = os.path.dirname(os.path.abspath(__file__))
data = open(os.path.join(directory, self.path), "rb").read()
self.send_response(200)
self.send_header('Content-type','image/jpeg')
self.end_headers()
self.wfile.write(data)
return
except (KeyboardInterrupt, SystemError):
print "queue size", queue.qsize()
if hasattr(self, "thread") and self.thread is not None:
self.thread.running = False
self.thread.join()
self.thread = None
except IOError, e: except IOError, e:
print "ioerror", e, e[0]
print dir(e)
if e[0] in (32, 104): if e[0] in (32, 104):
if hasattr(self, "thread") and self.thread is not None: if hasattr(self, "thread") and self.thread is not None:
self.thread.running = False self.thread.running = False
self.thread.join() self.thread.join()
self.thread = None self.thread = None
else: else:
print '-'*40 logging.info('-'*40)
print 'Exception happened during processing of request from' logging.info('Exception happened during processing of request from')
traceback.print_exc() # XXX But this goes to stderr! logging.exception(e)
print '-'*40 logging.info('-'*40)
self.send_error(404,'File Not Found: %s' % self.path) self.send_error(404,'File Not Found: %s' % self.path)
raise e
class JustAHTTPServer(HTTPServer): class JustAHTTPServer(HTTPServer):
@ -455,23 +354,26 @@ def main():
arg_parser.add_subscriber_group() arg_parser.add_subscriber_group()
args = arg_parser.finalize() args = arg_parser.finalize()
if not args.background:
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(ch)
http_host, http_port = resolve_host(args.http_host, args.http_port, args.address_family) http_host, http_port = resolve_host(args.http_host, args.http_port, args.address_family)
server = JustAHTTPServer((http_host, http_port), MyHandler) server = JustAHTTPServer((http_host, http_port), MyHandler)
server.address_family = args.address_family server.address_family = args.address_family
server.args = args server.args = args
print "%s: starting up http server on '%s:%d'" % ( logging.info("%s: starting up http server on '%s:%d'",
datetime.now().strftime("%x %X"), http_host, http_port) datetime.now().strftime("%x %X"), http_host, http_port)
try: try:
server.serve_forever() server.serve_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
print '^C received, shutting down server' logging.info('^C received, shutting down server')
server.socket.close() server.socket.close()
sys.exit(0) sys.exit(0)
if __name__ == '__main__': if __name__ == '__main__':
main() main()