#! /usr/bin/env python """Generate C code from an ASDL description.""" import sys import textwrap import types from argparse import ArgumentParser from contextlib import contextmanager from pathlib import Path import asdl TABSIZE = 4 MAX_COL = 80 AUTOGEN_MESSAGE = "// File automatically generated by {}.\n\n" def get_c_type(name): """Return a string for the C name of the type. This function special cases the default types provided by asdl. """ if name in asdl.builtin_types: return name else: return "%s_ty" % name def reflow_lines(s, depth): """Reflow the line s indented depth tabs. Return a sequence of lines where no line extends beyond MAX_COL when properly indented. The first line is properly indented based exclusively on depth * TABSIZE. All following lines -- these are the reflowed lines generated by this function -- start at the same column as the first character beyond the opening { in the first line. """ size = MAX_COL - depth * TABSIZE if len(s) < size: return [s] lines = [] cur = s padding = "" while len(cur) > size: i = cur.rfind(' ', 0, size) # XXX this should be fixed for real if i == -1 and 'GeneratorExp' in cur: i = size + 3 assert i != -1, "Impossible line %d to reflow: %r" % (size, s) lines.append(padding + cur[:i]) if len(lines) == 1: # find new size based on brace j = cur.find('{', 0, i) if j >= 0: j += 2 # account for the brace and the space after it size -= j padding = " " * j else: j = cur.find('(', 0, i) if j >= 0: j += 1 # account for the paren (no space after it) size -= j padding = " " * j cur = cur[i+1:] else: lines.append(padding + cur) return lines def reflow_c_string(s, depth): return '"%s"' % s.replace('\n', '\\n"\n%s"' % (' ' * depth * TABSIZE)) def is_simple(sum_type): """Return True if a sum is a simple. A sum is simple if its types have no fields and itself doesn't have any attributes. Instances of these types are cached at C level, and they act like singletons when propagating parser generated nodes into Python level, e.g. unaryop = Invert | Not | UAdd | USub """ return not ( sum_type.attributes or any(constructor.fields for constructor in sum_type.types) ) def asdl_of(name, obj): if isinstance(obj, asdl.Product) or isinstance(obj, asdl.Constructor): fields = ", ".join(map(str, obj.fields)) if fields: fields = "({})".format(fields) return "{}{}".format(name, fields) else: if is_simple(obj): types = " | ".join(type.name for type in obj.types) else: sep = "\n{}| ".format(" " * (len(name) + 1)) types = sep.join( asdl_of(type.name, type) for type in obj.types ) return "{} = {}".format(name, types) class EmitVisitor(asdl.VisitorBase): """Visit that emits lines""" def __init__(self, file, metadata = None): self.file = file self._metadata = metadata super(EmitVisitor, self).__init__() def emit(self, s, depth, reflow=True): # XXX reflow long lines? if reflow: lines = reflow_lines(s, depth) else: lines = [s] for line in lines: if line: line = (" " * TABSIZE * depth) + line self.file.write(line + "\n") @property def metadata(self): if self._metadata is None: raise ValueError( "%s was expecting to be annnotated with metadata" % type(self).__name__ ) return self._metadata @metadata.setter def metadata(self, value): self._metadata = value class MetadataVisitor(asdl.VisitorBase): ROOT_TYPE = "AST" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Metadata: # - simple_sums: Tracks the list of compound type # names where all the constructors # belonging to that type lack of any # fields. # - identifiers: All identifiers used in the AST declarations # - singletons: List of all constructors that originates from # simple sums. # - types: List of all top level type names # self.metadata = types.SimpleNamespace( simple_sums=set(), identifiers=set(), singletons=set(), types={self.ROOT_TYPE}, ) def visitModule(self, mod): for dfn in mod.dfns: self.visit(dfn) def visitType(self, type): self.visit(type.value, type.name) def visitSum(self, sum, name): self.metadata.types.add(name) simple_sum = is_simple(sum) if simple_sum: self.metadata.simple_sums.add(name) for constructor in sum.types: if simple_sum: self.metadata.singletons.add(constructor.name) self.visitConstructor(constructor) self.visitFields(sum.attributes) def visitConstructor(self, constructor): self.metadata.types.add(constructor.name) self.visitFields(constructor.fields) def visitProduct(self, product, name): self.metadata.types.add(name) self.visitFields(product.attributes) self.visitFields(product.fields) def visitFields(self, fields): for field in fields: self.visitField(field) def visitField(self, field): self.metadata.identifiers.add(field.name) class TypeDefVisitor(EmitVisitor): def visitModule(self, mod): for dfn in mod.dfns: self.visit(dfn) def visitType(self, type, depth=0): self.visit(type.value, type.name, depth) def visitSum(self, sum, name, depth): if is_simple(sum): self.simple_sum(sum, name, depth) else: self.sum_with_constructors(sum, name, depth) def simple_sum(self, sum, name, depth): enum = [] for i in range(len(sum.types)): type = sum.types[i] enum.append("%s=%d" % (type.name, i + 1)) enums = ", ".join(enum) ctype = get_c_type(name) s = "typedef enum _%s { %s } %s;" % (name, enums, ctype) self.emit(s, depth) self.emit("", depth) def sum_with_constructors(self, sum, name, depth): ctype = get_c_type(name) s = "typedef struct _%(name)s *%(ctype)s;" % locals() self.emit(s, depth) self.emit("", depth) def visitProduct(self, product, name, depth): ctype = get_c_type(name) s = "typedef struct _%(name)s *%(ctype)s;" % locals() self.emit(s, depth) self.emit("", depth) class SequenceDefVisitor(EmitVisitor): def visitModule(self, mod): for dfn in mod.dfns: self.visit(dfn) def visitType(self, type, depth=0): self.visit(type.value, type.name, depth) def visitSum(self, sum, name, depth): if is_simple(sum): return self.emit_sequence_constructor(name, depth) def emit_sequence_constructor(self, name,depth): ctype = get_c_type(name) self.emit("""\ typedef struct { _ASDL_SEQ_HEAD %(ctype)s typed_elements[1]; } asdl_%(name)s_seq;""" % locals(), reflow=False, depth=depth) self.emit("", depth) self.emit("asdl_%(name)s_seq *_Py_asdl_%(name)s_seq_new(Py_ssize_t size, PyArena *arena);" % locals(), depth) self.emit("", depth) def visitProduct(self, product, name, depth): self.emit_sequence_constructor(name, depth) class StructVisitor(EmitVisitor): """Visitor to generate typedefs for AST.""" def visitModule(self, mod): for dfn in mod.dfns: self.visit(dfn) def visitType(self, type, depth=0): self.visit(type.value, type.name, depth) def visitSum(self, sum, name, depth): if not is_simple(sum): self.sum_with_constructors(sum, name, depth) def sum_with_constructors(self, sum, name, depth): def emit(s, depth=depth): self.emit(s % sys._getframe(1).f_locals, depth) enum = [] for i in range(len(sum.types)): type = sum.types[i] enum.append("%s_kind=%d" % (type.name, i + 1)) emit("enum _%(name)s_kind {" + ", ".join(enum) + "};") emit("struct _%(name)s {") emit("enum _%(name)s_kind kind;", depth + 1) emit("union {", depth + 1) for t in sum.types: self.visit(t, depth + 2) emit("} v;", depth + 1) for field in sum.attributes: # rudimentary attribute handling type = str(field.type) assert type in asdl.builtin_types, type emit("%s %s;" % (type, field.name), depth + 1); emit("};") emit("") def visitConstructor(self, cons, depth): if cons.fields: self.emit("struct {", depth) for f in cons.fields: self.visit(f, depth + 1) self.emit("} %s;" % cons.name, depth) self.emit("", depth) def visitField(self, field, depth): # XXX need to lookup field.type, because it might be something # like a builtin... ctype = get_c_type(field.type) name = field.name if field.seq: if field.type in self.metadata.simple_sums: self.emit("asdl_int_seq *%(name)s;" % locals(), depth) else: _type = field.type self.emit("asdl_%(_type)s_seq *%(name)s;" % locals(), depth) else: self.emit("%(ctype)s %(name)s;" % locals(), depth) def visitProduct(self, product, name, depth): self.emit("struct _%(name)s {" % locals(), depth) for f in product.fields: self.visit(f, depth + 1) for field in product.attributes: # rudimentary attribute handling type = str(field.type) assert type in asdl.builtin_types, type self.emit("%s %s;" % (type, field.name), depth + 1); self.emit("};", depth) self.emit("", depth) def ast_func_name(name): return f"_PyAST_{name}" class PrototypeVisitor(EmitVisitor): """Generate function prototypes for the .h file""" def visitModule(self, mod): for dfn in mod.dfns: self.visit(dfn) def visitType(self, type): self.visit(type.value, type.name) def visitSum(self, sum, name): if is_simple(sum): pass # XXX else: for t in sum.types: self.visit(t, name, sum.attributes) def get_args(self, fields): """Return list of C argument info, one for each field. Argument info is 3-tuple of a C type, variable name, and flag that is true if type can be NULL. """ args = [] unnamed = {} for f in fields: if f.name is None: name = f.type c = unnamed[name] = unnamed.get(name, 0) + 1 if c > 1: name = "name%d" % (c - 1) else: name = f.name # XXX should extend get_c_type() to handle this if f.seq: if f.type in self.metadata.simple_sums: ctype = "asdl_int_seq *" else: ctype = f"asdl_{f.type}_seq *" else: ctype = get_c_type(f.type) args.append((ctype, name, f.opt or f.seq)) return args def visitConstructor(self, cons, type, attrs): args = self.get_args(cons.fields) attrs = self.get_args(attrs) ctype = get_c_type(type) self.emit_function(cons.name, ctype, args, attrs) def emit_function(self, name, ctype, args, attrs, union=True): args = args + attrs if args: argstr = ", ".join(["%s %s" % (atype, aname) for atype, aname, opt in args]) argstr += ", PyArena *arena" else: argstr = "PyArena *arena" self.emit("%s %s(%s);" % (ctype, ast_func_name(name), argstr), False) def visitProduct(self, prod, name): self.emit_function(name, get_c_type(name), self.get_args(prod.fields), self.get_args(prod.attributes), union=False) class FunctionVisitor(PrototypeVisitor): """Visitor to generate constructor functions for AST.""" def emit_function(self, name, ctype, args, attrs, union=True): def emit(s, depth=0, reflow=True): self.emit(s, depth, reflow) argstr = ", ".join(["%s %s" % (atype, aname) for atype, aname, opt in args + attrs]) if argstr: argstr += ", PyArena *arena" else: argstr = "PyArena *arena" self.emit("%s" % ctype, 0) emit("%s(%s)" % (ast_func_name(name), argstr)) emit("{") emit("%s p;" % ctype, 1) for argtype, argname, opt in args: if not opt and argtype != "int": emit("if (!%s) {" % argname, 1) emit("PyErr_SetString(PyExc_ValueError,", 2) msg = "field '%s' is required for %s" % (argname, name) emit(' "%s");' % msg, 2, reflow=False) emit('return NULL;', 2) emit('}', 1) emit("p = (%s)_PyArena_Malloc(arena, sizeof(*p));" % ctype, 1); emit("if (!p)", 1) emit("return NULL;", 2) if union: self.emit_body_union(name, args, attrs) else: self.emit_body_struct(name, args, attrs) emit("return p;", 1) emit("}") emit("") def emit_body_union(self, name, args, attrs): def emit(s, depth=0, reflow=True): self.emit(s, depth, reflow) emit("p->kind = %s_kind;" % name, 1) for argtype, argname, opt in args: emit("p->v.%s.%s = %s;" % (name, argname, argname), 1) for argtype, argname, opt in attrs: emit("p->%s = %s;" % (argname, argname), 1) def emit_body_struct(self, name, args, attrs): def emit(s, depth=0, reflow=True): self.emit(s, depth, reflow) for argtype, argname, opt in args: emit("p->%s = %s;" % (argname, argname), 1) for argtype, argname, opt in attrs: emit("p->%s = %s;" % (argname, argname), 1) class PickleVisitor(EmitVisitor): def visitModule(self, mod): for dfn in mod.dfns: self.visit(dfn) def visitType(self, type): self.visit(type.value, type.name) def visitSum(self, sum, name): pass def visitProduct(self, sum, name): pass def visitConstructor(self, cons, name): pass def visitField(self, sum): pass class Obj2ModPrototypeVisitor(PickleVisitor): def visitProduct(self, prod, name): code = "static int obj2ast_%s(struct ast_state *state, PyObject* obj, %s* out, PyArena* arena);" self.emit(code % (name, get_c_type(name)), 0) visitSum = visitProduct class Obj2ModVisitor(PickleVisitor): attribute_special_defaults = { "end_lineno": "lineno", "end_col_offset": "col_offset", } @contextmanager def recursive_call(self, node, level): self.emit('if (_Py_EnterRecursiveCall(" while traversing \'%s\' node")) {' % node, level, reflow=False) self.emit('goto failed;', level + 1) self.emit('}', level) yield self.emit('_Py_LeaveRecursiveCall();', level) def funcHeader(self, name): ctype = get_c_type(name) self.emit("int", 0) self.emit("obj2ast_%s(struct ast_state *state, PyObject* obj, %s* out, PyArena* arena)" % (name, ctype), 0) self.emit("{", 0) self.emit("int isinstance;", 1) self.emit("", 0) def sumTrailer(self, name, add_label=False): self.emit("", 0) # there's really nothing more we can do if this fails ... error = "expected some sort of %s, but got %%R" % name format = "PyErr_Format(PyExc_TypeError, \"%s\", obj);" self.emit(format % error, 1, reflow=False) if add_label: self.emit("failed:", 1) self.emit("Py_XDECREF(tmp);", 1) self.emit("return 1;", 1) self.emit("}", 0) self.emit("", 0) def simpleSum(self, sum, name): self.funcHeader(name) for t in sum.types: line = ("isinstance = PyObject_IsInstance(obj, " "state->%s_type);") self.emit(line % (t.name,), 1) self.emit("if (isinstance == -1) {", 1) self.emit("return 1;", 2) self.emit("}", 1) self.emit("if (isinstance) {", 1) self.emit("*out = %s;" % t.name, 2) self.emit("return 0;", 2) self.emit("}", 1) self.sumTrailer(name) def buildArgs(self, fields): return ", ".join(fields + ["arena"]) def complexSum(self, sum, name): self.funcHeader(name) self.emit("PyObject *tmp = NULL;", 1) self.emit("PyObject *tp;", 1) for a in sum.attributes: self.visitAttributeDeclaration(a, name, sum=sum) self.emit("", 0) # XXX: should we only do this for 'expr'? self.emit("if (obj == Py_None) {", 1) self.emit("*out = NULL;", 2) self.emit("return 0;", 2) self.emit("}", 1) for a in sum.attributes: self.visitField(a, name, sum=sum, depth=1) for t in sum.types: self.emit("tp = state->%s_type;" % (t.name,), 1) self.emit("isinstance = PyObject_IsInstance(obj, tp);", 1) self.emit("if (isinstance == -1) {", 1) self.emit("return 1;", 2) self.emit("}", 1) self.emit("if (isinstance) {", 1) for f in t.fields: self.visitFieldDeclaration(f, t.name, sum=sum, depth=2) self.emit("", 0) for f in t.fields: self.visitField(f, t.name, sum=sum, depth=2) args = [f.name for f in t.fields] + [a.name for a in sum.attributes] self.emit("*out = %s(%s);" % (ast_func_name(t.name), self.buildArgs(args)), 2) self.emit("if (*out == NULL) goto failed;", 2) self.emit("return 0;", 2) self.emit("}", 1) self.sumTrailer(name, True) def visitAttributeDeclaration(self, a, name, sum=sum): ctype = get_c_type(a.type) self.emit("%s %s;" % (ctype, a.name), 1) def visitSum(self, sum, name): if is_simple(sum): self.simpleSum(sum, name) else: self.complexSum(sum, name) def visitProduct(self, prod, name): ctype = get_c_type(name) self.emit("int", 0) self.emit("obj2ast_%s(struct ast_state *state, PyObject* obj, %s* out, PyArena* arena)" % (name, ctype), 0) self.emit("{", 0) self.emit("PyObject* tmp = NULL;", 1) for f in prod.fields: self.visitFieldDeclaration(f, name, prod=prod, depth=1) for a in prod.attributes: self.visitFieldDeclaration(a, name, prod=prod, depth=1) self.emit("", 0) for f in prod.fields: self.visitField(f, name, prod=prod, depth=1) for a in prod.attributes: self.visitField(a, name, prod=prod, depth=1) args = [f.name for f in prod.fields] args.extend([a.name for a in prod.attributes]) self.emit("*out = %s(%s);" % (ast_func_name(name), self.buildArgs(args)), 1) self.emit("return 0;", 1) self.emit("failed:", 0) self.emit("Py_XDECREF(tmp);", 1) self.emit("return 1;", 1) self.emit("}", 0) self.emit("", 0) def visitFieldDeclaration(self, field, name, sum=None, prod=None, depth=0): ctype = get_c_type(field.type) if field.seq: if self.isSimpleType(field): self.emit("asdl_int_seq* %s;" % field.name, depth) else: _type = field.type self.emit(f"asdl_{field.type}_seq* {field.name};", depth) else: ctype = get_c_type(field.type) self.emit("%s %s;" % (ctype, field.name), depth) def isNumeric(self, field): return get_c_type(field.type) in ("int", "bool") def isSimpleType(self, field): return field.type in self.metadata.simple_sums or self.isNumeric(field) def visitField(self, field, name, sum=None, prod=None, depth=0): ctype = get_c_type(field.type) line = "if (_PyObject_LookupAttr(obj, state->%s, &tmp) < 0) {" self.emit(line % field.name, depth) self.emit("return 1;", depth+1) self.emit("}", depth) if not field.opt: self.emit("if (tmp == NULL) {", depth) message = "required field \\\"%s\\\" missing from %s" % (field.name, name) format = "PyErr_SetString(PyExc_TypeError, \"%s\");" self.emit(format % message, depth+1, reflow=False) self.emit("return 1;", depth+1) else: self.emit("if (tmp == NULL || tmp == Py_None) {", depth) self.emit("Py_CLEAR(tmp);", depth+1) if self.isNumeric(field): if field.name in self.attribute_special_defaults: self.emit( "%s = %s;" % (field.name, self.attribute_special_defaults[field.name]), depth+1, ) else: self.emit("%s = 0;" % field.name, depth+1) elif not self.isSimpleType(field): self.emit("%s = NULL;" % field.name, depth+1) else: raise TypeError("could not determine the default value for %s" % field.name) self.emit("}", depth) self.emit("else {", depth) self.emit("int res;", depth+1) if field.seq: self.emit("Py_ssize_t len;", depth+1) self.emit("Py_ssize_t i;", depth+1) self.emit("if (!PyList_Check(tmp)) {", depth+1) self.emit("PyErr_Format(PyExc_TypeError, \"%s field \\\"%s\\\" must " "be a list, not a %%.200s\", _PyType_Name(Py_TYPE(tmp)));" % (name, field.name), depth+2, reflow=False) self.emit("goto failed;", depth+2) self.emit("}", depth+1) self.emit("len = PyList_GET_SIZE(tmp);", depth+1) if self.isSimpleType(field): self.emit("%s = _Py_asdl_int_seq_new(len, arena);" % field.name, depth+1) else: self.emit("%s = _Py_asdl_%s_seq_new(len, arena);" % (field.name, field.type), depth+1) self.emit("if (%s == NULL) goto failed;" % field.name, depth+1) self.emit("for (i = 0; i < len; i++) {", depth+1) self.emit("%s val;" % ctype, depth+2) self.emit("PyObject *tmp2 = Py_NewRef(PyList_GET_ITEM(tmp, i));", depth+2) with self.recursive_call(name, depth+2): self.emit("res = obj2ast_%s(state, tmp2, &val, arena);" % field.type, depth+2, reflow=False) self.emit("Py_DECREF(tmp2);", depth+2) self.emit("if (res != 0) goto failed;", depth+2) self.emit("if (len != PyList_GET_SIZE(tmp)) {", depth+2) self.emit("PyErr_SetString(PyExc_RuntimeError, \"%s field \\\"%s\\\" " "changed size during iteration\");" % (name, field.name), depth+3, reflow=False) self.emit("goto failed;", depth+3) self.emit("}", depth+2) self.emit("asdl_seq_SET(%s, i, val);" % field.name, depth+2) self.emit("}", depth+1) else: with self.recursive_call(name, depth+1): self.emit("res = obj2ast_%s(state, tmp, &%s, arena);" % (field.type, field.name), depth+1) self.emit("if (res != 0) goto failed;", depth+1) self.emit("Py_CLEAR(tmp);", depth+1) self.emit("}", depth) class SequenceConstructorVisitor(EmitVisitor): def visitModule(self, mod): for dfn in mod.dfns: self.visit(dfn) def visitType(self, type): self.visit(type.value, type.name) def visitProduct(self, prod, name): self.emit_sequence_constructor(name, get_c_type(name)) def visitSum(self, sum, name): if not is_simple(sum): self.emit_sequence_constructor(name, get_c_type(name)) def emit_sequence_constructor(self, name, type): self.emit(f"GENERATE_ASDL_SEQ_CONSTRUCTOR({name}, {type})", depth=0) class PyTypesDeclareVisitor(PickleVisitor): def visitProduct(self, prod, name): self.emit("static PyObject* ast2obj_%s(struct ast_state *state, void*);" % name, 0) if prod.attributes: self.emit("static const char * const %s_attributes[] = {" % name, 0) for a in prod.attributes: self.emit('"%s",' % a.name, 1) self.emit("};", 0) if prod.fields: self.emit("static const char * const %s_fields[]={" % name,0) for f in prod.fields: self.emit('"%s",' % f.name, 1) self.emit("};", 0) def visitSum(self, sum, name): if sum.attributes: self.emit("static const char * const %s_attributes[] = {" % name, 0) for a in sum.attributes: self.emit('"%s",' % a.name, 1) self.emit("};", 0) ptype = "void*" if is_simple(sum): ptype = get_c_type(name) self.emit("static PyObject* ast2obj_%s(struct ast_state *state, %s);" % (name, ptype), 0) for t in sum.types: self.visitConstructor(t, name) def visitConstructor(self, cons, name): if cons.fields: self.emit("static const char * const %s_fields[]={" % cons.name, 0) for t in cons.fields: self.emit('"%s",' % t.name, 1) self.emit("};",0) class PyTypesVisitor(PickleVisitor): def visitModule(self, mod): self.emit(""" typedef struct { PyObject_HEAD PyObject *dict; } AST_object; static void ast_dealloc(AST_object *self) { /* bpo-31095: UnTrack is needed before calling any callbacks */ PyTypeObject *tp = Py_TYPE(self); PyObject_GC_UnTrack(self); Py_CLEAR(self->dict); freefunc free_func = PyType_GetSlot(tp, Py_tp_free); assert(free_func != NULL); free_func(self); Py_DECREF(tp); } static int ast_traverse(AST_object *self, visitproc visit, void *arg) { Py_VISIT(Py_TYPE(self)); Py_VISIT(self->dict); return 0; } static int ast_clear(AST_object *self) { Py_CLEAR(self->dict); return 0; } static int ast_type_init(PyObject *self, PyObject *args, PyObject *kw) { struct ast_state *state = get_ast_state(); if (state == NULL) { return -1; } Py_ssize_t i, numfields = 0; int res = -1; PyObject *key, *value, *fields; if (_PyObject_LookupAttr((PyObject*)Py_TYPE(self), state->_fields, &fields) < 0) { goto cleanup; } if (fields) { numfields = PySequence_Size(fields); if (numfields == -1) { goto cleanup; } } res = 0; /* if no error occurs, this stays 0 to the end */ if (numfields < PyTuple_GET_SIZE(args)) { PyErr_Format(PyExc_TypeError, "%.400s constructor takes at most " "%zd positional argument%s", _PyType_Name(Py_TYPE(self)), numfields, numfields == 1 ? "" : "s"); res = -1; goto cleanup; } for (i = 0; i < PyTuple_GET_SIZE(args); i++) { /* cannot be reached when fields is NULL */ PyObject *name = PySequence_GetItem(fields, i); if (!name) { res = -1; goto cleanup; } res = PyObject_SetAttr(self, name, PyTuple_GET_ITEM(args, i)); Py_DECREF(name); if (res < 0) { goto cleanup; } } if (kw) { i = 0; /* needed by PyDict_Next */ while (PyDict_Next(kw, &i, &key, &value)) { int contains = PySequence_Contains(fields, key); if (contains == -1) { res = -1; goto cleanup; } else if (contains == 1) { Py_ssize_t p = PySequence_Index(fields, key); if (p == -1) { res = -1; goto cleanup; } if (p < PyTuple_GET_SIZE(args)) { PyErr_Format(PyExc_TypeError, "%.400s got multiple values for argument '%U'", Py_TYPE(self)->tp_name, key); res = -1; goto cleanup; } } res = PyObject_SetAttr(self, key, value); if (res < 0) { goto cleanup; } } } cleanup: Py_XDECREF(fields); return res; } /* Pickling support */ static PyObject * ast_type_reduce(PyObject *self, PyObject *unused) { struct ast_state *state = get_ast_state(); if (state == NULL) { return NULL; } PyObject *dict; if (_PyObject_LookupAttr(self, state->__dict__, &dict) < 0) { return NULL; } if (dict) { return Py_BuildValue("O()N", Py_TYPE(self), dict); } return Py_BuildValue("O()", Py_TYPE(self)); } static PyMemberDef ast_type_members[] = { {"__dictoffset__", T_PYSSIZET, offsetof(AST_object, dict), READONLY}, {NULL} /* Sentinel */ }; static PyMethodDef ast_type_methods[] = { {"__reduce__", ast_type_reduce, METH_NOARGS, NULL}, {NULL} }; static PyGetSetDef ast_type_getsets[] = { {"__dict__", PyObject_GenericGetDict, PyObject_GenericSetDict}, {NULL} }; static PyType_Slot AST_type_slots[] = { {Py_tp_dealloc, ast_dealloc}, {Py_tp_getattro, PyObject_GenericGetAttr}, {Py_tp_setattro, PyObject_GenericSetAttr}, {Py_tp_traverse, ast_traverse}, {Py_tp_clear, ast_clear}, {Py_tp_members, ast_type_members}, {Py_tp_methods, ast_type_methods}, {Py_tp_getset, ast_type_getsets}, {Py_tp_init, ast_type_init}, {Py_tp_alloc, PyType_GenericAlloc}, {Py_tp_new, PyType_GenericNew}, {Py_tp_free, PyObject_GC_Del}, {0, 0}, }; static PyType_Spec AST_type_spec = { "ast.AST", sizeof(AST_object), 0, Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_GC, AST_type_slots }; static PyObject * make_type(struct ast_state *state, const char *type, PyObject* base, const char* const* fields, int num_fields, const char *doc) { PyObject *fnames, *result; int i; fnames = PyTuple_New(num_fields); if (!fnames) return NULL; for (i = 0; i < num_fields; i++) { PyObject *field = PyUnicode_InternFromString(fields[i]); if (!field) { Py_DECREF(fnames); return NULL; } PyTuple_SET_ITEM(fnames, i, field); } result = PyObject_CallFunction((PyObject*)&PyType_Type, "s(O){OOOOOOOs}", type, base, state->_fields, fnames, state->__match_args__, fnames, state->__module__, state->ast, state->__doc__, doc); Py_DECREF(fnames); return result; } static int add_attributes(struct ast_state *state, PyObject *type, const char * const *attrs, int num_fields) { int i, result; PyObject *s, *l = PyTuple_New(num_fields); if (!l) return 0; for (i = 0; i < num_fields; i++) { s = PyUnicode_InternFromString(attrs[i]); if (!s) { Py_DECREF(l); return 0; } PyTuple_SET_ITEM(l, i, s); } result = PyObject_SetAttr(type, state->_attributes, l) >= 0; Py_DECREF(l); return result; } /* Conversion AST -> Python */ static PyObject* ast2obj_list(struct ast_state *state, asdl_seq *seq, PyObject* (*func)(struct ast_state *state, void*)) { Py_ssize_t i, n = asdl_seq_LEN(seq); PyObject *result = PyList_New(n); PyObject *value; if (!result) return NULL; for (i = 0; i < n; i++) { value = func(state, asdl_seq_GET_UNTYPED(seq, i)); if (!value) { Py_DECREF(result); return NULL; } PyList_SET_ITEM(result, i, value); } return result; } static PyObject* ast2obj_object(struct ast_state *Py_UNUSED(state), void *o) { PyObject *op = (PyObject*)o; if (!op) { op = Py_None; } return Py_NewRef(op); } #define ast2obj_constant ast2obj_object #define ast2obj_identifier ast2obj_object #define ast2obj_string ast2obj_object static PyObject* ast2obj_int(struct ast_state *Py_UNUSED(state), long b) { return PyLong_FromLong(b); } /* Conversion Python -> AST */ static int obj2ast_object(struct ast_state *Py_UNUSED(state), PyObject* obj, PyObject** out, PyArena* arena) { if (obj == Py_None) obj = NULL; if (obj) { if (_PyArena_AddPyObject(arena, obj) < 0) { *out = NULL; return -1; } *out = Py_NewRef(obj); } else { *out = NULL; } return 0; } static int obj2ast_constant(struct ast_state *Py_UNUSED(state), PyObject* obj, PyObject** out, PyArena* arena) { if (_PyArena_AddPyObject(arena, obj) < 0) { *out = NULL; return -1; } *out = Py_NewRef(obj); return 0; } static int obj2ast_identifier(struct ast_state *state, PyObject* obj, PyObject** out, PyArena* arena) { if (!PyUnicode_CheckExact(obj) && obj != Py_None) { PyErr_SetString(PyExc_TypeError, "AST identifier must be of type str"); return 1; } return obj2ast_object(state, obj, out, arena); } static int obj2ast_string(struct ast_state *state, PyObject* obj, PyObject** out, PyArena* arena) { if (!PyUnicode_CheckExact(obj) && !PyBytes_CheckExact(obj)) { PyErr_SetString(PyExc_TypeError, "AST string must be of type str"); return 1; } return obj2ast_object(state, obj, out, arena); } static int obj2ast_int(struct ast_state* Py_UNUSED(state), PyObject* obj, int* out, PyArena* arena) { int i; if (!PyLong_Check(obj)) { PyErr_Format(PyExc_ValueError, "invalid integer value: %R", obj); return 1; } i = _PyLong_AsInt(obj); if (i == -1 && PyErr_Occurred()) return 1; *out = i; return 0; } static int add_ast_fields(struct ast_state *state) { PyObject *empty_tuple; empty_tuple = PyTuple_New(0); if (!empty_tuple || PyObject_SetAttrString(state->AST_type, "_fields", empty_tuple) < 0 || PyObject_SetAttrString(state->AST_type, "__match_args__", empty_tuple) < 0 || PyObject_SetAttrString(state->AST_type, "_attributes", empty_tuple) < 0) { Py_XDECREF(empty_tuple); return -1; } Py_DECREF(empty_tuple); return 0; } """, 0, reflow=False) self.file.write(textwrap.dedent(''' static int init_types(struct ast_state *state) { // init_types() must not be called after _PyAST_Fini() // has been called assert(state->initialized >= 0); if (state->initialized) { return 1; } if (init_identifiers(state) < 0) { return 0; } state->AST_type = PyType_FromSpec(&AST_type_spec); if (!state->AST_type) { return 0; } if (add_ast_fields(state) < 0) { return 0; } ''')) for dfn in mod.dfns: self.visit(dfn) self.file.write(textwrap.dedent(''' state->recursion_depth = 0; state->recursion_limit = 0; state->initialized = 1; return 1; } ''')) def visitProduct(self, prod, name): if prod.fields: fields = name+"_fields" else: fields = "NULL" self.emit('state->%s_type = make_type(state, "%s", state->AST_type, %s, %d,' % (name, name, fields, len(prod.fields)), 1) self.emit('%s);' % reflow_c_string(asdl_of(name, prod), 2), 2, reflow=False) self.emit("if (!state->%s_type) return 0;" % name, 1) if prod.attributes: self.emit("if (!add_attributes(state, state->%s_type, %s_attributes, %d)) return 0;" % (name, name, len(prod.attributes)), 1) else: self.emit("if (!add_attributes(state, state->%s_type, NULL, 0)) return 0;" % name, 1) self.emit_defaults(name, prod.fields, 1) self.emit_defaults(name, prod.attributes, 1) def visitSum(self, sum, name): self.emit('state->%s_type = make_type(state, "%s", state->AST_type, NULL, 0,' % (name, name), 1) self.emit('%s);' % reflow_c_string(asdl_of(name, sum), 2), 2, reflow=False) self.emit("if (!state->%s_type) return 0;" % name, 1) if sum.attributes: self.emit("if (!add_attributes(state, state->%s_type, %s_attributes, %d)) return 0;" % (name, name, len(sum.attributes)), 1) else: self.emit("if (!add_attributes(state, state->%s_type, NULL, 0)) return 0;" % name, 1) self.emit_defaults(name, sum.attributes, 1) simple = is_simple(sum) for t in sum.types: self.visitConstructor(t, name, simple) def visitConstructor(self, cons, name, simple): if cons.fields: fields = cons.name+"_fields" else: fields = "NULL" self.emit('state->%s_type = make_type(state, "%s", state->%s_type, %s, %d,' % (cons.name, cons.name, name, fields, len(cons.fields)), 1) self.emit('%s);' % reflow_c_string(asdl_of(cons.name, cons), 2), 2, reflow=False) self.emit("if (!state->%s_type) return 0;" % cons.name, 1) self.emit_defaults(cons.name, cons.fields, 1) if simple: self.emit("state->%s_singleton = PyType_GenericNew((PyTypeObject *)" "state->%s_type, NULL, NULL);" % (cons.name, cons.name), 1) self.emit("if (!state->%s_singleton) return 0;" % cons.name, 1) def emit_defaults(self, name, fields, depth): for field in fields: if field.opt: self.emit('if (PyObject_SetAttr(state->%s_type, state->%s, Py_None) == -1)' % (name, field.name), depth) self.emit("return 0;", depth+1) class ASTModuleVisitor(PickleVisitor): def visitModule(self, mod): self.emit("static int", 0) self.emit("astmodule_exec(PyObject *m)", 0) self.emit("{", 0) self.emit('struct ast_state *state = get_ast_state();', 1) self.emit('if (state == NULL) {', 1) self.emit('return -1;', 2) self.emit('}', 1) self.emit('if (PyModule_AddObjectRef(m, "AST", state->AST_type) < 0) {', 1) self.emit('return -1;', 2) self.emit('}', 1) self.emit('if (PyModule_AddIntMacro(m, PyCF_ALLOW_TOP_LEVEL_AWAIT) < 0) {', 1) self.emit("return -1;", 2) self.emit('}', 1) self.emit('if (PyModule_AddIntMacro(m, PyCF_ONLY_AST) < 0) {', 1) self.emit("return -1;", 2) self.emit('}', 1) self.emit('if (PyModule_AddIntMacro(m, PyCF_TYPE_COMMENTS) < 0) {', 1) self.emit("return -1;", 2) self.emit('}', 1) for dfn in mod.dfns: self.visit(dfn) self.emit("return 0;", 1) self.emit("}", 0) self.emit("", 0) self.emit(""" static PyModuleDef_Slot astmodule_slots[] = { {Py_mod_exec, astmodule_exec}, {0, NULL} }; static struct PyModuleDef _astmodule = { PyModuleDef_HEAD_INIT, .m_name = "_ast", // The _ast module uses a per-interpreter state (PyInterpreterState.ast) .m_size = 0, .m_slots = astmodule_slots, }; PyMODINIT_FUNC PyInit__ast(void) { return PyModuleDef_Init(&_astmodule); } """.strip(), 0, reflow=False) def visitProduct(self, prod, name): self.addObj(name) def visitSum(self, sum, name): self.addObj(name) for t in sum.types: self.visitConstructor(t, name) def visitConstructor(self, cons, name): self.addObj(cons.name) def addObj(self, name): self.emit("if (PyModule_AddObjectRef(m, \"%s\", " "state->%s_type) < 0) {" % (name, name), 1) self.emit("return -1;", 2) self.emit('}', 1) class StaticVisitor(PickleVisitor): CODE = '''Very simple, always emit this static code. Override CODE''' def visit(self, object): self.emit(self.CODE, 0, reflow=False) class ObjVisitor(PickleVisitor): def func_begin(self, name): ctype = get_c_type(name) self.emit("PyObject*", 0) self.emit("ast2obj_%s(struct ast_state *state, void* _o)" % (name), 0) self.emit("{", 0) self.emit("%s o = (%s)_o;" % (ctype, ctype), 1) self.emit("PyObject *result = NULL, *value = NULL;", 1) self.emit("PyTypeObject *tp;", 1) self.emit('if (!o) {', 1) self.emit("Py_RETURN_NONE;", 2) self.emit("}", 1) self.emit("if (++state->recursion_depth > state->recursion_limit) {", 1) self.emit("PyErr_SetString(PyExc_RecursionError,", 2) self.emit('"maximum recursion depth exceeded during ast construction");', 3) self.emit("return 0;", 2) self.emit("}", 1) def func_end(self): self.emit("state->recursion_depth--;", 1) self.emit("return result;", 1) self.emit("failed:", 0) self.emit("Py_XDECREF(value);", 1) self.emit("Py_XDECREF(result);", 1) self.emit("return NULL;", 1) self.emit("}", 0) self.emit("", 0) def visitSum(self, sum, name): if is_simple(sum): self.simpleSum(sum, name) return self.func_begin(name) self.emit("switch (o->kind) {", 1) for i in range(len(sum.types)): t = sum.types[i] self.visitConstructor(t, i + 1, name) self.emit("}", 1) for a in sum.attributes: self.emit("value = ast2obj_%s(state, o->%s);" % (a.type, a.name), 1) self.emit("if (!value) goto failed;", 1) self.emit('if (PyObject_SetAttr(result, state->%s, value) < 0)' % a.name, 1) self.emit('goto failed;', 2) self.emit('Py_DECREF(value);', 1) self.func_end() def simpleSum(self, sum, name): self.emit("PyObject* ast2obj_%s(struct ast_state *state, %s_ty o)" % (name, name), 0) self.emit("{", 0) self.emit("switch(o) {", 1) for t in sum.types: self.emit("case %s:" % t.name, 2) self.emit("return Py_NewRef(state->%s_singleton);" % t.name, 3) self.emit("}", 1) self.emit("Py_UNREACHABLE();", 1); self.emit("}", 0) def visitProduct(self, prod, name): self.func_begin(name) self.emit("tp = (PyTypeObject *)state->%s_type;" % name, 1) self.emit("result = PyType_GenericNew(tp, NULL, NULL);", 1); self.emit("if (!result) return NULL;", 1) for field in prod.fields: self.visitField(field, name, 1, True) for a in prod.attributes: self.emit("value = ast2obj_%s(state, o->%s);" % (a.type, a.name), 1) self.emit("if (!value) goto failed;", 1) self.emit("if (PyObject_SetAttr(result, state->%s, value) < 0)" % a.name, 1) self.emit('goto failed;', 2) self.emit('Py_DECREF(value);', 1) self.func_end() def visitConstructor(self, cons, enum, name): self.emit("case %s_kind:" % cons.name, 1) self.emit("tp = (PyTypeObject *)state->%s_type;" % cons.name, 2) self.emit("result = PyType_GenericNew(tp, NULL, NULL);", 2); self.emit("if (!result) goto failed;", 2) for f in cons.fields: self.visitField(f, cons.name, 2, False) self.emit("break;", 2) def visitField(self, field, name, depth, product): def emit(s, d): self.emit(s, depth + d) if product: value = "o->%s" % field.name else: value = "o->v.%s.%s" % (name, field.name) self.set(field, value, depth) emit("if (!value) goto failed;", 0) emit("if (PyObject_SetAttr(result, state->%s, value) == -1)" % field.name, 0) emit("goto failed;", 1) emit("Py_DECREF(value);", 0) def set(self, field, value, depth): if field.seq: if field.type in self.metadata.simple_sums: # While the sequence elements are stored as void*, # simple sums expects an enum self.emit("{", depth) self.emit("Py_ssize_t i, n = asdl_seq_LEN(%s);" % value, depth+1) self.emit("value = PyList_New(n);", depth+1) self.emit("if (!value) goto failed;", depth+1) self.emit("for(i = 0; i < n; i++)", depth+1) # This cannot fail, so no need for error handling self.emit( "PyList_SET_ITEM(value, i, ast2obj_{0}(state, ({0}_ty)asdl_seq_GET({1}, i)));".format( field.type, value ), depth + 2, reflow=False, ) self.emit("}", depth) else: self.emit("value = ast2obj_list(state, (asdl_seq*)%s, ast2obj_%s);" % (value, field.type), depth) else: self.emit("value = ast2obj_%s(state, %s);" % (field.type, value), depth, reflow=False) class PartingShots(StaticVisitor): CODE = """ PyObject* PyAST_mod2obj(mod_ty t) { struct ast_state *state = get_ast_state(); if (state == NULL) { return NULL; } int starting_recursion_depth; /* Be careful here to prevent overflow. */ int COMPILER_STACK_FRAME_SCALE = 3; PyThreadState *tstate = _PyThreadState_GET(); if (!tstate) { return 0; } state->recursion_limit = C_RECURSION_LIMIT * COMPILER_STACK_FRAME_SCALE; int recursion_depth = C_RECURSION_LIMIT - tstate->c_recursion_remaining; starting_recursion_depth = recursion_depth * COMPILER_STACK_FRAME_SCALE; state->recursion_depth = starting_recursion_depth; PyObject *result = ast2obj_mod(state, t); /* Check that the recursion depth counting balanced correctly */ if (result && state->recursion_depth != starting_recursion_depth) { PyErr_Format(PyExc_SystemError, "AST constructor recursion depth mismatch (before=%d, after=%d)", starting_recursion_depth, state->recursion_depth); return 0; } return result; } /* mode is 0 for "exec", 1 for "eval" and 2 for "single" input */ mod_ty PyAST_obj2mod(PyObject* ast, PyArena* arena, int mode) { const char * const req_name[] = {"Module", "Expression", "Interactive"}; int isinstance; if (PySys_Audit("compile", "OO", ast, Py_None) < 0) { return NULL; } struct ast_state *state = get_ast_state(); if (state == NULL) { return NULL; } PyObject *req_type[3]; req_type[0] = state->Module_type; req_type[1] = state->Expression_type; req_type[2] = state->Interactive_type; assert(0 <= mode && mode <= 2); isinstance = PyObject_IsInstance(ast, req_type[mode]); if (isinstance == -1) return NULL; if (!isinstance) { PyErr_Format(PyExc_TypeError, "expected %s node, got %.400s", req_name[mode], _PyType_Name(Py_TYPE(ast))); return NULL; } mod_ty res = NULL; if (obj2ast_mod(state, ast, &res, arena) != 0) return NULL; else return res; } int PyAST_Check(PyObject* obj) { struct ast_state *state = get_ast_state(); if (state == NULL) { return -1; } return PyObject_IsInstance(obj, state->AST_type); } """ class ChainOfVisitors: def __init__(self, *visitors, metadata = None): self.visitors = visitors self.metadata = metadata def visit(self, object): for v in self.visitors: v.metadata = self.metadata v.visit(object) v.emit("", 0) def generate_ast_state(module_state, f): f.write('struct ast_state {\n') f.write(' int initialized;\n') f.write(' int recursion_depth;\n') f.write(' int recursion_limit;\n') for s in module_state: f.write(' PyObject *' + s + ';\n') f.write('};') def generate_ast_fini(module_state, f): f.write(textwrap.dedent(""" void _PyAST_Fini(PyInterpreterState *interp) { struct ast_state *state = &interp->ast; """)) for s in module_state: f.write(" Py_CLEAR(state->" + s + ');\n') f.write(textwrap.dedent(""" if (_PyInterpreterState_Get() == _PyInterpreterState_Main()) { Py_CLEAR(_Py_CACHED_OBJECT(str_replace_inf)); } #if !defined(NDEBUG) state->initialized = -1; #else state->initialized = 0; #endif } """)) def generate_module_def(mod, metadata, f, internal_h): # Gather all the data needed for ModuleSpec state_strings = { "ast", "_fields", "__match_args__", "__doc__", "__dict__", "__module__", "_attributes", *metadata.identifiers } module_state = state_strings.copy() module_state.update( "%s_singleton" % singleton for singleton in metadata.singletons ) module_state.update( "%s_type" % type for type in metadata.types ) state_strings = sorted(state_strings) module_state = sorted(module_state) generate_ast_state(module_state, internal_h) print(textwrap.dedent(""" #include "Python.h" #include "pycore_ast.h" #include "pycore_ast_state.h" // struct ast_state #include "pycore_ceval.h" // _Py_EnterRecursiveCall #include "pycore_interp.h" // _PyInterpreterState.ast #include "pycore_pystate.h" // _PyInterpreterState_GET() #include "structmember.h" #include // Forward declaration static int init_types(struct ast_state *state); static struct ast_state* get_ast_state(void) { PyInterpreterState *interp = _PyInterpreterState_GET(); struct ast_state *state = &interp->ast; if (!init_types(state)) { return NULL; } return state; } """).strip(), file=f) generate_ast_fini(module_state, f) f.write('static int init_identifiers(struct ast_state *state)\n') f.write('{\n') for identifier in state_strings: f.write(' if ((state->' + identifier) f.write(' = PyUnicode_InternFromString("') f.write(identifier + '")) == NULL) return 0;\n') f.write(' return 1;\n') f.write('};\n\n') def write_header(mod, metadata, f): f.write(textwrap.dedent(""" #ifndef Py_INTERNAL_AST_H #define Py_INTERNAL_AST_H #ifdef __cplusplus extern "C" { #endif #ifndef Py_BUILD_CORE # error "this header requires Py_BUILD_CORE define" #endif #include "pycore_asdl.h" """).lstrip()) c = ChainOfVisitors( TypeDefVisitor(f), SequenceDefVisitor(f), StructVisitor(f), metadata=metadata ) c.visit(mod) f.write("// Note: these macros affect function definitions, not only call sites.\n") prototype_visitor = PrototypeVisitor(f, metadata=metadata) prototype_visitor.visit(mod) f.write(textwrap.dedent(""" PyObject* PyAST_mod2obj(mod_ty t); mod_ty PyAST_obj2mod(PyObject* ast, PyArena* arena, int mode); int PyAST_Check(PyObject* obj); extern int _PyAST_Validate(mod_ty); /* _PyAST_ExprAsUnicode is defined in ast_unparse.c */ extern PyObject* _PyAST_ExprAsUnicode(expr_ty); /* Return the borrowed reference to the first literal string in the sequence of statements or NULL if it doesn't start from a literal string. Doesn't set exception. */ extern PyObject* _PyAST_GetDocString(asdl_stmt_seq *); #ifdef __cplusplus } #endif #endif /* !Py_INTERNAL_AST_H */ """)) def write_internal_h_header(mod, f): print(textwrap.dedent(""" #ifndef Py_INTERNAL_AST_STATE_H #define Py_INTERNAL_AST_STATE_H #ifdef __cplusplus extern "C" { #endif #ifndef Py_BUILD_CORE # error "this header requires Py_BUILD_CORE define" #endif """).lstrip(), file=f) def write_internal_h_footer(mod, f): print(textwrap.dedent(""" #ifdef __cplusplus } #endif #endif /* !Py_INTERNAL_AST_STATE_H */ """), file=f) def write_source(mod, metadata, f, internal_h_file): generate_module_def(mod, metadata, f, internal_h_file) v = ChainOfVisitors( SequenceConstructorVisitor(f), PyTypesDeclareVisitor(f), PyTypesVisitor(f), Obj2ModPrototypeVisitor(f), FunctionVisitor(f), ObjVisitor(f), Obj2ModVisitor(f), ASTModuleVisitor(f), PartingShots(f), metadata=metadata ) v.visit(mod) def main(input_filename, c_filename, h_filename, internal_h_filename, dump_module=False): auto_gen_msg = AUTOGEN_MESSAGE.format("/".join(Path(__file__).parts[-2:])) mod = asdl.parse(input_filename) if dump_module: print('Parsed Module:') print(mod) if not asdl.check(mod): sys.exit(1) metadata_visitor = MetadataVisitor() metadata_visitor.visit(mod) metadata = metadata_visitor.metadata with c_filename.open("w") as c_file, \ h_filename.open("w") as h_file, \ internal_h_filename.open("w") as internal_h_file: c_file.write(auto_gen_msg) h_file.write(auto_gen_msg) internal_h_file.write(auto_gen_msg) write_internal_h_header(mod, internal_h_file) write_source(mod, metadata, c_file, internal_h_file) write_header(mod, metadata, h_file) write_internal_h_footer(mod, internal_h_file) print(f"{c_filename}, {h_filename}, {internal_h_filename} regenerated.") if __name__ == "__main__": parser = ArgumentParser() parser.add_argument("input_file", type=Path) parser.add_argument("-C", "--c-file", type=Path, required=True) parser.add_argument("-H", "--h-file", type=Path, required=True) parser.add_argument("-I", "--internal-h-file", type=Path, required=True) parser.add_argument("-d", "--dump-module", action="store_true") args = parser.parse_args() main(args.input_file, args.c_file, args.h_file, args.internal_h_file, args.dump_module)