From 630c7770497fe4df4f71e8dc4d1fe1e0f575e684 Mon Sep 17 00:00:00 2001 From: willi Date: Thu, 5 Nov 2020 17:58:43 +0100 Subject: [PATCH] Dropped Python2 support; improved code style --- tplink_smartplug.py | 162 ++++++++++++++++++++------------------------ 1 file changed, 73 insertions(+), 89 deletions(-) diff --git a/tplink_smartplug.py b/tplink_smartplug.py index 789cdcf..e40a0ca 100755 --- a/tplink_smartplug.py +++ b/tplink_smartplug.py @@ -19,103 +19,88 @@ # limitations under the License. # -import sys -import socket import argparse +import socket from struct import pack -version = 0.3 +version = 0.4 # Check if hostname is valid def validHostname(hostname): - try: - socket.gethostbyname(hostname) - except socket.error: - parser.error("Invalid hostname.") - return hostname + try: + socket.gethostbyname(hostname) + except socket.error: + parser.error("Invalid hostname.") + return hostname # Check if port is valid def validPort(port): - try: - port = int(port) - except ValueError: - parser.error("Invalid port number.") + try: + port = int(port) + except ValueError: + parser.error("Invalid port number.") - if ((port <= 1024) or (port >65535)) : - parser.error("Invalid port number.") + if ((port <= 1024) or (port > 65535)): + parser.error("Invalid port number.") - return port + return port # Predefined Smart Plug Commands # For a full list of commands, consult tplink_commands.txt -commands = { 'info' : '{"system":{"get_sysinfo":{}}}', - 'on' : '{"system":{"set_relay_state":{"state":1}}}', - 'off' : '{"system":{"set_relay_state":{"state":0}}}', - 'ledoff' : '{"system":{"set_led_off":{"off":1}}}', - 'ledon' : '{"system":{"set_led_off":{"off":0}}}', - 'cloudinfo': '{"cnCloud":{"get_info":{}}}', - 'wlanscan' : '{"netif":{"get_scaninfo":{"refresh":0}}}', - 'time' : '{"time":{"get_time":{}}}', - 'schedule' : '{"schedule":{"get_rules":{}}}', - 'countdown': '{"count_down":{"get_rules":{}}}', - 'antitheft': '{"anti_theft":{"get_rules":{}}}', - 'reboot' : '{"system":{"reboot":{"delay":1}}}', - 'reset' : '{"system":{"reset":{"delay":1}}}', - 'energy' : '{"emeter":{"get_realtime":{}}}' +commands = {'info' : '{"system":{"get_sysinfo":{}}}', + 'on' : '{"system":{"set_relay_state":{"state":1}}}', + 'off' : '{"system":{"set_relay_state":{"state":0}}}', + 'ledoff' : '{"system":{"set_led_off":{"off":1}}}', + 'ledon' : '{"system":{"set_led_off":{"off":0}}}', + 'cloudinfo': '{"cnCloud":{"get_info":{}}}', + 'wlanscan' : '{"netif":{"get_scaninfo":{"refresh":0}}}', + 'time' : '{"time":{"get_time":{}}}', + 'schedule' : '{"schedule":{"get_rules":{}}}', + 'countdown': '{"count_down":{"get_rules":{}}}', + 'antitheft': '{"anti_theft":{"get_rules":{}}}', + 'reboot' : '{"system":{"reboot":{"delay":1}}}', + 'reset' : '{"system":{"reset":{"delay":1}}}', + 'energy' : '{"emeter":{"get_realtime":{}}}' } # Encryption and Decryption of TP-Link Smart Home Protocol # XOR Autokey Cipher with starting key = 171 -# Python 3.x Version -if sys.version_info[0] > 2: - def encrypt(string): - key = 171 - result = pack('>I', len(string)) - for i in string: - a = key ^ ord(i) - key = a - result += bytes([a]) - return result - def decrypt(string): - key = 171 - result = "" - for i in string: - a = key ^ i - key = i - result += chr(a) - return result +def encrypt(string): + key = 171 + result = pack(">I", len(string)) + for i in string: + a = key ^ ord(i) + key = a + result += bytes([a]) + return result -# Python 2.x Version -else: - def encrypt(string): - key = 171 - result = pack('>I', len(string)) - for i in string: - a = key ^ ord(i) - key = a - result += chr(a) - return result +def decrypt(string): + key = 171 + result = "" + for i in string: + a = key ^ i + key = i + result += chr(a) + return result - def decrypt(string): - key = 171 - result = "" - for i in string: - a = key ^ ord(i) - key = ord(i) - result += chr(a) - return result # Parse commandline arguments -parser = argparse.ArgumentParser(description="TP-Link Wi-Fi Smart Plug Client v" + str(version)) -parser.add_argument("-t", "--target", metavar="", required=True, help="Target hostname or IP address", type=validHostname) -parser.add_argument("-p", "--port", metavar="", default=9999, required=False, help="Target port", type=validPort) -parser.add_argument("-q", "--quiet", dest='quiet', action='store_true', help="Only show result") -parser.add_argument("--timeout", default=10, required=False, help="Timeout to establish connection") +parser = argparse.ArgumentParser(description=f"TP-Link Wi-Fi Smart Plug Client v{version}") +parser.add_argument("-t", "--target", metavar="", required=True, + help="Target hostname or IP address", type=validHostname) +parser.add_argument("-p", "--port", metavar="", default=9999, + required=False, help="Target port", type=validPort) +parser.add_argument("-q", "--quiet", dest="quiet", action="store_true", + help="Only show result") +parser.add_argument("--timeout", default=10, required=False, + help="Timeout to establish connection") group = parser.add_mutually_exclusive_group(required=True) -group.add_argument("-c", "--command", metavar="", help="Preset command to send. Choices are: "+", ".join(commands), choices=commands) -group.add_argument("-j", "--json", metavar="", help="Full JSON string of command to send") +group.add_argument("-c", "--command", metavar="", + help="Preset command to send. Choices are: "+", ".join(commands), choices=commands) +group.add_argument("-j", "--json", metavar="", + help="Full JSON string of command to send") args = parser.parse_args() @@ -123,29 +108,28 @@ args = parser.parse_args() ip = args.target port = args.port if args.command is None: - cmd = args.json + cmd = args.json else: - cmd = commands[args.command] - + cmd = commands[args.command] # Send command and receive reply try: - sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock_tcp.settimeout(int(args.timeout)) - sock_tcp.connect((ip, port)) - sock_tcp.settimeout(None) - sock_tcp.send(encrypt(cmd)) - data = sock_tcp.recv(2048) - sock_tcp.close() + sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock_tcp.settimeout(int(args.timeout)) + sock_tcp.connect((ip, port)) + sock_tcp.settimeout(None) + sock_tcp.send(encrypt(cmd)) + data = sock_tcp.recv(2048) + sock_tcp.close() - decrypted = decrypt(data[4:]) + decrypted = decrypt(data[4:]) - if args.quiet: - print(decrypted) - else: - print("Sent: ", cmd) - print("Received: ", decrypted) + if args.quiet: + print(decrypted) + else: + print("Sent: ", cmd) + print("Received: ", decrypted) except socket.error: - quit("Could not connect to host " + ip + ":" + str(port)) + quit(f"Could not connect to host {ip}:{port}")