summaryrefslogtreecommitdiffstats
path: root/Lib/unittest/test/support.py
blob: dbe4ddcd0e7fcb2af59c256e773e254a5764f5d0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import unittest


class TestEquality(object):
    """Used as a mixin for TestCase"""

    # Check for a valid __eq__ implementation
    def test_eq(self):
        for obj_1, obj_2 in self.eq_pairs:
            self.assertEqual(obj_1, obj_2)
            self.assertEqual(obj_2, obj_1)

    # Check for a valid __ne__ implementation
    def test_ne(self):
        for obj_1, obj_2 in self.ne_pairs:
            self.assertNotEqual(obj_1, obj_2)
            self.assertNotEqual(obj_2, obj_1)

class TestHashing(object):
    """Used as a mixin for TestCase"""

    # Check for a valid __hash__ implementation
    def test_hash(self):
        for obj_1, obj_2 in self.eq_pairs:
            try:
                if not hash(obj_1) == hash(obj_2):
                    self.fail("%r and %r do not hash equal" % (obj_1, obj_2))
            except KeyboardInterrupt:
                raise
            except Exception as e:
                self.fail("Problem hashing %r and %r: %s" % (obj_1, obj_2, e))

        for obj_1, obj_2 in self.ne_pairs:
            try:
                if hash(obj_1) == hash(obj_2):
                    self.fail("%s and %s hash equal, but shouldn't" %
                              (obj_1, obj_2))
            except KeyboardInterrupt:
                raise
            except Exception as e:
                self.fail("Problem hashing %s and %s: %s" % (obj_1, obj_2, e))


class LoggingResult(unittest.TestResult):
    def __init__(self, log):
        self._events = log
        super().__init__()

    def startTest(self, test):
        self._events.append('startTest')
        super().startTest(test)

    def startTestRun(self):
        self._events.append('startTestRun')
        super(LoggingResult, self).startTestRun()

    def stopTest(self, test):
        self._events.append('stopTest')
        super().stopTest(test)

    def stopTestRun(self):
        self._events.append('stopTestRun')
        super(LoggingResult, self).stopTestRun()

    def addFailure(self, *args):
        self._events.append('addFailure')
        super().addFailure(*args)

    def addSuccess(self, *args):
        self._events.append('addSuccess')
        super(LoggingResult, self).addSuccess(*args)

    def addError(self, *args):
        self._events.append('addError')
        super().addError(*args)

    def addSkip(self, *args):
        self._events.append('addSkip')
        super(LoggingResult, self).addSkip(*args)

    def addExpectedFailure(self, *args):
        self._events.append('addExpectedFailure')
        super(LoggingResult, self).addExpectedFailure(*args)

    def addUnexpectedSuccess(self, *args):
        self._events.append('addUnexpectedSuccess')
        super(LoggingResult, self).addUnexpectedSuccess(*args)


class ResultWithNoStartTestRunStopTestRun(object):
    """An object honouring TestResult before startTestRun/stopTestRun."""

    def __init__(self):
        self.failures = []
        self.errors = []
        self.testsRun = 0
        self.skipped = []
        self.expectedFailures = []
        self.unexpectedSuccesses = []
        self.shouldStop = False

    def startTest(self, test):
        pass

    def stopTest(self, test):
        pass

    def addError(self, test):
        pass

    def addFailure(self, test):
        pass

    def addSuccess(self, test):
        pass

    def wasSuccessful(self):
        return True
class="hl slc"># pass frame and traceback object IDs instead of the objects themselves self.conn.remotecall(self.oid, "interaction", (message, wrap_frame(frame), wrap_info(info)), {}) class IdbAdapter: def __init__(self, idb): self.idb = idb #----------called by an IdbProxy---------- def set_step(self): self.idb.set_step() def set_quit(self): self.idb.set_quit() def set_continue(self): self.idb.set_continue() def set_next(self, fid): frame = frametable[fid] self.idb.set_next(frame) def set_return(self, fid): frame = frametable[fid] self.idb.set_return(frame) def get_stack(self, fid, tbid): frame = frametable[fid] if tbid is None: tb = None else: tb = tracebacktable[tbid] stack, i = self.idb.get_stack(frame, tb) stack = [(wrap_frame(frame), k) for frame, k in stack] return stack, i def run(self, cmd): import __main__ self.idb.run(cmd, __main__.__dict__) def set_break(self, filename, lineno): msg = self.idb.set_break(filename, lineno) return msg def clear_break(self, filename, lineno): msg = self.idb.clear_break(filename, lineno) return msg def clear_all_file_breaks(self, filename): msg = self.idb.clear_all_file_breaks(filename) return msg #----------called by a FrameProxy---------- def frame_attr(self, fid, name): frame = frametable[fid] return getattr(frame, name) def frame_globals(self, fid): frame = frametable[fid] dict = frame.f_globals did = id(dict) dicttable[did] = dict return did def frame_locals(self, fid): frame = frametable[fid] dict = frame.f_locals did = id(dict) dicttable[did] = dict return did def frame_code(self, fid): frame = frametable[fid] code = frame.f_code cid = id(code) codetable[cid] = code return cid #----------called by a CodeProxy---------- def code_name(self, cid): code = codetable[cid] return code.co_name def code_filename(self, cid): code = codetable[cid] return code.co_filename #----------called by a DictProxy---------- def dict_keys(self, did): raise NotImplemented("dict_keys not public or pickleable") ## dict = dicttable[did] ## return dict.keys() ### Needed until dict_keys is type is finished and pickealable. ### Will probably need to extend rpc.py:SocketIO._proxify at that time. def dict_keys_list(self, did): dict = dicttable[did] return list(dict.keys()) def dict_item(self, did, key): dict = dicttable[did] value = dict[key] value = repr(value) ### can't pickle module 'builtins' return value #----------end class IdbAdapter---------- def start_debugger(rpchandler, gui_adap_oid): """Start the debugger and its RPC link in the Python subprocess Start the subprocess side of the split debugger and set up that side of the RPC link by instantiating the GUIProxy, Idb debugger, and IdbAdapter objects and linking them together. Register the IdbAdapter with the RPCServer to handle RPC requests from the split debugger GUI via the IdbProxy. """ gui_proxy = GUIProxy(rpchandler, gui_adap_oid) idb = Debugger.Idb(gui_proxy) idb_adap = IdbAdapter(idb) rpchandler.register(idb_adap_oid, idb_adap) return idb_adap_oid #======================================= # # In the IDLE process: class FrameProxy: def __init__(self, conn, fid): self._conn = conn self._fid = fid self._oid = "idb_adapter" self._dictcache = {} def __getattr__(self, name): if name[:1] == "_": raise AttributeError(name) if name == "f_code": return self._get_f_code() if name == "f_globals": return self._get_f_globals() if name == "f_locals": return self._get_f_locals() return self._conn.remotecall(self._oid, "frame_attr", (self._fid, name), {}) def _get_f_code(self): cid = self._conn.remotecall(self._oid, "frame_code", (self._fid,), {}) return CodeProxy(self._conn, self._oid, cid) def _get_f_globals(self): did = self._conn.remotecall(self._oid, "frame_globals", (self._fid,), {}) return self._get_dict_proxy(did) def _get_f_locals(self): did = self._conn.remotecall(self._oid, "frame_locals", (self._fid,), {}) return self._get_dict_proxy(did) def _get_dict_proxy(self, did): if did in self._dictcache: return self._dictcache[did] dp = DictProxy(self._conn, self._oid, did) self._dictcache[did] = dp return dp class CodeProxy: def __init__(self, conn, oid, cid): self._conn = conn self._oid = oid self._cid = cid def __getattr__(self, name): if name == "co_name": return self._conn.remotecall(self._oid, "code_name", (self._cid,), {}) if name == "co_filename": return self._conn.remotecall(self._oid, "code_filename", (self._cid,), {}) class DictProxy: def __init__(self, conn, oid, did): self._conn = conn self._oid = oid self._did = did ## def keys(self): ## return self._conn.remotecall(self._oid, "dict_keys", (self._did,), {}) # 'temporary' until dict_keys is a pickleable built-in type def keys(self): return self._conn.remotecall(self._oid, "dict_keys_list", (self._did,), {}) def __getitem__(self, key): return self._conn.remotecall(self._oid, "dict_item", (self._did, key), {}) def __getattr__(self, name): ##print("*** Failed DictProxy.__getattr__:", name) raise AttributeError(name) class GUIAdapter: def __init__(self, conn, gui): self.conn = conn self.gui = gui def interaction(self, message, fid, modified_info): ##print("*** Interaction: (%s, %s, %s)" % (message, fid, modified_info)) frame = FrameProxy(self.conn, fid) self.gui.interaction(message, frame, modified_info) class IdbProxy: def __init__(self, conn, shell, oid): self.oid = oid self.conn = conn self.shell = shell def call(self, methodname, *args, **kwargs): ##print("*** IdbProxy.call %s %s %s" % (methodname, args, kwargs)) value = self.conn.remotecall(self.oid, methodname, args, kwargs) ##print("*** IdbProxy.call %s returns %r" % (methodname, value)) return value def run(self, cmd, locals): # Ignores locals on purpose! seq = self.conn.asyncqueue(self.oid, "run", (cmd,), {}) self.shell.interp.active_seq = seq def get_stack(self, frame, tbid): # passing frame and traceback IDs, not the objects themselves stack, i = self.call("get_stack", frame._fid, tbid) stack = [(FrameProxy(self.conn, fid), k) for fid, k in stack] return stack, i def set_continue(self): self.call("set_continue") def set_step(self): self.call("set_step") def set_next(self, frame): self.call("set_next", frame._fid) def set_return(self, frame): self.call("set_return", frame._fid) def set_quit(self): self.call("set_quit") def set_break(self, filename, lineno): msg = self.call("set_break", filename, lineno) return msg def clear_break(self, filename, lineno): msg = self.call("clear_break", filename, lineno) return msg def clear_all_file_breaks(self, filename): msg = self.call("clear_all_file_breaks", filename) return msg def start_remote_debugger(rpcclt, pyshell): """Start the subprocess debugger, initialize the debugger GUI and RPC link Request the RPCServer start the Python subprocess debugger and link. Set up the Idle side of the split debugger by instantiating the IdbProxy, debugger GUI, and debugger GUIAdapter objects and linking them together. Register the GUIAdapter with the RPCClient to handle debugger GUI interaction requests coming from the subprocess debugger via the GUIProxy. The IdbAdapter will pass execution and environment requests coming from the Idle debugger GUI to the subprocess debugger via the IdbProxy. """ global idb_adap_oid idb_adap_oid = rpcclt.remotecall("exec", "start_the_debugger",\ (gui_adap_oid,), {}) idb_proxy = IdbProxy(rpcclt, pyshell, idb_adap_oid) gui = Debugger.Debugger(pyshell, idb_proxy) gui_adap = GUIAdapter(rpcclt, gui) rpcclt.register(gui_adap_oid, gui_adap) return gui def close_remote_debugger(rpcclt): """Shut down subprocess debugger and Idle side of debugger RPC link Request that the RPCServer shut down the subprocess debugger and link. Unregister the GUIAdapter, which will cause a GC on the Idle process debugger and RPC link objects. (The second reference to the debugger GUI is deleted in PyShell.close_remote_debugger().) """ close_subprocess_debugger(rpcclt) rpcclt.unregister(gui_adap_oid) def close_subprocess_debugger(rpcclt): rpcclt.remotecall("exec", "stop_the_debugger", (idb_adap_oid,), {}) def restart_subprocess_debugger(rpcclt): idb_adap_oid_ret = rpcclt.remotecall("exec", "start_the_debugger",\ (gui_adap_oid,), {}) assert idb_adap_oid_ret == idb_adap_oid, 'Idb restarted with different oid'