# checkHelo-0.2.py 09/28/05 Copyright (C) 2005 David MacQuigg # Licensed under GPL or a commercial license. See the file LICENSE.TXT ''' Check that an ID in the Registry of Public Email Senders authorizes sending mail from an address IP. ''' import sys, DNS # standard Python modules from time import localtime, strftime # for data logging import dnsRegRec # to get a TXT record from the Registry using DNS import parseRegRec # to parse a Registry record using re ##from SpammerFile import SpammerDict # local blacklist DNS.timeout = 6 # over-rides default in dnsRegRec # Authentication methods available at this site: from email_auth import spf # SPF1 from sourceforge.net/projects/pymilter - milter-0.8.2 from email_auth import csv # based on spec from mipassoc.org/csv ##import sid # SenderID from Microsoft # Methods that require transfer of DATA: ##import dkim # from Yahoo/Cisco RCVR = 'mxR.receiver.domain' # Name of this MTA. ADMIN = 'joe@receiver.domain' # Address to report setup problems. ##DNS.DiscoverNameServers() # from /etc/resolv.conf or Windows Registry ## # to DNS.defaults # Alternative manual setting: DNS.defaults['server'] = ['216.183.68.110', '216.183.68.111'] ### LOGLEVEL = 1 # 0: None, 1: One line per message, 2: Full diagnostics LOGFILE = open('logfile', 'a') # append normal logging & statistics ##LOGFILE = sys.stdout # another alternative # Authentication methods currently supported, and the maximum number of DNS # queries allowed. Methods listed first will be run first, if offered by the # sender. METHOD_LIST = [['QR1', 0], ['CSV', 4], ['SPF', 30], ['SID', 30]] # Domain Rating Services - Assign whatever scores you think are appropriate for # each of the ratings from these services. Acceptance or rejection will be # based on the min, max and weighted average of these scores. 'Wt' is the # weight. See open-mail.org/services for a complete list of service codes. TRUSTED_SERVICES = {'S1': {'A': 9, 'B': 7, 'C': 5, 'D': 0, 'Wt': 5}, 'M2': {'A': 9, 'B': 7, 'C': 5, 'D': 0, 'Wt': 5}, 'H1': {'A': 9, 'B': 7, 'C': 5, 'D': 0, 'Wt': 5}} ACTIONS = ('ACCEPT', # accept for delivery, no more testing 'REJECT', # reject 'FILTER', # send to spam filter, delivery uncertain 'CONTINUE', # run more authentication tests, if available 'TEMPFAIL', # temporary reject, try again later 'DISCARD' # reject with no reply ) # see http://www.milter.org/milter_api/api.html USAGE = ''' To check that a Registered ID authorizes the use of an IP address: >>> checkID('208.20.133.17', H='mailout17.heaven.net') ('ACCEPT', (250, '', 'Sender OK'), \ [{'text': '208.20.133.17 heaven.net QR1 PASS ratings=S1:A,M2:A,H1:B', \ 'label': 'Authent:'}]) heaven.net is the Registered ID extracted from the HELO name. This domain offers SPF, SID, and a list of authorized netblocks. The IP matched one of the netblocks, and the ratings were good, so it got an immediate ACCEPT. >>> checkID('208.20.133.32', H='mailout32.heaven.net') ('REJECT', (550, '5.7.1', "Sending MTA '208.20.133.32' not authorized by \ 'heaven.net'"), \ [{'text': '208.20.133.32 heaven.net QR1 FAIL ratings=S1:A,M2:A,H1:B', \ 'label': 'Authent:'}]) The IP address didn't match, so it got a REJECT. >>> checkID('192.168.0.1', 'mx1.spammer.tld') ('FILTER', (250, '?.?.?', \ "Message will be sent to spam filter - don't depend on delivery.\\n\ Your domain 'spammer.tld' has ratings S1:C,M2:C.\\n\ See open-mail.org/rejects for help."), \ [{'text': '192.168.0.1 spammer.tld QR1 PASS ratings=S1:C,M2:C', \ 'label': 'Authent:'}]) The authentication check was OK, but the ratings were low, so it went to the filter. This is a typical spammer name. >>> checkID('69.55.226.139', H='mx1.wayforward.net', S='tway@optsw.com') ('ACCEPT', (250, '', 'sender SPF verified'), \ [{'text': '69.55.226.139 wayforward.net SPF1 pass ratings=S1:A,M2:A,H1:B', \ 'label': 'Authent:'}, \ {'text': 'pass (mxR.receiver.domain: domain of optsw.com designates \ 69.55.226.139 as permitted sender) client-ip=69.55.226.139; \ envelope-from=tway@optsw.com; helo=mx1.wayforward.net;', \ 'label': 'Received-SPF:'}]) This was an SPF1 authentication. Notice that the Registered ID, 'wayforward.net' is not the same as the identity verified by SPF, S='tway@optsw.com'. In general, when calling for an ID checkon a domain whose authentication methods are unknown, it is best to provide all identities in the call, and let the Registry sort out which one to use. Notice also there is an extra header 'Received-SPF:' in addition to the 'Authent:' header which is returned for all methods. >>> checkID('192.0.4.1', H='mx1.abilene.texas.rr.tld') ('FILTER', (250, '', 'domain in transition'), \ [{'text': '192.0.4.1 rr.tld SPF1 softfail ratings=S1:A,M2:A,H1:B', \ 'label': 'Authent:'}, \ {'text': 'softfail (mxR.receiver.domain: transitioning domain of \ mx1.abilene.texas.rr.tld does not designate 192.0.4.1 as permitted sender) \ client-ip=192.0.4.1; envelope-from=postmaster@mx1.abilene.texas.rr.tld; \ helo=mx1.abilene.texas.rr.tld;', \ 'label': 'Received-SPF:'}]) rr.tld offers only SPF1, and this method produced a "softfail". Message was sent to the filter. If you get a timeout error, just try again after a few minutes: ->>> checkID('192.0.4.1', H='mx1.abilene.texas.rr.tld') ('TEMPFAIL', (450, '', \ "Timeout getting Registry record 'mx1.abilene.texas.rr.tld.df.open-mail.org'.\\n\ Try again later."), []) >>> checkID('208.20.133.17', H='mx1.example.co.uk') ('REJECT', (550, '5.7.1', "No mail allowed from 'example.co.uk'\\n\ See open-mail.org/rejects."), []) This is a domain with no authentication records, just a simple 'stop=all' entry in the Registry, preventing use of its name in sending mail. To repeat this message, call checkID() with no arguments. ''' class NoID(Exception): pass class BadFQDN(Exception): pass from DNS import DNSError from dnsRegRec import Timeout, NoRecord # subclasses of DNSError class NoRatings(Exception): pass from parseRegRec import ParseError def checkID( IP=None, H=None, S=None, R=[], Hdrs=[], Body=None ): ''' H, S, and R are the envelope names, whatever is known at the time of this call. H and S are the names from the HELO/EHLO and MAIL FROM commands. R is a list of recipient addresses from the RCPT TO command. Returns ( action, SMTP_reply, headers ) action: 'ACCEPT', 'REJECT', 'FILTER', 'CONTINUE', 'TEMPFAIL', 'DISCARD' SMTP_reply = ( SMTP_code, Xcode, explanation ) SMTP_code: SMTP Reply Code per RFC-2821 Xcode: Enhanced Mail System Status Code per RFC-3463 headers = [header0, header1, ...] header0 = {'label': 'Authent:', 'text': } headerN = {'label': , 'text': } Headers to be pre-pended in the order listed. 'header0' is the very first header from this Administrative Domain, 'header1' is above that, etc. Other headers may be inserted in this list if required by a method, e.g. SPF requires that the 'Received:' header for the border MTA be below its header. Text strings have no line breaks. Folding white space should be added before prepending a long header. >>> checkID('192.0.2.1', 'example.net') ('REJECT', (550, '5.7.1', "Sending MTA '192.0.2.1' not authorized by \ 'example.net'"), []) >>> checkID('192.0.2.1', 'example.com') ('ACCEPT', (250, '', 'Sender OK'), \ [{'text': '192.0.2.1 example.com SPF1 PASS ratings=S1:A,M2:A,H1:B', \ 'label': 'Authent:'}]) ''' if IP == None: print USAGE; return if LOGLEVEL > 1: print >> LOGFILE, "Testing %s %s %s" % (IP, H, S) ## Stub for local testing with no DNS queries: if IP == '192.0.2.1': return local_test(IP, H) ## Quick pre-screen for heavily-spamming IPs # Comment out this section if not needed. ### if checkSBL(IP): explain = "IP '%s' is on Spamhaus BlackList" % IP if LOGLEVEL > 0: log('REJECT', 'SBL', IP) return ('REJECT', (550, '5.7.1', explain), []) ## Process a Registry record assuming H has a valid Registered ID: try: ID = validate_FQDN( H ) record = dnsRegRec.getTXT( ID + '.id.open-mail.org') parsedrec = parseRegRec.parse1( record ) # Look for ID at a lower level: if parsedrec.has_key('IDlevel'): # TLD is a country code lev = int(parsedrec['IDlevel']) IDparts = H.split('.')[-lev:] ID = '.'.join(IDparts) # This is the real ID record = dnsRegRec.getTXT( ID + '.id.open-mail.org') parsedrec = parseRegRec.parse1( record ) # Stop forgery with variations on domain names: if parsedrec.has_key('stop'): exp = ("No mail allowed from '%s'\n" % ID + "See open-mail.org/rejects." ) return ('REJECT', (550, '5.7.1', exp), []) svclist = parseRegRec.parse2(parsedrec['svc']) ratings, score, mins, maxs = get_ratings( svclist ) except BadFQDN, explain: exp = str(explain) return ('REJECT', (550, '5.7.1', exp), []) except NoRecord, explain: exp = str(explain) + ("\nDeclared ID with no Registry record!" + "\nSee open-mail.org/rejects." ) return ('REJECT', (550, '5.7.1', exp), []) except NoRatings: explain = ("No trusted ratings for '%s'" % ID + "\nDelivery uncertain." ) SMTP_reply = (250, '?.?.?', explain) ### codes ? headers = [{'label': 'Authent:', \ 'text': '%s %s UNVERIFIED' % (IP, ID)}] return ('FILTER', SMTP_reply, headers) except Timeout, explain: exp = str(explain) + "\nTry again later." return ('TEMPFAIL', (450, '', exp), []) except (DNSError, ParseError), explain: # Nameservers not found. Bad record syntax, etc. exp = str(explain) ALERT('TEMPFAIL', 'DNS', IP, ID, explain=exp) # Alert the mail admin. return ('TEMPFAIL', (450, '', exp), []) ## ## DISCARD with no reply: ### tarpits are controversial !!! ## if ID in SpammerDict: ## return ('DISCARD', None, []) ## Quick REJECT if ratings are too low. 80% of incoming is trash. if score < 3.0 or mins < 1: explain = ( "Sorry, we accept mail only from reputable domains.\n" + "Your domain '%s' has ratings %s.\n" % (ID, ratings) + "See open-mail.org/rejects for help." ) return ('REJECT', (550, '5.7.1', explain), []) ## Quick ACCEPT pending authentication. 15% of incoming is from A-rated # domains. The remaining 5% will be filtered and sorted by spam score. # if mins > 4 and maxs > 8: RATING = 'HIGH' else: RATING = 'LOW' ## Find the Authentication Methods to be used on this ID: # methlist = get_methods(parsedrec) # [['CSV', 2, ''], ['SPF', 10, 'mx ip4:24.30.18.0/24 ~all']] if methlist == []: ### Accepting these for now. explain = "No supported methods in Registry for '%s'" % ID SMTP_reply = (250, '', explain + "\nDelivery uncertain.") headers = [{'label': 'Authent:', \ 'text': '%s %s UNVERIFIED (%s)' % (IP, ID, explain)}] return ('CONTINUE', SMTP_reply, headers) ## Run the methods in sequence, breaking after the first one with a # final action (anything but CONTINUE). for meth in methlist: methname = meth[0] maxq = meth[1] if parsedrec.has_key(methname): params = parsedrec[methname] else: params = meth[2] action, SMTP_reply, headers = run_method( methname, maxq, params, ID, IP, H, S, R ) if action != 'CONTINUE': headers[0]['text'] += ' ratings=%s' % ratings break else: explain = "No final result from any authentication method." SMTP_reply = (250, '', explain) ### Accept for now headers = ['Authent: %s %s UNVERIFIED' % (IP, ID)] headers = [{'label': 'Authent:', \ 'text': '%s %s UNVERIFIED' % (IP, ID)}] return ('CONTINUE', SMTP_reply, headers) ## Final disposition by this MTA: if action == 'ACCEPT' and RATING == 'LOW': action = 'FILTER' explain = ( "Message will be sent to spam filter - don't depend on delivery.\n" + "Your domain '%s' has ratings %s.\n" % (ID, ratings) + "See open-mail.org/rejects for help." ) SMTP_reply = (250, '?.?.?', explain) ### codes ? return (action, SMTP_reply, headers) ## === Support Functions === def ALERT(action, reason, IP, ID=None, H=None, S=None, explain=None): if LOGLEVEL > 0: log(action, reason, IP, ID, H, S, explain) ## send_alert_to(ADMIN) def checkSBL(IP): '''Check the Spamhaus Blacklist for currently spamming IPs. See http://www.spamhaus.org/sbl/latest.lasso for latest IPs on SBL. >>> checkID('192.0.2.99') ('REJECT', (550, '5.7.1', "IP '192.0.2.99' is on Spamhaus BlackList"), []) ''' # stub for local testing if IP == '192.0.2.99': return True IPR = reverse_IP(IP) name = IPR + '.sbl-xbl.spamhaus.org' reqobj = DNS.Request(name, qtype='A') resp = reqobj.req() if resp.answers == []: return False else: return True def cidr_base(a, m): '''Return an integer, the lowest address in the CIDR block with length m that contains address a. >>> hex( cidr_base('192.168.255.255', 22) ) '0xC0A8FC00L' ''' inta = 0 for p in a.split('.'): intp = int(p) if not 0 <= intp < 256: raise SyntaxError, "Number out of range '%s'" % a inta = inta*256 + intp s = 32 - m base = (inta >> s) << s # Mask out the low bits. return base def cidr_match(a, ip4): '''Test if an address 'a' falls withing a CIDR block 'ip4' >>> cidr_match('192.168.255.59', '192.168.255.255/22') True ''' b, m = cidr_parse(ip4) base1 = cidr_base(a, m) base2 = cidr_base(b, m) if base1 == base2: return True else: return False def cidr_parse(ip4): '''Parse a string in CIDR notation. >>> cidr_parse('192.168.255.255/22') ('192.168.255.255', 22) ''' sp = ip4.split('/') if len(sp) == 2: a = sp[0]; m = int(sp[1]) elif len(sp) == 1: a = sp[0]; m = 32 else: raise SyntaxError, ip4 return a, m def get_methods(parsedrec): methlist = [] if parsedrec.has_key('ip4'): # simple check against ip4 addresses methlist.append(['QR1', 0, 'ip4:' + parsedrec['ip4']]) elif parsedrec.has_key('ip6'): # simple check against ip6 addresses methlist.append(['QR1', 0, 'ip6:' + parsedrec['ip6']]) if parsedrec.has_key('meth'): senders_methods = parseRegRec.parse2(parsedrec['meth']) # [['CSV', 2, ''], ['SPF', 10, 'mx ip4:24.30.18.0/24 ~all']] # METHOD_LIST = [['QR1', 0], ['CSV', 4], ['SPF', 30], ['SID', 30]] for rmeth in METHOD_LIST: # receivers methods for smeth in senders_methods: if smeth[0] == rmeth[0]: # we have a match :>) queries = smeth[1] # what the sender wants if queries <= rmeth[1]: # what we will allow methlist.append(smeth) ## else: ## # Alternative to use during a DoS attack. ## explain = "Method %s, needing %s queries, too much \ ## for now. Try again later." % (methname, queries) ## return ('TEMPFAIL', (450, '?.?.?', explain), []) return methlist def get_ratings( svclist ): ratings = ''; maxs = 0; mins = 9 # Initial values total_s = 0.0; total_w = 0.0; # for weighted average for item in svclist: svc_code = item[0]; rating = item[2] if svc_code not in TRUSTED_SERVICES: continue # ignore unknown services ratings += svc_code + ':' + rating + ',' score = TRUSTED_SERVICES[svc_code][rating] weight = TRUSTED_SERVICES[svc_code]['Wt'] total_s += score * weight total_w += weight maxs = max(maxs, score); mins = min(mins, score) if total_w > 14.0: break # enough to make a decision if total_w == 0.0: raise NoRatings ratings = ratings.rstrip(',') score = total_s / total_w if LOGLEVEL > 1: print >> LOGFILE, ratings, score, mins, maxs return (ratings, score, mins, maxs) def local_test(IP, H): if H == 'example.com': headers = [ {'label': 'Authent:', 'text': '192.0.2.1 example.com SPF1 PASS ratings=S1:A,M2:A,H1:B'} ] return ('ACCEPT', ( 250, '', 'Sender OK'), headers) else: return ('REJECT', (550, '5.7.1', "Sending MTA '%s' not authorized by '%s'" % (IP, H)), []) def log(action, reason, IP, ID=None, H=None, S=None, explain=None): DT = strftime("%x %X", localtime()) print >> LOGFILE, DT, action, reason, IP, ID, H, S, explain def reverse_IP(IP): ''' >>> reverse_IP('1.2.3.4') '4.3.2.1' ''' labels = IP.split('.') labels.reverse() return '.'.join(labels) def validate_FQDN( name ): '''Check for a valid fully-qualified domain name. Look for common problems with forged names - easy rejects. >>> validate_FQDN( 'JUPITER' ) Traceback (most recent call last): - - - BadFQDN: 'JUPITER' is not a valid sender's name. ''' try: splits = name.split('.') assert len(splits) > 1 ### more tests here - RFC conformity, etc. # check for common spammer tricks # - numbers instead of names # - use of receiver's name except: raise BadFQDN, "'%s' is not a valid sender's name." % name return '.'.join(splits[-2:]) # ID is last two parts of name ## === Method Wrappers === def run_method(methname, maxq, params, ID, IP, H, S, R ): '''Call a wrapper function for each method, and translate its return values into the (action, SMTP_reply, headers) format needed by checkID(). ''' if methname == 'SPF': return _spf(maxq, params, ID, IP, H, S ) if methname == 'SID': return _sid(maxq, params, ID, IP, H, S ) if methname == 'CSV': return _csv(ID, IP, H) if methname == 'QR1': return _qr1(params, ID, IP) raise ValueError, "Method '%s' not supported." % methname def _spf(maxq, params, ID, IP, H, S ): '''Runs tests from the spf.py module, using an SPF record in the string params, or getting the SPF record using DNS queries as specifed in the SPF method. Not Implemented: Enforcement of maxq number of queries >>> _spf(10, '', 'mx1.wayforward.net', '69.55.226.139', \ 'mx1.wayforward.net','tway@optsw.com' ) ('ACCEPT', (250, '', 'sender SPF verified'), \ [{'text': '69.55.226.139 mx1.wayforward.net SPF1 pass', 'label': 'Authent:'}, \ {'text': 'pass (mxR.receiver.domain: domain of optsw.com designates 69.55.226.139 \ as permitted sender) client-ip=69.55.226.139; envelope-from=tway@optsw.com; \ helo=mx1.wayforward.net;', 'label': 'Received-SPF:'}]) >>> _spf(10, 'v=spf1 +mx +ip4:10.0.0.1 -all', 'mx1.wayforward.net', \ '10.0.0.1', 'mx1.wayforward.net', 'tway@optsw.com') ('ACCEPT', (250, '', 'sender SPF verified'), \ [{'text': '10.0.0.1 mx1.wayforward.net SPF1 pass', 'label': 'Authent:'}, \ {'text': 'pass (mxR.receiver.domain: domain of optsw.com designates 10.0.0.1 \ as permitted sender) client-ip=10.0.0.1; envelope-from=tway@optsw.com; \ helo=mx1.wayforward.net;', 'label': 'Received-SPF:'}]) ''' q_obj = spf.query(IP, S, H ) # an SPF query object if params == '': # run the standard SPF test on S and H result, SMTP_code, explain = spf.check(IP, S, H) else: # use the SPF record in params to run the test result, SMTP_code, explain = q_obj.check(params) SMTP_reply = (SMTP_code, '', explain) header0 = {'label': 'Authent:', 'text': '%s %s SPF1 %s' % (IP, ID, result) } header1 = {'label': 'Received-SPF:', \ 'text': q_obj.get_header(result, RCVR)} if result == 'pass': return ('ACCEPT', SMTP_reply, [header0, header1]) elif result in ('neutral', 'none', 'softfail'): return ('FILTER', SMTP_reply, [header0, header1]) elif result in ('fail', 'unknown'): return ('REJECT', SMTP_reply, [header0, header1]) elif result == 'error': ALERT(IP, ID, explain) # Send an email to the admin. return ('TEMPFAIL', SMTP_reply, [header0, header1]) else: raise ValueError, 'Unrecognized result from SPF check' def _csv(ID, IP, H): '''Runs tests from the csv.py module, using the CSV records found by DNS queries as specifed in the CSV method. >>> _csv('mail-abuse.org', '168.61.5.27', 'harry.mail-abuse.org') ('ACCEPT', (250, '?.?.?', 'Sender CSV OK'), \ [{'text': '168.61.5.27 harry.mail-abuse.org CSV1 pass', \ 'label': 'Authent:'}]) ''' result, SMTP_reply = csv.csa(IP, H) SMTP_code, Xcode, explain = SMTP_reply header = {'label': 'Authent:', 'text': '%s %s CSV1 %s' % (IP, H, result) } if result == 'pass': return ('ACCEPT', SMTP_reply, [header]) elif result in ('none', 'neutral'): return ('FILTER', SMTP_reply, [header]) elif result == 'fail': return ('REJECT', SMTP_reply, [header]) elif result == 'tempfail': return ('TEMPFAIL', SMTP_reply, [header]) elif result == 'error': ALERT(IP, ID, explain) # Send an email to the admin. return ('TEMPFAIL', SMTP_reply, [header]) else: raise ValueError, 'Unrecognized result from CSA check' def _qr1(params, ID, IP): '''Check an IP against an IP block contained in the string 'params'. >>> _qr1('ip4:192.168.128.0/22', 'example.com', '192.168.128.77') ('ACCEPT', (250, '', 'Sender OK'), \ [{'text': '192.168.128.77 example.com QR1 PASS', 'label': 'Authent:'}]) >>> _qr1('ip4:216.183.68.0/24,192.0.0.0/22', 'example.com', '192.168.128.77') ('REJECT', (550, '5.7.1', "Sending MTA '192.168.128.77' \ not authorized by 'example.com'"), \ [{'text': '192.168.128.77 example.com QR1 FAIL', 'label': 'Authent:'}]) ''' ip4s = params.split(':')[1] # 216.183.68.0/24,192.0.0.0/22 ip4list = ip4s.split(',') # ['216.183.68.0/24', '192.0.0.0/22'] for ip4 in ip4list: if cidr_match(IP, ip4): result = 'PASS' action = 'ACCEPT' SMTP_reply = (250, '', 'Sender OK') break else: result = 'FAIL' action = 'REJECT' explain = "Sending MTA '%s' not authorized by '%s'" % (IP, ID) SMTP_reply = (550, '5.7.1', explain) header0 = {'label': 'Authent:', 'text': '%s %s QR1 %s' % (IP, ID, result) } return (action, SMTP_reply, [header0]) if __name__ == '__main__': __test__ = { 'Usage_String': USAGE, # Verify all the examples in the USAGE string. 'unit_tests': # This is the place for edge cases and other tedious # tests with no tutorial value. ''' >>> unit_tests() === UNIT TESTS === TEST #1 PASSED ''' } def unit_tests(): # tests needing setup and post-processing print ' === UNIT TESTS ===' IP = '192.0.2.1' action, r, h = checkID(IP, 'example.com') if action == 'ACCEPT': print "TEST #1 PASSED" import sys, doctest doctest.testmod(sys.modules['__main__'], verbose=True)