This repository has been archived by the owner on Mar 11, 2021. It is now read-only.
/
migration.go
268 lines (225 loc) · 8.33 KB
/
migration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package migration
import (
"bufio"
"bytes"
"database/sql"
"net/http"
"net/url"
"sync"
"text/template"
"github.com/fabric8-services/fabric8-common/log"
"github.com/goadesign/goa"
"github.com/goadesign/goa/client"
errs "github.com/pkg/errors"
"golang.org/x/net/context"
)
// AdvisoryLockID is a random number that should be used within the application
// by anybody who wants to modify the "version" table.
const AdvisoryLockID = 42
// fn defines the type of function that can be part of a migration steps
type fn func(tx *sql.Tx) error
// steps defines a collection of all the functions that make up a version
type steps []fn
// migrations defines all a collection of all the steps
type migrations []steps
// mutex variable to lock/unlock the population of common types
var populateLocker = &sync.Mutex{}
// Migrate executes the required migration of the database on startup.
// For each successful migration, an entry will be written into the "version"
// table, that states when a certain version was reached.
func Migrate(db *sql.DB) error {
var err error
if db == nil {
return errs.Errorf("Database handle is nil\n")
}
m := getMigrations()
var tx *sql.Tx
for nextVersion := int64(0); nextVersion < int64(len(m)) && err == nil; nextVersion++ {
tx, err = db.Begin()
if err != nil {
return errs.Errorf("Failed to start transaction: %s\n", err)
}
err = migrateToNextVersion(tx, &nextVersion, m)
if err != nil {
oldErr := err
log.Info(nil, map[string]interface{}{
"next_version": nextVersion,
"migrations": m,
"err": err,
}, "Rolling back transaction due to: %v", err)
if err = tx.Rollback(); err != nil {
log.Error(nil, map[string]interface{}{
"next_version": nextVersion,
"migrations": m,
"err": err,
}, "error while rolling back transaction: %s", err)
return errs.Errorf("Error while rolling back transaction: %s\n", err)
}
return oldErr
}
if err = tx.Commit(); err != nil {
log.Error(nil, map[string]interface{}{
"migrations": m,
"err": err,
}, "error during transaction commit: %v", err)
return errs.Errorf("Error during transaction commit: %s\n", err)
}
}
if err != nil {
log.Error(nil, map[string]interface{}{
"migrations": m,
"err": err,
}, "migration failed with error: %v", err)
return errs.Errorf("Migration failed with error: %s\n", err)
}
return nil
}
// getMigrations returns the migrations all the migrations we have.
// Add your own migration to the end of this function.
// IMPORTANT: ALWAYS APPEND AT THE END AND DON'T CHANGE THE ORDER OF MIGRATIONS!
func getMigrations() migrations {
m := migrations{}
m = append(m, steps{executeSQLFile("000-bootstrap.sql")})
m = append(m, steps{executeSQLFile("001-tenant-and-namespaces.sql")})
m = append(m, steps{executeSQLFile("002-remove-dup-ns.sql")})
m = append(m, steps{executeSQLFile("003-profiles.sql")})
m = append(m, steps{executeSQLFile("004-index-tenants-search.sql")})
m = append(m, steps{executeSQLFile("005-add-username-column-to-tenant.sql")})
m = append(m, steps{executeSQLFile("006-add-ns-base-name-column-to-tenant.sql")})
m = append(m, steps{executeSQLFile("007-create-tenants-update-table.sql")})
m = append(m, steps{executeSQLFile("008-add-can-continue-column-to-tenants-update.sql")})
m = append(m, steps{executeSQLFile("009-index-namespace-name.sql")})
m = append(m, steps{executeSQLFile("010-delete-run-stage-jenkins.sql")})
// Version N
//
// In order to add an upgrade, simply append an array of MigrationFunc to the
// the end of the "migrations" slice. The version numbers are determined by
// the index in the array. The following code in comments show how you can
// do a migration in 3 steps. If one of the steps fails, the others are not
// executed.
// If something goes wrong during the migration, all you need to do is return
// an error that is not nil.
/*
m = append(m, steps{
func(db *sql.Tx) error {
// Execute random go code
return nil
},
executeSQLFile("YOUR_OWN_FILE.sql"),
func(db *sql.Tx) error {
// Execute random go code
return nil
},
})
*/
return m
}
// executeSQLFile loads the given filename from the packaged SQL files and
// executes it on the given database. Golang text/template module is used
// to handle all the optional arguments passed to the sql files
func executeSQLFile(filename string, args ...string) fn {
return func(db *sql.Tx) error {
data, err := Asset(filename)
if err != nil {
return errs.WithStack(err)
}
if len(args) > 0 {
tmpl, err := template.New("sql").Parse(string(data))
if err != nil {
return errs.WithStack(err)
}
var sqlScript bytes.Buffer
writer := bufio.NewWriter(&sqlScript)
err = tmpl.Execute(writer, args)
if err != nil {
return errs.WithStack(err)
}
// We need to flush the content of the writer
writer.Flush()
_, err = db.Exec(sqlScript.String())
} else {
_, err = db.Exec(string(data))
}
return errs.WithStack(err)
}
}
// migrateToNextVersion migrates the database to the nextVersion.
// If the database is already at nextVersion or higher, the nextVersion
// will be set to the actual next version.
func migrateToNextVersion(tx *sql.Tx, nextVersion *int64, m migrations) error {
// Obtain exclusive transaction level advisory that doesn't depend on any table.
// Once obtained, the lock is held for the remainder of the current transaction.
// (There is no UNLOCK TABLE command; locks are always released at transaction end.)
if _, err := tx.Exec("SELECT pg_advisory_xact_lock($1)", AdvisoryLockID); err != nil {
return errs.Errorf("Failed to acquire lock: %s\n", err)
}
// Determine current version and adjust the outmost loop
// iterator variable "version"
currentVersion, err := getCurrentVersion(tx)
if err != nil {
return errs.WithStack(err)
}
*nextVersion = currentVersion + 1
if *nextVersion >= int64(len(m)) {
// No further updates to apply (this is NOT an error)
log.Info(nil, map[string]interface{}{
"next_version": *nextVersion,
"current_version": currentVersion,
}, "Current version %d. Nothing to update.", currentVersion)
return nil
}
log.Info(nil, map[string]interface{}{
"next_version": *nextVersion,
"current_version": currentVersion,
}, "Attempt to update DB to version %v", *nextVersion)
// Apply all the updates of the next version
for j := range m[*nextVersion] {
if err := m[*nextVersion][j](tx); err != nil {
return errs.Errorf("Failed to execute migration of step %d of version %d: %s\n", j, *nextVersion, err)
}
}
if _, err := tx.Exec("INSERT INTO version(version) VALUES($1)", *nextVersion); err != nil {
return errs.Errorf("Failed to update DB to version %d: %s\n", *nextVersion, err)
}
log.Info(nil, map[string]interface{}{
"next_version": *nextVersion,
"current_version": currentVersion,
}, "Successfully updated DB to version %v", *nextVersion)
return nil
}
// getCurrentVersion returns the highest version from the version
// table or -1 if that table does not exist.
//
// Returning -1 simplifies the logic of the migration process because
// the next version is always the current version + 1 which results
// in -1 + 1 = 0 which is exactly what we want as the first version.
func getCurrentVersion(db *sql.Tx) (int64, error) {
row := db.QueryRow("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name='version')")
var exists bool
if err := row.Scan(&exists); err != nil {
return -1, errs.Errorf("Failed to scan if table \"version\" exists: %s\n", err)
}
if !exists {
// table doesn't exist
return -1, nil
}
row = db.QueryRow("SELECT max(version) as current FROM version")
var current int64 = -1
if err := row.Scan(¤t); err != nil {
return -1, errs.Errorf("Failed to scan max version in table \"version\": %s\n", err)
}
return current, nil
}
// NewMigrationContext aims to create a new goa context where to initialize the
// request and req_id context keys.
// NOTE: We need this function to initialize the goa.ContextRequest
func NewMigrationContext(ctx context.Context) context.Context {
req := &http.Request{Host: "localhost"}
params := url.Values{}
ctx = goa.NewContext(ctx, nil, req, params)
// set a random request ID for the context
var req_id string
ctx, req_id = client.ContextWithRequestID(ctx)
log.Debug(ctx, nil, "Initialized the migration context with Request ID: %v", req_id)
return ctx
}