95 lines
1.9 KiB
Go
95 lines
1.9 KiB
Go
|
package database
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
|
||
|
"github.com/jackc/pgx/v4"
|
||
|
"github.com/jackc/pgx/v4/pgxpool"
|
||
|
"github.com/pkg/errors"
|
||
|
)
|
||
|
|
||
|
type VersionResolver struct {
|
||
|
pool *pgxpool.Pool
|
||
|
}
|
||
|
|
||
|
func (r *VersionResolver) Current(ctx context.Context) (string, error) {
|
||
|
var version string
|
||
|
|
||
|
err := WithTx(ctx, r.pool, func(ctx context.Context, tx pgx.Tx) error {
|
||
|
err := tx.QueryRow(ctx, `SELECT version FROM database_schema WHERE is_current = true;`).
|
||
|
Scan(&version)
|
||
|
|
||
|
if errors.Is(err, pgx.ErrNoRows) {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
return err
|
||
|
})
|
||
|
|
||
|
if err != nil {
|
||
|
return "", errors.Wrap(err, "could execute version resolver init transaction")
|
||
|
}
|
||
|
|
||
|
return version, nil
|
||
|
}
|
||
|
|
||
|
func (r *VersionResolver) Set(ctx context.Context, version string) error {
|
||
|
err := WithTx(ctx, r.pool, func(ctx context.Context, tx pgx.Tx) error {
|
||
|
if version != "" {
|
||
|
_, err := tx.Exec(ctx, `
|
||
|
INSERT INTO database_schema (version, is_current, migrated_at)
|
||
|
VALUES
|
||
|
(
|
||
|
$1,
|
||
|
true,
|
||
|
now()
|
||
|
)
|
||
|
ON CONFLICT ON CONSTRAINT unique_version
|
||
|
DO UPDATE SET migrated_at = now(), is_current = true;
|
||
|
`, version)
|
||
|
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
_, err := tx.Exec(ctx, `
|
||
|
UPDATE database_schema SET is_current = false, migrated_at = null WHERE version <> $1;
|
||
|
`, version)
|
||
|
|
||
|
return err
|
||
|
})
|
||
|
|
||
|
if err != nil {
|
||
|
return errors.Wrap(err, "could not update schema version")
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (r *VersionResolver) Init(ctx context.Context) error {
|
||
|
err := WithTx(ctx, r.pool, func(ctx context.Context, tx pgx.Tx) error {
|
||
|
_, err := tx.Exec(ctx, `
|
||
|
CREATE TABLE IF NOT EXISTS database_schema(
|
||
|
version TEXT NOT NULL,
|
||
|
migrated_at TIME,
|
||
|
is_current BOOLEAN,
|
||
|
CONSTRAINT unique_version UNIQUE(version)
|
||
|
);`)
|
||
|
|
||
|
return err
|
||
|
})
|
||
|
|
||
|
if err != nil {
|
||
|
return errors.Wrap(err, "could execute version resolver init transaction")
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func NewVersionResolver(pool *pgxpool.Pool) *VersionResolver {
|
||
|
return &VersionResolver{
|
||
|
pool: pool,
|
||
|
}
|
||
|
}
|