summaryrefslogtreecommitdiffstats
path: root/Lib
diff options
context:
space:
mode:
authorMartin v. Löwis <martin@v.loewis.de>2004-05-31 18:22:40 (GMT)
committerMartin v. Löwis <martin@v.loewis.de>2004-05-31 18:22:40 (GMT)
commit2a6ba9097ee3942ae328befaf074ce9722b93ca0 (patch)
tree475a4e8bfd2d8aedd0c38f5e813ba8ad4d23e991 /Lib
parent0a6d0ff8d9ad27d66eb8195b9366e5b49dbb69b6 (diff)
downloadcpython-2a6ba9097ee3942ae328befaf074ce9722b93ca0.zip
cpython-2a6ba9097ee3942ae328befaf074ce9722b93ca0.tar.gz
cpython-2a6ba9097ee3942ae328befaf074ce9722b93ca0.tar.bz2
Patch #963318: Add support for client-side cookie management.
Diffstat (limited to 'Lib')
-rw-r--r--Lib/_LWPCookieJar.py171
-rw-r--r--Lib/_MozillaCookieJar.py145
-rw-r--r--Lib/cookielib.py1756
-rw-r--r--Lib/test/test_cookielib.py1620
-rw-r--r--Lib/test/test_urllib2.py99
-rw-r--r--Lib/urllib2.py75
6 files changed, 3827 insertions, 39 deletions
diff --git a/Lib/_LWPCookieJar.py b/Lib/_LWPCookieJar.py
new file mode 100644
index 0000000..2eb83a6
--- /dev/null
+++ b/Lib/_LWPCookieJar.py
@@ -0,0 +1,171 @@
+"""Load / save to libwww-perl (LWP) format files.
+
+Actually, the format is slightly extended from that used by LWP's
+(libwww-perl's) HTTP::Cookies, to avoid losing some RFC 2965 information
+not recorded by LWP.
+
+It uses the version string "2.0", though really there isn't an LWP Cookies
+2.0 format. This indicates that there is extra information in here
+(domain_dot and # port_spec) while still being compatible with
+libwww-perl, I hope.
+
+"""
+
+import time, re, logging
+from cookielib import reraise_unmasked_exceptions, FileCookieJar, Cookie, \
+ MISSING_FILENAME_TEXT, join_header_words, split_header_words, \
+ iso2time, time2isoz
+
+def lwp_cookie_str(cookie):
+ """Return string representation of Cookie in an the LWP cookie file format.
+
+ Actually, the format is extended a bit -- see module docstring.
+
+ """
+ h = [(cookie.name, cookie.value),
+ ("path", cookie.path),
+ ("domain", cookie.domain)]
+ if cookie.port is not None: h.append(("port", cookie.port))
+ if cookie.path_specified: h.append(("path_spec", None))
+ if cookie.port_specified: h.append(("port_spec", None))
+ if cookie.domain_initial_dot: h.append(("domain_dot", None))
+ if cookie.secure: h.append(("secure", None))
+ if cookie.expires: h.append(("expires",
+ time2isoz(float(cookie.expires))))
+ if cookie.discard: h.append(("discard", None))
+ if cookie.comment: h.append(("comment", cookie.comment))
+ if cookie.comment_url: h.append(("commenturl", cookie.comment_url))
+
+ keys = cookie._rest.keys()
+ keys.sort()
+ for k in keys:
+ h.append((k, str(cookie._rest[k])))
+
+ h.append(("version", str(cookie.version)))
+
+ return join_header_words([h])
+
+class LWPCookieJar(FileCookieJar):
+ """
+ The LWPCookieJar saves a sequence of"Set-Cookie3" lines.
+ "Set-Cookie3" is the format used by the libwww-perl libary, not known
+ to be compatible with any browser, but which is easy to read and
+ doesn't lose information about RFC 2965 cookies.
+
+ Additional methods
+
+ as_lwp_str(ignore_discard=True, ignore_expired=True)
+
+ """
+
+ def as_lwp_str(self, ignore_discard=True, ignore_expires=True):
+ """Return cookies as a string of "\n"-separated "Set-Cookie3" headers.
+
+ ignore_discard and ignore_expires: see docstring for FileCookieJar.save
+
+ """
+ now = time.time()
+ r = []
+ for cookie in self:
+ if not ignore_discard and cookie.discard:
+ continue
+ if not ignore_expires and cookie.is_expired(now):
+ continue
+ r.append("Set-Cookie3: %s" % lwp_cookie_str(cookie))
+ return "\n".join(r+[""])
+
+ def save(self, filename=None, ignore_discard=False, ignore_expires=False):
+ if filename is None:
+ if self.filename is not None: filename = self.filename
+ else: raise ValueError(MISSING_FILENAME_TEXT)
+
+ f = open(filename, "w")
+ try:
+ # There really isn't an LWP Cookies 2.0 format, but this indicates
+ # that there is extra information in here (domain_dot and
+ # port_spec) while still being compatible with libwww-perl, I hope.
+ f.write("#LWP-Cookies-2.0\n")
+ f.write(self.as_lwp_str(ignore_discard, ignore_expires))
+ finally:
+ f.close()
+
+ def _really_load(self, f, filename, ignore_discard, ignore_expires):
+ magic = f.readline()
+ if not re.search(self.magic_re, magic):
+ msg = "%s does not seem to contain cookies" % filename
+ raise IOError(msg)
+
+ now = time.time()
+
+ header = "Set-Cookie3:"
+ boolean_attrs = ("port_spec", "path_spec", "domain_dot",
+ "secure", "discard")
+ value_attrs = ("version",
+ "port", "path", "domain",
+ "expires",
+ "comment", "commenturl")
+
+ try:
+ while 1:
+ line = f.readline()
+ if line == "": break
+ if not line.startswith(header):
+ continue
+ line = line[len(header):].strip()
+
+ for data in split_header_words([line]):
+ name, value = data[0]
+ # name and value are an exception here, since a plain "foo"
+ # (with no "=", unlike "bar=foo") means a cookie with no
+ # name and value "foo". With all other cookie-attributes,
+ # the situation is reversed: "foo" means an attribute named
+ # "foo" with no value!
+ if value is None:
+ name, value = value, name
+ standard = {}
+ rest = {}
+ for k in boolean_attrs:
+ standard[k] = False
+ for k, v in data[1:]:
+ if k is not None:
+ lc = k.lower()
+ else:
+ lc = None
+ # don't lose case distinction for unknown fields
+ if (lc in value_attrs) or (lc in boolean_attrs):
+ k = lc
+ if k in boolean_attrs:
+ if v is None: v = True
+ standard[k] = v
+ elif k in value_attrs:
+ standard[k] = v
+ else:
+ rest[k] = v
+
+ h = standard.get
+ expires = h("expires")
+ discard = h("discard")
+ if expires is not None:
+ expires = iso2time(expires)
+ if expires is None:
+ discard = True
+ domain = h("domain")
+ domain_specified = domain.startswith(".")
+ c = Cookie(h("version"), name, value,
+ h("port"), h("port_spec"),
+ domain, domain_specified, h("domain_dot"),
+ h("path"), h("path_spec"),
+ h("secure"),
+ expires,
+ discard,
+ h("comment"),
+ h("commenturl"),
+ rest)
+ if not ignore_discard and c.discard:
+ continue
+ if not ignore_expires and c.is_expired(now):
+ continue
+ self.set_cookie(c)
+ except:
+ reraise_unmasked_exceptions((IOError,))
+ raise IOError("invalid Set-Cookie3 format file %s" % filename)
diff --git a/Lib/_MozillaCookieJar.py b/Lib/_MozillaCookieJar.py
new file mode 100644
index 0000000..761a879
--- /dev/null
+++ b/Lib/_MozillaCookieJar.py
@@ -0,0 +1,145 @@
+"""Mozilla / Netscape cookie loading / saving."""
+
+import re, time, logging
+
+from cookielib import reraise_unmasked_exceptions, FileCookieJar, Cookie, \
+ MISSING_FILENAME_TEXT
+
+class MozillaCookieJar(FileCookieJar):
+ """
+
+ WARNING: you may want to backup your browser's cookies file if you use
+ this class to save cookies. I *think* it works, but there have been
+ bugs in the past!
+
+ This class differs from CookieJar only in the format it uses to save and
+ load cookies to and from a file. This class uses the Mozilla/Netscape
+ `cookies.txt' format. lynx uses this file format, too.
+
+ Don't expect cookies saved while the browser is running to be noticed by
+ the browser (in fact, Mozilla on unix will overwrite your saved cookies if
+ you change them on disk while it's running; on Windows, you probably can't
+ save at all while the browser is running).
+
+ Note that the Mozilla/Netscape format will downgrade RFC2965 cookies to
+ Netscape cookies on saving.
+
+ In particular, the cookie version and port number information is lost,
+ together with information about whether or not Path, Port and Discard were
+ specified by the Set-Cookie2 (or Set-Cookie) header, and whether or not the
+ domain as set in the HTTP header started with a dot (yes, I'm aware some
+ domains in Netscape files start with a dot and some don't -- trust me, you
+ really don't want to know any more about this).
+
+ Note that though Mozilla and Netscape use the same format, they use
+ slightly different headers. The class saves cookies using the Netscape
+ header by default (Mozilla can cope with that).
+
+ """
+ magic_re = "#( Netscape)? HTTP Cookie File"
+ header = """\
+ # Netscape HTTP Cookie File
+ # http://www.netscape.com/newsref/std/cookie_spec.html
+ # This is a generated file! Do not edit.
+
+"""
+
+ def _really_load(self, f, filename, ignore_discard, ignore_expires):
+ now = time.time()
+
+ magic = f.readline()
+ if not re.search(self.magic_re, magic):
+ f.close()
+ raise IOError(
+ "%s does not look like a Netscape format cookies file" %
+ filename)
+
+ try:
+ while 1:
+ line = f.readline()
+ if line == "": break
+
+ # last field may be absent, so keep any trailing tab
+ if line.endswith("\n"): line = line[:-1]
+
+ # skip comments and blank lines XXX what is $ for?
+ if (line.strip().startswith("#") or
+ line.strip().startswith("$") or
+ line.strip() == ""):
+ continue
+
+ domain, domain_specified, path, secure, expires, name, value = \
+ line.split("\t")
+ secure = (secure == "TRUE")
+ domain_specified = (domain_specified == "TRUE")
+ if name == "":
+ name = value
+ value = None
+
+ initial_dot = domain.startswith(".")
+ assert domain_specified == initial_dot
+
+ discard = False
+ if expires == "":
+ expires = None
+ discard = True
+
+ # assume path_specified is false
+ c = Cookie(0, name, value,
+ None, False,
+ domain, domain_specified, initial_dot,
+ path, False,
+ secure,
+ expires,
+ discard,
+ None,
+ None,
+ {})
+ if not ignore_discard and c.discard:
+ continue
+ if not ignore_expires and c.is_expired(now):
+ continue
+ self.set_cookie(c)
+
+ except:
+ reraise_unmasked_exceptions((IOError,))
+ raise IOError("invalid Netscape format file %s: %s" %
+ (filename, line))
+
+ def save(self, filename=None, ignore_discard=False, ignore_expires=False):
+ if filename is None:
+ if self.filename is not None: filename = self.filename
+ else: raise ValueError(MISSING_FILENAME_TEXT)
+
+ f = open(filename, "w")
+ try:
+ f.write(self.header)
+ now = time.time()
+ for cookie in self:
+ if not ignore_discard and cookie.discard:
+ continue
+ if not ignore_expires and cookie.is_expired(now):
+ continue
+ if cookie.secure: secure = "TRUE"
+ else: secure = "FALSE"
+ if cookie.domain.startswith("."): initial_dot = "TRUE"
+ else: initial_dot = "FALSE"
+ if cookie.expires is not None:
+ expires = str(cookie.expires)
+ else:
+ expires = ""
+ if cookie.value is None:
+ # cookies.txt regards 'Set-Cookie: foo' as a cookie
+ # with no name, whereas cookielib regards it as a
+ # cookie with no value.
+ name = ""
+ value = cookie.name
+ else:
+ name = cookie.name
+ value = cookie.value
+ f.write(
+ "\t".join([cookie.domain, initial_dot, cookie.path,
+ secure, expires, name, value])+
+ "\n")
+ finally:
+ f.close()
diff --git a/Lib/cookielib.py b/Lib/cookielib.py
new file mode 100644
index 0000000..b82886a
--- /dev/null
+++ b/Lib/cookielib.py
@@ -0,0 +1,1756 @@
+"""HTTP cookie handling for web clients.
+
+This module has (now fairly distant) origins in Gisle Aas' Perl module
+HTTP::Cookies, from the libwww-perl library.
+
+Docstrings, comments and debug strings in this code refer to the
+attributes of the HTTP cookie system as cookie-attributes, to distinguish
+them clearly from Python attributes.
+
+Class diagram (note that the classes which do not derive from
+FileCookieJar are not distributed with the Python standard library, but
+are available from http://wwwsearch.sf.net/):
+
+ CookieJar____
+ / \ \
+ FileCookieJar \ \
+ / | \ \ \
+ MozillaCookieJar | LWPCookieJar \ \
+ | | \
+ | ---MSIEBase | \
+ | / | | \
+ | / MSIEDBCookieJar BSDDBCookieJar
+ |/
+ MSIECookieJar
+
+"""
+
+import sys, re, urlparse, copy, time, struct, urllib, types, logging
+from types import StringTypes
+try:
+ import threading as _threading
+except ImportError:
+ import dummy_threading as _threading
+import httplib # only for the default HTTP port
+from calendar import timegm
+
+logging.getLogger("cookielib").addHandler(logging.StreamHandler())
+debug = logging.getLogger("cookielib").debug
+
+DEFAULT_HTTP_PORT = str(httplib.HTTP_PORT)
+MISSING_FILENAME_TEXT = ("a filename was not supplied (nor was the CookieJar "
+ "instance initialised with one)")
+
+def reraise_unmasked_exceptions(unmasked=()):
+ # There are a few catch-all except: statements in this module, for
+ # catching input that's bad in unexpected ways.
+ # This function re-raises some exceptions we don't want to trap.
+ unmasked = unmasked + (KeyboardInterrupt, SystemExit, MemoryError)
+ etype = sys.exc_info()[0]
+ if issubclass(etype, unmasked):
+ raise
+ # swallowed an exception
+ import warnings
+ warnings.warn("cookielib bug!", stacklevel=2)
+ import traceback
+ traceback.print_exc()
+
+
+# Date/time conversion
+# -----------------------------------------------------------------------------
+
+EPOCH_YEAR = 1970
+def _timegm(tt):
+ year, month, mday, hour, min, sec = tt[:6]
+ if ((year >= EPOCH_YEAR) and (1 <= month <= 12) and (1 <= mday <= 31) and
+ (0 <= hour <= 24) and (0 <= min <= 59) and (0 <= sec <= 61)):
+ return timegm(tt)
+ else:
+ return None
+
+DAYS = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]
+MONTHS = ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
+ "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
+MONTHS_LOWER = []
+for month in MONTHS: MONTHS_LOWER.append(month.lower())
+
+def time2isoz(t=None):
+ """Return a string representing time in seconds since epoch, t.
+
+ If the function is called without an argument, it will use the current
+ time.
+
+ The format of the returned string is like "YYYY-MM-DD hh:mm:ssZ",
+ representing Universal Time (UTC, aka GMT). An example of this format is:
+
+ 1994-11-24 08:49:37Z
+
+ """
+ if t is None: t = time.time()
+ year, mon, mday, hour, min, sec = time.gmtime(t)[:6]
+ return "%04d-%02d-%02d %02d:%02d:%02dZ" % (
+ year, mon, mday, hour, min, sec)
+
+def time2netscape(t=None):
+ """Return a string representing time in seconds since epoch, t.
+
+ If the function is called without an argument, it will use the current
+ time.
+
+ The format of the returned string is like this:
+
+ Wed, DD-Mon-YYYY HH:MM:SS GMT
+
+ """
+ if t is None: t = time.time()
+ year, mon, mday, hour, min, sec, wday = time.gmtime(t)[:7]
+ return "%s %02d-%s-%04d %02d:%02d:%02d GMT" % (
+ DAYS[wday], mday, MONTHS[mon-1], year, hour, min, sec)
+
+
+UTC_ZONES = {"GMT": None, "UTC": None, "UT": None, "Z": None}
+
+TIMEZONE_RE = re.compile(r"^([-+])?(\d\d?):?(\d\d)?$")
+def offset_from_tz_string(tz):
+ offset = None
+ if tz in UTC_ZONES:
+ offset = 0
+ else:
+ m = TIMEZONE_RE.search(tz)
+ if m:
+ offset = 3600 * int(m.group(2))
+ if m.group(3):
+ offset = offset + 60 * int(m.group(3))
+ if m.group(1) == '-':
+ offset = -offset
+ return offset
+
+def _str2time(day, mon, yr, hr, min, sec, tz):
+ # translate month name to number
+ # month numbers start with 1 (January)
+ try:
+ mon = MONTHS_LOWER.index(mon.lower())+1
+ except ValueError:
+ # maybe it's already a number
+ try:
+ imon = int(mon)
+ except ValueError:
+ return None
+ if 1 <= imon <= 12:
+ mon = imon
+ else:
+ return None
+
+ # make sure clock elements are defined
+ if hr is None: hr = 0
+ if min is None: min = 0
+ if sec is None: sec = 0
+
+ yr = int(yr)
+ day = int(day)
+ hr = int(hr)
+ min = int(min)
+ sec = int(sec)
+
+ if yr < 1000:
+ # find "obvious" year
+ cur_yr = time.localtime(time.time())[0]
+ m = cur_yr % 100
+ tmp = yr
+ yr = yr + cur_yr - m
+ m = m - tmp
+ if abs(m) > 50:
+ if m > 0: yr = yr + 100
+ else: yr = yr - 100
+
+ # convert UTC time tuple to seconds since epoch (not timezone-adjusted)
+ t = _timegm((yr, mon, day, hr, min, sec, tz))
+
+ if t is not None:
+ # adjust time using timezone string, to get absolute time since epoch
+ if tz is None:
+ tz = "UTC"
+ tz = tz.upper()
+ offset = offset_from_tz_string(tz)
+ if offset is None:
+ return None
+ t = t - offset
+
+ return t
+
+STRICT_DATE_RE = re.compile(
+ r"^[SMTWF][a-z][a-z], (\d\d) ([JFMASOND][a-z][a-z]) "
+ "(\d\d\d\d) (\d\d):(\d\d):(\d\d) GMT$")
+WEEKDAY_RE = re.compile(
+ r"^(?:Sun|Mon|Tue|Wed|Thu|Fri|Sat)[a-z]*,?\s*", re.I)
+LOOSE_HTTP_DATE_RE = re.compile(
+ r"""^
+ (\d\d?) # day
+ (?:\s+|[-\/])
+ (\w+) # month
+ (?:\s+|[-\/])
+ (\d+) # year
+ (?:
+ (?:\s+|:) # separator before clock
+ (\d\d?):(\d\d) # hour:min
+ (?::(\d\d))? # optional seconds
+ )? # optional clock
+ \s*
+ ([-+]?\d{2,4}|(?![APap][Mm]\b)[A-Za-z]+)? # timezone
+ \s*
+ (?:\(\w+\))? # ASCII representation of timezone in parens.
+ \s*$""", re.X)
+def http2time(text):
+ """Returns time in seconds since epoch of time represented by a string.
+
+ Return value is an integer.
+
+ None is returned if the format of str is unrecognized, the time is outside
+ the representable range, or the timezone string is not recognized. If the
+ string contains no timezone, UTC is assumed.
+
+ The timezone in the string may be numerical (like "-0800" or "+0100") or a
+ string timezone (like "UTC", "GMT", "BST" or "EST"). Currently, only the
+ timezone strings equivalent to UTC (zero offset) are known to the function.
+
+ The function loosely parses the following formats:
+
+ Wed, 09 Feb 1994 22:23:32 GMT -- HTTP format
+ Tuesday, 08-Feb-94 14:15:29 GMT -- old rfc850 HTTP format
+ Tuesday, 08-Feb-1994 14:15:29 GMT -- broken rfc850 HTTP format
+ 09 Feb 1994 22:23:32 GMT -- HTTP format (no weekday)
+ 08-Feb-94 14:15:29 GMT -- rfc850 format (no weekday)
+ 08-Feb-1994 14:15:29 GMT -- broken rfc850 format (no weekday)
+
+ The parser ignores leading and trailing whitespace. The time may be
+ absent.
+
+ If the year is given with only 2 digits, the function will select the
+ century that makes the year closest to the current date.
+
+ """
+ # fast exit for strictly conforming string
+ m = STRICT_DATE_RE.search(text)
+ if m:
+ g = m.groups()
+ mon = MONTHS_LOWER.index(g[1].lower()) + 1
+ tt = (int(g[2]), mon, int(g[0]),
+ int(g[3]), int(g[4]), float(g[5]))
+ return _timegm(tt)
+
+ # No, we need some messy parsing...
+
+ # clean up
+ text = text.lstrip()
+ text = WEEKDAY_RE.sub("", text, 1) # Useless weekday
+
+ # tz is time zone specifier string
+ day, mon, yr, hr, min, sec, tz = [None]*7
+
+ # loose regexp parse
+ m = LOOSE_HTTP_DATE_RE.search(text)
+ if m is not None:
+ day, mon, yr, hr, min, sec, tz = m.groups()
+ else:
+ return None # bad format
+
+ return _str2time(day, mon, yr, hr, min, sec, tz)
+
+ISO_DATE_RE = re.compile(
+ """^
+ (\d{4}) # year
+ [-\/]?
+ (\d\d?) # numerical month
+ [-\/]?
+ (\d\d?) # day
+ (?:
+ (?:\s+|[-:Tt]) # separator before clock
+ (\d\d?):?(\d\d) # hour:min
+ (?::?(\d\d(?:\.\d*)?))? # optional seconds (and fractional)
+ )? # optional clock
+ \s*
+ ([-+]?\d\d?:?(:?\d\d)?
+ |Z|z)? # timezone (Z is "zero meridian", i.e. GMT)
+ \s*$""", re.X)
+def iso2time(text):
+ """
+ As for http2time, but parses the ISO 8601 formats:
+
+ 1994-02-03 14:15:29 -0100 -- ISO 8601 format
+ 1994-02-03 14:15:29 -- zone is optional
+ 1994-02-03 -- only date
+ 1994-02-03T14:15:29 -- Use T as separator
+ 19940203T141529Z -- ISO 8601 compact format
+ 19940203 -- only date
+
+ """
+ # clean up
+ text = text.lstrip()
+
+ # tz is time zone specifier string
+ day, mon, yr, hr, min, sec, tz = [None]*7
+
+ # loose regexp parse
+ m = ISO_DATE_RE.search(text)
+ if m is not None:
+ # XXX there's an extra bit of the timezone I'm ignoring here: is
+ # this the right thing to do?
+ yr, mon, day, hr, min, sec, tz, _ = m.groups()
+ else:
+ return None # bad format
+
+ return _str2time(day, mon, yr, hr, min, sec, tz)
+
+
+# Header parsing
+# -----------------------------------------------------------------------------
+
+def unmatched(match):
+ """Return unmatched part of re.Match object."""
+ start, end = match.span(0)
+ return match.string[:start]+match.string[end:]
+
+HEADER_TOKEN_RE = re.compile(r"^\s*([^=\s;,]+)")
+HEADER_QUOTED_VALUE_RE = re.compile(r"^\s*=\s*\"([^\"\\]*(?:\\.[^\"\\]*)*)\"")
+HEADER_VALUE_RE = re.compile(r"^\s*=\s*([^\s;,]*)")
+HEADER_ESCAPE_RE = re.compile(r"\\(.)")
+def split_header_words(header_values):
+ r"""Parse header values into a list of lists containing key,value pairs.
+
+ The function knows how to deal with ",", ";" and "=" as well as quoted
+ values after "=". A list of space separated tokens are parsed as if they
+ were separated by ";".
+
+ If the header_values passed as argument contains multiple values, then they
+ are treated as if they were a single value separated by comma ",".
+
+ This means that this function is useful for parsing header fields that
+ follow this syntax (BNF as from the HTTP/1.1 specification, but we relax
+ the requirement for tokens).
+
+ headers = #header
+ header = (token | parameter) *( [";"] (token | parameter))
+
+ token = 1*<any CHAR except CTLs or separators>
+ separators = "(" | ")" | "<" | ">" | "@"
+ | "," | ";" | ":" | "\" | <">
+ | "/" | "[" | "]" | "?" | "="
+ | "{" | "}" | SP | HT
+
+ quoted-string = ( <"> *(qdtext | quoted-pair ) <"> )
+ qdtext = <any TEXT except <">>
+ quoted-pair = "\" CHAR
+
+ parameter = attribute "=" value
+ attribute = token
+ value = token | quoted-string
+
+ Each header is represented by a list of key/value pairs. The value for a
+ simple token (not part of a parameter) is None. Syntactically incorrect
+ headers will not necessarily be parsed as you would want.
+
+ This is easier to describe with some examples:
+
+ >>> split_header_words(['foo="bar"; port="80,81"; discard, bar=baz'])
+ [[('foo', 'bar'), ('port', '80,81'), ('discard', None)], [('bar', 'baz')]]
+ >>> split_header_words(['text/html; charset="iso-8859-1"'])
+ [[('text/html', None), ('charset', 'iso-8859-1')]]
+ >>> split_header_words([r'Basic realm="\"foo\bar\""'])
+ [[('Basic', None), ('realm', '"foobar"')]]
+
+ """
+ assert type(header_values) not in StringTypes
+ result = []
+ for text in header_values:
+ orig_text = text
+ pairs = []
+ while text:
+ m = HEADER_TOKEN_RE.search(text)
+ if m:
+ text = unmatched(m)
+ name = m.group(1)
+ m = HEADER_QUOTED_VALUE_RE.search(text)
+ if m: # quoted value
+ text = unmatched(m)
+ value = m.group(1)
+ value = HEADER_ESCAPE_RE.sub(r"\1", value)
+ else:
+ m = HEADER_VALUE_RE.search(text)
+ if m: # unquoted value
+ text = unmatched(m)
+ value = m.group(1)
+ value = value.rstrip()
+ else:
+ # no value, a lone token
+ value = None
+ pairs.append((name, value))
+ elif text.lstrip().startswith(","):
+ # concatenated headers, as per RFC 2616 section 4.2
+ text = text.lstrip()[1:]
+ if pairs: result.append(pairs)
+ pairs = []
+ else:
+ # skip junk
+ non_junk, nr_junk_chars = re.subn("^[=\s;]*", "", text)
+ assert nr_junk_chars > 0, (
+ "split_header_words bug: '%s', '%s', %s" %
+ (orig_text, text, pairs))
+ text = non_junk
+ if pairs: result.append(pairs)
+ return result
+
+HEADER_JOIN_ESCAPE_RE = re.compile(r"([\"\\])")
+def join_header_words(lists):
+ """Do the inverse (almost) of the conversion done by split_header_words.
+
+ Takes a list of lists of (key, value) pairs and produces a single header
+ value. Attribute values are quoted if needed.
+
+ >>> join_header_words([[("text/plain", None), ("charset", "iso-8859/1")]])
+ 'text/plain; charset="iso-8859/1"'
+ >>> join_header_words([[("text/plain", None)], [("charset", "iso-8859/1")]])
+ 'text/plain, charset="iso-8859/1"'
+
+ """
+ headers = []
+ for pairs in lists:
+ attr = []
+ for k, v in pairs:
+ if v is not None:
+ if not re.search(r"^\w+$", v):
+ v = HEADER_JOIN_ESCAPE_RE.sub(r"\\\1", v) # escape " and \
+ v = '"%s"' % v
+ k = "%s=%s" % (k, v)
+ attr.append(k)
+ if attr: headers.append("; ".join(attr))
+ return ", ".join(headers)
+
+def parse_ns_headers(ns_headers):
+ """Ad-hoc parser for Netscape protocol cookie-attributes.
+
+ The old Netscape cookie format for Set-Cookie can for instance contain
+ an unquoted "," in the expires field, so we have to use this ad-hoc
+ parser instead of split_header_words.
+
+ XXX This may not make the best possible effort to parse all the crap
+ that Netscape Cookie headers contain. Ronald Tschalar's HTTPClient
+ parser is probably better, so could do worse than following that if
+ this ever gives any trouble.
+
+ Currently, this is also used for parsing RFC 2109 cookies.
+
+ """
+ known_attrs = ("expires", "domain", "path", "secure",
+ # RFC 2109 attrs (may turn up in Netscape cookies, too)
+ "port", "max-age")
+
+ result = []
+ for ns_header in ns_headers:
+ pairs = []
+ version_set = False
+ for param in re.split(r";\s*", ns_header):
+ param = param.rstrip()
+ if param == "": continue
+ if "=" not in param:
+ if param.lower() in known_attrs:
+ k, v = param, None
+ else:
+ # cookie with missing value
+ k, v = param, None
+ else:
+ k, v = re.split(r"\s*=\s*", param, 1)
+ k = k.lstrip()
+ if k is not None:
+ lc = k.lower()
+ if lc in known_attrs:
+ k = lc
+ if k == "version":
+ # This is an RFC 2109 cookie. Will be treated as RFC 2965
+ # cookie in rest of code.
+ # Probably it should be parsed with split_header_words, but
+ # that's too much hassle.
+ version_set = True
+ if k == "expires":
+ # convert expires date to seconds since epoch
+ if v.startswith('"'): v = v[1:]
+ if v.endswith('"'): v = v[:-1]
+ v = http2time(v) # None if invalid
+ pairs.append((k, v))
+
+ if pairs:
+ if not version_set:
+ pairs.append(("version", "0"))
+ result.append(pairs)
+
+ return result
+
+
+IPV4_RE = re.compile(r"\.\d+$")
+def is_HDN(text):
+ """Return True if text is a host domain name."""
+ # XXX
+ # This may well be wrong. Which RFC is HDN defined in, if any (for
+ # the purposes of RFC 2965)?
+ # For the current implementation, what about IPv6? Remember to look
+ # at other uses of IPV4_RE also, if change this.
+ if IPV4_RE.search(text):
+ return False
+ if text == "":
+ return False
+ if text[0] == "." or text[-1] == ".":
+ return False
+ return True
+
+def domain_match(A, B):
+ """Return True if domain A domain-matches domain B, according to RFC 2965.
+
+ A and B may be host domain names or IP addresses.
+
+ RFC 2965, section 1:
+
+ Host names can be specified either as an IP address or a HDN string.
+ Sometimes we compare one host name with another. (Such comparisons SHALL
+ be case-insensitive.) Host A's name domain-matches host B's if
+
+ * their host name strings string-compare equal; or
+
+ * A is a HDN string and has the form NB, where N is a non-empty
+ name string, B has the form .B', and B' is a HDN string. (So,
+ x.y.com domain-matches .Y.com but not Y.com.)
+
+ Note that domain-match is not a commutative operation: a.b.c.com
+ domain-matches .c.com, but not the reverse.
+
+ """
+ # Note that, if A or B are IP addresses, the only relevant part of the
+ # definition of the domain-match algorithm is the direct string-compare.
+ A = A.lower()
+ B = B.lower()
+ if A == B:
+ return True
+ if not is_HDN(A):
+ return False
+ i = A.rfind(B)
+ if i == -1 or i == 0:
+ # A does not have form NB, or N is the empty string
+ return False
+ if not B.startswith("."):
+ return False
+ if not is_HDN(B[1:]):
+ return False
+ return True
+
+def liberal_is_HDN(text):
+ """Return True if text is a sort-of-like a host domain name.
+
+ For accepting/blocking domains.
+
+ """
+ if IPV4_RE.search(text):
+ return False
+ return True
+
+def user_domain_match(A, B):
+ """For blocking/accepting domains.
+
+ A and B may be host domain names or IP addresses.
+
+ """
+ A = A.lower()
+ B = B.lower()
+ if not (liberal_is_HDN(A) and liberal_is_HDN(B)):
+ if A == B:
+ # equal IP addresses
+ return True
+ return False
+ initial_dot = B.startswith(".")
+ if initial_dot and A.endswith(B):
+ return True
+ if not initial_dot and A == B:
+ return True
+ return False
+
+cut_port_re = re.compile(r":\d+$")
+def request_host(request):
+ """Return request-host, as defined by RFC 2965.
+
+ Variation from RFC: returned value is lowercased, for convenient
+ comparison.
+
+ """
+ url = request.get_full_url()
+ host = urlparse.urlparse(url)[1]
+ if host == "":
+ host = request.get_header("Host", "")
+
+ # remove port, if present
+ host = cut_port_re.sub("", host, 1)
+ return host.lower()
+
+def eff_request_host(request):
+ """Return a tuple (request-host, effective request-host name).
+
+ As defined by RFC 2965, except both are lowercased.
+
+ """
+ erhn = req_host = request_host(request)
+ if req_host.find(".") == -1 and not IPV4_RE.search(req_host):
+ erhn = req_host + ".local"
+ return req_host, erhn
+
+def request_path(request):
+ """request-URI, as defined by RFC 2965."""
+ url = request.get_full_url()
+ #scheme, netloc, path, parameters, query, frag = urlparse.urlparse(url)
+ #req_path = escape_path("".join(urlparse.urlparse(url)[2:]))
+ path, parameters, query, frag = urlparse.urlparse(url)[2:]
+ if parameters:
+ path = "%s;%s" % (path, parameters)
+ path = escape_path(path)
+ req_path = urlparse.urlunparse(("", "", path, "", query, frag))
+ if not req_path.startswith("/"):
+ # fix bad RFC 2396 absoluteURI
+ req_path = "/"+req_path
+ return req_path
+
+def request_port(request):
+ host = request.get_host()
+ i = host.find(':')
+ if i >= 0:
+ port = host[i+1:]
+ try:
+ int(port)
+ except ValueError:
+ debug("nonnumeric port: '%s'", port)
+ return None
+ else:
+ port = DEFAULT_HTTP_PORT
+ return port
+
+# Characters in addition to A-Z, a-z, 0-9, '_', '.', and '-' that don't
+# need to be escaped to form a valid HTTP URL (RFCs 2396 and 1738).
+HTTP_PATH_SAFE = "%/;:@&=+$,!~*'()"
+ESCAPED_CHAR_RE = re.compile(r"%([0-9a-fA-F][0-9a-fA-F])")
+def uppercase_escaped_char(match):
+ return "%%%s" % match.group(1).upper()
+def escape_path(path):
+ """Escape any invalid characters in HTTP URL, and uppercase all escapes."""
+ # There's no knowing what character encoding was used to create URLs
+ # containing %-escapes, but since we have to pick one to escape invalid
+ # path characters, we pick UTF-8, as recommended in the HTML 4.0
+ # specification:
+ # http://www.w3.org/TR/REC-html40/appendix/notes.html#h-B.2.1
+ # And here, kind of: draft-fielding-uri-rfc2396bis-03
+ # (And in draft IRI specification: draft-duerst-iri-05)
+ # (And here, for new URI schemes: RFC 2718)
+ if isinstance(path, types.UnicodeType):
+ path = path.encode("utf-8")
+ path = urllib.quote(path, HTTP_PATH_SAFE)
+ path = ESCAPED_CHAR_RE.sub(uppercase_escaped_char, path)
+ return path
+
+def reach(h):
+ """Return reach of host h, as defined by RFC 2965, section 1.
+
+ The reach R of a host name H is defined as follows:
+
+ * If
+
+ - H is the host domain name of a host; and,
+
+ - H has the form A.B; and
+
+ - A has no embedded (that is, interior) dots; and
+
+ - B has at least one embedded dot, or B is the string "local".
+ then the reach of H is .B.
+
+ * Otherwise, the reach of H is H.
+
+ >>> reach("www.acme.com")
+ '.acme.com'
+ >>> reach("acme.com")
+ 'acme.com'
+ >>> reach("acme.local")
+ '.local'
+
+ """
+ i = h.find(".")
+ if i >= 0:
+ #a = h[:i] # this line is only here to show what a is
+ b = h[i+1:]
+ i = b.find(".")
+ if is_HDN(h) and (i >= 0 or b == "local"):
+ return "."+b
+ return h
+
+def is_third_party(request):
+ """
+
+ RFC 2965, section 3.3.6:
+
+ An unverifiable transaction is to a third-party host if its request-
+ host U does not domain-match the reach R of the request-host O in the
+ origin transaction.
+
+ """
+ req_host = request_host(request)
+ if not domain_match(req_host, reach(request.get_origin_req_host())):
+ return True
+ else:
+ return False
+
+
+class Cookie:
+ """HTTP Cookie.
+
+ This class represents both Netscape and RFC 2965 cookies.
+
+ This is deliberately a very simple class. It just holds attributes. It's
+ possible to construct Cookie instances that don't comply with the cookie
+ standards. CookieJar.make_cookies is the factory function for Cookie
+ objects -- it deals with cookie parsing, supplying defaults, and
+ normalising to the representation used in this class. CookiePolicy is
+ responsible for checking them to see whether they should be accepted from
+ and returned to the server.
+
+ Note that the port may be present in the headers, but unspecified ("Port"
+ rather than"Port=80", for example); if this is the case, port is None.
+
+ """
+
+ def __init__(self, version, name, value,
+ port, port_specified,
+ domain, domain_specified, domain_initial_dot,
+ path, path_specified,
+ secure,
+ expires,
+ discard,
+ comment,
+ comment_url,
+ rest):
+
+ if version is not None: version = int(version)
+ if expires is not None: expires = int(expires)
+ if port is None and port_specified is True:
+ raise ValueError("if port is None, port_specified must be false")
+
+ self.version = version
+ self.name = name
+ self.value = value
+ self.port = port
+ self.port_specified = port_specified
+ # normalise case, as per RFC 2965 section 3.3.3
+ self.domain = domain.lower()
+ self.domain_specified = domain_specified
+ # Sigh. We need to know whether the domain given in the
+ # cookie-attribute had an initial dot, in order to follow RFC 2965
+ # (as clarified in draft errata). Needed for the returned $Domain
+ # value.
+ self.domain_initial_dot = domain_initial_dot
+ self.path = path
+ self.path_specified = path_specified
+ self.secure = secure
+ self.expires = expires
+ self.discard = discard
+ self.comment = comment
+ self.comment_url = comment_url
+
+ self._rest = copy.copy(rest)
+
+ def has_nonstandard_attr(self, name):
+ return name in self._rest
+ def get_nonstandard_attr(self, name, default=None):
+ return self._rest.get(name, default)
+ def set_nonstandard_attr(self, name, value):
+ self._rest[name] = value
+
+ def is_expired(self, now=None):
+ if now is None: now = time.time()
+ if (self.expires is not None) and (self.expires <= now):
+ return True
+ return False
+
+ def __str__(self):
+ if self.port is None: p = ""
+ else: p = ":"+self.port
+ limit = self.domain + p + self.path
+ if self.value is not None:
+ namevalue = "%s=%s" % (self.name, self.value)
+ else:
+ namevalue = self.name
+ return "<Cookie %s for %s>" % (namevalue, limit)
+
+ def __repr__(self):
+ args = []
+ for name in ["version", "name", "value",
+ "port", "port_specified",
+ "domain", "domain_specified", "domain_initial_dot",
+ "path", "path_specified",
+ "secure", "expires", "discard", "comment", "comment_url",
+ ]:
+ attr = getattr(self, name)
+ args.append("%s=%s" % (name, repr(attr)))
+ args.append("rest=%s" % repr(self._rest))
+ return "Cookie(%s)" % ", ".join(args)
+
+
+class CookiePolicy:
+ """Defines which cookies get accepted from and returned to server.
+
+ May also modify cookies, though this is probably a bad idea.
+
+ The subclass DefaultCookiePolicy defines the standard rules for Netscape
+ and RFC 2965 cookies -- override that if you want a customised policy.
+
+ """
+ def set_ok(self, cookie, request):
+ """Return true if (and only if) cookie should be accepted from server.
+
+ Currently, pre-expired cookies never get this far -- the CookieJar
+ class deletes such cookies itself.
+
+ """
+ raise NotImplementedError()
+
+ def return_ok(self, cookie, request):
+ """Return true if (and only if) cookie should be returned to server."""
+ raise NotImplementedError()
+
+ def domain_return_ok(self, domain, request):
+ """Return false if cookies should not be returned, given cookie domain.
+ """
+ return True
+
+ def path_return_ok(self, path, request):
+ """Return false if cookies should not be returned, given cookie path.
+ """
+ return True
+
+
+class DefaultCookiePolicy(CookiePolicy):
+ """Implements the standard rules for accepting and returning cookies."""
+
+ DomainStrictNoDots = 1
+ DomainStrictNonDomain = 2
+ DomainRFC2965Match = 4
+
+ DomainLiberal = 0
+ DomainStrict = DomainStrictNoDots|DomainStrictNonDomain
+
+ def __init__(self,
+ blocked_domains=None, allowed_domains=None,
+ netscape=True, rfc2965=False,
+ hide_cookie2=False,
+ strict_domain=False,
+ strict_rfc2965_unverifiable=True,
+ strict_ns_unverifiable=False,
+ strict_ns_domain=DomainLiberal,
+ strict_ns_set_initial_dollar=False,
+ strict_ns_set_path=False,
+ ):
+ """Constructor arguments should be passed as keyword arguments only."""
+ self.netscape = netscape
+ self.rfc2965 = rfc2965
+ self.hide_cookie2 = hide_cookie2
+ self.strict_domain = strict_domain
+ self.strict_rfc2965_unverifiable = strict_rfc2965_unverifiable
+ self.strict_ns_unverifiable = strict_ns_unverifiable
+ self.strict_ns_domain = strict_ns_domain
+ self.strict_ns_set_initial_dollar = strict_ns_set_initial_dollar
+ self.strict_ns_set_path = strict_ns_set_path
+
+ if blocked_domains is not None:
+ self._blocked_domains = tuple(blocked_domains)
+ else:
+ self._blocked_domains = ()
+
+ if allowed_domains is not None:
+ allowed_domains = tuple(allowed_domains)
+ self._allowed_domains = allowed_domains
+
+ def blocked_domains(self):
+ """Return the sequence of blocked domains (as a tuple)."""
+ return self._blocked_domains
+ def set_blocked_domains(self, blocked_domains):
+ """Set the sequence of blocked domains."""
+ self._blocked_domains = tuple(blocked_domains)
+
+ def is_blocked(self, domain):
+ for blocked_domain in self._blocked_domains:
+ if user_domain_match(domain, blocked_domain):
+ return True
+ return False
+
+ def allowed_domains(self):
+ """Return None, or the sequence of allowed domains (as a tuple)."""
+ return self._allowed_domains
+ def set_allowed_domains(self, allowed_domains):
+ """Set the sequence of allowed domains, or None."""
+ if allowed_domains is not None:
+ allowed_domains = tuple(allowed_domains)
+ self._allowed_domains = allowed_domains
+
+ def is_not_allowed(self, domain):
+ if self._allowed_domains is None:
+ return False
+ for allowed_domain in self._allowed_domains:
+ if user_domain_match(domain, allowed_domain):
+ return False
+ return True
+
+ def set_ok(self, cookie, request):
+ """
+ If you override .set_ok(), be sure to call this method. If it returns
+ false, so should your subclass (assuming your subclass wants to be more
+ strict about which cookies to accept).
+
+ """
+ debug(" - checking cookie %s=%s", cookie.name, cookie.value)
+
+ assert cookie.name is not None
+
+ for n in "version", "verifiability", "name", "path", "domain", "port":
+ fn_name = "set_ok_"+n
+ fn = getattr(self, fn_name)
+ if not fn(cookie, request):
+ return False
+
+ return True
+
+ def set_ok_version(self, cookie, request):
+ if cookie.version is None:
+ # Version is always set to 0 by parse_ns_headers if it's a Netscape
+ # cookie, so this must be an invalid RFC 2965 cookie.
+ debug(" Set-Cookie2 without version attribute (%s=%s)",
+ cookie.name, cookie.value)
+ return False
+ if cookie.version > 0 and not self.rfc2965:
+ debug(" RFC 2965 cookies are switched off")
+ return False
+ elif cookie.version == 0 and not self.netscape:
+ debug(" Netscape cookies are switched off")
+ return False
+ return True
+
+ def set_ok_verifiability(self, cookie, request):
+ if request.is_unverifiable() and is_third_party(request):
+ if cookie.version > 0 and self.strict_rfc2965_unverifiable:
+ debug(" third-party RFC 2965 cookie during "
+ "unverifiable transaction")
+ return False
+ elif cookie.version == 0 and self.strict_ns_unverifiable:
+ debug(" third-party Netscape cookie during "
+ "unverifiable transaction")
+ return False
+ return True
+
+ def set_ok_name(self, cookie, request):
+ # Try and stop servers setting V0 cookies designed to hack other
+ # servers that know both V0 and V1 protocols.
+ if (cookie.version == 0 and self.strict_ns_set_initial_dollar and
+ cookie.name.startswith("$")):
+ debug(" illegal name (starts with '$'): '%s'", cookie.name)
+ return False
+ return True
+
+ def set_ok_path(self, cookie, request):
+ if cookie.path_specified:
+ req_path = request_path(request)
+ if ((cookie.version > 0 or
+ (cookie.version == 0 and self.strict_ns_set_path)) and
+ not req_path.startswith(cookie.path)):
+ debug(" path attribute %s is not a prefix of request "
+ "path %s", cookie.path, req_path)
+ return False
+ return True
+
+ def set_ok_domain(self, cookie, request):
+ if self.is_blocked(cookie.domain):
+ debug(" domain %s is in user block-list", cookie.domain)
+ return False
+ if self.is_not_allowed(cookie.domain):
+ debug(" domain %s is not in user allow-list", cookie.domain)
+ return False
+ if cookie.domain_specified:
+ req_host, erhn = eff_request_host(request)
+ domain = cookie.domain
+ if self.strict_domain and (domain.count(".") >= 2):
+ i = domain.rfind(".")
+ j = domain.rfind(".", 0, i)
+ if j == 0: # domain like .foo.bar
+ tld = domain[i+1:]
+ sld = domain[j+1:i]
+ if (sld.lower() in [
+ "co", "ac",
+ "com", "edu", "org", "net", "gov", "mil", "int"] and
+ len(tld) == 2):
+ # domain like .co.uk
+ debug(" country-code second level domain %s", domain)
+ return False
+ if domain.startswith("."):
+ undotted_domain = domain[1:]
+ else:
+ undotted_domain = domain
+ embedded_dots = (undotted_domain.find(".") >= 0)
+ if not embedded_dots and domain != ".local":
+ debug(" non-local domain %s contains no embedded dot",
+ domain)
+ return False
+ if cookie.version == 0:
+ if (not erhn.endswith(domain) and
+ (not erhn.startswith(".") and
+ not ("."+erhn).endswith(domain))):
+ debug(" effective request-host %s (even with added "
+ "initial dot) does not end end with %s",
+ erhn, domain)
+ return False
+ if (cookie.version > 0 or
+ (self.strict_ns_domain & self.DomainRFC2965Match)):
+ if not domain_match(erhn, domain):
+ debug(" effective request-host %s does not domain-match "
+ "%s", erhn, domain)
+ return False
+ if (cookie.version > 0 or
+ (self.strict_ns_domain & self.DomainStrictNoDots)):
+ host_prefix = req_host[:-len(domain)]
+ if (host_prefix.find(".") >= 0 and
+ not IPV4_RE.search(req_host)):
+ debug(" host prefix %s for domain %s contains a dot",
+ host_prefix, domain)
+ return False
+ return True
+
+ def set_ok_port(self, cookie, request):
+ if cookie.port_specified:
+ req_port = request_port(request)
+ if req_port is None:
+ req_port = "80"
+ else:
+ req_port = str(req_port)
+ for p in cookie.port.split(","):
+ try:
+ int(p)
+ except ValueError:
+ debug(" bad port %s (not numeric)", p)
+ return False
+ if p == req_port:
+ break
+ else:
+ debug(" request port (%s) not found in %s",
+ req_port, cookie.port)
+ return False
+ return True
+
+ def return_ok(self, cookie, request):
+ """
+ If you override .return_ok(), be sure to call this method. If it
+ returns false, so should your subclass (assuming your subclass wants to
+ be more strict about which cookies to return).
+
+ """
+ # Path has already been checked by .path_return_ok(), and domain
+ # blocking done by .domain_return_ok().
+ debug(" - checking cookie %s=%s", cookie.name, cookie.value)
+
+ for n in "version", "verifiability", "secure", "expires", "port", "domain":
+ fn_name = "return_ok_"+n
+ fn = getattr(self, fn_name)
+ if not fn(cookie, request):
+ return False
+ return True
+
+ def return_ok_version(self, cookie, request):
+ if cookie.version > 0 and not self.rfc2965:
+ debug(" RFC 2965 cookies are switched off")
+ return False
+ elif cookie.version == 0 and not self.netscape:
+ debug(" Netscape cookies are switched off")
+ return False
+ return True
+
+ def return_ok_verifiability(self, cookie, request):
+ if request.is_unverifiable() and is_third_party(request):
+ if cookie.version > 0 and self.strict_rfc2965_unverifiable:
+ debug(" third-party RFC 2965 cookie during unverifiable "
+ "transaction")
+ return False
+ elif cookie.version == 0 and self.strict_ns_unverifiable:
+ debug(" third-party Netscape cookie during unverifiable "
+ "transaction")
+ return False
+ return True
+
+ def return_ok_secure(self, cookie, request):
+ if cookie.secure and request.get_type() != "https":
+ debug(" secure cookie with non-secure request")
+ return False
+ return True
+
+ def return_ok_expires(self, cookie, request):
+ if cookie.is_expired(self._now):
+ debug(" cookie expired")
+ return False
+ return True
+
+ def return_ok_port(self, cookie, request):
+ if cookie.port:
+ req_port = request_port(request)
+ if req_port is None:
+ req_port = "80"
+ for p in cookie.port.split(","):
+ if p == req_port:
+ break
+ else:
+ debug(" request port %s does not match cookie port %s",
+ req_port, cookie.port)
+ return False
+ return True
+
+ def return_ok_domain(self, cookie, request):
+ req_host, erhn = eff_request_host(request)
+ domain = cookie.domain
+
+ # strict check of non-domain cookies: Mozilla does this, MSIE5 doesn't
+ if (cookie.version == 0 and
+ (self.strict_ns_domain & self.DomainStrictNonDomain) and
+ not cookie.domain_specified and domain != erhn):
+ debug(" cookie with unspecified domain does not string-compare "
+ "equal to request domain")
+ return False
+
+ if cookie.version > 0 and not domain_match(erhn, domain):
+ debug(" effective request-host name %s does not domain-match "
+ "RFC 2965 cookie domain %s", erhn, domain)
+ return False
+ if cookie.version == 0 and not ("."+erhn).endswith(domain):
+ debug(" request-host %s does not match Netscape cookie domain "
+ "%s", req_host, domain)
+ return False
+ return True
+
+ def domain_return_ok(self, domain, request):
+ # Liberal check of. This is here as an optimization to avoid
+ # having to load lots of MSIE cookie files unless necessary.
+ req_host, erhn = eff_request_host(request)
+ if not req_host.startswith("."):
+ dotted_req_host = "."+req_host
+ if not erhn.startswith("."):
+ dotted_erhn = "."+erhn
+ if not (dotted_req_host.endswith(domain) or
+ dotted_erhn.endswith(domain)):
+ #debug(" request domain %s does not match cookie domain %s",
+ # req_host, domain)
+ return False
+
+ if self.is_blocked(domain):
+ debug(" domain %s is in user block-list", domain)
+ return False
+ if self.is_not_allowed(domain):
+ debug(" domain %s is not in user allow-list", domain)
+ return False
+
+ return True
+
+ def path_return_ok(self, path, request):
+ debug("- checking cookie path=%s", path)
+ req_path = request_path(request)
+ if not req_path.startswith(path):
+ debug(" %s does not path-match %s", req_path, path)
+ return False
+ return True
+
+
+def vals_sorted_by_key(adict):
+ keys = adict.keys()
+ keys.sort()
+ return map(adict.get, keys)
+
+def deepvalues(mapping):
+ """Iterates over nested mapping, depth-first, in sorted order by key."""
+ values = vals_sorted_by_key(mapping)
+ for obj in values:
+ mapping = False
+ try:
+ obj.items
+ except AttributeError:
+ pass
+ else:
+ mapping = True
+ for subobj in deepvalues(obj):
+ yield subobj
+ if not mapping:
+ yield obj
+
+
+# Used as second parameter to dict.get() method, to distinguish absent
+# dict key from one with a None value.
+class Absent: pass
+
+class CookieJar:
+ """Collection of HTTP cookies.
+
+ You may not need to know about this class: try
+ urllib2.build_opener(HTTPCookieProcessor).open(url).
+
+ """
+
+ non_word_re = re.compile(r"\W")
+ quote_re = re.compile(r"([\"\\])")
+ strict_domain_re = re.compile(r"\.?[^.]*")
+ domain_re = re.compile(r"[^.]*")
+ dots_re = re.compile(r"^\.+")
+
+ magic_re = r"^\#LWP-Cookies-(\d+\.\d+)"
+
+ def __init__(self, policy=None):
+ if policy is None:
+ policy = DefaultCookiePolicy()
+ self._policy = policy
+
+ self._cookies_lock = _threading.RLock()
+ self._cookies = {}
+
+ def set_policy(self, policy):
+ self._policy = policy
+
+ def _cookies_for_domain(self, domain, request):
+ cookies = []
+ if not self._policy.domain_return_ok(domain, request):
+ return []
+ debug("Checking %s for cookies to return", domain)
+ cookies_by_path = self._cookies[domain]
+ for path in cookies_by_path.keys():
+ if not self._policy.path_return_ok(path, request):
+ continue
+ cookies_by_name = cookies_by_path[path]
+ for cookie in cookies_by_name.values():
+ if not self._policy.return_ok(cookie, request):
+ debug(" not returning cookie")
+ continue
+ debug(" it's a match")
+ cookies.append(cookie)
+ return cookies
+
+ def _cookies_for_request(self, request):
+ """Return a list of cookies to be returned to server."""
+ cookies = []
+ for domain in self._cookies.keys():
+ cookies.extend(self._cookies_for_domain(domain, request))
+ return cookies
+
+ def _cookie_attrs(self, cookies):
+ """Return a list of cookie-attributes to be returned to server.
+
+ like ['foo="bar"; $Path="/"', ...]
+
+ The $Version attribute is also added when appropriate (currently only
+ once per request).
+
+ """
+ # add cookies in order of most specific (ie. longest) path first
+ def decreasing_size(a, b): return cmp(len(b.path), len(a.path))
+ cookies.sort(decreasing_size)
+
+ version_set = False
+
+ attrs = []
+ for cookie in cookies:
+ # set version of Cookie header
+ # XXX
+ # What should it be if multiple matching Set-Cookie headers have
+ # different versions themselves?
+ # Answer: there is no answer; was supposed to be settled by
+ # RFC 2965 errata, but that may never appear...
+ version = cookie.version
+ if not version_set:
+ version_set = True
+ if version > 0:
+ attrs.append("$Version=%s" % version)
+
+ # quote cookie value if necessary
+ # (not for Netscape protocol, which already has any quotes
+ # intact, due to the poorly-specified Netscape Cookie: syntax)
+ if ((cookie.value is not None) and
+ self.non_word_re.search(cookie.value) and version > 0):
+ value = self.quote_re.sub(r"\\\1", cookie.value)
+ else:
+ value = cookie.value
+
+ # add cookie-attributes to be returned in Cookie header
+ if cookie.value is None:
+ attrs.append(cookie.name)
+ else:
+ attrs.append("%s=%s" % (cookie.name, value))
+ if version > 0:
+ if cookie.path_specified:
+ attrs.append('$Path="%s"' % cookie.path)
+ if cookie.domain.startswith("."):
+ domain = cookie.domain
+ if (not cookie.domain_initial_dot and
+ domain.startswith(".")):
+ domain = domain[1:]
+ attrs.append('$Domain="%s"' % domain)
+ if cookie.port is not None:
+ p = "$Port"
+ if cookie.port_specified:
+ p = p + ('="%s"' % cookie.port)
+ attrs.append(p)
+
+ return attrs
+
+ def add_cookie_header(self, request):
+ """Add correct Cookie: header to request (urllib2.Request object).
+
+ The Cookie2 header is also added unless policy.hide_cookie2 is true.
+
+ """
+ debug("add_cookie_header")
+ self._cookies_lock.acquire()
+
+ self._policy._now = self._now = int(time.time())
+
+ req_host, erhn = eff_request_host(request)
+ strict_non_domain = (
+ self._policy.strict_ns_domain & self._policy.DomainStrictNonDomain)
+
+ cookies = self._cookies_for_request(request)
+
+ attrs = self._cookie_attrs(cookies)
+ if attrs:
+ if not request.has_header("Cookie"):
+ request.add_unredirected_header(
+ "Cookie", "; ".join(attrs))
+
+ # if necessary, advertise that we know RFC 2965
+ if (self._policy.rfc2965 and not self._policy.hide_cookie2 and
+ not request.has_header("Cookie2")):
+ for cookie in cookies:
+ if cookie.version != 1:
+ request.add_unredirected_header("Cookie2", '$Version="1"')
+ break
+
+ self._cookies_lock.release()
+
+ self.clear_expired_cookies()
+
+ def _normalized_cookie_tuples(self, attrs_set):
+ """Return list of tuples containing normalised cookie information.
+
+ attrs_set is the list of lists of key,value pairs extracted from
+ the Set-Cookie or Set-Cookie2 headers.
+
+ Tuples are name, value, standard, rest, where name and value are the
+ cookie name and value, standard is a dictionary containing the standard
+ cookie-attributes (discard, secure, version, expires or max-age,
+ domain, path and port) and rest is a dictionary containing the rest of
+ the cookie-attributes.
+
+ """
+ cookie_tuples = []
+
+ boolean_attrs = "discard", "secure"
+ value_attrs = ("version",
+ "expires", "max-age",
+ "domain", "path", "port",
+ "comment", "commenturl")
+
+ for cookie_attrs in attrs_set:
+ name, value = cookie_attrs[0]
+
+ # Build dictionary of standard cookie-attributes (standard) and
+ # dictionary of other cookie-attributes (rest).
+
+ # Note: expiry time is normalised to seconds since epoch. V0
+ # cookies should have the Expires cookie-attribute, and V1 cookies
+ # should have Max-Age, but since V1 includes RFC 2109 cookies (and
+ # since V0 cookies may be a mish-mash of Netscape and RFC 2109), we
+ # accept either (but prefer Max-Age).
+ max_age_set = False
+
+ bad_cookie = False
+
+ standard = {}
+ rest = {}
+ for k, v in cookie_attrs[1:]:
+ lc = k.lower()
+ # don't lose case distinction for unknown fields
+ if lc in value_attrs or lc in boolean_attrs:
+ k = lc
+ if k in boolean_attrs and v is None:
+ # boolean cookie-attribute is present, but has no value
+ # (like "discard", rather than "port=80")
+ v = True
+ if k in standard:
+ # only first value is significant
+ continue
+ if k == "domain":
+ if v is None:
+ debug(" missing value for domain attribute")
+ bad_cookie = True
+ break
+ # RFC 2965 section 3.3.3
+ v = v.lower()
+ if k == "expires":
+ if max_age_set:
+ # Prefer max-age to expires (like Mozilla)
+ continue
+ if v is None:
+ debug(" missing or invalid value for expires "
+ "attribute: treating as session cookie")
+ continue
+ if k == "max-age":
+ max_age_set = True
+ try:
+ v = int(v)
+ except ValueError:
+ debug(" missing or invalid (non-numeric) value for "
+ "max-age attribute")
+ bad_cookie = True
+ break
+ # convert RFC 2965 Max-Age to seconds since epoch
+ # XXX Strictly you're supposed to follow RFC 2616
+ # age-calculation rules. Remember that zero Max-Age is a
+ # is a request to discard (old and new) cookie, though.
+ k = "expires"
+ v = self._now + v
+ if (k in value_attrs) or (k in boolean_attrs):
+ if (v is None and
+ k not in ["port", "comment", "commenturl"]):
+ debug(" missing value for %s attribute" % k)
+ bad_cookie = True
+ break
+ standard[k] = v
+ else:
+ rest[k] = v
+
+ if bad_cookie:
+ continue
+
+ cookie_tuples.append((name, value, standard, rest))
+
+ return cookie_tuples
+
+ def _cookie_from_cookie_tuple(self, tup, request):
+ # standard is dict of standard cookie-attributes, rest is dict of the
+ # rest of them
+ name, value, standard, rest = tup
+
+ domain = standard.get("domain", Absent)
+ path = standard.get("path", Absent)
+ port = standard.get("port", Absent)
+ expires = standard.get("expires", Absent)
+
+ # set the easy defaults
+ version = standard.get("version", None)
+ if version is not None: version = int(version)
+ secure = standard.get("secure", False)
+ # (discard is also set if expires is Absent)
+ discard = standard.get("discard", False)
+ comment = standard.get("comment", None)
+ comment_url = standard.get("commenturl", None)
+
+ # set default path
+ if path is not Absent and path != "":
+ path_specified = True
+ path = escape_path(path)
+ else:
+ path_specified = False
+ path = request_path(request)
+ i = path.rfind("/")
+ if i != -1:
+ if version == 0:
+ # Netscape spec parts company from reality here
+ path = path[:i]
+ else:
+ path = path[:i+1]
+ if len(path) == 0: path = "/"
+
+ # set default domain
+ domain_specified = domain is not Absent
+ # but first we have to remember whether it starts with a dot
+ domain_initial_dot = False
+ if domain_specified:
+ domain_initial_dot = bool(domain.startswith("."))
+ if domain is Absent:
+ req_host, erhn = eff_request_host(request)
+ domain = erhn
+ elif not domain.startswith("."):
+ domain = "."+domain
+
+ # set default port
+ port_specified = False
+ if port is not Absent:
+ if port is None:
+ # Port attr present, but has no value: default to request port.
+ # Cookie should then only be sent back on that port.
+ port = request_port(request)
+ else:
+ port_specified = True
+ port = re.sub(r"\s+", "", port)
+ else:
+ # No port attr present. Cookie can be sent back on any port.
+ port = None
+
+ # set default expires and discard
+ if expires is Absent:
+ expires = None
+ discard = True
+ elif expires <= self._now:
+ # Expiry date in past is request to delete cookie. This can't be
+ # in DefaultCookiePolicy, because can't delete cookies there.
+ try:
+ self.clear(domain, path, name)
+ except KeyError:
+ pass
+ debug("Expiring cookie, domain='%s', path='%s', name='%s'",
+ domain, path, name)
+ return None
+
+ return Cookie(version,
+ name, value,
+ port, port_specified,
+ domain, domain_specified, domain_initial_dot,
+ path, path_specified,
+ secure,
+ expires,
+ discard,
+ comment,
+ comment_url,
+ rest)
+
+ def _cookies_from_attrs_set(self, attrs_set, request):
+ cookie_tuples = self._normalized_cookie_tuples(attrs_set)
+
+ cookies = []
+ for tup in cookie_tuples:
+ cookie = self._cookie_from_cookie_tuple(tup, request)
+ if cookie: cookies.append(cookie)
+ return cookies
+
+ def make_cookies(self, response, request):
+ """Return sequence of Cookie objects extracted from response object."""
+ # get cookie-attributes for RFC 2965 and Netscape protocols
+ headers = response.info()
+ rfc2965_hdrs = headers.getheaders("Set-Cookie2")
+ ns_hdrs = headers.getheaders("Set-Cookie")
+
+ rfc2965 = self._policy.rfc2965
+ netscape = self._policy.netscape
+
+ if ((not rfc2965_hdrs and not ns_hdrs) or
+ (not ns_hdrs and not rfc2965) or
+ (not rfc2965_hdrs and not netscape) or
+ (not netscape and not rfc2965)):
+ return [] # no relevant cookie headers: quick exit
+
+ try:
+ cookies = self._cookies_from_attrs_set(
+ split_header_words(rfc2965_hdrs), request)
+ except:
+ reraise_unmasked_exceptions()
+ cookies = []
+
+ if ns_hdrs and netscape:
+ try:
+ ns_cookies = self._cookies_from_attrs_set(
+ parse_ns_headers(ns_hdrs), request)
+ except:
+ reraise_unmasked_exceptions()
+ ns_cookies = []
+
+ # Look for Netscape cookies (from Set-Cookie headers) that match
+ # corresponding RFC 2965 cookies (from Set-Cookie2 headers).
+ # For each match, keep the RFC 2965 cookie and ignore the Netscape
+ # cookie (RFC 2965 section 9.1). Actually, RFC 2109 cookies are
+ # bundled in with the Netscape cookies for this purpose, which is
+ # reasonable behaviour.
+ if rfc2965:
+ lookup = {}
+ for cookie in cookies:
+ lookup[(cookie.domain, cookie.path, cookie.name)] = None
+
+ def no_matching_rfc2965(ns_cookie, lookup=lookup):
+ key = ns_cookie.domain, ns_cookie.path, ns_cookie.name
+ return key not in lookup
+ ns_cookies = filter(no_matching_rfc2965, ns_cookies)
+
+ if ns_cookies:
+ cookies.extend(ns_cookies)
+
+ return cookies
+
+ def set_cookie_if_ok(self, cookie, request):
+ """Set a cookie if policy says it's OK to do so."""
+ self._cookies_lock.acquire()
+ self._policy._now = self._now = int(time.time())
+
+ if self._policy.set_ok(cookie, request):
+ self.set_cookie(cookie)
+
+ self._cookies_lock.release()
+
+ def set_cookie(self, cookie):
+ """Set a cookie, without checking whether or not it should be set."""
+ c = self._cookies
+ self._cookies_lock.acquire()
+ try:
+ if cookie.domain not in c: c[cookie.domain] = {}
+ c2 = c[cookie.domain]
+ if cookie.path not in c2: c2[cookie.path] = {}
+ c3 = c2[cookie.path]
+ c3[cookie.name] = cookie
+ finally:
+ self._cookies_lock.release()
+
+ def extract_cookies(self, response, request):
+ """Extract cookies from response, where allowable given the request."""
+ debug("extract_cookies: %s", response.info())
+ self._cookies_lock.acquire()
+ self._policy._now = self._now = int(time.time())
+
+ for cookie in self.make_cookies(response, request):
+ if self._policy.set_ok(cookie, request):
+ debug(" setting cookie: %s", cookie)
+ self.set_cookie(cookie)
+ self._cookies_lock.release()
+
+ def clear(self, domain=None, path=None, name=None):
+ """Clear some cookies.
+
+ Invoking this method without arguments will clear all cookies. If
+ given a single argument, only cookies belonging to that domain will be
+ removed. If given two arguments, cookies belonging to the specified
+ path within that domain are removed. If given three arguments, then
+ the cookie with the specified name, path and domain is removed.
+
+ Raises KeyError if no matching cookie exists.
+
+ """
+ if name is not None:
+ if (domain is None) or (path is None):
+ raise ValueError(
+ "domain and path must be given to remove a cookie by name")
+ del self._cookies[domain][path][name]
+ elif path is not None:
+ if domain is None:
+ raise ValueError(
+ "domain must be given to remove cookies by path")
+ del self._cookies[domain][path]
+ elif domain is not None:
+ del self._cookies[domain]
+ else:
+ self._cookies = {}
+
+ def clear_session_cookies(self):
+ """Discard all session cookies.
+
+ Note that the .save() method won't save session cookies anyway, unless
+ you ask otherwise by passing a true ignore_discard argument.
+
+ """
+ self._cookies_lock.acquire()
+ for cookie in self:
+ if cookie.discard:
+ self.clear(cookie.domain, cookie.path, cookie.name)
+ self._cookies_lock.release()
+
+ def clear_expired_cookies(self):
+ """Discard all expired cookies.
+
+ You probably don't need to call this method: expired cookies are never
+ sent back to the server (provided you're using DefaultCookiePolicy),
+ this method is called by CookieJar itself every so often, and the
+ .save() method won't save expired cookies anyway (unless you ask
+ otherwise by passing a true ignore_expires argument).
+
+ """
+ self._cookies_lock.acquire()
+ now = time.time()
+ for cookie in self:
+ if cookie.is_expired(now):
+ self.clear(cookie.domain, cookie.path, cookie.name)
+ self._cookies_lock.release()
+
+ def __iter__(self):
+ return deepvalues(self._cookies)
+
+ def __len__(self):
+ """Return number of contained cookies."""
+ i = 0
+ for cookie in self: i = i + 1
+ return i
+
+ def __repr__(self):
+ r = []
+ for cookie in self: r.append(repr(cookie))
+ return "<%s[%s]>" % (self.__class__, ", ".join(r))
+
+ def __str__(self):
+ r = []
+ for cookie in self: r.append(str(cookie))
+ return "<%s[%s]>" % (self.__class__, ", ".join(r))
+
+
+class LoadError(Exception): pass
+
+class FileCookieJar(CookieJar):
+ """CookieJar that can be loaded from and saved to a file."""
+
+ def __init__(self, filename=None, delayload=False, policy=None):
+ """
+ Cookies are NOT loaded from the named file until either the .load() or
+ .revert() method is called.
+
+ """
+ CookieJar.__init__(self, policy)
+ if filename is not None:
+ try:
+ filename+""
+ except:
+ raise ValueError("filename must be string-like")
+ self.filename = filename
+ self.delayload = bool(delayload)
+
+ def save(self, filename=None, ignore_discard=False, ignore_expires=False):
+ """Save cookies to a file."""
+ raise NotImplementedError()
+
+ def load(self, filename=None, ignore_discard=False, ignore_expires=False):
+ """Load cookies from a file."""
+ if filename is None:
+ if self.filename is not None: filename = self.filename
+ else: raise ValueError(MISSING_FILENAME_TEXT)
+
+ f = open(filename)
+ try:
+ self._really_load(f, filename, ignore_discard, ignore_expires)
+ finally:
+ f.close()
+
+ def revert(self, filename=None,
+ ignore_discard=False, ignore_expires=False):
+ """Clear all cookies and reload cookies from a saved file.
+
+ Raises LoadError (or IOError) if reversion is not successful; the
+ object's state will not be altered if this happens.
+
+ """
+ if filename is None:
+ if self.filename is not None: filename = self.filename
+ else: raise ValueError(MISSING_FILENAME_TEXT)
+
+ self._cookies_lock.acquire()
+
+ old_state = copy.deepcopy(self._cookies)
+ self._cookies = {}
+ try:
+ self.load(filename, ignore_discard, ignore_expires)
+ except (LoadError, IOError):
+ self._cookies = old_state
+ raise
+
+ self._cookies_lock.release()
+
+from _LWPCookieJar import LWPCookieJar, lwp_cookie_str
+from _MozillaCookieJar import MozillaCookieJar
diff --git a/Lib/test/test_cookielib.py b/Lib/test/test_cookielib.py
new file mode 100644
index 0000000..72c9fc8
--- /dev/null
+++ b/Lib/test/test_cookielib.py
@@ -0,0 +1,1620 @@
+# -*- coding: utf-8 -*-
+"""Tests for cookielib.py."""
+
+import re, os, time
+from unittest import TestCase
+
+from test import test_support
+
+class DateTimeTests(TestCase):
+
+ def test_time2isoz(self):
+ from cookielib import time2isoz
+
+ base = 1019227000
+ day = 24*3600
+ self.assertEquals(time2isoz(base), "2002-04-19 14:36:40Z")
+ self.assertEquals(time2isoz(base+day), "2002-04-20 14:36:40Z")
+ self.assertEquals(time2isoz(base+2*day), "2002-04-21 14:36:40Z")
+ self.assertEquals(time2isoz(base+3*day), "2002-04-22 14:36:40Z")
+
+ az = time2isoz()
+ bz = time2isoz(500000)
+ for text in (az, bz):
+ self.assert_(re.search(r"^\d{4}-\d\d-\d\d \d\d:\d\d:\d\dZ$", text),
+ "bad time2isoz format: %s %s" % (az, bz))
+
+ def test_http2time(self):
+ from cookielib import http2time
+
+ def parse_date(text):
+ return time.gmtime(http2time(text))[:6]
+
+ self.assertEquals(parse_date("01 Jan 2001"), (2001, 1, 1, 0, 0, 0.0))
+
+ # this test will break around year 2070
+ self.assertEquals(parse_date("03-Feb-20"), (2020, 2, 3, 0, 0, 0.0))
+
+ # this test will break around year 2048
+ self.assertEquals(parse_date("03-Feb-98"), (1998, 2, 3, 0, 0, 0.0))
+
+ def test_http2time_formats(self):
+ from cookielib import http2time, time2isoz
+
+ # test http2time for supported dates. Test cases with 2 digit year
+ # will probably break in year 2044.
+ tests = [
+ 'Thu, 03 Feb 1994 00:00:00 GMT', # proposed new HTTP format
+ 'Thursday, 03-Feb-94 00:00:00 GMT', # old rfc850 HTTP format
+ 'Thursday, 03-Feb-1994 00:00:00 GMT', # broken rfc850 HTTP format
+
+ '03 Feb 1994 00:00:00 GMT', # HTTP format (no weekday)
+ '03-Feb-94 00:00:00 GMT', # old rfc850 (no weekday)
+ '03-Feb-1994 00:00:00 GMT', # broken rfc850 (no weekday)
+ '03-Feb-1994 00:00 GMT', # broken rfc850 (no weekday, no seconds)
+ '03-Feb-1994 00:00', # broken rfc850 (no weekday, no seconds, no tz)
+
+ '03-Feb-94', # old rfc850 HTTP format (no weekday, no time)
+ '03-Feb-1994', # broken rfc850 HTTP format (no weekday, no time)
+ '03 Feb 1994', # proposed new HTTP format (no weekday, no time)
+
+ # A few tests with extra space at various places
+ ' 03 Feb 1994 0:00 ',
+ ' 03-Feb-1994 ',
+ ]
+
+ test_t = 760233600 # assume broken POSIX counting of seconds
+ result = time2isoz(test_t)
+ expected = "1994-02-03 00:00:00Z"
+ self.assertEquals(result, expected,
+ "%s => '%s' (%s)" % (test_t, result, expected))
+
+ for s in tests:
+ t = http2time(s)
+ t2 = http2time(s.lower())
+ t3 = http2time(s.upper())
+
+ self.assert_(t == t2 == t3 == test_t,
+ "'%s' => %s, %s, %s (%s)" % (s, t, t2, t3, test_t))
+
+ def test_http2time_garbage(self):
+ from cookielib import http2time
+
+ for test in [
+ '',
+ 'Garbage',
+ 'Mandag 16. September 1996',
+ '01-00-1980',
+ '01-13-1980',
+ '00-01-1980',
+ '32-01-1980',
+ '01-01-1980 25:00:00',
+ '01-01-1980 00:61:00',
+ '01-01-1980 00:00:62',
+ ]:
+ self.assert_(http2time(test) is None,
+ "http2time(%s) is not None\n"
+ "http2time(test) %s" % (test, http2time(test))
+ )
+
+
+class HeaderTests(TestCase):
+ def test_parse_ns_headers(self):
+ from cookielib import parse_ns_headers
+
+ # quotes should be stripped
+ expected = [[('expires', 2209069412L), ('version', '0')]]
+ for hdr in [
+ 'expires=01 Jan 2040 22:23:32 GMT',
+ 'expires="01 Jan 2040 22:23:32 GMT"',
+ ]:
+ self.assertEquals(parse_ns_headers([hdr]), expected)
+
+ def test_join_header_words(self):
+ from cookielib import join_header_words
+
+ joined = join_header_words([[("foo", None), ("bar", "baz")]])
+ self.assertEquals(joined, "foo; bar=baz")
+
+ self.assertEquals(join_header_words([[]]), "")
+
+ def test_split_header_words(self):
+ from cookielib import split_header_words
+
+ tests = [
+ ("foo", [[("foo", None)]]),
+ ("foo=bar", [[("foo", "bar")]]),
+ (" foo ", [[("foo", None)]]),
+ (" foo= ", [[("foo", "")]]),
+ (" foo=", [[("foo", "")]]),
+ (" foo= ; ", [[("foo", "")]]),
+ (" foo= ; bar= baz ", [[("foo", ""), ("bar", "baz")]]),
+ ("foo=bar bar=baz", [[("foo", "bar"), ("bar", "baz")]]),
+ # doesn't really matter if this next fails, but it works ATM
+ ("foo= bar=baz", [[("foo", "bar=baz")]]),
+ ("foo=bar;bar=baz", [[("foo", "bar"), ("bar", "baz")]]),
+ ('foo bar baz', [[("foo", None), ("bar", None), ("baz", None)]]),
+ ("a, b, c", [[("a", None)], [("b", None)], [("c", None)]]),
+ (r'foo; bar=baz, spam=, foo="\,\;\"", bar= ',
+ [[("foo", None), ("bar", "baz")],
+ [("spam", "")], [("foo", ',;"')], [("bar", "")]]),
+ ]
+
+ for arg, expect in tests:
+ try:
+ result = split_header_words([arg])
+ except:
+ import traceback, StringIO
+ f = StringIO.StringIO()
+ traceback.print_exc(None, f)
+ result = "(error -- traceback follows)\n\n%s" % f.getvalue()
+ self.assertEquals(result, expect, """
+When parsing: '%s'
+Expected: '%s'
+Got: '%s'
+""" % (arg, expect, result))
+
+ def test_roundtrip(self):
+ from cookielib import split_header_words, join_header_words
+
+ tests = [
+ ("foo", "foo"),
+ ("foo=bar", "foo=bar"),
+ (" foo ", "foo"),
+ ("foo=", 'foo=""'),
+ ("foo=bar bar=baz", "foo=bar; bar=baz"),
+ ("foo=bar;bar=baz", "foo=bar; bar=baz"),
+ ('foo bar baz', "foo; bar; baz"),
+ (r'foo="\"" bar="\\"', r'foo="\""; bar="\\"'),
+ ('foo,,,bar', 'foo, bar'),
+ ('foo=bar,bar=baz', 'foo=bar, bar=baz'),
+
+ ('text/html; charset=iso-8859-1',
+ 'text/html; charset="iso-8859-1"'),
+
+ ('foo="bar"; port="80,81"; discard, bar=baz',
+ 'foo=bar; port="80,81"; discard, bar=baz'),
+
+ (r'Basic realm="\"foo\\\\bar\""',
+ r'Basic; realm="\"foo\\\\bar\""')
+ ]
+
+ for arg, expect in tests:
+ input = split_header_words([arg])
+ res = join_header_words(input)
+ self.assertEquals(res, expect, """
+When parsing: '%s'
+Expected: '%s'
+Got: '%s'
+Input was: '%s'
+""" % (arg, expect, res, input))
+
+
+class FakeResponse:
+ def __init__(self, headers=[], url=None):
+ """
+ headers: list of RFC822-style 'Key: value' strings
+ """
+ import mimetools, StringIO
+ f = StringIO.StringIO("\n".join(headers))
+ self._headers = mimetools.Message(f)
+ self._url = url
+ def info(self): return self._headers
+
+def interact_2965(cookiejar, url, *set_cookie_hdrs):
+ return _interact(cookiejar, url, set_cookie_hdrs, "Set-Cookie2")
+
+def interact_netscape(cookiejar, url, *set_cookie_hdrs):
+ return _interact(cookiejar, url, set_cookie_hdrs, "Set-Cookie")
+
+def _interact(cookiejar, url, set_cookie_hdrs, hdr_name):
+ """Perform a single request / response cycle, returning Cookie: header."""
+ from urllib2 import Request
+ req = Request(url)
+ cookiejar.add_cookie_header(req)
+ cookie_hdr = req.get_header("Cookie", "")
+ headers = []
+ for hdr in set_cookie_hdrs:
+ headers.append("%s: %s" % (hdr_name, hdr))
+ res = FakeResponse(headers, url)
+ cookiejar.extract_cookies(res, req)
+ return cookie_hdr
+
+
+class CookieTests(TestCase):
+ # XXX
+ # Get rid of string comparisons where not actually testing str / repr.
+ # .clear() etc.
+ # IP addresses like 50 (single number, no dot) and domain-matching
+ # functions (and is_HDN)? See draft RFC 2965 errata.
+ # Strictness switches
+ # is_third_party()
+ # unverifiability / third-party blocking
+ # Netscape cookies work the same as RFC 2965 with regard to port.
+ # Set-Cookie with negative max age.
+ # If turn RFC 2965 handling off, Set-Cookie2 cookies should not clobber
+ # Set-Cookie cookies.
+ # Cookie2 should be sent if *any* cookies are not V1 (ie. V0 OR V2 etc.).
+ # Cookies (V1 and V0) with no expiry date should be set to be discarded.
+ # RFC 2965 Quoting:
+ # Should accept unquoted cookie-attribute values? check errata draft.
+ # Which are required on the way in and out?
+ # Should always return quoted cookie-attribute values?
+ # Proper testing of when RFC 2965 clobbers Netscape (waiting for errata).
+ # Path-match on return (same for V0 and V1).
+ # RFC 2965 acceptance and returning rules
+ # Set-Cookie2 without version attribute is rejected.
+
+ # Netscape peculiarities list from Ronald Tschalar.
+ # The first two still need tests, the rest are covered.
+## - Quoting: only quotes around the expires value are recognized as such
+## (and yes, some folks quote the expires value); quotes around any other
+## value are treated as part of the value.
+## - White space: white space around names and values is ignored
+## - Default path: if no path parameter is given, the path defaults to the
+## path in the request-uri up to, but not including, the last '/'. Note
+## that this is entirely different from what the spec says.
+## - Commas and other delimiters: Netscape just parses until the next ';'.
+## This means it will allow commas etc inside values (and yes, both
+## commas and equals are commonly appear in the cookie value). This also
+## means that if you fold multiple Set-Cookie header fields into one,
+## comma-separated list, it'll be a headache to parse (at least my head
+## starts hurting everytime I think of that code).
+## - Expires: You'll get all sorts of date formats in the expires,
+## including emtpy expires attributes ("expires="). Be as flexible as you
+## can, and certainly don't expect the weekday to be there; if you can't
+## parse it, just ignore it and pretend it's a session cookie.
+## - Domain-matching: Netscape uses the 2-dot rule for _all_ domains, not
+## just the 7 special TLD's listed in their spec. And folks rely on
+## that...
+
+ def test_domain_return_ok(self):
+ # test optimization: .domain_return_ok() should filter out most
+ # domains in the CookieJar before we try to access them (because that
+ # may require disk access -- in particular, with MSIECookieJar)
+ # This is only a rough check for performance reasons, so it's not too
+ # critical as long as it's sufficiently liberal.
+ import cookielib, urllib2
+ pol = cookielib.DefaultCookiePolicy()
+ for url, domain, ok in [
+ ("http://foo.bar.com/", "blah.com", False),
+ ("http://foo.bar.com/", "rhubarb.blah.com", False),
+ ("http://foo.bar.com/", "rhubarb.foo.bar.com", False),
+ ("http://foo.bar.com/", ".foo.bar.com", True),
+ ("http://foo.bar.com/", "foo.bar.com", True),
+ ("http://foo.bar.com/", ".bar.com", True),
+ ("http://foo.bar.com/", "com", True),
+ ("http://foo.com/", "rhubarb.foo.com", False),
+ ("http://foo.com/", ".foo.com", True),
+ ("http://foo.com/", "foo.com", True),
+ ("http://foo.com/", "com", True),
+ ("http://foo/", "rhubarb.foo", False),
+ ("http://foo/", ".foo", True),
+ ("http://foo/", "foo", True),
+ ("http://foo/", "foo.local", True),
+ ("http://foo/", ".local", True),
+ ]:
+ request = urllib2.Request(url)
+ r = pol.domain_return_ok(domain, request)
+ if ok: self.assert_(r)
+ else: self.assert_(not r)
+
+ def test_missing_value(self):
+ from cookielib import MozillaCookieJar, lwp_cookie_str
+
+ # missing = sign in Cookie: header is regarded by Mozilla as a missing
+ # name, and by cookielib as a missing value
+ filename = test_support.TESTFN
+ c = MozillaCookieJar(filename)
+ interact_netscape(c, "http://www.acme.com/", 'eggs')
+ interact_netscape(c, "http://www.acme.com/", '"spam"; path=/foo/')
+ cookie = c._cookies["www.acme.com"]["/"]["eggs"]
+ self.assert_(cookie.value is None)
+ self.assertEquals(cookie.name, "eggs")
+ cookie = c._cookies["www.acme.com"]['/foo/']['"spam"']
+ self.assert_(cookie.value is None)
+ self.assertEquals(cookie.name, '"spam"')
+ self.assertEquals(lwp_cookie_str(cookie), (
+ r'"spam"; path="/foo/"; domain="www.acme.com"; '
+ 'path_spec; discard; version=0'))
+ old_str = repr(c)
+ c.save(ignore_expires=True, ignore_discard=True)
+ try:
+ c = MozillaCookieJar(filename)
+ c.revert(ignore_expires=True, ignore_discard=True)
+ finally:
+ os.unlink(c.filename)
+ # cookies unchanged apart from lost info re. whether path was specified
+ self.assertEquals(
+ repr(c),
+ re.sub("path_specified=%s" % True, "path_specified=%s" % False,
+ old_str)
+ )
+ self.assertEquals(interact_netscape(c, "http://www.acme.com/foo/"),
+ '"spam"; eggs')
+
+ def test_ns_parser(self):
+ from cookielib import CookieJar, DEFAULT_HTTP_PORT
+
+ c = CookieJar()
+ interact_netscape(c, "http://www.acme.com/",
+ 'spam=eggs; DoMain=.acme.com; port; blArgh="feep"')
+ interact_netscape(c, "http://www.acme.com/", 'ni=ni; port=80,8080')
+ interact_netscape(c, "http://www.acme.com:80/", 'nini=ni')
+ interact_netscape(c, "http://www.acme.com:80/", 'foo=bar; expires=')
+ interact_netscape(c, "http://www.acme.com:80/", 'spam=eggs; '
+ 'expires="Foo Bar 25 33:22:11 3022"')
+
+ cookie = c._cookies[".acme.com"]["/"]["spam"]
+ self.assertEquals(cookie.domain, ".acme.com")
+ self.assert_(cookie.domain_specified)
+ self.assertEquals(cookie.port, DEFAULT_HTTP_PORT)
+ self.assert_(not cookie.port_specified)
+ # case is preserved
+ self.assert_(cookie.has_nonstandard_attr("blArgh") and
+ not cookie.has_nonstandard_attr("blargh"))
+
+ cookie = c._cookies["www.acme.com"]["/"]["ni"]
+ self.assertEquals(cookie.domain, "www.acme.com")
+ self.assert_(not cookie.domain_specified)
+ self.assertEquals(cookie.port, "80,8080")
+ self.assert_(cookie.port_specified)
+
+ cookie = c._cookies["www.acme.com"]["/"]["nini"]
+ self.assert_(cookie.port is None)
+ self.assert_(not cookie.port_specified)
+
+ # invalid expires should not cause cookie to be dropped
+ foo = c._cookies["www.acme.com"]["/"]["foo"]
+ spam = c._cookies["www.acme.com"]["/"]["foo"]
+ self.assert_(foo.expires is None)
+ self.assert_(spam.expires is None)
+
+ def test_expires(self):
+ from cookielib import time2netscape, CookieJar
+
+ # if expires is in future, keep cookie...
+ c = CookieJar()
+ future = time2netscape(time.time()+3600)
+ interact_netscape(c, "http://www.acme.com/", 'spam="bar"; expires=%s' %
+ future)
+ self.assertEquals(len(c), 1)
+ now = time2netscape(time.time()-1)
+ # ... and if in past or present, discard it
+ interact_netscape(c, "http://www.acme.com/", 'foo="eggs"; expires=%s' %
+ now)
+ h = interact_netscape(c, "http://www.acme.com/")
+ self.assertEquals(len(c), 1)
+ self.assert_('spam="bar"' in h and "foo" not in h)
+
+ # max-age takes precedence over expires, and zero max-age is request to
+ # delete both new cookie and any old matching cookie
+ interact_netscape(c, "http://www.acme.com/", 'eggs="bar"; expires=%s' %
+ future)
+ interact_netscape(c, "http://www.acme.com/", 'bar="bar"; expires=%s' %
+ future)
+ self.assertEquals(len(c), 3)
+ interact_netscape(c, "http://www.acme.com/", 'eggs="bar"; '
+ 'expires=%s; max-age=0' % future)
+ interact_netscape(c, "http://www.acme.com/", 'bar="bar"; '
+ 'max-age=0; expires=%s' % future)
+ h = interact_netscape(c, "http://www.acme.com/")
+ self.assertEquals(len(c), 1)
+
+ # test expiry at end of session for cookies with no expires attribute
+ interact_netscape(c, "http://www.rhubarb.net/", 'whum="fizz"')
+ self.assertEquals(len(c), 2)
+ c.clear_session_cookies()
+ self.assertEquals(len(c), 1)
+ self.assert_('spam="bar"' in h)
+
+ # XXX RFC 2965 expiry rules (some apply to V0 too)
+
+ def test_default_path(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ # RFC 2965
+ pol = DefaultCookiePolicy(rfc2965=True)
+
+ c = CookieJar(pol)
+ interact_2965(c, "http://www.acme.com/", 'spam="bar"; Version="1"')
+ self.assert_("/" in c._cookies["www.acme.com"])
+
+ c = CookieJar(pol)
+ interact_2965(c, "http://www.acme.com/blah", 'eggs="bar"; Version="1"')
+ self.assert_("/" in c._cookies["www.acme.com"])
+
+ c = CookieJar(pol)
+ interact_2965(c, "http://www.acme.com/blah/rhubarb",
+ 'eggs="bar"; Version="1"')
+ self.assert_("/blah/" in c._cookies["www.acme.com"])
+
+ c = CookieJar(pol)
+ interact_2965(c, "http://www.acme.com/blah/rhubarb/",
+ 'eggs="bar"; Version="1"')
+ self.assert_("/blah/rhubarb/" in c._cookies["www.acme.com"])
+
+ # Netscape
+
+ c = CookieJar()
+ interact_netscape(c, "http://www.acme.com/", 'spam="bar"')
+ self.assert_("/" in c._cookies["www.acme.com"])
+
+ c = CookieJar()
+ interact_netscape(c, "http://www.acme.com/blah", 'eggs="bar"')
+ self.assert_("/" in c._cookies["www.acme.com"])
+
+ c = CookieJar()
+ interact_netscape(c, "http://www.acme.com/blah/rhubarb", 'eggs="bar"')
+ self.assert_("/blah" in c._cookies["www.acme.com"])
+
+ c = CookieJar()
+ interact_netscape(c, "http://www.acme.com/blah/rhubarb/", 'eggs="bar"')
+ self.assert_("/blah/rhubarb" in c._cookies["www.acme.com"])
+
+ def test_escape_path(self):
+ from cookielib import escape_path
+ cases = [
+ # quoted safe
+ ("/foo%2f/bar", "/foo%2F/bar"),
+ ("/foo%2F/bar", "/foo%2F/bar"),
+ # quoted %
+ ("/foo%%/bar", "/foo%%/bar"),
+ # quoted unsafe
+ ("/fo%19o/bar", "/fo%19o/bar"),
+ ("/fo%7do/bar", "/fo%7Do/bar"),
+ # unquoted safe
+ ("/foo/bar&", "/foo/bar&"),
+ ("/foo//bar", "/foo//bar"),
+ ("\176/foo/bar", "\176/foo/bar"),
+ # unquoted unsafe
+ ("/foo\031/bar", "/foo%19/bar"),
+ ("/\175foo/bar", "/%7Dfoo/bar"),
+ # unicode
+ (u"/foo/bar\uabcd", "/foo/bar%EA%AF%8D"), # UTF-8 encoded
+ ]
+ for arg, result in cases:
+ self.assertEquals(escape_path(arg), result)
+
+ def test_request_path(self):
+ from urllib2 import Request
+ from cookielib import request_path
+ # with parameters
+ req = Request("http://www.example.com/rheum/rhaponicum;"
+ "foo=bar;sing=song?apples=pears&spam=eggs#ni")
+ self.assertEquals(request_path(req), "/rheum/rhaponicum;"
+ "foo=bar;sing=song?apples=pears&spam=eggs#ni")
+ # without parameters
+ req = Request("http://www.example.com/rheum/rhaponicum?"
+ "apples=pears&spam=eggs#ni")
+ self.assertEquals(request_path(req), "/rheum/rhaponicum?"
+ "apples=pears&spam=eggs#ni")
+ # missing final slash
+ req = Request("http://www.example.com")
+ self.assertEquals(request_path(req), "/")
+
+ def test_request_port(self):
+ from urllib2 import Request
+ from cookielib import request_port, DEFAULT_HTTP_PORT
+ req = Request("http://www.acme.com:1234/",
+ headers={"Host": "www.acme.com:4321"})
+ self.assertEquals(request_port(req), "1234")
+ req = Request("http://www.acme.com/",
+ headers={"Host": "www.acme.com:4321"})
+ self.assertEquals(request_port(req), DEFAULT_HTTP_PORT)
+
+ def test_request_host(self):
+ from urllib2 import Request
+ from cookielib import request_host
+ # this request is illegal (RFC2616, 14.2.3)
+ req = Request("http://1.1.1.1/",
+ headers={"Host": "www.acme.com:80"})
+ # libwww-perl wants this response, but that seems wrong (RFC 2616,
+ # section 5.2, point 1., and RFC 2965 section 1, paragraph 3)
+ #self.assertEquals(request_host(req), "www.acme.com")
+ self.assertEquals(request_host(req), "1.1.1.1")
+ req = Request("http://www.acme.com/",
+ headers={"Host": "irrelevant.com"})
+ self.assertEquals(request_host(req), "www.acme.com")
+ # not actually sure this one is valid Request object, so maybe should
+ # remove test for no host in url in request_host function?
+ req = Request("/resource.html",
+ headers={"Host": "www.acme.com"})
+ self.assertEquals(request_host(req), "www.acme.com")
+ # port shouldn't be in request-host
+ req = Request("http://www.acme.com:2345/resource.html",
+ headers={"Host": "www.acme.com:5432"})
+ self.assertEquals(request_host(req), "www.acme.com")
+
+ def test_is_HDN(self):
+ from cookielib import is_HDN
+ self.assert_(is_HDN("foo.bar.com"))
+ self.assert_(is_HDN("1foo2.3bar4.5com"))
+ self.assert_(not is_HDN("192.168.1.1"))
+ self.assert_(not is_HDN(""))
+ self.assert_(not is_HDN("."))
+ self.assert_(not is_HDN(".foo.bar.com"))
+ self.assert_(not is_HDN("..foo"))
+ self.assert_(not is_HDN("foo."))
+
+ def test_reach(self):
+ from cookielib import reach
+ self.assertEquals(reach("www.acme.com"), ".acme.com")
+ self.assertEquals(reach("acme.com"), "acme.com")
+ self.assertEquals(reach("acme.local"), ".local")
+ self.assertEquals(reach(".local"), ".local")
+ self.assertEquals(reach(".com"), ".com")
+ self.assertEquals(reach("."), ".")
+ self.assertEquals(reach(""), "")
+ self.assertEquals(reach("192.168.0.1"), "192.168.0.1")
+
+ def test_domain_match(self):
+ from cookielib import domain_match, user_domain_match
+ self.assert_(domain_match("192.168.1.1", "192.168.1.1"))
+ self.assert_(not domain_match("192.168.1.1", ".168.1.1"))
+ self.assert_(domain_match("x.y.com", "x.Y.com"))
+ self.assert_(domain_match("x.y.com", ".Y.com"))
+ self.assert_(not domain_match("x.y.com", "Y.com"))
+ self.assert_(domain_match("a.b.c.com", ".c.com"))
+ self.assert_(not domain_match(".c.com", "a.b.c.com"))
+ self.assert_(domain_match("example.local", ".local"))
+ self.assert_(not domain_match("blah.blah", ""))
+ self.assert_(not domain_match("", ".rhubarb.rhubarb"))
+ self.assert_(domain_match("", ""))
+
+ self.assert_(user_domain_match("acme.com", "acme.com"))
+ self.assert_(not user_domain_match("acme.com", ".acme.com"))
+ self.assert_(user_domain_match("rhubarb.acme.com", ".acme.com"))
+ self.assert_(user_domain_match("www.rhubarb.acme.com", ".acme.com"))
+ self.assert_(user_domain_match("x.y.com", "x.Y.com"))
+ self.assert_(user_domain_match("x.y.com", ".Y.com"))
+ self.assert_(not user_domain_match("x.y.com", "Y.com"))
+ self.assert_(user_domain_match("y.com", "Y.com"))
+ self.assert_(not user_domain_match(".y.com", "Y.com"))
+ self.assert_(user_domain_match(".y.com", ".Y.com"))
+ self.assert_(user_domain_match("x.y.com", ".com"))
+ self.assert_(not user_domain_match("x.y.com", "com"))
+ self.assert_(not user_domain_match("x.y.com", "m"))
+ self.assert_(not user_domain_match("x.y.com", ".m"))
+ self.assert_(not user_domain_match("x.y.com", ""))
+ self.assert_(not user_domain_match("x.y.com", "."))
+ self.assert_(user_domain_match("192.168.1.1", "192.168.1.1"))
+ # not both HDNs, so must string-compare equal to match
+ self.assert_(not user_domain_match("192.168.1.1", ".168.1.1"))
+ self.assert_(not user_domain_match("192.168.1.1", "."))
+ # empty string is a special case
+ self.assert_(not user_domain_match("192.168.1.1", ""))
+
+ def test_wrong_domain(self):
+ # Cookies whose effective request-host name does not domain-match the
+ # domain are rejected.
+
+ # XXX far from complete
+ from cookielib import CookieJar
+ c = CookieJar()
+ interact_2965(c, "http://www.nasty.com/",
+ 'foo=bar; domain=friendly.org; Version="1"')
+ self.assertEquals(len(c), 0)
+
+ def test_two_component_domain_ns(self):
+ # Netscape: .www.bar.com, www.bar.com, .bar.com, bar.com, no domain
+ # should all get accepted, as should .acme.com, acme.com and no domain
+ # for 2-component domains like acme.com.
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ c = CookieJar()
+
+ # two-component V0 domain is OK
+ interact_netscape(c, "http://foo.net/", 'ns=bar')
+ self.assertEquals(len(c), 1)
+ self.assertEquals(c._cookies["foo.net"]["/"]["ns"].value, "bar")
+ self.assertEquals(interact_netscape(c, "http://foo.net/"), "ns=bar")
+ # *will* be returned to any other domain (unlike RFC 2965)...
+ self.assertEquals(interact_netscape(c, "http://www.foo.net/"),
+ "ns=bar")
+ # ...unless requested otherwise
+ pol = DefaultCookiePolicy(
+ strict_ns_domain=DefaultCookiePolicy.DomainStrictNonDomain)
+ c.set_policy(pol)
+ self.assertEquals(interact_netscape(c, "http://www.foo.net/"), "")
+
+ # unlike RFC 2965, even explicit two-component domain is OK,
+ # because .foo.net matches foo.net
+ interact_netscape(c, "http://foo.net/foo/",
+ 'spam1=eggs; domain=foo.net')
+ # even if starts with a dot -- in NS rules, .foo.net matches foo.net!
+ interact_netscape(c, "http://foo.net/foo/bar/",
+ 'spam2=eggs; domain=.foo.net')
+ self.assertEquals(len(c), 3)
+ self.assertEquals(c._cookies[".foo.net"]["/foo"]["spam1"].value,
+ "eggs")
+ self.assertEquals(c._cookies[".foo.net"]["/foo/bar"]["spam2"].value,
+ "eggs")
+ self.assertEquals(interact_netscape(c, "http://foo.net/foo/bar/"),
+ "spam2=eggs; spam1=eggs; ns=bar")
+
+ # top-level domain is too general
+ interact_netscape(c, "http://foo.net/", 'nini="ni"; domain=.net')
+ self.assertEquals(len(c), 3)
+
+## # Netscape protocol doesn't allow non-special top level domains (such
+## # as co.uk) in the domain attribute unless there are at least three
+## # dots in it.
+ # Oh yes it does! Real implementations don't check this, and real
+ # cookies (of course) rely on that behaviour.
+ interact_netscape(c, "http://foo.co.uk", 'nasty=trick; domain=.co.uk')
+## self.assertEquals(len(c), 2)
+ self.assertEquals(len(c), 4)
+
+ def test_two_component_domain_rfc2965(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ pol = DefaultCookiePolicy(rfc2965=True)
+ c = CookieJar(pol)
+
+ # two-component V1 domain is OK
+ interact_2965(c, "http://foo.net/", 'foo=bar; Version="1"')
+ self.assertEquals(len(c), 1)
+ self.assertEquals(c._cookies["foo.net"]["/"]["foo"].value, "bar")
+ self.assertEquals(interact_2965(c, "http://foo.net/"),
+ "$Version=1; foo=bar")
+ # won't be returned to any other domain (because domain was implied)
+ self.assertEquals(interact_2965(c, "http://www.foo.net/"), "")
+
+ # unless domain is given explicitly, because then it must be
+ # rewritten to start with a dot: foo.net --> .foo.net, which does
+ # not domain-match foo.net
+ interact_2965(c, "http://foo.net/foo",
+ 'spam=eggs; domain=foo.net; path=/foo; Version="1"')
+ self.assertEquals(len(c), 1)
+ self.assertEquals(interact_2965(c, "http://foo.net/foo"),
+ "$Version=1; foo=bar")
+
+ # explicit foo.net from three-component domain www.foo.net *does* get
+ # set, because .foo.net domain-matches .foo.net
+ interact_2965(c, "http://www.foo.net/foo/",
+ 'spam=eggs; domain=foo.net; Version="1"')
+ self.assertEquals(c._cookies[".foo.net"]["/foo/"]["spam"].value,
+ "eggs")
+ self.assertEquals(len(c), 2)
+ self.assertEquals(interact_2965(c, "http://foo.net/foo/"),
+ "$Version=1; foo=bar")
+ self.assertEquals(interact_2965(c, "http://www.foo.net/foo/"),
+ '$Version=1; spam=eggs; $Domain="foo.net"')
+
+ # top-level domain is too general
+ interact_2965(c, "http://foo.net/",
+ 'ni="ni"; domain=".net"; Version="1"')
+ self.assertEquals(len(c), 2)
+
+ # RFC 2965 doesn't require blocking this
+ interact_2965(c, "http://foo.co.uk/",
+ 'nasty=trick; domain=.co.uk; Version="1"')
+ self.assertEquals(len(c), 3)
+
+ def test_domain_allow(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+ from urllib2 import Request
+
+ c = CookieJar(policy=DefaultCookiePolicy(
+ blocked_domains=["acme.com"],
+ allowed_domains=["www.acme.com"]))
+
+ req = Request("http://acme.com/")
+ headers = ["Set-Cookie: CUSTOMER=WILE_E_COYOTE; path=/"]
+ res = FakeResponse(headers, "http://acme.com/")
+ c.extract_cookies(res, req)
+ self.assertEquals(len(c), 0)
+
+ req = Request("http://www.acme.com/")
+ res = FakeResponse(headers, "http://www.acme.com/")
+ c.extract_cookies(res, req)
+ self.assertEquals(len(c), 1)
+
+ req = Request("http://www.coyote.com/")
+ res = FakeResponse(headers, "http://www.coyote.com/")
+ c.extract_cookies(res, req)
+ self.assertEquals(len(c), 1)
+
+ # set a cookie with non-allowed domain...
+ req = Request("http://www.coyote.com/")
+ res = FakeResponse(headers, "http://www.coyote.com/")
+ cookies = c.make_cookies(res, req)
+ c.set_cookie(cookies[0])
+ self.assertEquals(len(c), 2)
+ # ... and check is doesn't get returned
+ c.add_cookie_header(req)
+ self.assert_(not req.has_header("Cookie"))
+
+ def test_domain_block(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+ from urllib2 import Request
+
+ pol = DefaultCookiePolicy(
+ rfc2965=True, blocked_domains=[".acme.com"])
+ c = CookieJar(policy=pol)
+ headers = ["Set-Cookie: CUSTOMER=WILE_E_COYOTE; path=/"]
+
+ req = Request("http://www.acme.com/")
+ res = FakeResponse(headers, "http://www.acme.com/")
+ c.extract_cookies(res, req)
+ self.assertEquals(len(c), 0)
+
+ p = pol.set_blocked_domains(["acme.com"])
+ c.extract_cookies(res, req)
+ self.assertEquals(len(c), 1)
+
+ c.clear()
+ req = Request("http://www.roadrunner.net/")
+ res = FakeResponse(headers, "http://www.roadrunner.net/")
+ c.extract_cookies(res, req)
+ self.assertEquals(len(c), 1)
+ req = Request("http://www.roadrunner.net/")
+ c.add_cookie_header(req)
+ self.assert_((req.has_header("Cookie") and
+ req.has_header("Cookie2")))
+
+ c.clear()
+ pol.set_blocked_domains([".acme.com"])
+ c.extract_cookies(res, req)
+ self.assertEquals(len(c), 1)
+
+ # set a cookie with blocked domain...
+ req = Request("http://www.acme.com/")
+ res = FakeResponse(headers, "http://www.acme.com/")
+ cookies = c.make_cookies(res, req)
+ c.set_cookie(cookies[0])
+ self.assertEquals(len(c), 2)
+ # ... and check is doesn't get returned
+ c.add_cookie_header(req)
+ self.assert_(not req.has_header("Cookie"))
+
+ def test_secure(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ for ns in True, False:
+ for whitespace in " ", "":
+ c = CookieJar()
+ if ns:
+ pol = DefaultCookiePolicy(rfc2965=False)
+ int = interact_netscape
+ vs = ""
+ else:
+ pol = DefaultCookiePolicy(rfc2965=True)
+ int = interact_2965
+ vs = "; Version=1"
+ c.set_policy(pol)
+ url = "http://www.acme.com/"
+ int(c, url, "foo1=bar%s%s" % (vs, whitespace))
+ int(c, url, "foo2=bar%s; secure%s" % (vs, whitespace))
+ self.assert_(
+ not c._cookies["www.acme.com"]["/"]["foo1"].secure,
+ "non-secure cookie registered secure")
+ self.assert_(
+ c._cookies["www.acme.com"]["/"]["foo2"].secure,
+ "secure cookie registered non-secure")
+
+ def test_quote_cookie_value(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+ c = CookieJar(policy=DefaultCookiePolicy(rfc2965=True))
+ interact_2965(c, "http://www.acme.com/", r'foo=\b"a"r; Version=1')
+ h = interact_2965(c, "http://www.acme.com/")
+ self.assertEquals(h, r'$Version=1; foo=\\b\"a\"r')
+
+ def test_missing_final_slash(self):
+ # Missing slash from request URL's abs_path should be assumed present.
+ from cookielib import CookieJar, DefaultCookiePolicy
+ from urllib2 import Request
+ url = "http://www.acme.com"
+ c = CookieJar(DefaultCookiePolicy(rfc2965=True))
+ interact_2965(c, url, "foo=bar; Version=1")
+ req = Request(url)
+ self.assertEquals(len(c), 1)
+ c.add_cookie_header(req)
+ self.assert_(req.has_header("Cookie"))
+
+ def test_domain_mirror(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ pol = DefaultCookiePolicy(rfc2965=True)
+
+ c = CookieJar(pol)
+ url = "http://foo.bar.com/"
+ interact_2965(c, url, "spam=eggs; Version=1")
+ h = interact_2965(c, url)
+ self.assert_("Domain" not in h,
+ "absent domain returned with domain present")
+
+ c = CookieJar(pol)
+ url = "http://foo.bar.com/"
+ interact_2965(c, url, 'spam=eggs; Version=1; Domain=.bar.com')
+ h = interact_2965(c, url)
+ self.assert_('$Domain=".bar.com"' in h, "domain not returned")
+
+ c = CookieJar(pol)
+ url = "http://foo.bar.com/"
+ # note missing initial dot in Domain
+ interact_2965(c, url, 'spam=eggs; Version=1; Domain=bar.com')
+ h = interact_2965(c, url)
+ self.assert_('$Domain="bar.com"' in h, "domain not returned")
+
+ def test_path_mirror(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ pol = DefaultCookiePolicy(rfc2965=True)
+
+ c = CookieJar(pol)
+ url = "http://foo.bar.com/"
+ interact_2965(c, url, "spam=eggs; Version=1")
+ h = interact_2965(c, url)
+ self.assert_("Path" not in h,
+ "absent path returned with path present")
+
+ c = CookieJar(pol)
+ url = "http://foo.bar.com/"
+ interact_2965(c, url, 'spam=eggs; Version=1; Path=/')
+ h = interact_2965(c, url)
+ self.assert_('$Path="/"' in h, "path not returned")
+
+ def test_port_mirror(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ pol = DefaultCookiePolicy(rfc2965=True)
+
+ c = CookieJar(pol)
+ url = "http://foo.bar.com/"
+ interact_2965(c, url, "spam=eggs; Version=1")
+ h = interact_2965(c, url)
+ self.assert_("Port" not in h,
+ "absent port returned with port present")
+
+ c = CookieJar(pol)
+ url = "http://foo.bar.com/"
+ interact_2965(c, url, "spam=eggs; Version=1; Port")
+ h = interact_2965(c, url)
+ self.assert_(re.search("\$Port([^=]|$)", h),
+ "port with no value not returned with no value")
+
+ c = CookieJar(pol)
+ url = "http://foo.bar.com/"
+ interact_2965(c, url, 'spam=eggs; Version=1; Port="80"')
+ h = interact_2965(c, url)
+ self.assert_('$Port="80"' in h,
+ "port with single value not returned with single value")
+
+ c = CookieJar(pol)
+ url = "http://foo.bar.com/"
+ interact_2965(c, url, 'spam=eggs; Version=1; Port="80,8080"')
+ h = interact_2965(c, url)
+ self.assert_('$Port="80,8080"' in h,
+ "port with multiple values not returned with multiple "
+ "values")
+
+ def test_no_return_comment(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ c = CookieJar(DefaultCookiePolicy(rfc2965=True))
+ url = "http://foo.bar.com/"
+ interact_2965(c, url, 'spam=eggs; Version=1; '
+ 'Comment="does anybody read these?"; '
+ 'CommentURL="http://foo.bar.net/comment.html"')
+ h = interact_2965(c, url)
+ self.assert_(
+ "Comment" not in h,
+ "Comment or CommentURL cookie-attributes returned to server")
+
+ def test_Cookie_iterator(self):
+ from cookielib import CookieJar, Cookie, DefaultCookiePolicy
+
+ cs = CookieJar(DefaultCookiePolicy(rfc2965=True))
+ # add some random cookies
+ interact_2965(cs, "http://blah.spam.org/", 'foo=eggs; Version=1; '
+ 'Comment="does anybody read these?"; '
+ 'CommentURL="http://foo.bar.net/comment.html"')
+ interact_netscape(cs, "http://www.acme.com/blah/", "spam=bar; secure")
+ interact_2965(cs, "http://www.acme.com/blah/",
+ "foo=bar; secure; Version=1")
+ interact_2965(cs, "http://www.acme.com/blah/",
+ "foo=bar; path=/; Version=1")
+ interact_2965(cs, "http://www.sol.no",
+ r'bang=wallop; version=1; domain=".sol.no"; '
+ r'port="90,100, 80,8080"; '
+ r'max-age=100; Comment = "Just kidding! (\"|\\\\) "')
+
+ versions = [1, 1, 1, 0, 1]
+ names = ["bang", "foo", "foo", "spam", "foo"]
+ domains = [".sol.no", "blah.spam.org", "www.acme.com",
+ "www.acme.com", "www.acme.com"]
+ paths = ["/", "/", "/", "/blah", "/blah/"]
+
+ for i in range(4):
+ i = 0
+ for c in cs:
+ self.assert_(isinstance(c, Cookie))
+ self.assertEquals(c.version, versions[i])
+ self.assertEquals(c.name, names[i])
+ self.assertEquals(c.domain, domains[i])
+ self.assertEquals(c.path, paths[i])
+ i = i + 1
+
+ def test_parse_ns_headers(self):
+ from cookielib import parse_ns_headers
+
+ # missing domain value (invalid cookie)
+ self.assertEquals(
+ parse_ns_headers(["foo=bar; path=/; domain"]),
+ [[("foo", "bar"),
+ ("path", "/"), ("domain", None), ("version", "0")]]
+ )
+ # invalid expires value
+ self.assertEquals(
+ parse_ns_headers(["foo=bar; expires=Foo Bar 12 33:22:11 2000"]),
+ [[("foo", "bar"), ("expires", None), ("version", "0")]]
+ )
+ # missing cookie value (valid cookie)
+ self.assertEquals(
+ parse_ns_headers(["foo"]),
+ [[("foo", None), ("version", "0")]]
+ )
+ # shouldn't add version if header is empty
+ self.assertEquals(parse_ns_headers([""]), [])
+
+ def test_bad_cookie_header(self):
+
+ def cookiejar_from_cookie_headers(headers):
+ from cookielib import CookieJar
+ from urllib2 import Request
+ c = CookieJar()
+ req = Request("http://www.example.com/")
+ r = FakeResponse(headers, "http://www.example.com/")
+ c.extract_cookies(r, req)
+ return c
+
+ # none of these bad headers should cause an exception to be raised
+ for headers in [
+ ["Set-Cookie: "], # actually, nothing wrong with this
+ ["Set-Cookie2: "], # ditto
+ # missing domain value
+ ["Set-Cookie2: a=foo; path=/; Version=1; domain"],
+ # bad max-age
+ ["Set-Cookie: b=foo; max-age=oops"],
+ ]:
+ c = cookiejar_from_cookie_headers(headers)
+ # these bad cookies shouldn't be set
+ self.assertEquals(len(c), 0)
+
+ # cookie with invalid expires is treated as session cookie
+ headers = ["Set-Cookie: c=foo; expires=Foo Bar 12 33:22:11 2000"]
+ c = cookiejar_from_cookie_headers(headers)
+ cookie = c._cookies["www.example.com"]["/"]["c"]
+ self.assert_(cookie.expires is None)
+
+
+class LWPCookieTests(TestCase):
+ # Tests taken from libwww-perl, with a few modifications and additions.
+
+ def test_netscape_example_1(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+ from urllib2 import Request
+
+ #-------------------------------------------------------------------
+ # First we check that it works for the original example at
+ # http://www.netscape.com/newsref/std/cookie_spec.html
+
+ # Client requests a document, and receives in the response:
+ #
+ # Set-Cookie: CUSTOMER=WILE_E_COYOTE; path=/; expires=Wednesday, 09-Nov-99 23:12:40 GMT
+ #
+ # When client requests a URL in path "/" on this server, it sends:
+ #
+ # Cookie: CUSTOMER=WILE_E_COYOTE
+ #
+ # Client requests a document, and receives in the response:
+ #
+ # Set-Cookie: PART_NUMBER=ROCKET_LAUNCHER_0001; path=/
+ #
+ # When client requests a URL in path "/" on this server, it sends:
+ #
+ # Cookie: CUSTOMER=WILE_E_COYOTE; PART_NUMBER=ROCKET_LAUNCHER_0001
+ #
+ # Client receives:
+ #
+ # Set-Cookie: SHIPPING=FEDEX; path=/fo
+ #
+ # When client requests a URL in path "/" on this server, it sends:
+ #
+ # Cookie: CUSTOMER=WILE_E_COYOTE; PART_NUMBER=ROCKET_LAUNCHER_0001
+ #
+ # When client requests a URL in path "/foo" on this server, it sends:
+ #
+ # Cookie: CUSTOMER=WILE_E_COYOTE; PART_NUMBER=ROCKET_LAUNCHER_0001; SHIPPING=FEDEX
+ #
+ # The last Cookie is buggy, because both specifications say that the
+ # most specific cookie must be sent first. SHIPPING=FEDEX is the
+ # most specific and should thus be first.
+
+ year_plus_one = time.localtime()[0] + 1
+
+ headers = []
+
+ c = CookieJar(DefaultCookiePolicy(rfc2965 = True))
+
+ #req = Request("http://1.1.1.1/",
+ # headers={"Host": "www.acme.com:80"})
+ req = Request("http://www.acme.com:80/",
+ headers={"Host": "www.acme.com:80"})
+
+ headers.append(
+ "Set-Cookie: CUSTOMER=WILE_E_COYOTE; path=/ ; "
+ "expires=Wednesday, 09-Nov-%d 23:12:40 GMT" % year_plus_one)
+ res = FakeResponse(headers, "http://www.acme.com/")
+ c.extract_cookies(res, req)
+
+ req = Request("http://www.acme.com/")
+ c.add_cookie_header(req)
+
+ self.assertEqual(req.get_header("Cookie"), "CUSTOMER=WILE_E_COYOTE")
+ self.assertEqual(req.get_header("Cookie2"), '$Version="1"')
+
+ headers.append("Set-Cookie: PART_NUMBER=ROCKET_LAUNCHER_0001; path=/")
+ res = FakeResponse(headers, "http://www.acme.com/")
+ c.extract_cookies(res, req)
+
+ req = Request("http://www.acme.com/foo/bar")
+ c.add_cookie_header(req)
+
+ h = req.get_header("Cookie")
+ self.assert_("PART_NUMBER=ROCKET_LAUNCHER_0001" in h and
+ "CUSTOMER=WILE_E_COYOTE" in h)
+
+ headers.append('Set-Cookie: SHIPPING=FEDEX; path=/foo')
+ res = FakeResponse(headers, "http://www.acme.com")
+ c.extract_cookies(res, req)
+
+ req = Request("http://www.acme.com/")
+ c.add_cookie_header(req)
+
+ h = req.get_header("Cookie")
+ self.assert_("PART_NUMBER=ROCKET_LAUNCHER_0001" in h and
+ "CUSTOMER=WILE_E_COYOTE" in h and
+ "SHIPPING=FEDEX" not in h)
+
+ req = Request("http://www.acme.com/foo/")
+ c.add_cookie_header(req)
+
+ h = req.get_header("Cookie")
+ self.assert_(("PART_NUMBER=ROCKET_LAUNCHER_0001" in h and
+ "CUSTOMER=WILE_E_COYOTE" in h and
+ h.startswith("SHIPPING=FEDEX;")))
+
+ def test_netscape_example_2(self):
+ from cookielib import CookieJar
+ from urllib2 import Request
+
+ # Second Example transaction sequence:
+ #
+ # Assume all mappings from above have been cleared.
+ #
+ # Client receives:
+ #
+ # Set-Cookie: PART_NUMBER=ROCKET_LAUNCHER_0001; path=/
+ #
+ # When client requests a URL in path "/" on this server, it sends:
+ #
+ # Cookie: PART_NUMBER=ROCKET_LAUNCHER_0001
+ #
+ # Client receives:
+ #
+ # Set-Cookie: PART_NUMBER=RIDING_ROCKET_0023; path=/ammo
+ #
+ # When client requests a URL in path "/ammo" on this server, it sends:
+ #
+ # Cookie: PART_NUMBER=RIDING_ROCKET_0023; PART_NUMBER=ROCKET_LAUNCHER_0001
+ #
+ # NOTE: There are two name/value pairs named "PART_NUMBER" due to
+ # the inheritance of the "/" mapping in addition to the "/ammo" mapping.
+
+ c = CookieJar()
+ headers = []
+
+ req = Request("http://www.acme.com/")
+ headers.append("Set-Cookie: PART_NUMBER=ROCKET_LAUNCHER_0001; path=/")
+ res = FakeResponse(headers, "http://www.acme.com/")
+
+ c.extract_cookies(res, req)
+
+ req = Request("http://www.acme.com/")
+ c.add_cookie_header(req)
+
+ self.assertEquals(req.get_header("Cookie"),
+ "PART_NUMBER=ROCKET_LAUNCHER_0001")
+
+ headers.append(
+ "Set-Cookie: PART_NUMBER=RIDING_ROCKET_0023; path=/ammo")
+ res = FakeResponse(headers, "http://www.acme.com/")
+ c.extract_cookies(res, req)
+
+ req = Request("http://www.acme.com/ammo")
+ c.add_cookie_header(req)
+
+ self.assert_(re.search(r"PART_NUMBER=RIDING_ROCKET_0023;\s*"
+ "PART_NUMBER=ROCKET_LAUNCHER_0001",
+ req.get_header("Cookie")))
+
+ def test_ietf_example_1(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+ #-------------------------------------------------------------------
+ # Then we test with the examples from draft-ietf-http-state-man-mec-03.txt
+ #
+ # 5. EXAMPLES
+
+ c = CookieJar(DefaultCookiePolicy(rfc2965=True))
+
+ #
+ # 5.1 Example 1
+ #
+ # Most detail of request and response headers has been omitted. Assume
+ # the user agent has no stored cookies.
+ #
+ # 1. User Agent -> Server
+ #
+ # POST /acme/login HTTP/1.1
+ # [form data]
+ #
+ # User identifies self via a form.
+ #
+ # 2. Server -> User Agent
+ #
+ # HTTP/1.1 200 OK
+ # Set-Cookie2: Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"
+ #
+ # Cookie reflects user's identity.
+
+ cookie = interact_2965(
+ c, 'http://www.acme.com/acme/login',
+ 'Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"')
+ self.assert_(not cookie)
+
+ #
+ # 3. User Agent -> Server
+ #
+ # POST /acme/pickitem HTTP/1.1
+ # Cookie: $Version="1"; Customer="WILE_E_COYOTE"; $Path="/acme"
+ # [form data]
+ #
+ # User selects an item for ``shopping basket.''
+ #
+ # 4. Server -> User Agent
+ #
+ # HTTP/1.1 200 OK
+ # Set-Cookie2: Part_Number="Rocket_Launcher_0001"; Version="1";
+ # Path="/acme"
+ #
+ # Shopping basket contains an item.
+
+ cookie = interact_2965(c, 'http://www.acme.com/acme/pickitem',
+ 'Part_Number="Rocket_Launcher_0001"; '
+ 'Version="1"; Path="/acme"');
+ self.assert_(re.search(
+ r'^\$Version="?1"?; Customer="?WILE_E_COYOTE"?; \$Path="/acme"$',
+ cookie))
+
+ #
+ # 5. User Agent -> Server
+ #
+ # POST /acme/shipping HTTP/1.1
+ # Cookie: $Version="1";
+ # Customer="WILE_E_COYOTE"; $Path="/acme";
+ # Part_Number="Rocket_Launcher_0001"; $Path="/acme"
+ # [form data]
+ #
+ # User selects shipping method from form.
+ #
+ # 6. Server -> User Agent
+ #
+ # HTTP/1.1 200 OK
+ # Set-Cookie2: Shipping="FedEx"; Version="1"; Path="/acme"
+ #
+ # New cookie reflects shipping method.
+
+ cookie = interact_2965(c, "http://www.acme.com/acme/shipping",
+ 'Shipping="FedEx"; Version="1"; Path="/acme"')
+
+ self.assert_(re.search(r'^\$Version="?1"?;', cookie))
+ self.assert_(re.search(r'Part_Number="?Rocket_Launcher_0001"?;'
+ '\s*\$Path="\/acme"', cookie))
+ self.assert_(re.search(r'Customer="?WILE_E_COYOTE"?;\s*\$Path="\/acme"',
+ cookie))
+
+ #
+ # 7. User Agent -> Server
+ #
+ # POST /acme/process HTTP/1.1
+ # Cookie: $Version="1";
+ # Customer="WILE_E_COYOTE"; $Path="/acme";
+ # Part_Number="Rocket_Launcher_0001"; $Path="/acme";
+ # Shipping="FedEx"; $Path="/acme"
+ # [form data]
+ #
+ # User chooses to process order.
+ #
+ # 8. Server -> User Agent
+ #
+ # HTTP/1.1 200 OK
+ #
+ # Transaction is complete.
+
+ cookie = interact_2965(c, "http://www.acme.com/acme/process")
+ self.assert_(
+ re.search(r'Shipping="?FedEx"?;\s*\$Path="\/acme"', cookie) and
+ "WILE_E_COYOTE" in cookie)
+
+ #
+ # The user agent makes a series of requests on the origin server, after
+ # each of which it receives a new cookie. All the cookies have the same
+ # Path attribute and (default) domain. Because the request URLs all have
+ # /acme as a prefix, and that matches the Path attribute, each request
+ # contains all the cookies received so far.
+
+ def test_ietf_example_2(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ # 5.2 Example 2
+ #
+ # This example illustrates the effect of the Path attribute. All detail
+ # of request and response headers has been omitted. Assume the user agent
+ # has no stored cookies.
+
+ c = CookieJar(DefaultCookiePolicy(rfc2965=True))
+
+ # Imagine the user agent has received, in response to earlier requests,
+ # the response headers
+ #
+ # Set-Cookie2: Part_Number="Rocket_Launcher_0001"; Version="1";
+ # Path="/acme"
+ #
+ # and
+ #
+ # Set-Cookie2: Part_Number="Riding_Rocket_0023"; Version="1";
+ # Path="/acme/ammo"
+
+ interact_2965(
+ c, "http://www.acme.com/acme/ammo/specific",
+ 'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"',
+ 'Part_Number="Riding_Rocket_0023"; Version="1"; Path="/acme/ammo"')
+
+ # A subsequent request by the user agent to the (same) server for URLs of
+ # the form /acme/ammo/... would include the following request header:
+ #
+ # Cookie: $Version="1";
+ # Part_Number="Riding_Rocket_0023"; $Path="/acme/ammo";
+ # Part_Number="Rocket_Launcher_0001"; $Path="/acme"
+ #
+ # Note that the NAME=VALUE pair for the cookie with the more specific Path
+ # attribute, /acme/ammo, comes before the one with the less specific Path
+ # attribute, /acme. Further note that the same cookie name appears more
+ # than once.
+
+ cookie = interact_2965(c, "http://www.acme.com/acme/ammo/...")
+ self.assert_(
+ re.search(r"Riding_Rocket_0023.*Rocket_Launcher_0001", cookie))
+
+ # A subsequent request by the user agent to the (same) server for a URL of
+ # the form /acme/parts/ would include the following request header:
+ #
+ # Cookie: $Version="1"; Part_Number="Rocket_Launcher_0001"; $Path="/acme"
+ #
+ # Here, the second cookie's Path attribute /acme/ammo is not a prefix of
+ # the request URL, /acme/parts/, so the cookie does not get forwarded to
+ # the server.
+
+ cookie = interact_2965(c, "http://www.acme.com/acme/parts/")
+ self.assert_("Rocket_Launcher_0001" in cookie and
+ "Riding_Rocket_0023" not in cookie)
+
+ def test_rejection(self):
+ # Test rejection of Set-Cookie2 responses based on domain, path, port.
+ from cookielib import DefaultCookiePolicy, LWPCookieJar
+
+ pol = DefaultCookiePolicy(rfc2965=True)
+
+ c = LWPCookieJar(policy=pol)
+
+ max_age = "max-age=3600"
+
+ # illegal domain (no embedded dots)
+ cookie = interact_2965(c, "http://www.acme.com",
+ 'foo=bar; domain=".com"; version=1')
+ self.assert_(not c)
+
+ # legal domain
+ cookie = interact_2965(c, "http://www.acme.com",
+ 'ping=pong; domain="acme.com"; version=1')
+ self.assertEquals(len(c), 1)
+
+ # illegal domain (host prefix "www.a" contains a dot)
+ cookie = interact_2965(c, "http://www.a.acme.com",
+ 'whiz=bang; domain="acme.com"; version=1')
+ self.assertEquals(len(c), 1)
+
+ # legal domain
+ cookie = interact_2965(c, "http://www.a.acme.com",
+ 'wow=flutter; domain=".a.acme.com"; version=1')
+ self.assertEquals(len(c), 2)
+
+ # can't partially match an IP-address
+ cookie = interact_2965(c, "http://125.125.125.125",
+ 'zzzz=ping; domain="125.125.125"; version=1')
+ self.assertEquals(len(c), 2)
+
+ # illegal path (must be prefix of request path)
+ cookie = interact_2965(c, "http://www.sol.no",
+ 'blah=rhubarb; domain=".sol.no"; path="/foo"; '
+ 'version=1')
+ self.assertEquals(len(c), 2)
+
+ # legal path
+ cookie = interact_2965(c, "http://www.sol.no/foo/bar",
+ 'bing=bong; domain=".sol.no"; path="/foo"; '
+ 'version=1')
+ self.assertEquals(len(c), 3)
+
+ # illegal port (request-port not in list)
+ cookie = interact_2965(c, "http://www.sol.no",
+ 'whiz=ffft; domain=".sol.no"; port="90,100"; '
+ 'version=1')
+ self.assertEquals(len(c), 3)
+
+ # legal port
+ cookie = interact_2965(
+ c, "http://www.sol.no",
+ r'bang=wallop; version=1; domain=".sol.no"; '
+ r'port="90,100, 80,8080"; '
+ r'max-age=100; Comment = "Just kidding! (\"|\\\\) "')
+ self.assertEquals(len(c), 4)
+
+ # port attribute without any value (current port)
+ cookie = interact_2965(c, "http://www.sol.no",
+ 'foo9=bar; version=1; domain=".sol.no"; port; '
+ 'max-age=100;')
+ self.assertEquals(len(c), 5)
+
+ # encoded path
+ # LWP has this test, but unescaping allowed path characters seems
+ # like a bad idea, so I think this should fail:
+## cookie = interact_2965(c, "http://www.sol.no/foo/",
+## r'foo8=bar; version=1; path="/%66oo"')
+ # but this is OK, because '<' is not an allowed HTTP URL path
+ # character:
+ cookie = interact_2965(c, "http://www.sol.no/<oo/",
+ r'foo8=bar; version=1; path="/%3coo"')
+ self.assertEquals(len(c), 6)
+
+ # save and restore
+ filename = test_support.TESTFN
+
+ try:
+ c.save(filename, ignore_discard=True)
+ old = repr(c)
+
+ c = LWPCookieJar(policy=pol)
+ c.load(filename, ignore_discard=True)
+ finally:
+ try: os.unlink(filename)
+ except OSError: pass
+
+ self.assertEquals(old, repr(c))
+
+ def test_url_encoding(self):
+ # Try some URL encodings of the PATHs.
+ # (the behaviour here has changed from libwww-perl)
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ c = CookieJar(DefaultCookiePolicy(rfc2965=True))
+ interact_2965(c, "http://www.acme.com/foo%2f%25/%3c%3c%0Anew%E5/%E5",
+ "foo = bar; version = 1")
+
+ cookie = interact_2965(
+ c, "http://www.acme.com/foo%2f%25/<<%0anewå/æøå",
+ 'bar=baz; path="/foo/"; version=1');
+ version_re = re.compile(r'^\$version=\"?1\"?', re.I)
+ self.assert_("foo=bar" in cookie and version_re.search(cookie))
+
+ cookie = interact_2965(
+ c, "http://www.acme.com/foo/%25/<<%0anewå/æøå")
+ self.assert_(not cookie)
+
+ # unicode URL doesn't raise exception
+ cookie = interact_2965(c, u"http://www.acme.com/\xfc")
+
+ def test_mozilla(self):
+ # Save / load Mozilla/Netscape cookie file format.
+ from cookielib import MozillaCookieJar, DefaultCookiePolicy
+
+ year_plus_one = time.localtime()[0] + 1
+
+ filename = test_support.TESTFN
+
+ c = MozillaCookieJar(filename,
+ policy=DefaultCookiePolicy(rfc2965=True))
+ interact_2965(c, "http://www.acme.com/",
+ "foo1=bar; max-age=100; Version=1")
+ interact_2965(c, "http://www.acme.com/",
+ 'foo2=bar; port="80"; max-age=100; Discard; Version=1')
+ interact_2965(c, "http://www.acme.com/", "foo3=bar; secure; Version=1")
+
+ expires = "expires=09-Nov-%d 23:12:40 GMT" % (year_plus_one,)
+ interact_netscape(c, "http://www.foo.com/",
+ "fooa=bar; %s" % expires)
+ interact_netscape(c, "http://www.foo.com/",
+ "foob=bar; Domain=.foo.com; %s" % expires)
+ interact_netscape(c, "http://www.foo.com/",
+ "fooc=bar; Domain=www.foo.com; %s" % expires)
+
+ def save_and_restore(cj, ignore_discard):
+ try:
+ cj.save(ignore_discard=ignore_discard)
+ new_c = MozillaCookieJar(filename,
+ DefaultCookiePolicy(rfc2965=True))
+ new_c.load(ignore_discard=ignore_discard)
+ finally:
+ try: os.unlink(filename)
+ except OSError: pass
+ return new_c
+
+ new_c = save_and_restore(c, True)
+ self.assertEquals(len(new_c), 6) # none discarded
+ self.assert_("name='foo1', value='bar'" in repr(new_c))
+
+ new_c = save_and_restore(c, False)
+ self.assertEquals(len(new_c), 4) # 2 of them discarded on save
+ self.assert_("name='foo1', value='bar'" in repr(new_c))
+
+ def test_netscape_misc(self):
+ # Some additional Netscape cookies tests.
+ from cookielib import CookieJar
+ from urllib2 import Request
+
+ c = CookieJar()
+ headers = []
+ req = Request("http://foo.bar.acme.com/foo")
+
+ # Netscape allows a host part that contains dots
+ headers.append("Set-Cookie: Customer=WILE_E_COYOTE; domain=.acme.com")
+ res = FakeResponse(headers, "http://www.acme.com/foo")
+ c.extract_cookies(res, req)
+
+ # and that the domain is the same as the host without adding a leading
+ # dot to the domain. Should not quote even if strange chars are used
+ # in the cookie value.
+ headers.append("Set-Cookie: PART_NUMBER=3,4; domain=foo.bar.acme.com")
+ res = FakeResponse(headers, "http://www.acme.com/foo")
+ c.extract_cookies(res, req)
+
+ req = Request("http://foo.bar.acme.com/foo")
+ c.add_cookie_header(req)
+ self.assert_(
+ "PART_NUMBER=3,4" in req.get_header("Cookie") and
+ "Customer=WILE_E_COYOTE" in req.get_header("Cookie"))
+
+ def test_intranet_domains_2965(self):
+ # Test handling of local intranet hostnames without a dot.
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ c = CookieJar(DefaultCookiePolicy(rfc2965=True))
+ interact_2965(c, "http://example/",
+ "foo1=bar; PORT; Discard; Version=1;")
+ cookie = interact_2965(c, "http://example/",
+ 'foo2=bar; domain=".local"; Version=1')
+ self.assert_("foo1=bar" in cookie)
+
+ interact_2965(c, "http://example/", 'foo3=bar; Version=1')
+ cookie = interact_2965(c, "http://example/")
+ self.assert_("foo2=bar" in cookie and len(c) == 3)
+
+ def test_intranet_domains_ns(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+
+ c = CookieJar(DefaultCookiePolicy(rfc2965 = False))
+ interact_netscape(c, "http://example/", "foo1=bar")
+ cookie = interact_netscape(c, "http://example/",
+ 'foo2=bar; domain=.local')
+ self.assertEquals(len(c), 2)
+ self.assert_("foo1=bar" in cookie)
+
+ cookie = interact_netscape(c, "http://example/")
+ self.assert_("foo2=bar" in cookie)
+ self.assertEquals(len(c), 2)
+
+ def test_empty_path(self):
+ from cookielib import CookieJar, DefaultCookiePolicy
+ from urllib2 import Request
+
+ # Test for empty path
+ # Broken web-server ORION/1.3.38 returns to the client response like
+ #
+ # Set-Cookie: JSESSIONID=ABCDERANDOM123; Path=
+ #
+ # ie. with Path set to nothing.
+ # In this case, extract_cookies() must set cookie to / (root)
+ c = CookieJar(DefaultCookiePolicy(rfc2965 = True))
+ headers = []
+
+ req = Request("http://www.ants.com/")
+ headers.append("Set-Cookie: JSESSIONID=ABCDERANDOM123; Path=")
+ res = FakeResponse(headers, "http://www.ants.com/")
+ c.extract_cookies(res, req)
+
+ req = Request("http://www.ants.com/")
+ c.add_cookie_header(req)
+
+ self.assertEquals(req.get_header("Cookie"),
+ "JSESSIONID=ABCDERANDOM123")
+ self.assertEquals(req.get_header("Cookie2"), '$Version="1"')
+
+ # missing path in the request URI
+ req = Request("http://www.ants.com:8080")
+ c.add_cookie_header(req)
+
+ self.assertEquals(req.get_header("Cookie"),
+ "JSESSIONID=ABCDERANDOM123")
+ self.assertEquals(req.get_header("Cookie2"), '$Version="1"')
+
+ def test_session_cookies(self):
+ from cookielib import CookieJar
+ from urllib2 import Request
+
+ year_plus_one = time.localtime()[0] + 1
+
+ # Check session cookies are deleted properly by
+ # CookieJar.clear_session_cookies method
+
+ req = Request('http://www.perlmeister.com/scripts')
+ headers = []
+ headers.append("Set-Cookie: s1=session;Path=/scripts")
+ headers.append("Set-Cookie: p1=perm; Domain=.perlmeister.com;"
+ "Path=/;expires=Fri, 02-Feb-%d 23:24:20 GMT" %
+ year_plus_one)
+ headers.append("Set-Cookie: p2=perm;Path=/;expires=Fri, "
+ "02-Feb-%d 23:24:20 GMT" % year_plus_one)
+ headers.append("Set-Cookie: s2=session;Path=/scripts;"
+ "Domain=.perlmeister.com")
+ headers.append('Set-Cookie2: s3=session;Version=1;Discard;Path="/"')
+ res = FakeResponse(headers, 'http://www.perlmeister.com/scripts')
+
+ c = CookieJar()
+ c.extract_cookies(res, req)
+ # How many session/permanent cookies do we have?
+ counter = {"session_after": 0,
+ "perm_after": 0,
+ "session_before": 0,
+ "perm_before": 0}
+ for cookie in c:
+ key = "%s_before" % cookie.value
+ counter[key] = counter[key] + 1
+ c.clear_session_cookies()
+ # How many now?
+ for cookie in c:
+ key = "%s_after" % cookie.value
+ counter[key] = counter[key] + 1
+
+ self.assert_(not (
+ # a permanent cookie got lost accidently
+ counter["perm_after"] != counter["perm_before"] or
+ # a session cookie hasn't been cleared
+ counter["session_after"] != 0 or
+ # we didn't have session cookies in the first place
+ counter["session_before"] == 0))
+
+
+def test_main(verbose=None):
+ from test import test_sets
+ test_support.run_unittest(
+ DateTimeTests,
+ HeaderTests,
+ CookieTests,
+ LWPCookieTests,
+ )
+
+if __name__ == "__main__":
+ test_main(verbose=True)
diff --git a/Lib/test/test_urllib2.py b/Lib/test/test_urllib2.py
index f38a4a3..8a7cf65 100644
--- a/Lib/test/test_urllib2.py
+++ b/Lib/test/test_urllib2.py
@@ -54,6 +54,10 @@ class MockFile:
def readline(self, count=None): pass
def close(self): pass
+class MockHeaders(dict):
+ def getheaders(self, name):
+ return self.values()
+
class MockResponse(StringIO.StringIO):
def __init__(self, code, msg, headers, data, url=None):
StringIO.StringIO.__init__(self, data)
@@ -63,6 +67,12 @@ class MockResponse(StringIO.StringIO):
def geturl(self):
return self.url
+class MockCookieJar:
+ def add_cookie_header(self, request):
+ self.ach_req = request
+ def extract_cookies(self, response, request):
+ self.ec_req, self.ec_r = request, response
+
class FakeMethod:
def __init__(self, meth_name, action, handle):
self.meth_name = meth_name
@@ -474,7 +484,7 @@ class HandlerTests(unittest.TestCase):
for data in "", None: # POST, GET
req = Request("http://example.com/", data)
r = MockResponse(200, "OK", {}, "")
- newreq = h.do_request(req)
+ newreq = h.do_request_(req)
if data is None: # GET
self.assert_("Content-length" not in req.unredirected_hdrs)
self.assert_("Content-type" not in req.unredirected_hdrs)
@@ -491,7 +501,7 @@ class HandlerTests(unittest.TestCase):
req.add_unredirected_header("Content-type", "bar")
req.add_unredirected_header("Host", "baz")
req.add_unredirected_header("Spam", "foo")
- newreq = h.do_request(req)
+ newreq = h.do_request_(req)
self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")
self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")
self.assertEqual(req.unredirected_hdrs["Host"], "baz")
@@ -514,6 +524,21 @@ class HandlerTests(unittest.TestCase):
self.assertEqual(o.proto, "http") # o.error called
self.assertEqual(o.args, (req, r, 201, "Created", {}))
+ def test_cookies(self):
+ cj = MockCookieJar()
+ h = urllib2.HTTPCookieProcessor(cj)
+ o = h.parent = MockOpener()
+
+ req = Request("http://example.com/")
+ r = MockResponse(200, "OK", {}, "")
+ newreq = h.http_request(req)
+ self.assert_(cj.ach_req is req is newreq)
+ self.assertEquals(req.get_origin_req_host(), "example.com")
+ self.assert_(not req.is_unverifiable())
+ newr = h.http_response(req, r)
+ self.assert_(cj.ec_req is req)
+ self.assert_(cj.ec_r is r is newr)
+
def test_redirect(self):
from_url = "http://example.com/a.html"
to_url = "http://example.com/b.html"
@@ -528,7 +553,8 @@ class HandlerTests(unittest.TestCase):
req.add_header("Nonsense", "viking=withhold")
req.add_unredirected_header("Spam", "spam")
try:
- method(req, MockFile(), code, "Blah", {"location": to_url})
+ method(req, MockFile(), code, "Blah",
+ MockHeaders({"location": to_url}))
except urllib2.HTTPError:
# 307 in response to POST requires user OK
self.assert_(code == 307 and data is not None)
@@ -544,38 +570,65 @@ class HandlerTests(unittest.TestCase):
# loop detection
req = Request(from_url)
- req.origin_req_host = "example.com"
- def redirect(h, req, code, url=to_url):
- method = getattr(h, "http_error_%s" % code)
- method(req, MockFile(), code, "Blah", {"location": url})
+ def redirect(h, req, url=to_url):
+ h.http_error_302(req, MockFile(), 302, "Blah",
+ MockHeaders({"location": url}))
# Note that the *original* request shares the same record of
# redirections with the sub-requests caused by the redirections.
- # once
- redirect(h, req, 302)
- # twice: loop detected
- self.assertRaises(urllib2.HTTPError, redirect, h, req, 302)
- # and again
- self.assertRaises(urllib2.HTTPError, redirect, h, req, 302)
- # but this is a different redirect code, so OK...
- redirect(h, req, 301)
- self.assertRaises(urllib2.HTTPError, redirect, h, req, 301)
- # order doesn't matter
- redirect(h, req, 303)
- redirect(h, req, 307)
- self.assertRaises(urllib2.HTTPError, redirect, h, req, 303)
+
+ # detect infinite loop redirect of a URL to itself
+ req = Request(from_url, origin_req_host="example.com")
+ count = 0
+ try:
+ while 1:
+ redirect(h, req, "http://example.com/")
+ count = count + 1
+ except urllib2.HTTPError:
+ # don't stop until max_repeats, because cookies may introduce state
+ self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats)
# detect endless non-repeating chain of redirects
- req = Request(from_url)
- req.origin_req_host = "example.com"
+ req = Request(from_url, origin_req_host="example.com")
count = 0
try:
while 1:
- redirect(h, req, 302, "http://example.com/%d" % count)
+ redirect(h, req, "http://example.com/%d" % count)
count = count + 1
except urllib2.HTTPError:
self.assertEqual(count,
urllib2.HTTPRedirectHandler.max_redirections)
+ def test_cookie_redirect(self):
+ class MockHTTPHandler(urllib2.HTTPHandler):
+ def __init__(self): self._count = 0
+ def http_open(self, req):
+ import mimetools
+ from StringIO import StringIO
+ if self._count == 0:
+ self._count = self._count + 1
+ msg = mimetools.Message(
+ StringIO("Location: http://www.cracker.com/\r\n\r\n"))
+ return self.parent.error(
+ "http", req, MockFile(), 302, "Found", msg)
+ else:
+ self.req = req
+ msg = mimetools.Message(StringIO("\r\n\r\n"))
+ return MockResponse(200, "OK", msg, "", req.get_full_url())
+ # cookies shouldn't leak into redirected requests
+ from cookielib import CookieJar
+ from urllib2 import build_opener, HTTPHandler, HTTPError, \
+ HTTPCookieProcessor
+
+ from test_cookielib import interact_netscape
+
+ cj = CookieJar()
+ interact_netscape(cj, "http://www.example.com/", "spam=eggs")
+ hh = MockHTTPHandler()
+ cp = HTTPCookieProcessor(cj)
+ o = build_opener(hh, cp)
+ o.open("http://www.example.com/")
+ self.assert_(not hh.req.has_header("Cookie"))
+
class MiscTests(unittest.TestCase):
diff --git a/Lib/urllib2.py b/Lib/urllib2.py
index 773cd7d..448e948 100644
--- a/Lib/urllib2.py
+++ b/Lib/urllib2.py
@@ -106,6 +106,7 @@ import sys
import time
import urlparse
import bisect
+import cookielib
try:
from cStringIO import StringIO
@@ -176,7 +177,8 @@ class GopherError(URLError):
class Request:
- def __init__(self, url, data=None, headers={}):
+ def __init__(self, url, data=None, headers={},
+ origin_req_host=None, unverifiable=False):
# unwrap('<URL:type://host/path>') --> 'type://host/path'
self.__original = unwrap(url)
self.type = None
@@ -188,6 +190,10 @@ class Request:
for key, value in headers.items():
self.add_header(key, value)
self.unredirected_hdrs = {}
+ if origin_req_host is None:
+ origin_req_host = cookielib.request_host(self)
+ self.origin_req_host = origin_req_host
+ self.unverifiable = unverifiable
def __getattr__(self, attr):
# XXX this is a fallback mechanism to guard against these
@@ -242,6 +248,12 @@ class Request:
self.host, self.type = host, type
self.__r_host = self.__original
+ def get_origin_req_host(self):
+ return self.origin_req_host
+
+ def is_unverifiable(self):
+ return self.unverifiable
+
def add_header(self, key, val):
# useful for something like authentication
self.headers[key.capitalize()] = val
@@ -254,6 +266,15 @@ class Request:
return bool(header_name in self.headers or
header_name in self.unredirected_hdrs)
+ def get_header(self, header_name, default=None):
+ return self.headers.get(
+ header_name,
+ self.unredirected_hdrs.get(header_name, default))
+
+ def header_items(self):
+ hdrs = self.unredirected_hdrs.copy()
+ hdrs.update(self.headers)
+ return hdrs.items()
class OpenerDirector:
def __init__(self):
@@ -460,7 +481,11 @@ class HTTPDefaultErrorHandler(BaseHandler):
raise HTTPError(req.get_full_url(), code, msg, hdrs, fp)
class HTTPRedirectHandler(BaseHandler):
- # maximum number of redirections before assuming we're in a loop
+ # maximum number of redirections to any single URL
+ # this is needed because of the state that cookies introduce
+ max_repeats = 4
+ # maximum total number of redirections (regardless of URL) before
+ # assuming we're in a loop
max_redirections = 10
def redirect_request(self, req, fp, code, msg, headers, newurl):
@@ -481,7 +506,10 @@ class HTTPRedirectHandler(BaseHandler):
# from the user (of urllib2, in this case). In practice,
# essentially all clients do redirect in this case, so we
# do the same.
- return Request(newurl, headers=req.headers)
+ return Request(newurl,
+ headers=req.headers,
+ origin_req_host=req.get_origin_req_host(),
+ unverifiable=True)
else:
raise HTTPError(req.get_full_url(), code, msg, headers, fp)
@@ -490,10 +518,12 @@ class HTTPRedirectHandler(BaseHandler):
# have already seen. Do this by adding a handler-specific
# attribute to the Request object.
def http_error_302(self, req, fp, code, msg, headers):
+ # Some servers (incorrectly) return multiple Location headers
+ # (so probably same goes for URI). Use first header.
if 'location' in headers:
- newurl = headers['location']
+ newurl = headers.getheaders('location')[0]
elif 'uri' in headers:
- newurl = headers['uri']
+ newurl = headers.getheaders('uri')[0]
else:
return
newurl = urlparse.urljoin(req.get_full_url(), newurl)
@@ -506,20 +536,16 @@ class HTTPRedirectHandler(BaseHandler):
return
# loop detection
- # .redirect_dict has a key (url, code) if url was previously
- # visited as a result of a redirection with that code. The
- # code is needed in addition to the URL because visiting a URL
- # twice isn't necessarily a loop: there is more than one way
- # to redirect (301, 302, 303, 307, refresh).
- key = (newurl, code)
+ # .redirect_dict has a key url if url was previously visited.
if hasattr(req, 'redirect_dict'):
visited = new.redirect_dict = req.redirect_dict
- if key in visited or len(visited) >= self.max_redirections:
+ if (visited.get(newurl, 0) >= self.max_repeats or
+ len(visited) >= self.max_redirections):
raise HTTPError(req.get_full_url(), code,
self.inf_msg + msg, headers, fp)
else:
visited = new.redirect_dict = req.redirect_dict = {}
- visited[key] = None
+ visited[newurl] = visited.get(newurl, 0) + 1
# Don't close the fp until we are sure that we won't use it
# with HTTPError.
@@ -912,7 +938,7 @@ class AbstractHTTPHandler(BaseHandler):
def set_http_debuglevel(self, level):
self._debuglevel = level
- def do_request(self, request):
+ def do_request_(self, request):
host = request.get_host()
if not host:
raise URLError('no host given')
@@ -987,7 +1013,7 @@ class HTTPHandler(AbstractHTTPHandler):
def http_open(self, req):
return self.do_open(httplib.HTTPConnection, req)
- http_request = AbstractHTTPHandler.do_request
+ http_request = AbstractHTTPHandler.do_request_
if hasattr(httplib, 'HTTPS'):
class HTTPSHandler(AbstractHTTPHandler):
@@ -995,7 +1021,24 @@ if hasattr(httplib, 'HTTPS'):
def https_open(self, req):
return self.do_open(httplib.HTTPSConnection, req)
- https_request = AbstractHTTPHandler.do_request
+ https_request = AbstractHTTPHandler.do_request_
+
+class HTTPCookieProcessor(BaseHandler):
+ def __init__(self, cookiejar=None):
+ if cookiejar is None:
+ cookiejar = CookieJar()
+ self.cookiejar = cookiejar
+
+ def http_request(self, request):
+ self.cookiejar.add_cookie_header(request)
+ return request
+
+ def http_response(self, request, response):
+ self.cookiejar.extract_cookies(response, request)
+ return response
+
+ https_request = http_request
+ https_response = http_response
class UnknownHandler(BaseHandler):
def unknown_open(self, req):