diff --git a/tddp-client/tddp_client.py b/tddp-client/tddp_client.py index c2233c0..0ef05fe 100755 --- a/tddp-client/tddp_client.py +++ b/tddp-client/tddp_client.py @@ -1,224 +1,198 @@ #!/usr/bin/env python3 -# -# TP-Link Device Debug Protocol (TDDP) v2 Client -# Based on https://www.google.com/patents/CN102096654A?cl=en -# -# HIGHLY EXPERIMENTAL and untested! -# The protocol is available on all kinds of TP-Link devices such as routers, cameras, smart plugs etc. -# -# by Lubomir Stroetmann -# Copyright 2016 softScheck GmbH -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# + +"""TP-Link Device Debug Protocol (TDDP) v2 Client. + +Based on https://www.google.com/patents/CN102096654A?cl=en + +HIGHLY EXPERIMENTAL and untested! +The protocol is available on all kinds of TP-Link devices such as routers, +cameras, smart plugs etc. + +by Lubomir Stroetmann +Copyright 2016 softScheck GmbH + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" import argparse import hashlib +import logging import socket import string - from binascii import hexlify, unhexlify -from pyDes import * -version = 0.3 +from pyDes import ECB, des -# Default username and password -username = "admin" -password = "admin" +version = 0.4 -# Check if IP is valid -def validIP(ip): - try: - socket.inet_pton(socket.AF_INET, ip) - except socket.error: - parser.error("Invalid IP Address.") - return ip -# Check if command is two hex chars -def validHex(cmd): - ishex = all(c in string.hexdigits for c in cmd) - if len(cmd) == 2 and ishex: +def generate_tddp_key(username: str, password: str) -> str: + """Generate TDDP DES Key.""" + return hashlib.md5(username.encode() + password.encode()).hexdigest()[:16] + + +def build_tddp_packet(cmd: str, tddp_key: str) -> str: + """Build TDDP packet.""" + tddp_ver = "02" + tddp_type = "03" + tddp_code = "01" + tddp_reply = "00" + tddp_length = "0000002A" + tddp_id = "0001" + tddp_subtype = cmd + tddp_reserved = "00" + tddp_digest = f"{00:0032X}" + tddp_data = "" + + tddp_length = len(tddp_data) // 2 + tddp_length = f"{tddp_length:008X}" + + key = des(unhexlify(tddp_key), ECB) + data = key.encrypt(unhexlify(tddp_data)) + + tddp_packet = "".join( + [ + tddp_ver, + tddp_type, + tddp_code, + tddp_reply, + tddp_length, + tddp_id, + tddp_subtype, + tddp_reserved, + tddp_digest, + hexlify(data.encode()).decode(), + ] + ) + + tddp_digest = hashlib.md5(unhexlify(tddp_packet)).hexdigest() + tddp_packet = tddp_packet[:24] + tddp_digest + tddp_packet[56:] + + logging.debug(f"Raw Request:\t{tddp_packet}") + + return tddp_packet + + +def send_request(ip: str, port_send: int, tddp_packet: str): + """Send TDDP request.""" + sock_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock_send.sendto(unhexlify(tddp_packet), (ip, port_send)) + logging.debug( + f"Req Data:\tVersion {tddp_packet[0:2]} " + f"Type {tddp_packet[2:4]} " + f"Status {tddp_packet[6:8]} " + f"Length {tddp_packet[8:16]} " + f"ID {tddp_packet[16:20]} " + f"Subtype {tddp_packet[20:22]}" + ) + sock_send.close() + + +def receive_reply(port_receive: int) -> str: + """Receive TDDP reply.""" + sock_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock_receive.bind(("", port_receive)) + response, addr = sock_receive.recvfrom(1024) + resp = hexlify(response).decode() + logging.info(f"Raw Reply:\t{resp}") + sock_receive.close() + return resp + + +def decrypt_and_print_response(response: str, tddp_key: str): + """Decrypt and print TDDP response.""" + key = des(unhexlify(tddp_key), ECB) + recv_data = response[56:] + if recv_data: + logging.info(f"Decrypted:\t{key.decrypt(unhexlify(recv_data))}") + + +def parse_args(): + """Parse commandline arguments.""" + + def valid_ip(ip: str) -> str: + """Check if IP is valid.""" + try: + socket.inet_pton(socket.AF_INET, ip) + except OSError: + parser.error("Invalid IP Address.") + return ip + + def valid_hex(cmd: str) -> str: + """Check if command is two hex chars.""" + ishex = all(c in string.hexdigits for c in cmd) + if not (len(cmd) == 2 and ishex): + parser.error("Please issue a two-character hex command, e.g. 0A") return cmd - else: - parser.error("Please issue a two-character hex command, e.g. 0A") -# Parse commandline arguments -parser = argparse.ArgumentParser(description="Experimental TP-Link TDDPv2 Client v" + str(version)) -parser.add_argument("-v", "--verbose", help="Verbose mode", action="store_true") -parser.add_argument("-t", "--target", metavar="", required=True, help="Target IP Address", type=validIP) -parser.add_argument("-u", "--username", metavar="", help="Username (default: admin)") -parser.add_argument("-p", "--password", metavar="", help="Password (default: admin)") -parser.add_argument("-c", "--command", metavar="", required=True, help="Command value to send as hex (e.g. 0A)", type=validHex) -args = parser.parse_args() + parser = argparse.ArgumentParser( + description=f"Experimental TP-Link TDDPv2 Client v.{version}" + ) + parser.add_argument("-v", "--verbose", help="Verbose mode", action="store_true") + parser.add_argument( + "-t", + "--target", + metavar="", + required=True, + help="Target IP Address", + type=valid_ip, + ) + parser.add_argument( + "-u", + "--username", + metavar="", + help="Username (default: admin)", + default="admin", + ) + parser.add_argument( + "-p", + "--password", + metavar="", + help="Password (default: admin)", + default="admin", + ) + parser.add_argument( + "-c", + "--command", + metavar="", + required=True, + help="Command value to send as hex (e.g. 0A)", + type=valid_hex, + ) + return parser.parse_args() -# Set Target IP, username and password to calculate DES decryption key for data and command to execute -ip = args.target -cmd = args.command -if args.username: + +def main(): + """Parse arguments, generate key, build and send tddp packet, + decrypt and print response. + """ + args = parse_args() + ip = args.target + cmd = args.command username = args.username -if args.password: password = args.password + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO) -# TDDP runs on UDP Port 1040 -# Response is sent to UDP Port 61000 -port_send = 1040 -port_receive = 61000 + port_send = 1040 + port_receive = 61000 + + tddp_key = generate_tddp_key(username, password) + logging.info(f"TDDP Key:\t{tddp_key} ({username}:{password})") + tddp_packet = build_tddp_packet(cmd, tddp_key) + send_request(ip, port_send, tddp_packet) + response = receive_reply(port_receive) + decrypt_and_print_response(response, tddp_key) -# TDDP DES Key = MD5 of username and password concatenated -# Key is first 8 bytes only -tddp_key = hashlib.md5(username.encode() + password.encode()).hexdigest()[:16] -if args.verbose: - print("TDDP Key:\t", tddp_key, "(" + username + password + ")") - -## TDDP Header -# 0 1 2 3 -# 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 -# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -# | Ver | Type | Code | ReplyInfo | -# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -# | PktLength | -# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -# | PktID | SubType | Reserve | -# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -# | MD5 Digest[0-3] | -# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -# | MD5 Digest[4-7] | -# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -# | MD5 Digest[8-11] | -# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -# | MD5 Digest[12-15] | -# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - -## TDDP Protocol Version -tddp_ver = "02" - -## Packet Type -# 0x01 SET_USR_CFG - set configuration information -# 0x02 GET_SYS_INF - get configuration information -# 0x03 CMD_SPE_OPR - special configuration commands -# 0x04 HEART_BEAT - the heartbeat package -tddp_type = "03" - -## Code Request Type -# 0x01 TDDP_REQUEST -# 0x02 TDDP_REPLY -tddp_code = "01" - -## Reply Info Status -# 0x00 REPLY_OK -# 0x02 ? -# 0x03 ? -# 0x09 REPLY_ERROR -# 0xFF ? -tddp_reply = "00" - -## Packet Length (not including header) -# 4 bytes -tddp_length = "0000002A" - -## Packet ID -# 2 bytes -# supposed to be incremented +1 for each packet -tddp_id = "0001" - -# Subtype for CMD_SPE_OPR (Special Operations Command) -# Set to 0x00 for SET_USR_CFG and GET_SYS_INF -# -# Subtypes described in patent application, hex value unknown: -# CMD_SYS_OPR Router system operation, including: init, save, reboot, reset, clr dos -# CMD_AUTO_TEST MAC for writing operation, the user replies CMD_SYS_INIT broadcast packet -# CMD_CONFIG_MAC Factory settings MAC operations -# CMD_CANCEL_TEST Cancel automatic test, stop receiving broadcast packets -# CMD_GET_PROD_ID Get product ID -# CMD_SYS_INIT Initialize a router -# CMD_CONFIG_PIN Router PIN code -# -# Subtypes that seem to work for a HS-110 Smart Plug: -# 0x0A returns "ABCD0110" -# 0x12 returns the deviceID -# 0x14 returns the hwID -# 0x06 changes MAC -# 0x13 changes deviceID -# 0x15 changes deviceID -# -# Subtypes that seem to work for an Archer C9 Router: -# 0x0E returns physical status of WAN link: -# wan_ph_link 1 0 = disconnected -# wan_ph_link 1 1 = connected -# 0x0F returns logical status of WAN link: wan_logic_link 1 0 -# 0x0A returns \x00\x09\x00\x01\x00\x00\x00\x00 -# 0x15 returns \x01\x00\x00\x00\x00\x00\x00\x00 -# 0x18 returns 1 -tddp_subtype = cmd - -# Reserved -tddp_reserved = "00" - -# Digest 0-15 (32char/128bit/16byte) -# MD5 digest of entire packet -# Set to 0 initially for building the digest, then overwrite with result -tddp_digest = "%0.32X" % 00 - -# TDDP Data -# Always pad with 0x00 to a length divisible by 8 -# We're not sending any data since we're only sending read commands -tddp_data = "" - -# Recalculate length if sending data - -tddp_length = len(tddp_data)//2 -tddp_length = "%0.8X" % tddp_length - -## Encrypt data with key -key = des(unhexlify(tddp_key), ECB) -data = key.encrypt(unhexlify(tddp_data)) - -## Assemble packet -tddp_packet = "".join([tddp_ver, tddp_type, tddp_code, tddp_reply, - tddp_length, tddp_id, tddp_subtype, - tddp_reserved, tddp_digest, hexlify(data.encode()).decode()]) - -# Calculate MD5 -tddp_digest = hashlib.md5(unhexlify(tddp_packet)).hexdigest() -tddp_packet = tddp_packet[:24] + tddp_digest + tddp_packet[56:] - -# Binding receive socket in advance in case reply comes fast. -sock_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) -sock_receive.bind(('', port_receive)) - -# Send a request -sock_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) -sock_send.sendto(unhexlify(tddp_packet), (ip, port_send)) -if args.verbose: - print("Raw Request:\t", tddp_packet) -t = tddp_packet -print("Request Data:\tVersion", t[0:2], "Type", t[2:4], "Status", t[6:8], - "Length", t[8:16], "ID", t[16:20], "Subtype", t[20:22]) -sock_send.close() - -# Receive the reply -response, addr = sock_receive.recvfrom(1024) -r = hexlify(response).decode() -if args.verbose: - print("Raw Reply:\t", r) -sock_receive.close() -print("Reply Data:\tVersion", r[0:2], "Type", r[2:4], "Status", r[6:8], - "Length", r[8:16], "ID", r[16:20], "Subtype", r[20:22]) - -# Take payload and decrypt using key -recv_data = r[56:] -if recv_data: - print("Decrypted:\t", end="") - print(key.decrypt(unhexlify(recv_data))) - +if __name__ == "__main__": + main() diff --git a/tplink_smartplug.py b/tplink_smartplug.py index 4bdc8e4..6bbdb88 100755 --- a/tplink_smartplug.py +++ b/tplink_smartplug.py @@ -1,23 +1,22 @@ #!/usr/bin/env python3 -# -# TP-Link Wi-Fi Smart Plug Protocol Client -# For use with TP-Link HS-100 or HS-110 -# -# by Lubomir Stroetmann -# Copyright 2016 softScheck GmbH -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# + +"""TP-Link Wi-Fi Smart Plug Protocol client (TP-Link HS-100, HS-110, ...). + +Orignal author: Lubomir Stroetmann +Copyright 2016 softScheck GmbH + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" import argparse import socket @@ -25,51 +24,33 @@ from struct import pack version = 0.4 -# Check if hostname is valid -def validHostname(hostname): - try: - socket.gethostbyname(hostname) - except socket.error: - parser.error("Invalid hostname.") - return hostname - -# Check if port is valid -def validPort(port): - try: - port = int(port) - except ValueError: - parser.error("Invalid port number.") - - if ((port <= 1024) or (port > 65535)): - parser.error("Invalid port number.") - - return port - # Predefined Smart Plug Commands -# For a full list of commands, consult tplink_commands.txt -commands = {'info' : '{"system":{"get_sysinfo":{}}}', - 'on' : '{"system":{"set_relay_state":{"state":1}}}', - 'off' : '{"system":{"set_relay_state":{"state":0}}}', - 'ledoff' : '{"system":{"set_led_off":{"off":1}}}', - 'ledon' : '{"system":{"set_led_off":{"off":0}}}', - 'cloudinfo': '{"cnCloud":{"get_info":{}}}', - 'wlanscan' : '{"netif":{"get_scaninfo":{"refresh":0}}}', - 'time' : '{"time":{"get_time":{}}}', - 'schedule' : '{"schedule":{"get_rules":{}}}', - 'countdown': '{"count_down":{"get_rules":{}}}', - 'antitheft': '{"anti_theft":{"get_rules":{}}}', - 'reboot' : '{"system":{"reboot":{"delay":1}}}', - 'reset' : '{"system":{"reset":{"delay":1}}}', - 'energy' : '{"emeter":{"get_realtime":{}}}', - 'energy_reset' : '{"emeter":{"erase_emeter_stat":{}}}', - 'runtime_reset' : '{"schedule":{"erase_runtime_stat":{}}}' +# For a full list of commands, consult tplink-smarthome-commands.txt +commands = { + "antitheft": '{"anti_theft":{"get_rules":{}}}', + "cloudinfo": '{"cnCloud":{"get_info":{}}}', + "countdown": '{"count_down":{"get_rules":{}}}', + "energy_reset": '{"emeter":{"erase_emeter_stat":{}}}', + "energy": '{"emeter":{"get_realtime":{}}}', + "info": '{"system":{"get_sysinfo":{}}}', + "ledoff": '{"system":{"set_led_off":{"off":1}}}', + "ledon": '{"system":{"set_led_off":{"off":0}}}', + "off": '{"system":{"set_relay_state":{"state":0}}}', + "on": '{"system":{"set_relay_state":{"state":1}}}', + "reboot": '{"system":{"reboot":{"delay":1}}}', + "reset": '{"system":{"reset":{"delay":1}}}', + "runtime_reset": '{"schedule":{"erase_runtime_stat":{}}}', + "schedule": '{"schedule":{"get_rules":{}}}', + "time": '{"time":{"get_time":{}}}', + "wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}', } -# Encryption and Decryption of TP-Link Smart Home Protocol -# XOR Autokey Cipher with starting key = 171 -def encrypt(string): +def encrypt(string: str) -> bytes: + """Encryption of TP-Link Smart Home Protocol. + XOR Autokey Cipher with starting key = 171. + """ key = 171 result = pack(">I", len(string)) for i in string: @@ -78,60 +59,116 @@ def encrypt(string): result += bytes([a]) return result -def decrypt(string): + +def decrypt(ciphertext: bytes) -> bytearray: + """Decryption of TP-Link Smart Home Protocol. + XOR Autokey Cipher with starting key = 171. + """ key = 171 result = [] - for i in string: + for i in ciphertext: a = key ^ i key = i result.append(a) - return bytearray(result).decode('utf-8') + return bytearray(result).decode() -# Parse commandline arguments -parser = argparse.ArgumentParser(description=f"TP-Link Wi-Fi Smart Plug Client v{version}") -parser.add_argument("-t", "--target", metavar="", required=True, - help="Target hostname or IP address", type=validHostname) -parser.add_argument("-p", "--port", metavar="", default=9999, - required=False, help="Target port", type=validPort) -parser.add_argument("-q", "--quiet", dest="quiet", action="store_true", - help="Only show result") -parser.add_argument("--timeout", default=10, required=False, - help="Timeout to establish connection") -group = parser.add_mutually_exclusive_group(required=True) -group.add_argument("-c", "--command", metavar="", - help="Preset command to send. Choices are: "+", ".join(commands), choices=commands) -group.add_argument("-j", "--json", metavar="", - help="Full JSON string of command to send") -args = parser.parse_args() +def parse_args(): + """Parse commandline arguments.""" + + def valid_hostname(hostname: str) -> str: + """Check if hostname is valid.""" + try: + socket.gethostbyname(hostname) + except OSError: + parser.error("Invalid hostname.") + return hostname + + def valid_port(port: str) -> int: + """Check if port is valid.""" + print(type(port)) + try: + port = int(port) + except ValueError: + parser.error("Invalid port number.") + + if (port <= 1024) or (port > 65535): + parser.error("Invalid port number.") + + return port + + parser = argparse.ArgumentParser( + description=f"TP-Link Wi-Fi Smart Plug Client v{version}" + ) + parser.add_argument( + "-t", + "--target", + metavar="", + required=True, + help="Target hostname or IP address", + type=valid_hostname, + ) + parser.add_argument( + "-p", + "--port", + metavar="", + default=9999, + required=False, + help="Target port", + type=valid_port, + ) + parser.add_argument( + "-q", "--quiet", dest="quiet", action="store_true", help="Only show result" + ) + parser.add_argument( + "--timeout", default=10, required=False, help="Timeout to establish connection" + ) + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "-c", + "--command", + metavar="", + help="Preset command to send. Choices are: " + ", ".join(commands), + choices=commands, + ) + group.add_argument( + "-j", + "--json", + metavar="", + help="Full JSON string of command to send", + ) + return parser.parse_args() -# Set target IP, port and command to send -ip = args.target -port = args.port -if args.command is None: - cmd = args.json -else: - cmd = commands[args.command] +def main(): + """Read argument, send encrypted commands, output decrypted answer.""" + args = parse_args() + # Set target IP, port and command to send + ip = args.target + port = args.port + cmd = args.json if args.command is None else commands[args.command] + + # Send command and receive reply + try: + sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock_tcp.settimeout(int(args.timeout)) + sock_tcp.connect((ip, port)) + sock_tcp.settimeout(None) + sock_tcp.send(encrypt(cmd)) + data = sock_tcp.recv(2048) + sock_tcp.close() + + decrypted = decrypt(data[4:]) + + if args.quiet: + print(decrypted) + else: + print("Sent: ", cmd) + print("Received: ", decrypted) + + except OSError: + print(f"Could not connect to host {ip}:{port}") -# Send command and receive reply -try: - sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock_tcp.settimeout(int(args.timeout)) - sock_tcp.connect((ip, port)) - sock_tcp.settimeout(None) - sock_tcp.send(encrypt(cmd)) - data = sock_tcp.recv(2048) - sock_tcp.close() - - decrypted = decrypt(data[4:]) - - if args.quiet: - print(decrypted) - else: - print("Sent: ", cmd) - print("Received: ", decrypted) - -except socket.error: - quit(f"Could not connect to host {ip}:{port}") +if __name__ == "__main__": + main()