# csv-0.41.py 08/23/05 Copyright (C) 2005 David MacQuigg # Licensed under the GPL or a commercial license. See the file LICENSE.TXT ''' Email authentication using CSV/CSA v.1 (draft-ietf-marid-csv-csa-02) Not Implemented: Checking parent domains for subdomain authorization. IPv6. ''' # 0.41 Test for no nameservers found. # 0.4 Return results, not actions. # Change module name to csv, function name to csa. # 0.3 Handle weight=3 records. # 0.2 Ignore SRV records with version numbers other than 1. import DNS # from http://pydns.sourceforge.net from DNS import DNSError DNS.DiscoverNameServers() # from /etc/resolv.conf or Windows Registry # to DNS.defaults if DNS.defaults['server'] == []: raise DNSError, "No nameservers found." # Set manually if DiscoverNameServers doesn't work on your platform: # DNS.defaults['server'] = ['216.183.68.110', '216.183.68.111'] ### DNS.timeout = 30 # default if not over-ridden by main module verbose = 0 def csa(IP, HELO): ''' Runs a CSV/CSA test on a HELO name. IP is the SMTP sender's IP address. HELO is a string from the HELO command, which must be verified as a fully- qualified domain name prior to this call. Returns ( result, SMTP_reply ) result = ( 'pass', # IP passes the test 'fail', # IP fails the test / error on sender side 'neutral', # test completed with no definite result 'none', # no CSV records found 'tempfail', # try again in a few minutes 'error' # on receiver side, e.g. "No route to nameservers", # "No working nameservers", etc. ) SMTP_reply = ( SMTP_code, Xcode, explanation ) (550, '5.7.1', 'Sender not authorized') SMTP_code: SMTP Reply Code per RFC-2821 Xcode: Enhanced Mail System Status Code per RFC-3463 Examples: >>> csa('168.61.5.27', 'harry.mail-abuse.org') ('pass', (250, '?.?.?', 'Sender CSV OK')) >>> csa('192.168.0.1', 'harry.mail-abuse.org') ('fail', (550, '5.7.1', \ "'192.168.0.1' not authorized by 'harry.mail-abuse.org'")) >>> csa('192.168.0.1', 'yahoo.com') ('none', (550, '?.?.?', "No CSV1 record for '_client._smtp.yahoo.com'.")) ''' ## Get an SRV record for the HELO name: name = '_client._smtp.' + HELO if verbose > 0: print "CSV1 testing %s for '%s'" % (IP, name) try: reqobj = DNS.Request(name, qtype='SRV', timeout=DNS.timeout) resp = reqobj.req() except DNSError, explain: exp = str(explain) if exp == 'Timeout': msg = ("Timeout getting SRV record for '%s'.\n" % name + "Try again later." ) return ('tempfail', (450, '?.?.?', msg) ) else: msg == exp + "\nDNS error getting SRV record for '%s'" % name ALERT(IP, HELO, msg ) # error needs human attention return ('tempfail', (450, '?.?.?', msg) ) ## Extract the needed info from a version 1 record: if verbose > 0: print "Found %s SRV records" % len(resp.answers) count = 0 for ans in resp.answers: data = ans['data'] if verbose > 0: print data version = data[0] # CSV version (SRV priority field) if version != 1: continue # ignore any records that are not version 1 count += 1 weight = data[1] # authorization ( 1: NO, 2: YES, 3: MAYBE ) port = data[2] # subdomain authorization (ignore for now) target = data[3] # authorized hostname (ID) ## Check for too few or too many SRV records: if verbose > 0: print "Found %s version 1 records" % count if count == 0: exp = "No CSV1 record for '%s'." % name # or non-existent domain return ('none', (550, '?.?.?', exp) ) if count > 1: exp = "Found %s CSV1 records for '%s'. Should be 1." % (count, name) return ('fail', (550, '?.?.?', exp) ) if weight in (0,1): exp = "'%s' not authorized to send mail" % target return ('fail', (550, '5.7.1', exp) ) ## Special IPs for testing: if IP == '192.168.0.97': target = '_spf.open-mail.org' # No A records if IP == '192.168.0.98': target = 'nxdomain.open-mail.org' # No domain if IP == '192.168.0.99': weight = 3 # Authorization uncertain ## Check the 'A' records for the authorized name: # Will need 'AAAA' records for IPv6. try: reqobj = DNS.Request(target, qtype='A', timeout=DNS.timeout) resp = reqobj.req() except DNSError, explain: exp = str(explain) if exp == 'Timeout': msg = ("Timeout getting A records for '%s'.\n" % target + "Try again later." ) return ('tempfail', (450, '?.?.?', msg) ) else: msg == exp + "\nDNSError getting A records for '%s'" % target ALERT(IP, HELO, msg ) # error needs human attention return ('error', (450, '?.?.?', msg) ) ## Make a list of the authorized IP addresses: aa = [] for ans in resp.answers: aa.append(ans['data']) if verbose > 0: print "authorized IPs:", aa if aa == []: # No target domain, or no A records found. SMTP_reply = (550, '5.7.1', "CSV: No A records for '%s'" % target ) return ('fail', SMTP_reply ) ## Check incoming IP against the list: if IP in aa: return ('pass', (250, '?.?.?', 'Sender CSV OK') ) else: assert weight not in (0,1) # already tested this if weight == 2: SMTP_reply = (550, '5.7.1', "'%s' not authorized by '%s'" % (IP, HELO) ) return ('fail', SMTP_reply ) elif weight == 3: SMTP_reply = (250, '?.?.?', ### Xcode for "tentative accept"? "'%s' not listed by '%s' Do not depend on delivery." % (IP, HELO) ) return ('neutral', SMTP_reply ) else: SMTP_reply = (550, '5.7.1', "SRV record with weight=%s not valid for CSV version 1" % weight ) return ('fail', SMTP_reply ) def ALERT(IP, HELO, msg ): # something is broken ### pass # Receiver adds their desired handling here. e.g. send an email # to the postmaster@receivers.domain if __name__ == '__main__': __test__ = {'unit_tests': ''' >>> csa('208.31.42.39', 'iecc.com') ('fail', (550, '5.7.1', "'iecc.com' not authorized to send mail")) >>> csa('192.168.0.97', 'harry.mail-abuse.org') ('fail', (550, '5.7.1', "CSV: No A records for '_spf.open-mail.org'")) >>> csa('192.168.0.98', 'harry.mail-abuse.org') ('fail', (550, '5.7.1', "CSV: No A records for 'nxdomain.open-mail.org'")) >>> csa('192.168.0.99', 'harry.mail-abuse.org') ('neutral', (250, '?.?.?', "'192.168.0.99' not listed by 'harry.mail-abuse.org' \ Do not depend on delivery.")) ''' } import sys, doctest doctest.testmod(sys.modules['__main__'], verbose=True)