"""Miscellaneous utility functions useful for dealing with ESIS streams.""" __version__ = '$Revision$' import re import string import xml.dom.pulldom import xml.sax import xml.sax.handler import xml.sax.xmlreader _data_match = re.compile(r"[^\\][^\\]*").match def decode(s): r = '' while s: m = _data_match(s) if m: r = r + m.group() s = s[m.end():] elif s[1] == "\\": r = r + "\\" s = s[2:] elif s[1] == "n": r = r + "\n" s = s[2:] elif s[1] == "%": s = s[2:] n, s = s.split(";", 1) r = r + unichr(int(n)) else: raise ValueError, "can't handle " + `s` return r _charmap = {} for c in map(chr, range(256)): _charmap[c] = c _charmap["\n"] = r"\n" _charmap["\\"] = r"\\" del c _null_join = ''.join def encode(s): return _null_join(map(_charmap.get, s)) class ESISReader(xml.sax.xmlreader.XMLReader): """SAX Reader which reads from an ESIS stream. No verification of the document structure is performed by the reader; a general verifier could be used as the target ContentHandler instance. """ _decl_handler = None _lexical_handler = None _public_id = None _system_id = None _buffer = "" _is_empty = 0 _lineno = 0 _started = 0 def __init__(self, contentHandler=None, errorHandler=None): xml.sax.xmlreader.XMLReader.__init__(self) self._attrs = {} self._attributes = Attributes(self._attrs) self._locator = Locator() self._empties = {} if contentHandler: self.setContentHandler(contentHandler) if errorHandler: self.setErrorHandler(errorHandler) def get_empties(self): return self._empties.keys() # # XMLReader interface # def parse(self, source): raise RuntimeError self._locator._public_id = source.getPublicId() self._locator._system_id = source.getSystemId() fp = source.getByteStream() handler = self.getContentHandler() if handler: handler.startDocument() lineno = 0 while 1: token, data = self._get_token(fp) if token is None: break lineno = lineno + 1 self._locator._lineno = lineno self._handle_token(token, data) handler = self.getContentHandler() if handler: handler.startDocument() def feed(self, data): if not self._started: handler = self.getContentHandler() if handler: handler.startDocument() self._started = 1 data = self._buffer + data self._buffer = None lines = data.split("\n") if lines: for line in lines[:-1]: self._lineno = self._lineno + 1 self._locator._lineno = self._lineno if not line: e = xml.sax.SAXParseException( "ESIS input line contains no token type mark", None, self._locator) self.getErrorHandler().error(e) else: self._handle_token(line[0], line[1:]) self._buffer = lines[-1] else: self._buffer = "" def close(self): handler = self.getContentHandler() if handler: handler.endDocument() self._buffer = "" def _get_token(self, fp): try: line = fp.readline() except IOError, e: e = SAXException("I/O error reading input stream", e) self.getErrorHandler().fatalError(e) return if not line: return None, None if line[-1] == "\n": line = line[:-1] if not line: e = xml.sax.SAXParseException( "ESIS input line contains no token type mark", None, self._locator) self.getErrorHandler().error(e) return return line[0], line[1:] def _handle_token(self, token, data): handler = self.getContentHandler() if token == '-': if data and handler: handler.characters(decode(data)) elif token == ')': if handler: handler.endElement(decode(data)) elif token == '(': if self._is_empty: self._empties[data] = 1 if handler: handler.startElement(data, self._attributes) self._attrs.clear() self._is_empty = 0 elif token == 'A': name, value = data.split(' ', 1) if value != "IMPLIED": type, value = value.split(' ', 1) self._attrs[name] = (decode(value), type) elif token == '&': # entity reference in SAX? pass elif token == '?': if handler: if ' ' in data: target, data = string.split(data, None, 1) else: target, data = data, "" handler.processingInstruction(target, decode(data)) elif token == 'N': handler = self.getDTDHandler() if handler: handler.notationDecl(data, self._public_id, self._system_id) self._public_id = None self._system_id = None elif token == 'p': self._public_id = decode(data) elif token == 's': self._system_id = decode(data) elif token == 'e': self._is_empty = 1 elif token == 'C': pass else: e = SAXParseException("unknown ESIS token in event stream", None, self._locator) self.getErrorHandler().error(e) def setContentHandler(self, handler): old = self.getContentHandler() if old: old.setDocumentLocator(None) if handler: handler.setDocumentLocator(self._locator) xml.sax.xmlreader.XMLReader.setContentHandler(self, handler) def getProperty(self, property): if property == xml.sax.handler.property_lexical_handler: return self._lexical_handler elif property == xml.sax.handler.property_declaration_handler: return self._decl_handler else: raise xml.sax.SAXNotRecognizedException("unknown property %s" % `property`) def setProperty(self, property, value): if property == xml.sax.handler.property_lexical_handler: if self._lexical_handler: self._lexical_handler.setDocumentLocator(None) if value: value.setDocumentLocator(self._locator) self._lexical_handler = value elif property == xml.sax.handler.property_declaration_handler: if self._decl_handler: self._decl_handler.setDocumentLocator(None) if value: value.setDocumentLocator(self._locator) self._decl_handler = value else: raise xml.sax.SAXNotRecognizedException() def getFeature(self, feature): if feature == xml.sax.handler.feature_namespaces: return 1 else: return xml.sax.xmlreader.XMLReader.getFeature(self, feature) def setFeature(self, feature, enabled): if feature == xml.sax.handler.feature_namespaces: pass else: xml.sax.xmlreader.XMLReader.setFeature(self, feature, enabled) class Attributes(xml.sax.xmlreader.AttributesImpl): # self._attrs has the form {name: (value, type)} def getType(self, name): return self._attrs[name][1] def getValue(self, name): return self._attrs[name][0] def getValueByQName(self, name): return self._attrs[name][0] def __getitem__(self, name): return self._attrs[name][0] def get(self, name, default=None): if self._attrs.has_key(name): return self._attrs[name][0] return default def items(self): L = [] for name, (value, type) in self._attrs.items(): L.append((name, value)) return L def values(self): L = [] for value, type in self._attrs.values(): L.append(value) return L class Locator(xml.sax.xmlreader.Locator): _lineno = -1 _public_id = None _system_id = None def getLineNumber(self): return self._lineno def getPublicId(self): return self._public_id def getSystemId(self): return self._system_id def parse(stream_or_string, parser=None): if type(stream_or_string) in [type(""), type(u"")]: stream = open(stream_or_string) else: stream = stream_or_string if not parser: parser = ESISReader() return xml.dom.pulldom.DOMEventStream(stream, parser, (2 ** 14) - 20)