summaryrefslogtreecommitdiffstats
path: root/Source/cmQtAutoGenerator.h
blob: ff4c4c942af1b65caf857d38d481ad12becd7a90 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/* Distributed under the OSI-approved BSD 3-Clause License.  See accompanying
   file Copyright.txt or https://cmake.org/licensing for details.  */
#ifndef cmQtAutoGenerator_h
#define cmQtAutoGenerator_h

#include "cmConfigure.h" // IWYU pragma: keep

#include "cmFileTime.h"
#include "cmQtAutoGen.h"

#include <mutex>
#include <string>
#include <vector>

class cmMakefile;

/** \class cmQtAutoGenerator
 * \brief Base class for QtAutoGen generators
 */
class cmQtAutoGenerator : public cmQtAutoGen
{
public:
  // -- Types

  /**
   * Thread safe logger
   */
  class Logger
  {
  public:
    // -- Construction
    Logger();
    ~Logger();
    // -- Verbosity
    unsigned int Verbosity() const { return this->Verbosity_; }
    void SetVerbosity(unsigned int value) { this->Verbosity_ = value; }
    void RaiseVerbosity(std::string const& value);
    bool Verbose() const { return (this->Verbosity_ != 0); }
    void SetVerbose(bool value) { this->Verbosity_ = value ? 1 : 0; }
    // -- Color output
    bool ColorOutput() const { return this->ColorOutput_; }
    void SetColorOutput(bool value);
    // -- Log info
    void Info(GenT genType, std::string const& message) const;
    // -- Log warning
    void Warning(GenT genType, std::string const& message) const;
    void WarningFile(GenT genType, std::string const& filename,
                     std::string const& message) const;
    // -- Log error
    void Error(GenT genType, std::string const& message) const;
    void ErrorFile(GenT genType, std::string const& filename,
                   std::string const& message) const;
    void ErrorCommand(GenT genType, std::string const& message,
                      std::vector<std::string> const& command,
                      std::string const& output) const;

  private:
    static std::string HeadLine(std::string const& title);

  private:
    mutable std::mutex Mutex_;
    unsigned int Verbosity_ = 0;
    bool ColorOutput_ = false;
  };

  // -- File system methods
  static bool MakeParentDirectory(std::string const& filename);
  static bool FileRead(std::string& content, std::string const& filename,
                       std::string* error = nullptr);
  static bool FileWrite(std::string const& filename,
                        std::string const& content,
                        std::string* error = nullptr);
  static bool FileDiffers(std::string const& filename,
                          std::string const& content);

public:
  // -- Constructors
  cmQtAutoGenerator();
  virtual ~cmQtAutoGenerator();

  cmQtAutoGenerator(cmQtAutoGenerator const&) = delete;
  cmQtAutoGenerator& operator=(cmQtAutoGenerator const&) = delete;

  // -- Run
  bool Run(std::string const& infoFile, std::string const& config);

  // -- InfoFile
  std::string const& InfoFile() const { return InfoFile_; }
  cmFileTime const& InfoFileTime() const { return InfoFileTime_; }
  std::string const& InfoDir() const { return InfoDir_; }
  std::string const& InfoConfig() const { return InfoConfig_; }

  // -- Utility
  static std::string SettingsFind(std::string const& content, const char* key);

protected:
  // -- Abstract processing interface
  virtual bool Init(cmMakefile* makefile) = 0;
  virtual bool Process() = 0;

private:
  // -- Info settings
  std::string InfoFile_;
  cmFileTime InfoFileTime_;
  std::string InfoDir_;
  std::string InfoConfig_;
};

#endif
#n520'>520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
"""Unit tests for io.py."""

import sys
import unittest
from itertools import chain
from test import test_support

import io  # The module under test


class MockRawIO(io.RawIOBase):

    def __init__(self, read_stack=()):
        self._read_stack = list(read_stack)
        self._write_stack = []

    def read(self, n=None):
        try:
            return self._read_stack.pop(0)
        except:
            return b""

    def write(self, b):
        self._write_stack.append(b[:])
        return len(b)

    def writable(self):
        return True

    def fileno(self):
        return 42

    def readable(self):
        return True

    def seekable(self):
        return True

    def seek(self, pos, whence):
        pass

    def tell(self):
        return 42


class MockFileIO(io.BytesIO):

    def __init__(self, data):
        self.read_history = []
        io.BytesIO.__init__(self, data)

    def read(self, n=None):
        res = io.BytesIO.read(self, n)
        self.read_history.append(None if res is None else len(res))
        return res


class MockNonBlockWriterIO(io.RawIOBase):

    def __init__(self, blocking_script):
        self._blocking_script = list(blocking_script)
        self._write_stack = []

    def write(self, b):
        self._write_stack.append(b[:])
        n = self._blocking_script.pop(0)
        if (n < 0):
            raise io.BlockingIOError(0, "test blocking", -n)
        else:
            return n

    def writable(self):
        return True


class IOTest(unittest.TestCase):

    def tearDown(self):
        test_support.unlink(test_support.TESTFN)

    def write_ops(self, f):
        self.assertEqual(f.write(b"blah."), 5)
        self.assertEqual(f.seek(0), 0)
        self.assertEqual(f.write(b"Hello."), 6)
        self.assertEqual(f.tell(), 6)
        self.assertEqual(f.seek(-1, 1), 5)
        self.assertEqual(f.tell(), 5)
        self.assertEqual(f.write(" world\n\n\n"), 9)
        self.assertEqual(f.seek(0), 0)
        self.assertEqual(f.write("h"), 1)
        self.assertEqual(f.seek(-1, 2), 13)
        self.assertEqual(f.tell(), 13)
        self.assertEqual(f.truncate(12), 12)
        self.assertEqual(f.tell(), 12)

    def read_ops(self, f, buffered=False):
        data = f.read(5)
        self.assertEqual(data, b"hello")
        self.assertEqual(f.readinto(data), 5)
        self.assertEqual(data, b" worl")
        self.assertEqual(f.readinto(data), 2)
        self.assertEqual(len(data), 5)
        self.assertEqual(data[:2], b"d\n")
        self.assertEqual(f.seek(0), 0)
        self.assertEqual(f.read(20), b"hello world\n")
        self.assertEqual(f.read(1), b"")
        self.assertEqual(f.readinto(b"x"), 0)
        self.assertEqual(f.seek(-6, 2), 6)
        self.assertEqual(f.read(5), b"world")
        self.assertEqual(f.read(0), b"")
        self.assertEqual(f.readinto(b""), 0)
        self.assertEqual(f.seek(-6, 1), 5)
        self.assertEqual(f.read(5), b" worl")
        self.assertEqual(f.tell(), 10)
        if buffered:
            f.seek(0)
            self.assertEqual(f.read(), b"hello world\n")
            f.seek(6)
            self.assertEqual(f.read(), b"world\n")
            self.assertEqual(f.read(), b"")

    LARGE = 2**31

    def large_file_ops(self, f):
        assert f.readable()
        assert f.writable()
        self.assertEqual(f.seek(self.LARGE), self.LARGE)
        self.assertEqual(f.tell(), self.LARGE)
        self.assertEqual(f.write(b"xxx"), 3)
        self.assertEqual(f.tell(), self.LARGE + 3)
        self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
        self.assertEqual(f.truncate(), self.LARGE + 2)
        self.assertEqual(f.tell(), self.LARGE + 2)
        self.assertEqual(f.seek(0, 2), self.LARGE + 2)
        self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
        self.assertEqual(f.tell(), self.LARGE + 1)
        self.assertEqual(f.seek(0, 2), self.LARGE + 1)
        self.assertEqual(f.seek(-1, 2), self.LARGE)
        self.assertEqual(f.read(2), b"x")

    def test_raw_file_io(self):
        f = io.open(test_support.TESTFN, "wb", buffering=0)
        self.assertEqual(f.readable(), False)
        self.assertEqual(f.writable(), True)
        self.assertEqual(f.seekable(), True)
        self.write_ops(f)
        f.close()
        f = io.open(test_support.TESTFN, "rb", buffering=0)
        self.assertEqual(f.readable(), True)
        self.assertEqual(f.writable(), False)
        self.assertEqual(f.seekable(), True)
        self.read_ops(f)
        f.close()

    def test_buffered_file_io(self):
        f = io.open(test_support.TESTFN, "wb")
        self.assertEqual(f.readable(), False)
        self.assertEqual(f.writable(), True)
        self.assertEqual(f.seekable(), True)
        self.write_ops(f)
        f.close()
        f = io.open(test_support.TESTFN, "rb")
        self.assertEqual(f.readable(), True)
        self.assertEqual(f.writable(), False)
        self.assertEqual(f.seekable(), True)
        self.read_ops(f, True)
        f.close()

    def test_raw_bytes_io(self):
        f = io.BytesIO()
        self.write_ops(f)
        data = f.getvalue()
        self.assertEqual(data, b"hello world\n")
        f = io.BytesIO(data)
        self.read_ops(f, True)

    def test_large_file_ops(self):
        # On Windows and Mac OSX this test comsumes large resources; It takes
        # a long time to build the >2GB file and takes >2GB of disk space
        # therefore the resource must be enabled to run this test.
        if sys.platform[:3] == 'win' or sys.platform == 'darwin':
            if not test_support.is_resource_enabled("largefile"):
                print("\nTesting large file ops skipped on %s." % sys.platform,
                      file=sys.stderr)
                print("It requires %d bytes and a long time." % self.LARGE,
                      file=sys.stderr)
                print("Use 'regrtest.py -u largefile test_io' to run it.",
                      file=sys.stderr)
                return
        f = io.open(test_support.TESTFN, "w+b", 0)
        self.large_file_ops(f)
        f.close()
        f = io.open(test_support.TESTFN, "w+b")
        self.large_file_ops(f)
        f.close()

    def test_with_open(self):
        for bufsize in (0, 1, 100):
            f = None
            with open(test_support.TESTFN, "wb", bufsize) as f:
                f.write("xxx")
            self.assertEqual(f.closed, True)
            f = None
            try:
                with open(test_support.TESTFN, "wb", bufsize) as f:
                    1/0
            except ZeroDivisionError:
                self.assertEqual(f.closed, True)
            else:
                self.fail("1/0 didn't raise an exception")

    def test_destructor(self):
        record = []
        class MyFileIO(io.FileIO):
            def __del__(self):
                record.append(1)
                io.FileIO.__del__(self)
            def close(self):
                record.append(2)
                io.FileIO.close(self)
            def flush(self):
                record.append(3)
                io.FileIO.flush(self)
        f = MyFileIO(test_support.TESTFN, "w")
        f.write("xxx")
        del f
        self.assertEqual(record, [1, 2, 3])

    def test_close_flushes(self):
        f = io.open(test_support.TESTFN, "wb")
        f.write("xxx")
        f.close()
        f = io.open(test_support.TESTFN, "rb")
        self.assertEqual(f.read(), b"xxx")
        f.close()


class MemorySeekTestMixin:

    def testInit(self):
        buf = self.buftype("1234567890")
        bytesIo = self.ioclass(buf)

    def testRead(self):
        buf = self.buftype("1234567890")
        bytesIo = self.ioclass(buf)

        self.assertEquals(buf[:1], bytesIo.read(1))
        self.assertEquals(buf[1:5], bytesIo.read(4))
        self.assertEquals(buf[5:], bytesIo.read(900))
        self.assertEquals(self.EOF, bytesIo.read())

    def testReadNoArgs(self):
        buf = self.buftype("1234567890")
        bytesIo = self.ioclass(buf)

        self.assertEquals(buf, bytesIo.read())
        self.assertEquals(self.EOF, bytesIo.read())

    def testSeek(self):
        buf = self.buftype("1234567890")
        bytesIo = self.ioclass(buf)

        bytesIo.read(5)
        bytesIo.seek(0)
        self.assertEquals(buf, bytesIo.read())

        bytesIo.seek(3)
        self.assertEquals(buf[3:], bytesIo.read())

    def testTell(self):
        buf = self.buftype("1234567890")
        bytesIo = self.ioclass(buf)

        self.assertEquals(0, bytesIo.tell())
        bytesIo.seek(5)
        self.assertEquals(5, bytesIo.tell())
        bytesIo.seek(10000)
        self.assertEquals(10000, bytesIo.tell())


class BytesIOTest(MemorySeekTestMixin, unittest.TestCase):
    buftype = bytes
    ioclass = io.BytesIO
    EOF = b""


class StringIOTest(MemorySeekTestMixin, unittest.TestCase):
    buftype = str
    ioclass = io.StringIO
    EOF = ""


class BufferedReaderTest(unittest.TestCase):

    def testRead(self):
        rawio = MockRawIO((b"abc", b"d", b"efg"))
        bufio = io.BufferedReader(rawio)

        self.assertEquals(b"abcdef", bufio.read(6))

    def testBuffering(self):
        data = b"abcdefghi"
        dlen = len(data)

        tests = [
            [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
            [ 100, [ 3, 3, 3],     [ dlen ]    ],
            [   4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
        ]

        for bufsize, buf_read_sizes, raw_read_sizes in tests:
            rawio = MockFileIO(data)
            bufio = io.BufferedReader(rawio, buffer_size=bufsize)
            pos = 0
            for nbytes in buf_read_sizes:
                self.assertEquals(bufio.read(nbytes), data[pos:pos+nbytes])
                pos += nbytes
            self.assertEquals(rawio.read_history, raw_read_sizes)

    def testReadNonBlocking(self):
        # Inject some None's in there to simulate EWOULDBLOCK
        rawio = MockRawIO((b"abc", b"d", None, b"efg", None, None))
        bufio = io.BufferedReader(rawio)

        self.assertEquals(b"abcd", bufio.read(6))
        self.assertEquals(b"e", bufio.read(1))
        self.assertEquals(b"fg", bufio.read())
        self.assert_(None is bufio.read())
        self.assertEquals(b"", bufio.read())

    def testReadToEof(self):
        rawio = MockRawIO((b"abc", b"d", b"efg"))
        bufio = io.BufferedReader(rawio)

        self.assertEquals(b"abcdefg", bufio.read(9000))

    def testReadNoArgs(self):
        rawio = MockRawIO((b"abc", b"d", b"efg"))
        bufio = io.BufferedReader(rawio)

        self.assertEquals(b"abcdefg", bufio.read())

    def testFileno(self):
        rawio = MockRawIO((b"abc", b"d", b"efg"))
        bufio = io.BufferedReader(rawio)

        self.assertEquals(42, bufio.fileno())

    def testFilenoNoFileno(self):
        # XXX will we always have fileno() function? If so, kill
        # this test. Else, write it.
        pass


class BufferedWriterTest(unittest.TestCase):

    def testWrite(self):
        # Write to the buffered IO but don't overflow the buffer.
        writer = MockRawIO()
        bufio = io.BufferedWriter(writer, 8)

        bufio.write(b"abc")

        self.assertFalse(writer._write_stack)

    def testWriteOverflow(self):
        writer = MockRawIO()
        bufio = io.BufferedWriter(writer, 8)

        bufio.write(b"abc")
        bufio.write(b"defghijkl")

        self.assertEquals(b"abcdefghijkl", writer._write_stack[0])

    def testWriteNonBlocking(self):
        raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
        bufio = io.BufferedWriter(raw, 8, 16)

        bufio.write(b"asdf")
        bufio.write(b"asdfa")
        self.assertEquals(b"asdfasdfa", raw._write_stack[0])

        bufio.write(b"asdfasdfasdf")
        self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
        bufio.write(b"asdfasdfasdf")
        self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
        self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])

        bufio.write(b"asdfasdfasdf")

        # XXX I don't like this test. It relies too heavily on how the
        # algorithm actually works, which we might change. Refactor
        # later.

    def testFileno(self):
        rawio = MockRawIO((b"abc", b"d", b"efg"))
        bufio = io.BufferedWriter(rawio)

        self.assertEquals(42, bufio.fileno())

    def testFlush(self):
        writer = MockRawIO()
        bufio = io.BufferedWriter(writer, 8)

        bufio.write(b"abc")
        bufio.flush()

        self.assertEquals(b"abc", writer._write_stack[0])


class BufferedRWPairTest(unittest.TestCase):

    def testRWPair(self):
        r = MockRawIO(())
        w = MockRawIO()
        pair = io.BufferedRWPair(r, w)

        # XXX need implementation


class BufferedRandomTest(unittest.TestCase):

    def testReadAndWrite(self):
        raw = MockRawIO((b"asdf", b"ghjk"))
        rw = io.BufferedRandom(raw, 8, 12)

        self.assertEqual(b"as", rw.read(2))
        rw.write(b"ddd")
        rw.write(b"eee")
        self.assertFalse(raw._write_stack) # Buffer writes
        self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
        self.assertEquals(b"dddeee", raw._write_stack[0])

    def testSeekAndTell(self):
        raw = io.BytesIO(b"asdfghjkl")
        rw = io.BufferedRandom(raw)

        self.assertEquals(b"as", rw.read(2))
        self.assertEquals(2, rw.tell())
        rw.seek(0, 0)
        self.assertEquals(b"asdf", rw.read(4))

        rw.write(b"asdf")
        rw.seek(0, 0)
        self.assertEquals(b"asdfasdfl", rw.read())
        self.assertEquals(9, rw.tell())
        rw.seek(-4, 2)
        self.assertEquals(5, rw.tell())
        rw.seek(2, 1)
        self.assertEquals(7, rw.tell())
        self.assertEquals(b"fl", rw.read(11))


class TextIOWrapperTest(unittest.TestCase):

##     def tearDown(self):
##         test_support.unlink(test_support.TESTFN)

    def testNewlines(self):
        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]

        tests = [
            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
            [ '\n', input_lines ],
            [ '\r\n', input_lines ],
        ]

        encodings = ('utf-8', 'bz2')

        # Try a range of pad sizes to test the case where \r is the last
        # character in TextIOWrapper._pending_line.
        for encoding in encodings:
            for do_reads in (False, True):
                for padlen in chain(range(10), range(50, 60)):
                    pad = '.' * padlen
                    data_lines = [ pad + line for line in input_lines ]
                    # XXX: str.encode() should return bytes
                    data = bytes(''.join(data_lines).encode(encoding))

                    for newline, exp_line_ends in tests:
                        exp_lines = [ pad + line for line in exp_line_ends ]
                        bufio = io.BufferedReader(io.BytesIO(data))
                        textio = io.TextIOWrapper(bufio, newline=newline,
                                                  encoding=encoding)
                        if do_reads:
                            got_lines = []
                            while True:
                                c2 = textio.read(2)
                                if c2 == '':
                                    break
                                self.assertEquals(len(c2), 2)
                                got_lines.append(c2 + textio.readline())
                        else:
                            got_lines = list(textio)

                        for got_line, exp_line in zip(got_lines, exp_lines):
                            self.assertEquals(got_line, exp_line)
                        self.assertEquals(len(got_lines), len(exp_lines))

    # Systematic tests of the text I/O API

    def testBasicIO(self):
        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
            for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
                f = io.open(test_support.TESTFN, "w+", encoding=enc)
                f._CHUNK_SIZE = chunksize
                self.assertEquals(f.write("abc"), 3)
                f.close()
                f = io.open(test_support.TESTFN, "r+", encoding=enc)
                f._CHUNK_SIZE = chunksize
                self.assertEquals(f.tell(), 0)
                self.assertEquals(f.read(), "abc")
                cookie = f.tell()
                self.assertEquals(f.seek(0), 0)
                self.assertEquals(f.read(2), "ab")
                self.assertEquals(f.read(1), "c")
                self.assertEquals(f.read(1), "")
                self.assertEquals(f.read(), "")
                self.assertEquals(f.tell(), cookie)
                self.assertEquals(f.seek(0), 0)
                self.assertEquals(f.seek(0, 2), cookie)
                self.assertEquals(f.write("def"), 3)
                self.assertEquals(f.seek(cookie), cookie)
                self.assertEquals(f.read(), "def")
                if enc.startswith("utf"):
                    self.multi_line_test(f, enc)
                f.close()

    def multi_line_test(self, f, enc):
        f.seek(0)
        f.truncate()
        sample = u"s\xff\u0fff\uffff"
        wlines = []
        for size in (0, 1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65,
                     100, 200, 300, 400, 500, 1000):
            chars = []
            for i in xrange(size):
                chars.append(sample[i % len(sample)])
            line = u"".join(chars) + "\n"
            wlines.append((f.tell(), line))
            f.write(line)
        wendpos = f.tell()
        f.seek(0)
        rlines = []
        while True:
            pos = f.tell()
            line = f.readline()
            if not line:
                rendpos = pos
                break
            rlines.append((pos, line))
        self.assertEquals(rendpos, wendpos)
        self.assertEquals(rlines, wlines)


# XXX Tests for open()

def test_main():
    test_support.run_unittest(IOTest, BytesIOTest, StringIOTest,
                              BufferedReaderTest,
                              BufferedWriterTest, BufferedRWPairTest,
                              BufferedRandomTest, TextIOWrapperTest)

if __name__ == "__main__":
    unittest.main()