2
0
mirror of https://github.com/softScheck/tplink-smartplug synced 2026-01-11 23:38:46 +01:00

Code refactoring

This commit is contained in:
willi
2024-10-15 12:43:07 +02:00
parent ce7739564d
commit 1512e110a3
2 changed files with 321 additions and 310 deletions

View File

@@ -1,224 +1,198 @@
#!/usr/bin/env python3
#
# TP-Link Device Debug Protocol (TDDP) v2 Client
# Based on https://www.google.com/patents/CN102096654A?cl=en
#
# HIGHLY EXPERIMENTAL and untested!
# The protocol is available on all kinds of TP-Link devices such as routers, cameras, smart plugs etc.
#
# by Lubomir Stroetmann
# Copyright 2016 softScheck GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""TP-Link Device Debug Protocol (TDDP) v2 Client.
Based on https://www.google.com/patents/CN102096654A?cl=en
HIGHLY EXPERIMENTAL and untested!
The protocol is available on all kinds of TP-Link devices such as routers,
cameras, smart plugs etc.
by Lubomir Stroetmann
Copyright 2016 softScheck GmbH
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import argparse
import hashlib
import logging
import socket
import string
from binascii import hexlify, unhexlify
from pyDes import *
version = 0.3
from pyDes import ECB, des
# Default username and password
username = "admin"
password = "admin"
version = 0.4
# Check if IP is valid
def validIP(ip):
try:
socket.inet_pton(socket.AF_INET, ip)
except socket.error:
parser.error("Invalid IP Address.")
return ip
# Check if command is two hex chars
def validHex(cmd):
ishex = all(c in string.hexdigits for c in cmd)
if len(cmd) == 2 and ishex:
def generate_tddp_key(username: str, password: str) -> str:
"""Generate TDDP DES Key."""
return hashlib.md5(username.encode() + password.encode()).hexdigest()[:16]
def build_tddp_packet(cmd: str, tddp_key: str) -> str:
"""Build TDDP packet."""
tddp_ver = "02"
tddp_type = "03"
tddp_code = "01"
tddp_reply = "00"
tddp_length = "0000002A"
tddp_id = "0001"
tddp_subtype = cmd
tddp_reserved = "00"
tddp_digest = f"{00:0032X}"
tddp_data = ""
tddp_length = len(tddp_data) // 2
tddp_length = f"{tddp_length:008X}"
key = des(unhexlify(tddp_key), ECB)
data = key.encrypt(unhexlify(tddp_data))
tddp_packet = "".join(
[
tddp_ver,
tddp_type,
tddp_code,
tddp_reply,
tddp_length,
tddp_id,
tddp_subtype,
tddp_reserved,
tddp_digest,
hexlify(data.encode()).decode(),
]
)
tddp_digest = hashlib.md5(unhexlify(tddp_packet)).hexdigest()
tddp_packet = tddp_packet[:24] + tddp_digest + tddp_packet[56:]
logging.debug(f"Raw Request:\t{tddp_packet}")
return tddp_packet
def send_request(ip: str, port_send: int, tddp_packet: str):
"""Send TDDP request."""
sock_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_send.sendto(unhexlify(tddp_packet), (ip, port_send))
logging.debug(
f"Req Data:\tVersion {tddp_packet[0:2]} "
f"Type {tddp_packet[2:4]} "
f"Status {tddp_packet[6:8]} "
f"Length {tddp_packet[8:16]} "
f"ID {tddp_packet[16:20]} "
f"Subtype {tddp_packet[20:22]}"
)
sock_send.close()
def receive_reply(port_receive: int) -> str:
"""Receive TDDP reply."""
sock_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_receive.bind(("", port_receive))
response, addr = sock_receive.recvfrom(1024)
resp = hexlify(response).decode()
logging.info(f"Raw Reply:\t{resp}")
sock_receive.close()
return resp
def decrypt_and_print_response(response: str, tddp_key: str):
"""Decrypt and print TDDP response."""
key = des(unhexlify(tddp_key), ECB)
recv_data = response[56:]
if recv_data:
logging.info(f"Decrypted:\t{key.decrypt(unhexlify(recv_data))}")
def parse_args():
"""Parse commandline arguments."""
def valid_ip(ip: str) -> str:
"""Check if IP is valid."""
try:
socket.inet_pton(socket.AF_INET, ip)
except OSError:
parser.error("Invalid IP Address.")
return ip
def valid_hex(cmd: str) -> str:
"""Check if command is two hex chars."""
ishex = all(c in string.hexdigits for c in cmd)
if not (len(cmd) == 2 and ishex):
parser.error("Please issue a two-character hex command, e.g. 0A")
return cmd
else:
parser.error("Please issue a two-character hex command, e.g. 0A")
# Parse commandline arguments
parser = argparse.ArgumentParser(description="Experimental TP-Link TDDPv2 Client v" + str(version))
parser.add_argument("-v", "--verbose", help="Verbose mode", action="store_true")
parser.add_argument("-t", "--target", metavar="<ip>", required=True, help="Target IP Address", type=validIP)
parser.add_argument("-u", "--username", metavar="<username>", help="Username (default: admin)")
parser.add_argument("-p", "--password", metavar="<password>", help="Password (default: admin)")
parser.add_argument("-c", "--command", metavar="<hex>", required=True, help="Command value to send as hex (e.g. 0A)", type=validHex)
args = parser.parse_args()
parser = argparse.ArgumentParser(
description=f"Experimental TP-Link TDDPv2 Client v.{version}"
)
parser.add_argument("-v", "--verbose", help="Verbose mode", action="store_true")
parser.add_argument(
"-t",
"--target",
metavar="<ip>",
required=True,
help="Target IP Address",
type=valid_ip,
)
parser.add_argument(
"-u",
"--username",
metavar="<username>",
help="Username (default: admin)",
default="admin",
)
parser.add_argument(
"-p",
"--password",
metavar="<password>",
help="Password (default: admin)",
default="admin",
)
parser.add_argument(
"-c",
"--command",
metavar="<hex>",
required=True,
help="Command value to send as hex (e.g. 0A)",
type=valid_hex,
)
return parser.parse_args()
# Set Target IP, username and password to calculate DES decryption key for data and command to execute
ip = args.target
cmd = args.command
if args.username:
def main():
"""Parse arguments, generate key, build and send tddp packet,
decrypt and print response.
"""
args = parse_args()
ip = args.target
cmd = args.command
username = args.username
if args.password:
password = args.password
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
# TDDP runs on UDP Port 1040
# Response is sent to UDP Port 61000
port_send = 1040
port_receive = 61000
port_send = 1040
port_receive = 61000
tddp_key = generate_tddp_key(username, password)
logging.info(f"TDDP Key:\t{tddp_key} ({username}:{password})")
tddp_packet = build_tddp_packet(cmd, tddp_key)
send_request(ip, port_send, tddp_packet)
response = receive_reply(port_receive)
decrypt_and_print_response(response, tddp_key)
# TDDP DES Key = MD5 of username and password concatenated
# Key is first 8 bytes only
tddp_key = hashlib.md5(username.encode() + password.encode()).hexdigest()[:16]
if args.verbose:
print("TDDP Key:\t", tddp_key, "(" + username + password + ")")
## TDDP Header
# 0 1 2 3
# 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
# | Ver | Type | Code | ReplyInfo |
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
# | PktLength |
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
# | PktID | SubType | Reserve |
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
# | MD5 Digest[0-3] |
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
# | MD5 Digest[4-7] |
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
# | MD5 Digest[8-11] |
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
# | MD5 Digest[12-15] |
# +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
## TDDP Protocol Version
tddp_ver = "02"
## Packet Type
# 0x01 SET_USR_CFG - set configuration information
# 0x02 GET_SYS_INF - get configuration information
# 0x03 CMD_SPE_OPR - special configuration commands
# 0x04 HEART_BEAT - the heartbeat package
tddp_type = "03"
## Code Request Type
# 0x01 TDDP_REQUEST
# 0x02 TDDP_REPLY
tddp_code = "01"
## Reply Info Status
# 0x00 REPLY_OK
# 0x02 ?
# 0x03 ?
# 0x09 REPLY_ERROR
# 0xFF ?
tddp_reply = "00"
## Packet Length (not including header)
# 4 bytes
tddp_length = "0000002A"
## Packet ID
# 2 bytes
# supposed to be incremented +1 for each packet
tddp_id = "0001"
# Subtype for CMD_SPE_OPR (Special Operations Command)
# Set to 0x00 for SET_USR_CFG and GET_SYS_INF
#
# Subtypes described in patent application, hex value unknown:
# CMD_SYS_OPR Router system operation, including: init, save, reboot, reset, clr dos
# CMD_AUTO_TEST MAC for writing operation, the user replies CMD_SYS_INIT broadcast packet
# CMD_CONFIG_MAC Factory settings MAC operations
# CMD_CANCEL_TEST Cancel automatic test, stop receiving broadcast packets
# CMD_GET_PROD_ID Get product ID
# CMD_SYS_INIT Initialize a router
# CMD_CONFIG_PIN Router PIN code
#
# Subtypes that seem to work for a HS-110 Smart Plug:
# 0x0A returns "ABCD0110"
# 0x12 returns the deviceID
# 0x14 returns the hwID
# 0x06 changes MAC
# 0x13 changes deviceID
# 0x15 changes deviceID
#
# Subtypes that seem to work for an Archer C9 Router:
# 0x0E returns physical status of WAN link:
# wan_ph_link 1 0 = disconnected
# wan_ph_link 1 1 = connected
# 0x0F returns logical status of WAN link: wan_logic_link 1 0
# 0x0A returns \x00\x09\x00\x01\x00\x00\x00\x00
# 0x15 returns \x01\x00\x00\x00\x00\x00\x00\x00
# 0x18 returns 1
tddp_subtype = cmd
# Reserved
tddp_reserved = "00"
# Digest 0-15 (32char/128bit/16byte)
# MD5 digest of entire packet
# Set to 0 initially for building the digest, then overwrite with result
tddp_digest = "%0.32X" % 00
# TDDP Data
# Always pad with 0x00 to a length divisible by 8
# We're not sending any data since we're only sending read commands
tddp_data = ""
# Recalculate length if sending data
tddp_length = len(tddp_data)//2
tddp_length = "%0.8X" % tddp_length
## Encrypt data with key
key = des(unhexlify(tddp_key), ECB)
data = key.encrypt(unhexlify(tddp_data))
## Assemble packet
tddp_packet = "".join([tddp_ver, tddp_type, tddp_code, tddp_reply,
tddp_length, tddp_id, tddp_subtype,
tddp_reserved, tddp_digest, hexlify(data.encode()).decode()])
# Calculate MD5
tddp_digest = hashlib.md5(unhexlify(tddp_packet)).hexdigest()
tddp_packet = tddp_packet[:24] + tddp_digest + tddp_packet[56:]
# Binding receive socket in advance in case reply comes fast.
sock_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_receive.bind(('', port_receive))
# Send a request
sock_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_send.sendto(unhexlify(tddp_packet), (ip, port_send))
if args.verbose:
print("Raw Request:\t", tddp_packet)
t = tddp_packet
print("Request Data:\tVersion", t[0:2], "Type", t[2:4], "Status", t[6:8],
"Length", t[8:16], "ID", t[16:20], "Subtype", t[20:22])
sock_send.close()
# Receive the reply
response, addr = sock_receive.recvfrom(1024)
r = hexlify(response).decode()
if args.verbose:
print("Raw Reply:\t", r)
sock_receive.close()
print("Reply Data:\tVersion", r[0:2], "Type", r[2:4], "Status", r[6:8],
"Length", r[8:16], "ID", r[16:20], "Subtype", r[20:22])
# Take payload and decrypt using key
recv_data = r[56:]
if recv_data:
print("Decrypted:\t", end="")
print(key.decrypt(unhexlify(recv_data)))
if __name__ == "__main__":
main()

View File

@@ -1,23 +1,22 @@
#!/usr/bin/env python3
#
# TP-Link Wi-Fi Smart Plug Protocol Client
# For use with TP-Link HS-100 or HS-110
#
# by Lubomir Stroetmann
# Copyright 2016 softScheck GmbH
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""TP-Link Wi-Fi Smart Plug Protocol client (TP-Link HS-100, HS-110, ...).
Orignal author: Lubomir Stroetmann
Copyright 2016 softScheck GmbH
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import argparse
import socket
@@ -25,51 +24,33 @@ from struct import pack
version = 0.4
# Check if hostname is valid
def validHostname(hostname):
try:
socket.gethostbyname(hostname)
except socket.error:
parser.error("Invalid hostname.")
return hostname
# Check if port is valid
def validPort(port):
try:
port = int(port)
except ValueError:
parser.error("Invalid port number.")
if ((port <= 1024) or (port > 65535)):
parser.error("Invalid port number.")
return port
# Predefined Smart Plug Commands
# For a full list of commands, consult tplink_commands.txt
commands = {'info' : '{"system":{"get_sysinfo":{}}}',
'on' : '{"system":{"set_relay_state":{"state":1}}}',
'off' : '{"system":{"set_relay_state":{"state":0}}}',
'ledoff' : '{"system":{"set_led_off":{"off":1}}}',
'ledon' : '{"system":{"set_led_off":{"off":0}}}',
'cloudinfo': '{"cnCloud":{"get_info":{}}}',
'wlanscan' : '{"netif":{"get_scaninfo":{"refresh":0}}}',
'time' : '{"time":{"get_time":{}}}',
'schedule' : '{"schedule":{"get_rules":{}}}',
'countdown': '{"count_down":{"get_rules":{}}}',
'antitheft': '{"anti_theft":{"get_rules":{}}}',
'reboot' : '{"system":{"reboot":{"delay":1}}}',
'reset' : '{"system":{"reset":{"delay":1}}}',
'energy' : '{"emeter":{"get_realtime":{}}}',
'energy_reset' : '{"emeter":{"erase_emeter_stat":{}}}',
'runtime_reset' : '{"schedule":{"erase_runtime_stat":{}}}'
# For a full list of commands, consult tplink-smarthome-commands.txt
commands = {
"antitheft": '{"anti_theft":{"get_rules":{}}}',
"cloudinfo": '{"cnCloud":{"get_info":{}}}',
"countdown": '{"count_down":{"get_rules":{}}}',
"energy_reset": '{"emeter":{"erase_emeter_stat":{}}}',
"energy": '{"emeter":{"get_realtime":{}}}',
"info": '{"system":{"get_sysinfo":{}}}',
"ledoff": '{"system":{"set_led_off":{"off":1}}}',
"ledon": '{"system":{"set_led_off":{"off":0}}}',
"off": '{"system":{"set_relay_state":{"state":0}}}',
"on": '{"system":{"set_relay_state":{"state":1}}}',
"reboot": '{"system":{"reboot":{"delay":1}}}',
"reset": '{"system":{"reset":{"delay":1}}}',
"runtime_reset": '{"schedule":{"erase_runtime_stat":{}}}',
"schedule": '{"schedule":{"get_rules":{}}}',
"time": '{"time":{"get_time":{}}}',
"wlanscan": '{"netif":{"get_scaninfo":{"refresh":0}}}',
}
# Encryption and Decryption of TP-Link Smart Home Protocol
# XOR Autokey Cipher with starting key = 171
def encrypt(string):
def encrypt(string: str) -> bytes:
"""Encryption of TP-Link Smart Home Protocol.
XOR Autokey Cipher with starting key = 171.
"""
key = 171
result = pack(">I", len(string))
for i in string:
@@ -78,60 +59,116 @@ def encrypt(string):
result += bytes([a])
return result
def decrypt(string):
def decrypt(ciphertext: bytes) -> bytearray:
"""Decryption of TP-Link Smart Home Protocol.
XOR Autokey Cipher with starting key = 171.
"""
key = 171
result = []
for i in string:
for i in ciphertext:
a = key ^ i
key = i
result.append(a)
return bytearray(result).decode('utf-8')
return bytearray(result).decode()
# Parse commandline arguments
parser = argparse.ArgumentParser(description=f"TP-Link Wi-Fi Smart Plug Client v{version}")
parser.add_argument("-t", "--target", metavar="<hostname>", required=True,
help="Target hostname or IP address", type=validHostname)
parser.add_argument("-p", "--port", metavar="<port>", default=9999,
required=False, help="Target port", type=validPort)
parser.add_argument("-q", "--quiet", dest="quiet", action="store_true",
help="Only show result")
parser.add_argument("--timeout", default=10, required=False,
help="Timeout to establish connection")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-c", "--command", metavar="<command>",
help="Preset command to send. Choices are: "+", ".join(commands), choices=commands)
group.add_argument("-j", "--json", metavar="<JSON string>",
help="Full JSON string of command to send")
args = parser.parse_args()
def parse_args():
"""Parse commandline arguments."""
def valid_hostname(hostname: str) -> str:
"""Check if hostname is valid."""
try:
socket.gethostbyname(hostname)
except OSError:
parser.error("Invalid hostname.")
return hostname
def valid_port(port: str) -> int:
"""Check if port is valid."""
print(type(port))
try:
port = int(port)
except ValueError:
parser.error("Invalid port number.")
if (port <= 1024) or (port > 65535):
parser.error("Invalid port number.")
return port
parser = argparse.ArgumentParser(
description=f"TP-Link Wi-Fi Smart Plug Client v{version}"
)
parser.add_argument(
"-t",
"--target",
metavar="<hostname>",
required=True,
help="Target hostname or IP address",
type=valid_hostname,
)
parser.add_argument(
"-p",
"--port",
metavar="<port>",
default=9999,
required=False,
help="Target port",
type=valid_port,
)
parser.add_argument(
"-q", "--quiet", dest="quiet", action="store_true", help="Only show result"
)
parser.add_argument(
"--timeout", default=10, required=False, help="Timeout to establish connection"
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"-c",
"--command",
metavar="<command>",
help="Preset command to send. Choices are: " + ", ".join(commands),
choices=commands,
)
group.add_argument(
"-j",
"--json",
metavar="<JSON string>",
help="Full JSON string of command to send",
)
return parser.parse_args()
# Set target IP, port and command to send
ip = args.target
port = args.port
if args.command is None:
cmd = args.json
else:
cmd = commands[args.command]
def main():
"""Read argument, send encrypted commands, output decrypted answer."""
args = parse_args()
# Set target IP, port and command to send
ip = args.target
port = args.port
cmd = args.json if args.command is None else commands[args.command]
# Send command and receive reply
try:
sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock_tcp.settimeout(int(args.timeout))
sock_tcp.connect((ip, port))
sock_tcp.settimeout(None)
sock_tcp.send(encrypt(cmd))
data = sock_tcp.recv(2048)
sock_tcp.close()
decrypted = decrypt(data[4:])
if args.quiet:
print(decrypted)
else:
print("Sent: ", cmd)
print("Received: ", decrypted)
except OSError:
print(f"Could not connect to host {ip}:{port}")
# Send command and receive reply
try:
sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock_tcp.settimeout(int(args.timeout))
sock_tcp.connect((ip, port))
sock_tcp.settimeout(None)
sock_tcp.send(encrypt(cmd))
data = sock_tcp.recv(2048)
sock_tcp.close()
decrypted = decrypt(data[4:])
if args.quiet:
print(decrypted)
else:
print("Sent: ", cmd)
print("Received: ", decrypted)
except socket.error:
quit(f"Could not connect to host {ip}:{port}")
if __name__ == "__main__":
main()