diff options
author | neodarz <neodarz@neodarz.net> | 2019-05-10 22:50:26 +0200 |
---|---|---|
committer | neodarz <neodarz@neodarz.net> | 2019-05-10 22:50:26 +0200 |
commit | d71317ce3bf17dea46be0fc027a1bd751a02ea0b (patch) | |
tree | 4daf5e354ef70eac270c0fa7a0c8068d157fabf0 /parrot_zik | |
parent | 7f45025f126f3bee86a78c11d883694ed75656a1 (diff) | |
download | pyParrotZikTCP-d71317ce3bf17dea46be0fc027a1bd751a02ea0b.tar.xz pyParrotZikTCP-d71317ce3bf17dea46be0fc027a1bd751a02ea0b.zip |
Add simple server
Diffstat (limited to '')
-rw-r--r-- | parrot_zik/test_server.py | 189 |
1 files changed, 189 insertions, 0 deletions
diff --git a/parrot_zik/test_server.py b/parrot_zik/test_server.py new file mode 100644 index 0000000..b50b87d --- /dev/null +++ b/parrot_zik/test_server.py @@ -0,0 +1,189 @@ +from .interface.version1 import ParrotZikVersion1Interface +from .interface.version2 import ParrotZikVersion2Interface +from . import resource_manager +from . import bluetooth_paired_devices + +from . import bluezutils + +import sys +import time +import json + +from twisted.internet import reactor, protocol + +from threading import Thread + + +REFRESH_FREQUENCY = 60 +RECONNECT_FREQUENCY = 5 + +def toBool(data): + return data.lower() == "true" + +def toBytes(data): + return str(data).encode('utf-8') + +def convert(val): + constructors = [int, float, str] + if val.lower() == "true": + return True + elif val.lower() == "false": + return False + for c in constructors: + try: + return c(val) + except ValueError: + pass + + +class TCPParrotZik(protocol.Protocol): + def api_func_not_found_msg(self, data, setter_functions, getter_functions): + if data: + error_msg = "Not implemented yet: " + ' '.join(data) + "\n" + else: + error_msg = "" + error_msg += "Setter implemented:" + for function in setter_functions: + error_msg += "\n - " + function + error_msg += "\nGetter implemented:" + for function in getter_functions: + error_msg += "\n - " + function + + return error_msg + + def dataReceived(self, data): + setter_functions = { + 'auto_connection': + self.factory.active_interface.toggle_auto_connection, + 'sound_effect_room': + self.factory.active_interface.toggle_sound_effect_room, + 'flight_mode': + self.factory.active_interface.toggle_flight_mode, + 'sound_effect_room_status': + self.factory.active_interface.toggle_sound_effect_room_enabled, + 'sound_effect_angle': + self.factory.active_interface.toggle_sound_effect_angle, + 'noise_control': + self.factory.active_interface.toggle_noise_control, + 'head_detection': + self.factory.active_interface.toggle_head_detection, + } + + getter_functions = { + 'auto_connection': + self.factory.active_interface.read_auto_connection, + 'read_battery': + self.factory.active_interface.read_battery, + 'flight_mode': + self.factory.active_interface.read_flight_mode, + 'sound_effect_room': + self.factory.active_interface.read_sound_effect_room, + 'sound_effect_room_status': + self.factory.active_interface.read_sound_effect_room_enabled, + 'sound_effect_angle': + self.factory.active_interface.read_sound_effect_angle, + 'noise_control': + self.factory.active_interface.read_noise_control, + 'head_detection': + self.factory.active_interface.read_head_detection, + + } + data = list(data.decode('utf-8').split()) + if len(data) >= 2: + try: + setter = setter_functions[data[0]] + self.transport.write(toBytes(setter(convert(data[1])))) + except KeyError: + self.transport.write(toBytes(self.api_func_not_found_msg(data, setter_functions, getter_functions))) + else: + if data[0] == "help": + self.transport.write(toBytes(self.api_func_not_found_msg(None, setter_functions, getter_functions))) + else: + try: + getter = getter_functions[data[0]] + self.transport.write(toBytes(getter())) + except KeyError: + self.transport.write(toBytes(self.api_func_not_found_msg(data, setter_functions, getter_functions))) + +class TCPParrotZikFactory(protocol.ServerFactory): + protocol = TCPParrotZik + + def __init__(self, active_interface=None): + self.active_interface = active_interface + +class ParrotZik(protocol.Protocol): + def __init__(self): + self.version_1_interface = ParrotZikVersion1Interface(self) + self.version_2_interface = ParrotZikVersion2Interface(self) + self.active_interface = None + + def reconnect(self, frequency): + while True: + self.info({'info': 'Trying to connect'}) + try: + manager = bluetooth_paired_devices.connect() + except bluetooth_paired_devices.BluetoothIsNotOn: + self.sinfo({'error': 'Bluetooth is turned off'}) + except bluetooth_paired_devices.DeviceNotFound: + self.sinfo({'error': 'Parrot Zik not found'}) + except bluetooth_paired_devices.DeviceNotPaired: + self.sinfo({'error': 'Parrot Zik not paired'}) + except bluetooth_paired_devices.DeviceNotConnected: + self.sinfo({'error': 'Parrot Zik not connected'}) + except bluetooth_paired_devices.ConnectionFailure: + self.sinfo({'error': 'Failed to connect'}) + except bluezutils.BluetoothAdaptaterNotFound: + self.sinfo({'error': 'Adaptateur not found'}) + else: + if manager.api_version.startswith('1'): + interface = self.version_1_interface + else: + interface = self.version_2_interface + try: + interface.activate(manager) + reactor.listenTCP(8000, TCPParrotZikFactory(self.active_interface)) + Thread(target=reactor.run, args=(False,)).start() + except resource_manager.DeviceDisconnected: + interface.deactivate() + pass + else: + self.autorefresh(REFRESH_FREQUENCY) + sys.exit() + time.sleep(frequency) + + def info(self, message): + print(message) + + def sinfo(self, message): + self.info(message) + with open("/tmp/parrotZikBattery", "w") as file: + file.write(json.dumps(message)) + + def autorefresh(self, frequency): + while True: + if self.active_interface: + self.sinfo(self.active_interface.read_battery()) + #self.active_interface.toggle_sound_effect_room('silent') + #self.active_interface._read_sound_effect_room_enabled() + #self.active_interface.toggle_sound_effect_room_enabled(True) + #self.active_interface._read_sound_effect_angle() + #self.active_interface._read_noise_cancelation() + #self.active_interface.toggle_head_detection(True) + #self.active_interface._read_head_detection() + #mode = self.active_interface.noise_control_types['STREET_MODE_MAX'] + #self.active_interface.toggle_noise_cancelation(mode) + #print(self.active_interface._read_sound_effect_room()) + else: + self.reconnect(RECONNECT_FREQUENCY) + time.sleep(frequency) + + @classmethod + def main(cls): + try: + indicator = cls() + cls.reconnect(indicator, RECONNECT_FREQUENCY) + except KeyboardInterrupt: + pass + +parrot = ParrotZik() +parrot.main() |