1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
|
# Copyright 2007 Google, Inc. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Abstract Base Classes (ABCs) according to PEP 3119."""
def abstractmethod(funcobj):
"""A decorator indicating abstract methods.
Requires that the metaclass is ABCMeta or derived from it. A
class that has a metaclass derived from ABCMeta cannot be
instantiated unless all of its abstract methods are overridden.
The abstract methods can be called using any of the the normal
'super' call mechanisms.
Usage:
class C(metaclass=ABCMeta):
@abstractmethod
def my_abstract_method(self, ...):
...
"""
funcobj.__isabstractmethod__ = True
return funcobj
class _Abstract(object):
"""Helper class inserted into the bases by ABCMeta (using _fix_bases()).
You should never need to explicitly subclass this class.
There should never be a base class between _Abstract and object.
"""
def __new__(cls, *args, **kwds):
am = cls.__dict__.get("__abstractmethods__")
if am:
raise TypeError("Can't instantiate abstract class %s "
"with abstract methods %s" %
(cls.__name__, ", ".join(sorted(am))))
if (args or kwds) and cls.__init__ is object.__init__:
raise TypeError("Can't pass arguments to __new__ "
"without overriding __init__")
return object.__new__(cls)
@classmethod
def __subclasshook__(cls, subclass):
"""Abstract classes can override this to customize issubclass().
This is invoked early on by __subclasscheck__() below. It
should return True, False or NotImplemented. If it returns
NotImplemented, the normal algorithm is used. Otherwise, it
overrides the normal algorithm (and the outcome is cached).
"""
return NotImplemented
def _fix_bases(bases):
"""Helper method that inserts _Abstract in the bases if needed."""
for base in bases:
if issubclass(base, _Abstract):
# _Abstract is already a base (maybe indirectly)
return bases
if object in bases:
# Replace object with _Abstract
return tuple([_Abstract if base is object else base
for base in bases])
# Append _Abstract to the end
return bases + (_Abstract,)
class ABCMeta(type):
"""Metaclass for defining Abstract Base Classes (ABCs).
Use this metaclass to create an ABC. An ABC can be subclassed
directly, and then acts as a mix-in class. You can also register
unrelated concrete classes (even built-in classes) and unrelated
ABCs as 'virtual subclasses' -- these and their descendants will
be considered subclasses of the registering ABC by the built-in
issubclass() function, but the registering ABC won't show up in
their MRO (Method Resolution Order) nor will method
implementations defined by the registering ABC be callable (not
even via super()).
"""
# A global counter that is incremented each time a class is
# registered as a virtual subclass of anything. It forces the
# negative cache to be cleared before its next use.
__invalidation_counter = 0
def __new__(mcls, name, bases, namespace):
bases = _fix_bases(bases)
cls = super(ABCMeta, mcls).__new__(mcls, name, bases, namespace)
# Compute set of abstract method names
abstracts = {name
for name, value in namespace.items()
if getattr(value, "__isabstractmethod__", False)}
for base in bases:
for name in getattr(base, "__abstractmethods__", set()):
value = getattr(cls, name, None)
if getattr(value, "__isabstractmethod__", False):
abstracts.add(name)
cls.__abstractmethods__ = abstracts
# Set up inheritance registry
cls.__registry = set()
cls.__cache = set()
cls.__negative_cache = set()
cls.__negative_cache_version = ABCMeta.__invalidation_counter
return cls
def register(cls, subclass):
"""Register a virtual subclass of an ABC."""
if not isinstance(cls, type):
raise TypeError("Can only register classes")
if issubclass(subclass, cls):
return # Already a subclass
# Subtle: test for cycles *after* testing for "already a subclass";
# this means we allow X.register(X) and interpret it as a no-op.
if issubclass(cls, subclass):
# This would create a cycle, which is bad for the algorithm below
raise RuntimeError("Refusing to create an inheritance cycle")
cls.__registry.add(subclass)
ABCMeta.__invalidation_counter += 1 # Invalidate negative cache
def _dump_registry(cls, file=None):
"""Debug helper to print the ABC registry."""
print("Class: %s.%s" % (cls.__module__, cls.__name__), file=file)
print("Inv.counter: %s" % ABCMeta.__invalidation_counter, file=file)
for name in sorted(cls.__dict__.keys()):
if name.startswith("__abc_"):
value = getattr(cls, name)
print("%s: %r" % (name, value), file=file)
def __instancecheck__(cls, instance):
"""Override for isinstance(instance, cls)."""
return any(cls.__subclasscheck__(c)
for c in {instance.__class__, type(instance)})
def __subclasscheck__(cls, subclass):
"""Override for issubclass(subclass, cls)."""
# Check cache
if subclass in cls.__cache:
return True
# Check negative cache; may have to invalidate
if cls.__negative_cache_version < ABCMeta.__invalidation_counter:
# Invalidate the negative cache
cls.__negative_cache_version = ABCMeta.__invalidation_counter
cls.__negative_cache = set()
elif subclass in cls.__negative_cache:
return False
# Check the subclass hook
ok = cls.__subclasshook__(subclass)
if ok is not NotImplemented:
assert isinstance(ok, bool)
if ok:
cls.__cache.add(subclass)
else:
cls.__negative_cache.add(subclass)
return ok
# Check if it's a direct subclass
if cls in subclass.__mro__:
cls.__cache.add(subclass)
return True
# Check if it's a subclass of a registered class (recursive)
for rcls in cls.__registry:
if issubclass(subclass, rcls):
cls.__registry.add(subclass)
return True
# Check if it's a subclass of a subclass (recursive)
for scls in cls.__subclasses__():
if issubclass(subclass, scls):
cls.__registry.add(subclass)
return True
# No dice; update negative cache
cls.__negative_cache.add(subclass)
return False
|