2
0
mirror of https://github.com/softScheck/tplink-smartplug synced 2026-01-11 23:38:46 +01:00

Dropped Python2 support; improved code style

This commit is contained in:
willi
2020-11-05 17:58:43 +01:00
parent e975f579b3
commit 630c777049

View File

@@ -19,103 +19,88 @@
# limitations under the License. # limitations under the License.
# #
import sys
import socket
import argparse import argparse
import socket
from struct import pack from struct import pack
version = 0.3 version = 0.4
# Check if hostname is valid # Check if hostname is valid
def validHostname(hostname): def validHostname(hostname):
try: try:
socket.gethostbyname(hostname) socket.gethostbyname(hostname)
except socket.error: except socket.error:
parser.error("Invalid hostname.") parser.error("Invalid hostname.")
return hostname return hostname
# Check if port is valid # Check if port is valid
def validPort(port): def validPort(port):
try: try:
port = int(port) port = int(port)
except ValueError: except ValueError:
parser.error("Invalid port number.") parser.error("Invalid port number.")
if ((port <= 1024) or (port >65535)) : if ((port <= 1024) or (port > 65535)):
parser.error("Invalid port number.") parser.error("Invalid port number.")
return port return port
# Predefined Smart Plug Commands # Predefined Smart Plug Commands
# For a full list of commands, consult tplink_commands.txt # For a full list of commands, consult tplink_commands.txt
commands = { 'info' : '{"system":{"get_sysinfo":{}}}', commands = {'info' : '{"system":{"get_sysinfo":{}}}',
'on' : '{"system":{"set_relay_state":{"state":1}}}', 'on' : '{"system":{"set_relay_state":{"state":1}}}',
'off' : '{"system":{"set_relay_state":{"state":0}}}', 'off' : '{"system":{"set_relay_state":{"state":0}}}',
'ledoff' : '{"system":{"set_led_off":{"off":1}}}', 'ledoff' : '{"system":{"set_led_off":{"off":1}}}',
'ledon' : '{"system":{"set_led_off":{"off":0}}}', 'ledon' : '{"system":{"set_led_off":{"off":0}}}',
'cloudinfo': '{"cnCloud":{"get_info":{}}}', 'cloudinfo': '{"cnCloud":{"get_info":{}}}',
'wlanscan' : '{"netif":{"get_scaninfo":{"refresh":0}}}', 'wlanscan' : '{"netif":{"get_scaninfo":{"refresh":0}}}',
'time' : '{"time":{"get_time":{}}}', 'time' : '{"time":{"get_time":{}}}',
'schedule' : '{"schedule":{"get_rules":{}}}', 'schedule' : '{"schedule":{"get_rules":{}}}',
'countdown': '{"count_down":{"get_rules":{}}}', 'countdown': '{"count_down":{"get_rules":{}}}',
'antitheft': '{"anti_theft":{"get_rules":{}}}', 'antitheft': '{"anti_theft":{"get_rules":{}}}',
'reboot' : '{"system":{"reboot":{"delay":1}}}', 'reboot' : '{"system":{"reboot":{"delay":1}}}',
'reset' : '{"system":{"reset":{"delay":1}}}', 'reset' : '{"system":{"reset":{"delay":1}}}',
'energy' : '{"emeter":{"get_realtime":{}}}' 'energy' : '{"emeter":{"get_realtime":{}}}'
} }
# Encryption and Decryption of TP-Link Smart Home Protocol # Encryption and Decryption of TP-Link Smart Home Protocol
# XOR Autokey Cipher with starting key = 171 # XOR Autokey Cipher with starting key = 171
# Python 3.x Version
if sys.version_info[0] > 2:
def encrypt(string):
key = 171
result = pack('>I', len(string))
for i in string:
a = key ^ ord(i)
key = a
result += bytes([a])
return result
def decrypt(string): def encrypt(string):
key = 171 key = 171
result = "" result = pack(">I", len(string))
for i in string: for i in string:
a = key ^ i a = key ^ ord(i)
key = i key = a
result += chr(a) result += bytes([a])
return result return result
# Python 2.x Version def decrypt(string):
else: key = 171
def encrypt(string): result = ""
key = 171 for i in string:
result = pack('>I', len(string)) a = key ^ i
for i in string: key = i
a = key ^ ord(i) result += chr(a)
key = a return result
result += chr(a)
return result
def decrypt(string):
key = 171
result = ""
for i in string:
a = key ^ ord(i)
key = ord(i)
result += chr(a)
return result
# Parse commandline arguments # Parse commandline arguments
parser = argparse.ArgumentParser(description="TP-Link Wi-Fi Smart Plug Client v" + str(version)) parser = argparse.ArgumentParser(description=f"TP-Link Wi-Fi Smart Plug Client v{version}")
parser.add_argument("-t", "--target", metavar="<hostname>", required=True, help="Target hostname or IP address", type=validHostname) parser.add_argument("-t", "--target", metavar="<hostname>", required=True,
parser.add_argument("-p", "--port", metavar="<port>", default=9999, required=False, help="Target port", type=validPort) help="Target hostname or IP address", type=validHostname)
parser.add_argument("-q", "--quiet", dest='quiet', action='store_true', help="Only show result") parser.add_argument("-p", "--port", metavar="<port>", default=9999,
parser.add_argument("--timeout", default=10, required=False, help="Timeout to establish connection") required=False, help="Target port", type=validPort)
parser.add_argument("-q", "--quiet", dest="quiet", action="store_true",
help="Only show result")
parser.add_argument("--timeout", default=10, required=False,
help="Timeout to establish connection")
group = parser.add_mutually_exclusive_group(required=True) group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("-c", "--command", metavar="<command>", help="Preset command to send. Choices are: "+", ".join(commands), choices=commands) group.add_argument("-c", "--command", metavar="<command>",
group.add_argument("-j", "--json", metavar="<JSON string>", help="Full JSON string of command to send") help="Preset command to send. Choices are: "+", ".join(commands), choices=commands)
group.add_argument("-j", "--json", metavar="<JSON string>",
help="Full JSON string of command to send")
args = parser.parse_args() args = parser.parse_args()
@@ -123,29 +108,28 @@ args = parser.parse_args()
ip = args.target ip = args.target
port = args.port port = args.port
if args.command is None: if args.command is None:
cmd = args.json cmd = args.json
else: else:
cmd = commands[args.command] cmd = commands[args.command]
# Send command and receive reply # Send command and receive reply
try: try:
sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock_tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock_tcp.settimeout(int(args.timeout)) sock_tcp.settimeout(int(args.timeout))
sock_tcp.connect((ip, port)) sock_tcp.connect((ip, port))
sock_tcp.settimeout(None) sock_tcp.settimeout(None)
sock_tcp.send(encrypt(cmd)) sock_tcp.send(encrypt(cmd))
data = sock_tcp.recv(2048) data = sock_tcp.recv(2048)
sock_tcp.close() sock_tcp.close()
decrypted = decrypt(data[4:]) decrypted = decrypt(data[4:])
if args.quiet: if args.quiet:
print(decrypted) print(decrypted)
else: else:
print("Sent: ", cmd) print("Sent: ", cmd)
print("Received: ", decrypted) print("Received: ", decrypted)
except socket.error: except socket.error:
quit("Could not connect to host " + ip + ":" + str(port)) quit(f"Could not connect to host {ip}:{port}")