summaryrefslogtreecommitdiffstats
path: root/Lib/tempfile.py
blob: 3e1e08dba21bb1f80549a09f7d550c225591bac3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Temporary files and filenames."""

# XXX This tries to be not UNIX specific, but I don't know beans about
# how to choose a temp directory or filename on MS-DOS or other
# systems so it may have to be changed...

import os

# Parameters that the caller may set to override the defaults
tempdir = None
template = None

def gettempdir():
    """Function to calculate the directory to use."""
    global tempdir
    if tempdir is not None:
        return tempdir
    try:
        pwd = os.getcwd()
    except (AttributeError, os.error):
        pwd = os.curdir
    attempdirs = ['/var/tmp', '/usr/tmp', '/tmp', pwd]
    if os.name == 'nt':
        attempdirs.insert(0, 'C:\\TEMP')
        attempdirs.insert(0, '\\TEMP')
    elif os.name == 'mac':
        import macfs, MACFS
        try:
             refnum, dirid = macfs.FindFolder(MACFS.kOnSystemDisk,
                                              MACFS.kTemporaryFolderType, 1)
             dirname = macfs.FSSpec((refnum, dirid, '')).as_pathname()
             attempdirs.insert(0, dirname)
        except macfs.error:
            pass
    for envname in 'TMPDIR', 'TEMP', 'TMP':
        if os.environ.has_key(envname):
            attempdirs.insert(0, os.environ[envname])
    testfile = gettempprefix() + 'test'
    for dir in attempdirs:
        try:
           filename = os.path.join(dir, testfile)
           if os.name == 'posix':
               try:
                   fd = os.open(filename, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0700)
               except OSError:
                   pass
               else:
                   fp = os.fdopen(fd, 'w')
                   fp.write('blat')
                   fp.close()
                   os.unlink(filename)
                   del fp, fd
                   tempdir = dir
                   break
           else:
               fp = open(filename, 'w')
               fp.write('blat')
               fp.close()
               os.unlink(filename)
               tempdir = dir
               break
        except IOError:
            pass
    if tempdir is None:
        msg = "Can't find a usable temporary directory amongst " + `attempdirs`
        raise IOError, msg
    return tempdir


_pid = None

def gettempprefix():
    """Function to calculate a prefix of the filename to use."""
    global template, _pid
    if os.name == 'posix' and _pid and _pid != os.getpid():
        # Our pid changed; we must have forked -- zap the template
        template = None
    if template is None:
        if os.name == 'posix':
            _pid = os.getpid()
            template = '@' + `_pid` + '.'
        elif os.name == 'nt':
            template = '~' + `os.getpid()` + '-'
        elif os.name == 'mac':
            template = 'Python-Tmp-'
        else:
            template = 'tmp' # XXX might choose a better one
    return template


def mktemp(suffix=""):
    """User-callable function to return a unique temporary file name."""
    dir = gettempdir()
    pre = gettempprefix()
    while 1:
        i = _counter.get_next()
        file = os.path.join(dir, pre + str(i) + suffix)
        if not os.path.exists(file):
            return file


class TemporaryFileWrapper:
    """Temporary file wrapper

    This class provides a wrapper around files opened for temporary use.
    In particular, it seeks to automatically remove the file when it is
    no longer needed.
    """
    def __init__(self, file, path):
        self.file = file
        self.path = path

    def close(self):
        self.file.close()
        os.unlink(self.path)

    def __del__(self):
        try: self.close()
        except: pass

    def __getattr__(self, name):
        file = self.__dict__['file']
        a = getattr(file, name)
        if type(a) != type(0):
            setattr(self, name, a)
        return a


def TemporaryFile(mode='w+b', bufsize=-1, suffix=""):
    """Create and return a temporary file (opened read-write by default)."""
    name = mktemp(suffix)
    if os.name == 'posix':
        # Unix -- be very careful
        fd = os.open(name, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0700)
        try:
            os.unlink(name)
            return os.fdopen(fd, mode, bufsize)
        except:
            os.close(fd)
            raise
    else:
        # Non-unix -- can't unlink file that's still open, use wrapper
        file = open(name, mode, bufsize)
        return TemporaryFileWrapper(file, name)

# In order to generate unique names, mktemp() uses _counter.get_next().
# This returns a unique integer on each call, in a threadsafe way (i.e.,
# multiple threads will never see the same integer).  The integer will
# usually be a Python int, but if _counter.get_next() is called often
# enough, it will become a Python long.
# Note that the only name that survives this next block of code
# is "_counter".

class _ThreadSafeCounter:
    def __init__(self, mutex, initialvalue=0):
        self.mutex = mutex
        self.i = initialvalue

    def get_next(self):
        self.mutex.acquire()
        result = self.i
        try:
            newi = result + 1
        except OverflowError:
            newi = long(result) + 1
        self.i = newi
        self.mutex.release()
        return result

try:
    import thread

except ImportError:
    class _DummyMutex:
        def acquire(self):
            pass

        release = acquire

    _counter = _ThreadSafeCounter(_DummyMutex())
    del _DummyMutex

else:
    _counter = _ThreadSafeCounter(thread.allocate_lock())
    del thread

del _ThreadSafeCounter