diff options
Diffstat (limited to 'Demo/dns/dnslib.py')
-rwxr-xr-x | Demo/dns/dnslib.py | 588 |
1 files changed, 0 insertions, 588 deletions
diff --git a/Demo/dns/dnslib.py b/Demo/dns/dnslib.py deleted file mode 100755 index 4e1fb87..0000000 --- a/Demo/dns/dnslib.py +++ /dev/null @@ -1,588 +0,0 @@ -# Domain Name Server (DNS) interface -# -# See RFC 1035: -# ------------------------------------------------------------------------ -# Network Working Group P. Mockapetris -# Request for Comments: 1035 ISI -# November 1987 -# Obsoletes: RFCs 882, 883, 973 -# -# DOMAIN NAMES - IMPLEMENTATION AND SPECIFICATION -# ------------------------------------------------------------------------ - - -import string - -import dnstype -import dnsclass -import dnsopcode - - -# Low-level 16 and 32 bit integer packing and unpacking - -def pack16bit(n): - return chr((n>>8)&0xFF) + chr(n&0xFF) - -def pack32bit(n): - return chr((n>>24)&0xFF) + chr((n>>16)&0xFF) \ - + chr((n>>8)&0xFF) + chr(n&0xFF) - -def unpack16bit(s): - return (ord(s[0])<<8) | ord(s[1]) - -def unpack32bit(s): - return (ord(s[0])<<24) | (ord(s[1])<<16) \ - | (ord(s[2])<<8) | ord(s[3]) - -def addr2bin(addr): - if type(addr) == type(0): - return addr - bytes = string.splitfields(addr, '.') - if len(bytes) != 4: raise ValueError, 'bad IP address' - n = 0 - for byte in bytes: n = n<<8 | string.atoi(byte) - return n - -def bin2addr(n): - return '%d.%d.%d.%d' % ((n>>24)&0xFF, (n>>16)&0xFF, - (n>>8)&0xFF, n&0xFF) - - -# Packing class - -class Packer: - def __init__(self): - self.buf = '' - self.index = {} - def getbuf(self): - return self.buf - def addbyte(self, c): - if len(c) != 1: raise TypeError, 'one character expected' - self.buf = self.buf + c - def addbytes(self, bytes): - self.buf = self.buf + bytes - def add16bit(self, n): - self.buf = self.buf + pack16bit(n) - def add32bit(self, n): - self.buf = self.buf + pack32bit(n) - def addaddr(self, addr): - n = addr2bin(addr) - self.buf = self.buf + pack32bit(n) - def addstring(self, s): - self.addbyte(chr(len(s))) - self.addbytes(s) - def addname(self, name): - # Domain name packing (section 4.1.4) - # Add a domain name to the buffer, possibly using pointers. - # The case of the first occurrence of a name is preserved. - # Redundant dots are ignored. - list = [] - for label in string.splitfields(name, '.'): - if label: - if len(label) > 63: - raise PackError, 'label too long' - list.append(label) - keys = [] - for i in range(len(list)): - key = string.upper(string.joinfields(list[i:], '.')) - keys.append(key) - if self.index.has_key(key): - pointer = self.index[key] - break - else: - i = len(list) - pointer = None - # Do it into temporaries first so exceptions don't - # mess up self.index and self.buf - buf = '' - offset = len(self.buf) - index = [] - for j in range(i): - label = list[j] - n = len(label) - if offset + len(buf) < 0x3FFF: - index.append( (keys[j], offset + len(buf)) ) - else: - print 'dnslib.Packer.addname:', - print 'warning: pointer too big' - buf = buf + (chr(n) + label) - if pointer: - buf = buf + pack16bit(pointer | 0xC000) - else: - buf = buf + '\0' - self.buf = self.buf + buf - for key, value in index: - self.index[key] = value - def dump(self): - keys = self.index.keys() - keys.sort() - print '-'*40 - for key in keys: - print '%20s %3d' % (key, self.index[key]) - print '-'*40 - space = 1 - for i in range(0, len(self.buf)+1, 2): - if self.buf[i:i+2] == '**': - if not space: print - space = 1 - continue - space = 0 - print '%4d' % i, - for c in self.buf[i:i+2]: - if ' ' < c < '\177': - print ' %c' % c, - else: - print '%2d' % ord(c), - print - print '-'*40 - - -# Unpacking class - -UnpackError = 'dnslib.UnpackError' # Exception - -class Unpacker: - def __init__(self, buf): - self.buf = buf - self.offset = 0 - def getbyte(self): - c = self.buf[self.offset] - self.offset = self.offset + 1 - return c - def getbytes(self, n): - s = self.buf[self.offset : self.offset + n] - if len(s) != n: raise UnpackError, 'not enough data left' - self.offset = self.offset + n - return s - def get16bit(self): - return unpack16bit(self.getbytes(2)) - def get32bit(self): - return unpack32bit(self.getbytes(4)) - def getaddr(self): - return bin2addr(self.get32bit()) - def getstring(self): - return self.getbytes(ord(self.getbyte())) - def getname(self): - # Domain name unpacking (section 4.1.4) - c = self.getbyte() - i = ord(c) - if i & 0xC0 == 0xC0: - d = self.getbyte() - j = ord(d) - pointer = ((i<<8) | j) & ~0xC000 - save_offset = self.offset - try: - self.offset = pointer - domain = self.getname() - finally: - self.offset = save_offset - return domain - if i == 0: - return '' - domain = self.getbytes(i) - remains = self.getname() - if not remains: - return domain - else: - return domain + '.' + remains - - -# Test program for packin/unpacking (section 4.1.4) - -def testpacker(): - N = 25 - R = range(N) - import timing - # See section 4.1.4 of RFC 1035 - timing.start() - for i in R: - p = Packer() - p.addbytes('*' * 20) - p.addname('f.ISI.ARPA') - p.addbytes('*' * 8) - p.addname('Foo.F.isi.arpa') - p.addbytes('*' * 18) - p.addname('arpa') - p.addbytes('*' * 26) - p.addname('') - timing.finish() - print round(timing.milli() * 0.001 / N, 3), 'seconds per packing' - p.dump() - u = Unpacker(p.buf) - u.getbytes(20) - u.getname() - u.getbytes(8) - u.getname() - u.getbytes(18) - u.getname() - u.getbytes(26) - u.getname() - timing.start() - for i in R: - u = Unpacker(p.buf) - res = (u.getbytes(20), - u.getname(), - u.getbytes(8), - u.getname(), - u.getbytes(18), - u.getname(), - u.getbytes(26), - u.getname()) - timing.finish() - print round(timing.milli() * 0.001 / N, 3), 'seconds per unpacking' - for item in res: print item - - -# Pack/unpack RR toplevel format (section 3.2.1) - -class RRpacker(Packer): - def __init__(self): - Packer.__init__(self) - self.rdstart = None - def addRRheader(self, name, type, klass, ttl, *rest): - self.addname(name) - self.add16bit(type) - self.add16bit(klass) - self.add32bit(ttl) - if rest: - if res[1:]: raise TypeError, 'too many args' - rdlength = rest[0] - else: - rdlength = 0 - self.add16bit(rdlength) - self.rdstart = len(self.buf) - def patchrdlength(self): - rdlength = unpack16bit(self.buf[self.rdstart-2:self.rdstart]) - if rdlength == len(self.buf) - self.rdstart: - return - rdata = self.buf[self.rdstart:] - save_buf = self.buf - ok = 0 - try: - self.buf = self.buf[:self.rdstart-2] - self.add16bit(len(rdata)) - self.buf = self.buf + rdata - ok = 1 - finally: - if not ok: self.buf = save_buf - def endRR(self): - if self.rdstart is not None: - self.patchrdlength() - self.rdstart = None - def getbuf(self): - if self.rdstart is not None: self.patchrdlenth() - return Packer.getbuf(self) - # Standard RRs (section 3.3) - def addCNAME(self, name, klass, ttl, cname): - self.addRRheader(name, dnstype.CNAME, klass, ttl) - self.addname(cname) - self.endRR() - def addHINFO(self, name, klass, ttl, cpu, os): - self.addRRheader(name, dnstype.HINFO, klass, ttl) - self.addstring(cpu) - self.addstring(os) - self.endRR() - def addMX(self, name, klass, ttl, preference, exchange): - self.addRRheader(name, dnstype.MX, klass, ttl) - self.add16bit(preference) - self.addname(exchange) - self.endRR() - def addNS(self, name, klass, ttl, nsdname): - self.addRRheader(name, dnstype.NS, klass, ttl) - self.addname(nsdname) - self.endRR() - def addPTR(self, name, klass, ttl, ptrdname): - self.addRRheader(name, dnstype.PTR, klass, ttl) - self.addname(ptrdname) - self.endRR() - def addSOA(self, name, klass, ttl, - mname, rname, serial, refresh, retry, expire, minimum): - self.addRRheader(name, dnstype.SOA, klass, ttl) - self.addname(mname) - self.addname(rname) - self.add32bit(serial) - self.add32bit(refresh) - self.add32bit(retry) - self.add32bit(expire) - self.add32bit(minimum) - self.endRR() - def addTXT(self, name, klass, ttl, list): - self.addRRheader(name, dnstype.TXT, klass, ttl) - for txtdata in list: - self.addstring(txtdata) - self.endRR() - # Internet specific RRs (section 3.4) -- class = IN - def addA(self, name, ttl, address): - self.addRRheader(name, dnstype.A, dnsclass.IN, ttl) - self.addaddr(address) - self.endRR() - def addWKS(self, name, ttl, address, protocol, bitmap): - self.addRRheader(name, dnstype.WKS, dnsclass.IN, ttl) - self.addaddr(address) - self.addbyte(chr(protocol)) - self.addbytes(bitmap) - self.endRR() - - -class RRunpacker(Unpacker): - def __init__(self, buf): - Unpacker.__init__(self, buf) - self.rdend = None - def getRRheader(self): - name = self.getname() - type = self.get16bit() - klass = self.get16bit() - ttl = self.get32bit() - rdlength = self.get16bit() - self.rdend = self.offset + rdlength - return (name, type, klass, ttl, rdlength) - def endRR(self): - if self.offset != self.rdend: - raise UnpackError, 'end of RR not reached' - def getCNAMEdata(self): - return self.getname() - def getHINFOdata(self): - return self.getstring(), self.getstring() - def getMXdata(self): - return self.get16bit(), self.getname() - def getNSdata(self): - return self.getname() - def getPTRdata(self): - return self.getname() - def getSOAdata(self): - return self.getname(), \ - self.getname(), \ - self.get32bit(), \ - self.get32bit(), \ - self.get32bit(), \ - self.get32bit(), \ - self.get32bit() - def getTXTdata(self): - list = [] - while self.offset != self.rdend: - list.append(self.getstring()) - return list - def getAdata(self): - return self.getaddr() - def getWKSdata(self): - address = self.getaddr() - protocol = ord(self.getbyte()) - bitmap = self.getbytes(self.rdend - self.offset) - return address, protocol, bitmap - - -# Pack/unpack Message Header (section 4.1) - -class Hpacker(Packer): - def addHeader(self, id, qr, opcode, aa, tc, rd, ra, z, rcode, - qdcount, ancount, nscount, arcount): - self.add16bit(id) - self.add16bit((qr&1)<<15 | (opcode*0xF)<<11 | (aa&1)<<10 - | (tc&1)<<9 | (rd&1)<<8 | (ra&1)<<7 - | (z&7)<<4 | (rcode&0xF)) - self.add16bit(qdcount) - self.add16bit(ancount) - self.add16bit(nscount) - self.add16bit(arcount) - -class Hunpacker(Unpacker): - def getHeader(self): - id = self.get16bit() - flags = self.get16bit() - qr, opcode, aa, tc, rd, ra, z, rcode = ( - (flags>>15)&1, - (flags>>11)&0xF, - (flags>>10)&1, - (flags>>9)&1, - (flags>>8)&1, - (flags>>7)&1, - (flags>>4)&7, - (flags>>0)&0xF) - qdcount = self.get16bit() - ancount = self.get16bit() - nscount = self.get16bit() - arcount = self.get16bit() - return (id, qr, opcode, aa, tc, rd, ra, z, rcode, - qdcount, ancount, nscount, arcount) - - -# Pack/unpack Question (section 4.1.2) - -class Qpacker(Packer): - def addQuestion(self, qname, qtype, qclass): - self.addname(qname) - self.add16bit(qtype) - self.add16bit(qclass) - -class Qunpacker(Unpacker): - def getQuestion(self): - return self.getname(), self.get16bit(), self.get16bit() - - -# Pack/unpack Message(section 4) -# NB the order of the base classes is important for __init__()! - -class Mpacker(RRpacker, Qpacker, Hpacker): - pass - -class Munpacker(RRunpacker, Qunpacker, Hunpacker): - pass - - -# Routines to print an unpacker to stdout, for debugging. -# These affect the unpacker's current position! - -def dumpM(u): - print 'HEADER:', - (id, qr, opcode, aa, tc, rd, ra, z, rcode, - qdcount, ancount, nscount, arcount) = u.getHeader() - print 'id=%d,' % id, - print 'qr=%d, opcode=%d, aa=%d, tc=%d, rd=%d, ra=%d, z=%d, rcode=%d,' \ - % (qr, opcode, aa, tc, rd, ra, z, rcode) - if tc: print '*** response truncated! ***' - if rcode: print '*** nonzero error code! (%d) ***' % rcode - print ' qdcount=%d, ancount=%d, nscount=%d, arcount=%d' \ - % (qdcount, ancount, nscount, arcount) - for i in range(qdcount): - print 'QUESTION %d:' % i, - dumpQ(u) - for i in range(ancount): - print 'ANSWER %d:' % i, - dumpRR(u) - for i in range(nscount): - print 'AUTHORITY RECORD %d:' % i, - dumpRR(u) - for i in range(arcount): - print 'ADDITIONAL RECORD %d:' % i, - dumpRR(u) - -def dumpQ(u): - qname, qtype, qclass = u.getQuestion() - print 'qname=%s, qtype=%d(%s), qclass=%d(%s)' \ - % (qname, - qtype, dnstype.typestr(qtype), - qclass, dnsclass.classstr(qclass)) - -def dumpRR(u): - name, type, klass, ttl, rdlength = u.getRRheader() - typename = dnstype.typestr(type) - print 'name=%s, type=%d(%s), class=%d(%s), ttl=%d' \ - % (name, - type, typename, - klass, dnsclass.classstr(klass), - ttl) - mname = 'get%sdata' % typename - if hasattr(u, mname): - print ' formatted rdata:', getattr(u, mname)() - else: - print ' binary rdata:', u.getbytes(rdlength) - - -# Test program - -def test(): - import sys - import getopt - import socket - protocol = 'udp' - server = 'cnri.reston.va.us' # XXX adapt this to your local - port = 53 - opcode = dnsopcode.QUERY - rd = 0 - qtype = dnstype.MX - qname = 'cwi.nl' - try: - opts, args = getopt.getopt(sys.argv[1:], 'Trs:tu') - if len(args) > 2: raise getopt.error, 'too many arguments' - except getopt.error, msg: - print msg - print 'Usage: python dnslib.py', - print '[-T] [-r] [-s server] [-t] [-u]', - print '[qtype [qname]]' - print '-T: run testpacker() and exit' - print '-r: recursion desired (default not)' - print '-s server: use server (default %s)' % server - print '-t: use TCP protocol' - print '-u: use UDP protocol (default)' - print 'qtype: query type (default %s)' % \ - dnstype.typestr(qtype) - print 'qname: query name (default %s)' % qname - print 'Recognized qtype values:' - qtypes = dnstype.typemap.keys() - qtypes.sort() - n = 0 - for qtype in qtypes: - n = n+1 - if n >= 8: n = 1; print - print '%s = %d' % (dnstype.typemap[qtype], qtype), - print - sys.exit(2) - for o, a in opts: - if o == '-T': testpacker(); return - if o == '-t': protocol = 'tcp' - if o == '-u': protocol = 'udp' - if o == '-s': server = a - if o == '-r': rd = 1 - if args[0:]: - try: - qtype = eval(string.upper(args[0]), dnstype.__dict__) - except (NameError, SyntaxError): - print 'bad query type:', `args[0]` - sys.exit(2) - if args[1:]: - qname = args[1] - if qtype == dnstype.AXFR: - print 'Query type AXFR, protocol forced to TCP' - protocol = 'tcp' - print 'QTYPE %d(%s)' % (qtype, dnstype.typestr(qtype)) - m = Mpacker() - m.addHeader(0, - 0, opcode, 0, 0, rd, 0, 0, 0, - 1, 0, 0, 0) - m.addQuestion(qname, qtype, dnsclass.IN) - request = m.getbuf() - if protocol == 'udp': - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect((server, port)) - s.send(request) - reply = s.recv(1024) - else: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.connect((server, port)) - s.send(pack16bit(len(request)) + request) - s.shutdown(1) - f = s.makefile('r') - header = f.read(2) - if len(header) < 2: - print '*** EOF ***' - return - count = unpack16bit(header) - reply = f.read(count) - if len(reply) != count: - print '*** Incomplete reply ***' - return - u = Munpacker(reply) - dumpM(u) - if protocol == 'tcp' and qtype == dnstype.AXFR: - while 1: - header = f.read(2) - if len(header) < 2: - print '========== EOF ==========' - break - count = unpack16bit(header) - if not count: - print '========== ZERO COUNT ==========' - break - print '========== NEXT ==========' - reply = f.read(count) - if len(reply) != count: - print '*** Incomplete reply ***' - break - u = Munpacker(reply) - dumpM(u) - - -# Run test program when called as a script - -if __name__ == '__main__': - test() |