aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorneodarz <neodarz@neodarz.net>2019-05-10 22:50:26 +0200
committerneodarz <neodarz@neodarz.net>2019-05-10 22:50:26 +0200
commitd71317ce3bf17dea46be0fc027a1bd751a02ea0b (patch)
tree4daf5e354ef70eac270c0fa7a0c8068d157fabf0
parent7f45025f126f3bee86a78c11d883694ed75656a1 (diff)
downloadpyParrotZikTCP-d71317ce3bf17dea46be0fc027a1bd751a02ea0b.tar.xz
pyParrotZikTCP-d71317ce3bf17dea46be0fc027a1bd751a02ea0b.zip
Add simple server
-rw-r--r--parrot_zik/test_server.py189
1 files changed, 189 insertions, 0 deletions
diff --git a/parrot_zik/test_server.py b/parrot_zik/test_server.py
new file mode 100644
index 0000000..b50b87d
--- /dev/null
+++ b/parrot_zik/test_server.py
@@ -0,0 +1,189 @@
+from .interface.version1 import ParrotZikVersion1Interface
+from .interface.version2 import ParrotZikVersion2Interface
+from . import resource_manager
+from . import bluetooth_paired_devices
+
+from . import bluezutils
+
+import sys
+import time
+import json
+
+from twisted.internet import reactor, protocol
+
+from threading import Thread
+
+
+REFRESH_FREQUENCY = 60
+RECONNECT_FREQUENCY = 5
+
+def toBool(data):
+ return data.lower() == "true"
+
+def toBytes(data):
+ return str(data).encode('utf-8')
+
+def convert(val):
+ constructors = [int, float, str]
+ if val.lower() == "true":
+ return True
+ elif val.lower() == "false":
+ return False
+ for c in constructors:
+ try:
+ return c(val)
+ except ValueError:
+ pass
+
+
+class TCPParrotZik(protocol.Protocol):
+ def api_func_not_found_msg(self, data, setter_functions, getter_functions):
+ if data:
+ error_msg = "Not implemented yet: " + ' '.join(data) + "\n"
+ else:
+ error_msg = ""
+ error_msg += "Setter implemented:"
+ for function in setter_functions:
+ error_msg += "\n - " + function
+ error_msg += "\nGetter implemented:"
+ for function in getter_functions:
+ error_msg += "\n - " + function
+
+ return error_msg
+
+ def dataReceived(self, data):
+ setter_functions = {
+ 'auto_connection':
+ self.factory.active_interface.toggle_auto_connection,
+ 'sound_effect_room':
+ self.factory.active_interface.toggle_sound_effect_room,
+ 'flight_mode':
+ self.factory.active_interface.toggle_flight_mode,
+ 'sound_effect_room_status':
+ self.factory.active_interface.toggle_sound_effect_room_enabled,
+ 'sound_effect_angle':
+ self.factory.active_interface.toggle_sound_effect_angle,
+ 'noise_control':
+ self.factory.active_interface.toggle_noise_control,
+ 'head_detection':
+ self.factory.active_interface.toggle_head_detection,
+ }
+
+ getter_functions = {
+ 'auto_connection':
+ self.factory.active_interface.read_auto_connection,
+ 'read_battery':
+ self.factory.active_interface.read_battery,
+ 'flight_mode':
+ self.factory.active_interface.read_flight_mode,
+ 'sound_effect_room':
+ self.factory.active_interface.read_sound_effect_room,
+ 'sound_effect_room_status':
+ self.factory.active_interface.read_sound_effect_room_enabled,
+ 'sound_effect_angle':
+ self.factory.active_interface.read_sound_effect_angle,
+ 'noise_control':
+ self.factory.active_interface.read_noise_control,
+ 'head_detection':
+ self.factory.active_interface.read_head_detection,
+
+ }
+ data = list(data.decode('utf-8').split())
+ if len(data) >= 2:
+ try:
+ setter = setter_functions[data[0]]
+ self.transport.write(toBytes(setter(convert(data[1]))))
+ except KeyError:
+ self.transport.write(toBytes(self.api_func_not_found_msg(data, setter_functions, getter_functions)))
+ else:
+ if data[0] == "help":
+ self.transport.write(toBytes(self.api_func_not_found_msg(None, setter_functions, getter_functions)))
+ else:
+ try:
+ getter = getter_functions[data[0]]
+ self.transport.write(toBytes(getter()))
+ except KeyError:
+ self.transport.write(toBytes(self.api_func_not_found_msg(data, setter_functions, getter_functions)))
+
+class TCPParrotZikFactory(protocol.ServerFactory):
+ protocol = TCPParrotZik
+
+ def __init__(self, active_interface=None):
+ self.active_interface = active_interface
+
+class ParrotZik(protocol.Protocol):
+ def __init__(self):
+ self.version_1_interface = ParrotZikVersion1Interface(self)
+ self.version_2_interface = ParrotZikVersion2Interface(self)
+ self.active_interface = None
+
+ def reconnect(self, frequency):
+ while True:
+ self.info({'info': 'Trying to connect'})
+ try:
+ manager = bluetooth_paired_devices.connect()
+ except bluetooth_paired_devices.BluetoothIsNotOn:
+ self.sinfo({'error': 'Bluetooth is turned off'})
+ except bluetooth_paired_devices.DeviceNotFound:
+ self.sinfo({'error': 'Parrot Zik not found'})
+ except bluetooth_paired_devices.DeviceNotPaired:
+ self.sinfo({'error': 'Parrot Zik not paired'})
+ except bluetooth_paired_devices.DeviceNotConnected:
+ self.sinfo({'error': 'Parrot Zik not connected'})
+ except bluetooth_paired_devices.ConnectionFailure:
+ self.sinfo({'error': 'Failed to connect'})
+ except bluezutils.BluetoothAdaptaterNotFound:
+ self.sinfo({'error': 'Adaptateur not found'})
+ else:
+ if manager.api_version.startswith('1'):
+ interface = self.version_1_interface
+ else:
+ interface = self.version_2_interface
+ try:
+ interface.activate(manager)
+ reactor.listenTCP(8000, TCPParrotZikFactory(self.active_interface))
+ Thread(target=reactor.run, args=(False,)).start()
+ except resource_manager.DeviceDisconnected:
+ interface.deactivate()
+ pass
+ else:
+ self.autorefresh(REFRESH_FREQUENCY)
+ sys.exit()
+ time.sleep(frequency)
+
+ def info(self, message):
+ print(message)
+
+ def sinfo(self, message):
+ self.info(message)
+ with open("/tmp/parrotZikBattery", "w") as file:
+ file.write(json.dumps(message))
+
+ def autorefresh(self, frequency):
+ while True:
+ if self.active_interface:
+ self.sinfo(self.active_interface.read_battery())
+ #self.active_interface.toggle_sound_effect_room('silent')
+ #self.active_interface._read_sound_effect_room_enabled()
+ #self.active_interface.toggle_sound_effect_room_enabled(True)
+ #self.active_interface._read_sound_effect_angle()
+ #self.active_interface._read_noise_cancelation()
+ #self.active_interface.toggle_head_detection(True)
+ #self.active_interface._read_head_detection()
+ #mode = self.active_interface.noise_control_types['STREET_MODE_MAX']
+ #self.active_interface.toggle_noise_cancelation(mode)
+ #print(self.active_interface._read_sound_effect_room())
+ else:
+ self.reconnect(RECONNECT_FREQUENCY)
+ time.sleep(frequency)
+
+ @classmethod
+ def main(cls):
+ try:
+ indicator = cls()
+ cls.reconnect(indicator, RECONNECT_FREQUENCY)
+ except KeyboardInterrupt:
+ pass
+
+parrot = ParrotZik()
+parrot.main()