summaryrefslogtreecommitdiffstats
path: root/Lib/sqlite3/dump.py
blob: 57e6a3b4f1e6eba0d399d09dbe0df157f36d93b9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# Mimic the sqlite3 console shell's .dump command
# Author: Paul Kippes <kippesp@gmail.com>

# Every identifier in sql is quoted based on a comment in sqlite
# documentation "SQLite adds new keywords from time to time when it
# takes on new features. So to prevent your code from being broken by
# future enhancements, you should normally quote any identifier that
# is an English language word, even if you do not have to."

def _quote_name(name):
    return '"{0}"'.format(name.replace('"', '""'))


def _quote_value(value):
    return "'{0}'".format(value.replace("'", "''"))


def _iterdump(connection, *, filter=None):
    """
    Returns an iterator to the dump of the database in an SQL text format.

    Used to produce an SQL dump of the database.  Useful to save an in-memory
    database for later restoration.  This function should not be called
    directly but instead called from the Connection method, iterdump().
    """

    writeable_schema = False
    cu = connection.cursor()
    cu.row_factory = None  # Make sure we get predictable results.
    # Disable foreign key constraints, if there is any foreign key violation.
    violations = cu.execute("PRAGMA foreign_key_check").fetchall()
    if violations:
        yield('PRAGMA foreign_keys=OFF;')
    yield('BEGIN TRANSACTION;')

    if filter:
        # Return database objects which match the filter pattern.
        filter_name_clause = 'AND "name" LIKE ?'
        params = [filter]
    else:
        filter_name_clause = ""
        params = []
    # sqlite_master table contains the SQL CREATE statements for the database.
    q = f"""
        SELECT "name", "type", "sql"
        FROM "sqlite_master"
            WHERE "sql" NOT NULL AND
            "type" == 'table'
            {filter_name_clause}
            ORDER BY "name"
        """
    schema_res = cu.execute(q, params)
    sqlite_sequence = []
    for table_name, type, sql in schema_res.fetchall():
        if table_name == 'sqlite_sequence':
            rows = cu.execute('SELECT * FROM "sqlite_sequence";')
            sqlite_sequence = ['DELETE FROM "sqlite_sequence"']
            sqlite_sequence += [
                f'INSERT INTO "sqlite_sequence" VALUES({_quote_value(table_name)},{seq_value})'
                for table_name, seq_value in rows.fetchall()
            ]
            continue
        elif table_name == 'sqlite_stat1':
            yield('ANALYZE "sqlite_master";')
        elif table_name.startswith('sqlite_'):
            continue
        elif sql.startswith('CREATE VIRTUAL TABLE'):
            if not writeable_schema:
                writeable_schema = True
                yield('PRAGMA writable_schema=ON;')
            yield("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)"
                  "VALUES('table',{0},{0},0,{1});".format(
                      _quote_value(table_name),
                      _quote_value(sql),
                  ))
        else:
            yield('{0};'.format(sql))

        # Build the insert statement for each row of the current table
        table_name_ident = _quote_name(table_name)
        res = cu.execute(f'PRAGMA table_info({table_name_ident})')
        column_names = [str(table_info[1]) for table_info in res.fetchall()]
        q = "SELECT 'INSERT INTO {0} VALUES('{1}')' FROM {0};".format(
            table_name_ident,
            "','".join(
                "||quote({0})||".format(_quote_name(col)) for col in column_names
            )
        )
        query_res = cu.execute(q)
        for row in query_res:
            yield("{0};".format(row[0]))

    # Now when the type is 'index', 'trigger', or 'view'
    q = f"""
        SELECT "name", "type", "sql"
        FROM "sqlite_master"
            WHERE "sql" NOT NULL AND
            "type" IN ('index', 'trigger', 'view')
            {filter_name_clause}
        """
    schema_res = cu.execute(q, params)
    for name, type, sql in schema_res.fetchall():
        yield('{0};'.format(sql))

    if writeable_schema:
        yield('PRAGMA writable_schema=OFF;')

    # gh-79009: Yield statements concerning the sqlite_sequence table at the
    # end of the transaction.
    for row in sqlite_sequence:
        yield('{0};'.format(row))

    yield('COMMIT;')