summaryrefslogtreecommitdiffstats
path: root/Tools/cases_generator/instructions.py
blob: 78b3c290a49273eabd939fd75ef984c42c4c54c4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
import dataclasses
import re
import typing

from flags import InstructionFlags, variable_used, variable_used_unspecialized
from formatting import (
    Formatter,
    UNUSED,
    list_effect_size,
)
import lexer as lx
import parsing
from parsing import StackEffect
import stacking

BITS_PER_CODE_UNIT = 16


@dataclasses.dataclass
class ActiveCacheEffect:
    """Wraps a CacheEffect that is actually used, in context."""

    effect: parsing.CacheEffect
    offset: int


FORBIDDEN_NAMES_IN_UOPS = (
    "resume_with_error",
    "kwnames",
    "next_instr",
    "oparg1",  # Proxy for super-instructions like LOAD_FAST_LOAD_FAST
    "JUMPBY",
    "DISPATCH",
    "INSTRUMENTED_JUMP",
    "throwflag",
    "exception_unwind",
    "import_from",
    "import_name",
    "_PyObject_CallNoArgs",  # Proxy for BEFORE_WITH
    "TIER_ONE_ONLY",
)


# Interpreter tiers
TIER_ONE: typing.Final = 1  # Specializing adaptive interpreter (PEP 659)
TIER_TWO: typing.Final = 2  # Experimental tracing interpreter
Tiers: typing.TypeAlias = typing.Literal[1, 2]


@dataclasses.dataclass
class Instruction:
    """An instruction with additional data and code."""

    # Parts of the underlying instruction definition
    inst: parsing.InstDef
    kind: typing.Literal["inst", "op"]
    name: str
    block: parsing.Block
    block_text: list[str]  # Block.text, less curlies, less PREDICT() calls
    block_line: int  # First line of block in original code

    # Computed by constructor
    always_exits: str  # If the block always exits, its last line; else ""
    has_deopt: bool
    cache_offset: int
    cache_effects: list[parsing.CacheEffect]
    input_effects: list[StackEffect]
    output_effects: list[StackEffect]
    unmoved_names: frozenset[str]
    instr_fmt: str
    instr_flags: InstructionFlags
    active_caches: list[ActiveCacheEffect]

    # Set later
    family: parsing.Family | None = None
    predicted: bool = False

    def __init__(self, inst: parsing.InstDef):
        self.inst = inst
        self.kind = inst.kind
        self.name = inst.name
        self.block = inst.block
        self.block_text, self.check_eval_breaker, self.block_line = extract_block_text(
            self.block
        )
        self.always_exits = always_exits(self.block_text)
        self.has_deopt = variable_used(self.inst, "DEOPT_IF")
        self.cache_effects = [
            effect for effect in inst.inputs if isinstance(effect, parsing.CacheEffect)
        ]
        self.cache_offset = sum(c.size for c in self.cache_effects)
        self.input_effects = [
            effect for effect in inst.inputs if isinstance(effect, StackEffect)
        ]
        self.output_effects = inst.outputs  # For consistency/completeness
        unmoved_names: set[str] = set()
        for ieffect, oeffect in zip(self.input_effects, self.output_effects):
            if ieffect == oeffect and ieffect.name == oeffect.name:
                unmoved_names.add(ieffect.name)
            else:
                break
        self.unmoved_names = frozenset(unmoved_names)

        self.instr_flags = InstructionFlags.fromInstruction(inst)

        self.active_caches = []
        offset = 0
        for effect in self.cache_effects:
            if effect.name != UNUSED:
                self.active_caches.append(ActiveCacheEffect(effect, offset))
            offset += effect.size

        if self.instr_flags.HAS_ARG_FLAG:
            fmt = "IB"
        else:
            fmt = "IX"
        if offset:
            fmt += "C" + "0" * (offset - 1)
        self.instr_fmt = fmt

    def is_viable_uop(self) -> bool:
        """Whether this instruction is viable as a uop."""
        dprint: typing.Callable[..., None] = lambda *args, **kwargs: None
        if "FRAME" in self.name:
            dprint = print

        if self.name == "_EXIT_TRACE":
            return True  # This has 'return frame' but it's okay
        if self.always_exits:
            dprint(f"Skipping {self.name} because it always exits: {self.always_exits}")
            return False
        if len(self.active_caches) > 1:
            # print(f"Skipping {self.name} because it has >1 cache entries")
            return False
        res = True
        for forbidden in FORBIDDEN_NAMES_IN_UOPS:
            # NOTE: To disallow unspecialized uops, use
            # if variable_used(self.inst, forbidden):
            if variable_used_unspecialized(self.inst, forbidden):
                dprint(f"Skipping {self.name} because it uses {forbidden}")
                res = False
        return res

    def write_body(
        self,
        out: Formatter,
        dedent: int,
        active_caches: list[ActiveCacheEffect],
        tier: Tiers = TIER_ONE,
    ) -> None:
        """Write the instruction body."""
        # Write cache effect variable declarations and initializations
        for active in active_caches:
            ceffect = active.effect
            bits = ceffect.size * BITS_PER_CODE_UNIT
            if bits == 64:
                # NOTE: We assume that 64-bit data in the cache
                # is always an object pointer.
                # If this becomes false, we need a way to specify
                # syntactically what type the cache data is.
                typ = "PyObject *"
                func = "read_obj"
            else:
                typ = f"uint{bits}_t "
                func = f"read_u{bits}"
            if tier == TIER_ONE:
                out.emit(
                    f"{typ}{ceffect.name} = {func}(&next_instr[{active.offset}].cache);"
                )
            else:
                out.emit(f"{typ}{ceffect.name} = ({typ.strip()})operand;")

        # Write the body, substituting a goto for ERROR_IF() and other stuff
        assert dedent <= 0
        extra = " " * -dedent
        names_to_skip = self.unmoved_names | frozenset({UNUSED, "null"})
        offset = 0
        context = self.block.context
        assert context is not None and context.owner is not None
        filename = context.owner.filename
        for line in self.block_text:
            out.set_lineno(self.block_line + offset, filename)
            offset += 1
            if m := re.match(r"(\s*)ERROR_IF\((.+), (\w+)\);\s*(?://.*)?$", line):
                space, cond, label = m.groups()
                space = extra + space
                # ERROR_IF() must pop the inputs from the stack.
                # The code block is responsible for DECREF()ing them.
                # NOTE: If the label doesn't exist, just add it to ceval.c.

                # Don't pop common input/output effects at the bottom!
                # These aren't DECREF'ed so they can stay.
                ieffs = list(self.input_effects)
                oeffs = list(self.output_effects)
                while (
                    ieffs
                    and oeffs
                    and ieffs[0] == oeffs[0]
                    and ieffs[0].name == oeffs[0].name
                ):
                    ieffs.pop(0)
                    oeffs.pop(0)
                ninputs, symbolic = list_effect_size(ieffs)
                if ninputs:
                    label = f"pop_{ninputs}_{label}"
                if symbolic:
                    out.write_raw(
                        f"{space}if ({cond}) {{ STACK_SHRINK({symbolic}); goto {label}; }}\n"
                    )
                else:
                    out.write_raw(f"{space}if ({cond}) goto {label};\n")
            elif m := re.match(r"(\s*)DECREF_INPUTS\(\);\s*(?://.*)?$", line):
                out.reset_lineno()
                space = extra + m.group(1)
                for ieff in self.input_effects:
                    if ieff.name in names_to_skip:
                        continue
                    if ieff.size:
                        out.write_raw(
                            f"{space}for (int _i = {ieff.size}; --_i >= 0;) {{\n"
                        )
                        out.write_raw(f"{space}    Py_DECREF({ieff.name}[_i]);\n")
                        out.write_raw(f"{space}}}\n")
                    else:
                        decref = "XDECREF" if ieff.cond else "DECREF"
                        out.write_raw(f"{space}Py_{decref}({ieff.name});\n")
            else:
                out.write_raw(extra + line)
        out.reset_lineno()


InstructionOrCacheEffect = Instruction | parsing.CacheEffect


# Instruction used for abstract interpretation.
class AbstractInstruction(Instruction):
    def __init__(self, inst: parsing.InstDef):
        super().__init__(inst)

    def write(self, out: Formatter, tier: Tiers = TIER_ONE) -> None:
        """Write one abstract instruction, sans prologue and epilogue."""
        stacking.write_single_instr_for_abstract_interp(self, out)

    def write_body(
        self,
        out: Formatter,
        dedent: int,
        active_caches: list[ActiveCacheEffect],
        tier: Tiers = TIER_ONE,
    ) -> None:
        pass


@dataclasses.dataclass
class Component:
    instr: Instruction
    active_caches: list[ActiveCacheEffect]


MacroParts = list[Component | parsing.CacheEffect]


@dataclasses.dataclass
class MacroInstruction:
    """A macro instruction."""

    name: str
    instr_fmt: str
    instr_flags: InstructionFlags
    macro: parsing.Macro
    parts: MacroParts
    cache_offset: int
    predicted: bool = False


@dataclasses.dataclass
class PseudoInstruction:
    """A pseudo instruction."""

    name: str
    targets: list[Instruction]
    instr_fmt: str
    instr_flags: InstructionFlags


@dataclasses.dataclass
class OverriddenInstructionPlaceHolder:
    name: str


AnyInstruction = Instruction | MacroInstruction | PseudoInstruction


def extract_block_text(block: parsing.Block) -> tuple[list[str], bool, int]:
    # Get lines of text with proper dedent
    blocklines = block.text.splitlines(True)
    first_token: lx.Token = block.tokens[0]  # IndexError means the context is broken
    block_line = first_token.begin[0]

    # Remove blank lines from both ends
    while blocklines and not blocklines[0].strip():
        blocklines.pop(0)
        block_line += 1
    while blocklines and not blocklines[-1].strip():
        blocklines.pop()

    # Remove leading and trailing braces
    assert blocklines and blocklines[0].strip() == "{"
    assert blocklines and blocklines[-1].strip() == "}"
    blocklines.pop()
    blocklines.pop(0)
    block_line += 1

    # Remove trailing blank lines
    while blocklines and not blocklines[-1].strip():
        blocklines.pop()

    # Separate CHECK_EVAL_BREAKER() macro from end
    check_eval_breaker = (
        blocklines != [] and blocklines[-1].strip() == "CHECK_EVAL_BREAKER();"
    )
    if check_eval_breaker:
        del blocklines[-1]

    return blocklines, check_eval_breaker, block_line


def always_exits(lines: list[str]) -> str:
    """Determine whether a block always ends in a return/goto/etc."""
    if not lines:
        return ""
    line = lines[-1].rstrip()
    # Indent must match exactly (TODO: Do something better)
    if line[:12] != " " * 12:
        return ""
    line = line[12:]
    if line.startswith(
        (
            "goto ",
            "return ",
            "DISPATCH",
            "GO_TO_",
            "Py_UNREACHABLE()",
            "ERROR_IF(true, ",
        )
    ):
        return line
    return ""