From 1bda5388d6ea19771cc082e42d92fedad1c8faa7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stefan=20K=C3=B6gl?= Date: Sat, 8 Mar 2014 19:26:17 +0100 Subject: [PATCH] added more test cases and implemented more commands, had to change some, since newer firmware versions lost some cmds --- osc2cam/osc2cam/main.py | 87 ++++++++++++++++++++++++++------------ osc2cam/osc2cam/testing.py | 82 ++++++++++++++++++++++++++--------- 2 files changed, 121 insertions(+), 48 deletions(-) diff --git a/osc2cam/osc2cam/main.py b/osc2cam/osc2cam/main.py index 3cc77a2..d2f9e60 100644 --- a/osc2cam/osc2cam/main.py +++ b/osc2cam/osc2cam/main.py @@ -56,14 +56,16 @@ class OSC2CamServer(SimpleOSCServer): return cams - def handleConnResult(self, connection, client_address=None, cmd=None): + def handleConnResult(self, connection, *args, **kwargs): conn_result = connection.getresponse() - if client_address is not None: + if "client_address" in kwargs: response = OSCMessage("/Response") - response.appendTypedArg(cmd, "s") + response.appendTypedArg(kwargs["cmd"], "s") + for arg in args: + response.append(arg) response.appendTypedArg(conn_result.status, "i") response.appendTypedArg(conn_result.reason, "s") - self.socket.sendto(response.encode_osc(), client_address) + self.socket.sendto(response.encode_osc(), kwargs["client_address"]) if conn_result.status != 200: print "%s. Error: %d, %s" % (datetime.now().strftime("%x %X"), conn_result.status, conn_result.reason) @@ -89,15 +91,21 @@ class OSC2CamServer(SimpleOSCServer): direction = args[0] now = datetime.now().strftime("%x %X") - if direction in ("home", "up", "down", "left", "right", "upleft", "upright", "downleft", "downright", "repeat", "stop"): + if direction in ("home", "up", "down", "left", "right", "upleft", "upright", "downleft", "downright"): print "%s: move camera %d to: dir %r" % (now, cam_id, direction) connection = self.connections[cam_id] - connection.request("GET", "%s?move=%s" % (self.ptz_ctl_url, direction)) - self.handleConnResult(connection, client_address, "moveCam") + url = "%s?move=%s" % (self.ptz_ctl_url, direction) + try: + repeat = args[1] == "repeat" + url += "?move=repeat" + except IndexError: + pass + connection.request("GET", url) + self.handleConnResult(connection, *args, client_address=client_address, cmd="moveCam") - def use_cam_preset(self, cam_id, args): + def use_cam_preset(self, cam_id, args, client_address): """ says given ip cam to use a predefined position preset""" presetno = args[0] @@ -106,33 +114,43 @@ class OSC2CamServer(SimpleOSCServer): print "%s: use camera %d preset %d" % (now, cam_id, presetno) connection.request("GET", "%s?gotoserverpresetno=%d" % (self.ptz_ctl_url, presetno)) - self.handleConnResult(connection, client_address, "useCamPreset") + self.handleConnResult(connection, *args, client_address=client_address, cmd="useCamPreset") - def set_cam_preset(self, cam_id, args): + def set_cam_preset(self, cam_id, args, client_address): """ saves the actual position of given ip cam to a preset""" presetno = args[0] connection = self.connections[cam_id] - connection.request("GET", "%s?setserverpresetno=%d&home=yes" % (self.ptz_config_url, presetno)) - self.handleConnResult(connection, client_address, "setCamPreset") + connection.request("GET", "%s?setserverpresetno=%d" % (self.ptz_config_url, presetno)) + self.handleConnResult(connection, *args, client_address=client_address, cmd="setCamPreset") - def zoom_cam(self, cam_id, args): + def zoom_cam(self, cam_id, args, client_address): """ tells given ip cam to zoom in or out""" - direction = None - arg = args[0] - if arg == "out": - direction = 0 - elif arg == "in": - direction = 1 + step = args[0] + now = datetime.now().strftime("%x %X") + print "%s: zoom camera %d to position %d" % (now, cam_id, step) + connection = self.connections[cam_id] - connection.request("GET", "%s?zoom=%s" % (self.ptz_ctl_url, direction)) - self.handleConnResult(connection, client_address, "zoomCam") + connection.request("GET", "%s?zoom=%d" % (self.ptz_ctl_url, step)) + self.handleConnResult(connection, *args, client_address=client_address, cmd="zoomCam") - def toggle_night_view(self, cam_id, args): + def focus_cam(self, cam_id, args, client_address): + """ tells given ip cam to zoom in or out""" + + step = args[0] + now = datetime.now().strftime("%x %X") + print "%s: focus camera %d to position %d" % (now, cam_id, step) + + connection = self.connections[cam_id] + connection.request("GET", "%s?focus=%d" % (self.ptz_ctl_url, step)) + self.handleConnResult(connection, *args, client_address=client_address, cmd="focusCam") + + + def toggle_night_view(self, cam_id, args, client_address): """ toggles the night view function of given ip cam""" arg = args[0] @@ -144,27 +162,40 @@ class OSC2CamServer(SimpleOSCServer): connection = self.connections[cam_id] connection.request("GET", "%sparam?action=update&Image.I0.Appearance.NightMode=%s" % (self.set_url, state)) - self.handleConnResult(connection, client_address, "toggleNightView") + self.handleConnResult(connection, *args, client_address=client_address, cmd="toggleNightView") + + def stop_cam(self, cam_id, args, client_address): + """stops cam movement""" + + now = datetime.now().strftime("%x %X") + print "%s: stop camera %d" % (now, cam_id) + connection = self.connections[cam_id] + connection.request("GET", "%s?move=stop" % (self.ptz_ctl_url)) + self.handleConnResult(connection, *args, client_address=client_address, cmd="stopCam") def dispatchMessage(self, osc_address, typetags, args, packet, client_address): """ dispatches parsed osc messages to the ip cam command methods""" - print "client_address", client_address cam_id = args.pop(0) if osc_address == "/moveCam": self.move_cam(cam_id, args, client_address) elif osc_address == "/setCamPreset": - self.set_cam_preset(cam_id, args) + self.set_cam_preset(cam_id, args, client_address) elif osc_address == "/useCamPreset": - self.use_cam_preset(cam_id, args) + self.use_cam_preset(cam_id, args, client_address) elif osc_address == "/zoomCam": - self.zoom_cam(cam_id, args) + self.zoom_cam(cam_id, args, client_address) + elif osc_address == "/focusCam": + self.zoom_cam(cam_id, args, client_address) + elif osc_address == "/stopCam": + self.zoom_cam(cam_id, args, client_address) elif osc_address == "/toggleNightView": - self.toggle_night_view(cam_id, args) + self.toggle_night_view(cam_id, args, client_address) elif osc_address == "/subscribe": self.__subscription_handler(osc_address, typetags, args, packet, client_address) + def __subscription_handler(self, addr, typetags, args, client_address): """handles a target subscription. diff --git a/osc2cam/osc2cam/testing.py b/osc2cam/osc2cam/testing.py index 96990a5..2a67919 100644 --- a/osc2cam/osc2cam/testing.py +++ b/osc2cam/osc2cam/testing.py @@ -20,8 +20,6 @@ import socket, argparse, sys -from chaosc.simpleOSCServer import SimpleOSCServer - try: from chaosc.c_osc_lib import OSCMessage, decode_osc @@ -30,27 +28,66 @@ except ImportError, e: from chaosc.osc_lib import OSCMessage, decode_osc +def send(sock, binary, args): + sent = sock.sendto(binary, (args.osc2cam_host, args.osc2cam_port)) + response = sock.recv(4096) + if response: + osc_address, typetags, arguments = decode_osc(response, 0, len(response)) + print osc_address, arguments -def test_moveCam(args, cam_id, sock): - directions = ("home", "up", "down", "left", "right", "upleft", "upright", "downleft", "downright", "repeat", "stop") +def test_move_cam(args, cam_id, sock): + directions = ("home", "up", "down", "left", "right", "upleft", "upright", "downleft", "downright") for direction in directions: - moveCam = OSCMessage("/moveCam") - moveCam.appendTypedArg(cam_id, "i") - moveCam.appendTypedArg(direction, "s") - binary = moveCam.encode_osc() + message = OSCMessage("/moveCam") + message.appendTypedArg(cam_id, "i") + message.appendTypedArg(direction, "s") + binary = message.encode_osc() + send(sock, binary, args) - try: - sent = sock.sendto(binary, (args.osc2cam_host, args.osc2cam_port)) - except socket.error, e: - if e[0] in (7, 65): # 7 = 'no address associated with nodename', 65 = 'no route to host' - raise e - else: - raise Exception("while sending OSCMessage 'moveCam' for cam_id %r with direction %r to %r:%r: %s" % (cam_id, direction, args.osc2cam_host, args.osc2cam_port, str(e))) - response = sock.recv(4096) - if response: - osc_address, typetags, arguments = decode_osc(response, 0, len(response)) - print osc_address, arguments + +def test_use_cam_preset(args, cam_id, sock): + for i in range(10): + message = OSCMessage("/useCamPreset") + message.appendTypedArg(cam_id, "i") + message.appendTypedArg(i, "i") + binary = message.encode_osc() + send(sock, binary, args) + + +def test_set_cam_preset(args, cam_id, sock): + message = OSCMessage("/setCamPreset") + message.appendTypedArg(cam_id, "i") + message.appendTypedArg(0, "i") + binary = message.encode_osc() + send(sock, binary, args) + + +def test_zoom_cam(args, cam_id, sock): + for i in (1, 5000, 9999): + message = OSCMessage("/zoomCam") + message.appendTypedArg(cam_id, "i") + message.appendTypedArg(i, "i") + binary = message.encode_osc() + send(sock, binary, args) + + +def test_focus_cam(args, cam_id, sock): + for i in (1, 5000, 9999): + message = OSCMessage("/focusCam") + message.appendTypedArg(cam_id, "i") + message.appendTypedArg(i, "i") + binary = message.encode_osc() + send(sock, binary, args) + + +def test_toggle_night_view(args, cam_id, sock): + for i in ("on", "off"): + message = OSCMessage("/toggleNightView") + message.appendTypedArg(cam_id, "i") + message.appendTypedArg(i, "s") + binary = message.encode_osc() + send(sock, binary, args) @@ -87,6 +124,11 @@ def main(): #sock.connect((args.osc2cam_host, args.osc2cam_port)) for ix, (host, port) in enumerate(cams): - test_moveCam(args, ix, sock) + test_move_cam(args, ix, sock) + test_use_cam_preset(args, ix, sock) + test_set_cam_preset(args, ix, sock) + test_zoom_cam(args, ix, sock) + test_focus_cam(args, ix, sock) + test_toggle_night_view(args, ix, sock) main()