feat(backend): add data export

This commit is contained in:
Sam 2023-03-15 15:24:51 +01:00
parent ded9d06e4a
commit 15109819df
No known key found for this signature in database
GPG Key ID: B4EF20DDE721CAA1
13 changed files with 559 additions and 4 deletions

View File

@ -221,10 +221,26 @@ func (db *DB) DeleteMemberAvatar(ctx context.Context, memberID xid.ID, hash stri
return errors.Wrap(err, "deleting webp avatar")
}
err = db.minio.RemoveObject(ctx, db.minioBucket, "/members/"+memberID.String()+"/"+hash+".webp", minio.RemoveObjectOptions{})
err = db.minio.RemoveObject(ctx, db.minioBucket, "/members/"+memberID.String()+"/"+hash+".jpg", minio.RemoveObjectOptions{})
if err != nil {
return errors.Wrap(err, "deleting jpeg avatar")
}
return nil
}
func (db *DB) UserAvatar(ctx context.Context, userID xid.ID, hash string) (io.ReadCloser, error) {
obj, err := db.minio.GetObject(ctx, db.minioBucket, "/users/"+userID.String()+"/"+hash+".webp", minio.GetObjectOptions{})
if err != nil {
return nil, errors.Wrap(err, "getting object")
}
return obj, nil
}
func (db *DB) MemberAvatar(ctx context.Context, memberID xid.ID, hash string) (io.ReadCloser, error) {
obj, err := db.minio.GetObject(ctx, db.minioBucket, "/members/"+memberID.String()+"/"+hash+".webp", minio.GetObjectOptions{})
if err != nil {
return nil, errors.Wrap(err, "getting object")
}
return obj, nil
}

104
backend/db/export.go Normal file
View File

@ -0,0 +1,104 @@
package db
import (
"bytes"
"context"
"time"
"emperror.dev/errors"
"github.com/georgysavva/scany/pgxscan"
"github.com/jackc/pgx/v4"
"github.com/minio/minio-go/v7"
"github.com/rs/xid"
)
type DataExport struct {
ID int64
UserID xid.ID
Filename string
CreatedAt time.Time
}
func (de DataExport) Path() string {
return "/exports/" + de.UserID.String() + "/" + de.Filename + ".zip"
}
const ErrNoExport = errors.Sentinel("no data export exists")
const KeepExportTime = 7 * 24 * time.Hour
func (db *DB) UserExport(ctx context.Context, userID xid.ID) (de DataExport, err error) {
sql, args, err := sq.Select("*").
From("data_exports").
Where("user_id = ?", userID).
OrderBy("id DESC").
Limit(1).ToSql()
if err != nil {
return de, errors.Wrap(err, "building query")
}
err = pgxscan.Get(ctx, db, &de, sql, args...)
if err != nil {
if errors.Cause(err) == pgx.ErrNoRows {
return de, ErrNoExport
}
return de, errors.Wrap(err, "executing sql")
}
return de, nil
}
const recentExport = 24 * time.Hour
func (db *DB) HasRecentExport(ctx context.Context, userID xid.ID) (hasExport bool, err error) {
err = db.QueryRow(ctx,
"SELECT EXISTS(SELECT * FROM data_exports WHERE user_id = $1 AND created_at > $2)",
userID, time.Now().Add(-recentExport)).Scan(&hasExport)
if err != nil {
return false, errors.Wrap(err, "executing query")
}
return hasExport, nil
}
func (db *DB) CreateExport(ctx context.Context, userID xid.ID, filename string, file *bytes.Buffer) (de DataExport, err error) {
de = DataExport{
UserID: userID,
Filename: filename,
}
_, err = db.minio.PutObject(ctx, db.minioBucket, de.Path(), file, int64(file.Len()), minio.PutObjectOptions{
ContentType: "application/zip",
})
if err != nil {
return de, errors.Wrap(err, "writing export file")
}
sql, args, err := sq.Insert("data_exports").Columns("user_id", "filename").Values(userID, filename).ToSql()
if err != nil {
return de, errors.Wrap(err, "building query")
}
pgxscan.Get(ctx, db, &de, sql, args...)
if err != nil {
return de, errors.Wrap(err, "executing sql")
}
return de, nil
}
func (db *DB) DeleteExport(ctx context.Context, de DataExport) (err error) {
sql, args, err := sq.Delete("data_exports").Where("id = ?", de.ID).ToSql()
if err != nil {
return errors.Wrap(err, "building query")
}
err = db.minio.RemoveObject(ctx, db.minioBucket, de.Path(), minio.RemoveObjectOptions{})
if err != nil {
return errors.Wrap(err, "deleting export zip")
}
_, err = db.Exec(ctx, sql, args...)
if err != nil {
return errors.Wrap(err, "executing sql")
}
return nil
}

View File

@ -61,7 +61,7 @@ func (db *DB) UserMember(ctx context.Context, userID xid.ID, memberRef string) (
// UserMembers returns all of a user's members, sorted by name.
func (db *DB) UserMembers(ctx context.Context, userID xid.ID) (ms []Member, err error) {
sql, args, err := sq.Select("id", "user_id", "name", "display_name", "bio", "avatar", "names", "pronouns").
sql, args, err := sq.Select("*").
From("members").Where("user_id = ?", userID).
OrderBy("name", "id").ToSql()
if err != nil {

View File

@ -0,0 +1,265 @@
package exporter
import (
"archive/zip"
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"encoding/json"
"io"
"net/http"
"os"
"os/signal"
"sync"
"codeberg.org/u1f320/pronouns.cc/backend/db"
"codeberg.org/u1f320/pronouns.cc/backend/log"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/rs/xid"
"github.com/urfave/cli/v2"
)
var Command = &cli.Command{
Name: "exporter",
Usage: "Data exporter service",
Action: run,
}
type server struct {
Router chi.Router
DB *db.DB
exporting map[xid.ID]struct{}
exportingMu sync.Mutex
}
func run(c *cli.Context) error {
port := ":" + os.Getenv("EXPORTER_PORT")
db, err := db.New()
if err != nil {
log.Fatalf("creating database: %v", err)
return err
}
s := &server{
Router: chi.NewRouter(),
DB: db,
exporting: make(map[xid.ID]struct{}),
}
// set up middleware + the single route
s.Router.Use(middleware.Recoverer)
s.Router.Get("/start/{id}", s.startExport)
e := make(chan error)
// run server in another goroutine (for gracefully shutting down, see below)
go func() {
e <- http.ListenAndServe(port, s.Router)
}()
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt)
defer stop()
log.Infof("API server running at %v!", port)
select {
case <-ctx.Done():
log.Info("Interrupt signal received, shutting down...")
s.DB.Close()
return nil
case err := <-e:
log.Fatalf("Error running server: %v", err)
}
return nil
}
func (s *server) startExport(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
id, err := xid.FromString(chi.URLParam(r, "id"))
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
u, err := s.DB.User(ctx, id)
if err != nil {
log.Errorf("getting user %v: %v", id, err)
w.WriteHeader(http.StatusInternalServerError)
return
}
go s.doExport(u)
w.WriteHeader(http.StatusAccepted)
}
func (s *server) doExport(u db.User) {
s.exportingMu.Lock()
if _, ok := s.exporting[u.ID]; ok {
s.exportingMu.Unlock()
log.Debugf("user %v is already being exported, aborting", u.ID)
return
}
s.exporting[u.ID] = struct{}{}
s.exportingMu.Unlock()
defer func() {
s.exportingMu.Lock()
delete(s.exporting, u.ID)
s.exportingMu.Unlock()
}()
ctx := context.Background()
log.Debugf("[%v] starting export of user", u.ID)
outBuffer := new(bytes.Buffer)
zw := zip.NewWriter(outBuffer)
defer zw.Close()
w, err := zw.Create("user.json")
if err != nil {
log.Errorf("[%v] creating file in zip archive: %v", u.ID, err)
return
}
log.Debugf("[%v] getting user fields", u.ID)
fields, err := s.DB.UserFields(ctx, u.ID)
if err != nil {
log.Errorf("[%v] getting user fields: %v", u.ID, err)
return
}
log.Debugf("[%v] writing user json", u.ID)
ub, err := json.Marshal(dbUserToExport(u, fields))
if err != nil {
log.Errorf("[%v] marshaling user: %v", u.ID, err)
return
}
_, err = w.Write(ub)
if err != nil {
log.Errorf("[%v] writing user: %v", u.ID, err)
return
}
if u.Avatar != nil {
log.Debugf("[%v] getting user avatar", u.ID)
w, err := zw.Create("user_avatar.webp")
if err != nil {
log.Errorf("[%v] creating file in zip archive: %v", u.ID, err)
return
}
r, err := s.DB.UserAvatar(ctx, u.ID, *u.Avatar)
if err != nil {
log.Errorf("[%v] getting user avatar: %v", u.ID, err)
return
}
defer r.Close()
_, err = io.Copy(w, r)
if err != nil {
log.Errorf("[%v] writing user avatar: %v", u.ID, err)
return
}
log.Debugf("[%v] exported user avatar", u.ID)
}
members, err := s.DB.UserMembers(ctx, u.ID)
if err != nil {
log.Errorf("[%v] getting user members: %v", u.ID, err)
return
}
for _, m := range members {
log.Debugf("[%v] starting export for member %v", u.ID, m.ID)
fields, err := s.DB.MemberFields(ctx, m.ID)
if err != nil {
log.Errorf("[%v] getting fields for member %v: %v", u.ID, m.ID, err)
return
}
w, err := zw.Create("members/" + m.Name + "-" + m.ID.String() + ".json")
if err != nil {
log.Errorf("[%v] creating file in zip archive: %v", u.ID, err)
return
}
mb, err := json.Marshal(dbMemberToExport(m, fields))
if err != nil {
log.Errorf("[%v] marshaling member %v: %v", u.ID, m.ID, err)
return
}
_, err = w.Write(mb)
if err != nil {
log.Errorf("[%v] writing member %v json: %v", u.ID, m.ID, err)
return
}
if m.Avatar != nil {
log.Debugf("[%v] getting member %v avatar", u.ID, m.ID)
w, err := zw.Create("members/" + m.Name + "-" + m.ID.String() + "-avatar.webp")
if err != nil {
log.Errorf("[%v] creating file in zip archive: %v", u.ID, err)
return
}
r, err := s.DB.MemberAvatar(ctx, m.ID, *m.Avatar)
if err != nil {
log.Errorf("[%v] getting member %v avatar: %v", u.ID, m.ID, err)
return
}
defer r.Close()
_, err = io.Copy(w, r)
if err != nil {
log.Errorf("[%v] writing member %v avatar: %v", u.ID, m.ID, err)
return
}
log.Debugf("[%v] exported member %v avatar", u.ID, m.ID)
}
log.Debugf("[%v] finished export for member %v", u.ID, m.ID)
}
log.Debugf("[%v] finished export, uploading to object storage and saving in database", u.ID)
err = zw.Close()
if err != nil {
log.Errorf("[%v] closing zip file: %v", u.ID, err)
return
}
de, err := s.DB.CreateExport(ctx, u.ID, randomFilename(), outBuffer)
if err != nil {
log.Errorf("[%v] writing export: %v", u.ID, err)
return
}
log.Debugf("[%v] finished writing export. path: %q", u.ID, de.Path())
}
func randomFilename() string {
b := make([]byte, 32)
_, err := rand.Read(b)
if err != nil {
panic(err)
}
return base64.RawURLEncoding.EncodeToString(b)
}

64
backend/exporter/types.go Normal file
View File

@ -0,0 +1,64 @@
package exporter
import (
"codeberg.org/u1f320/pronouns.cc/backend/db"
"github.com/rs/xid"
)
type userExport struct {
ID xid.ID `json:"id"`
Username string `json:"name"`
DisplayName *string `json:"display_name"`
Bio *string `json:"bio"`
Links []string `json:"links"`
Names []db.FieldEntry `json:"names"`
Pronouns []db.PronounEntry `json:"pronouns"`
Fields []db.Field `json:"fields"`
Discord *string `json:"discord"`
DiscordUsername *string `json:"discord_username"`
MaxInvites int `json:"max_invites"`
}
func dbUserToExport(u db.User, fields []db.Field) userExport {
return userExport{
ID: u.ID,
Username: u.Username,
DisplayName: u.DisplayName,
Bio: u.Bio,
Links: u.Links,
Names: u.Names,
Pronouns: u.Pronouns,
Fields: fields,
Discord: u.Discord,
DiscordUsername: u.DiscordUsername,
MaxInvites: u.MaxInvites,
}
}
type memberExport struct {
ID xid.ID `json:"id"`
Name string `json:"name"`
DisplayName *string `json:"display_name"`
Bio *string `json:"bio"`
Links []string `json:"links"`
Names []db.FieldEntry `json:"names"`
Pronouns []db.PronounEntry `json:"pronouns"`
Fields []db.Field `json:"fields"`
}
func dbMemberToExport(m db.Member, fields []db.Field) memberExport {
return memberExport{
ID: m.ID,
Name: m.Name,
DisplayName: m.DisplayName,
Bio: m.Bio,
Links: m.Links,
Names: m.Names,
Pronouns: m.Pronouns,
Fields: fields,
}
}

View File

@ -12,7 +12,7 @@ var SugaredLogger *zap.SugaredLogger
func init() {
zcfg := zap.NewProductionConfig()
zcfg.Level.SetLevel(zap.InfoLevel)
zcfg.Level.SetLevel(zap.DebugLevel)
zcfg.Encoding = "console"
zcfg.EncoderConfig.EncodeLevel = zapcore.CapitalColorLevelEncoder
zcfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder

View File

@ -0,0 +1,74 @@
package auth
import (
"net/http"
"time"
"codeberg.org/u1f320/pronouns.cc/backend/db"
"codeberg.org/u1f320/pronouns.cc/backend/log"
"codeberg.org/u1f320/pronouns.cc/backend/server"
"github.com/go-chi/render"
)
func (s *Server) startExport(w http.ResponseWriter, r *http.Request) error {
ctx := r.Context()
claims, _ := server.ClaimsFromContext(ctx)
hasExport, err := s.DB.HasRecentExport(ctx, claims.UserID)
if err != nil {
log.Errorf("checking if user has recent export: %v", err)
return server.APIError{Code: server.ErrInternalServerError}
}
if hasExport {
return server.APIError{Code: server.ErrRecentExport}
}
req, err := http.NewRequestWithContext(ctx, "GET", s.ExporterPath+"/start/"+claims.UserID.String(), nil)
if err != nil {
log.Errorf("creating start export request: %v", err)
return server.APIError{Code: server.ErrInternalServerError}
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Errorf("executing start export request: %v", err)
return server.APIError{Code: server.ErrInternalServerError}
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusAccepted {
log.Errorf("got non-%v code: %v", http.StatusAccepted, resp.StatusCode)
return server.APIError{
Code: server.ErrInternalServerError,
}
}
render.JSON(w, r, map[string]any{"started": true})
return nil
}
type dataExportResponse struct {
Path string `json:"path"`
CreatedAt time.Time `json:"created_at"`
}
func (s *Server) getExport(w http.ResponseWriter, r *http.Request) error {
ctx := r.Context()
claims, _ := server.ClaimsFromContext(ctx)
de, err := s.DB.UserExport(ctx, claims.UserID)
if err != nil {
if err == db.ErrNoExport {
return server.APIError{Code: server.ErrNotFound}
}
log.Errorf("getting export for user %v: %v", claims.UserID, err)
return err
}
render.JSON(w, r, dataExportResponse{
Path: de.Path(),
CreatedAt: de.CreatedAt,
})
return nil
}

View File

@ -17,6 +17,7 @@ type Server struct {
*server.Server
RequireInvite bool
ExporterPath string
}
type userResponse struct {
@ -54,6 +55,7 @@ func Mount(srv *server.Server, r chi.Router) {
s := &Server{
Server: srv,
RequireInvite: os.Getenv("REQUIRE_INVITE") == "true",
ExporterPath: "http://127.0.0.1:" + os.Getenv("EXPORTER_PORT"),
}
r.Route("/auth", func(r chi.Router) {
@ -79,6 +81,9 @@ func Mount(srv *server.Server, r chi.Router) {
r.With(server.MustAuth).Post("/tokens", server.WrapHandler(s.createToken))
r.With(server.MustAuth).Delete("/tokens/{id}", server.WrapHandler(s.deleteToken))
r.With(server.MustAuth).Get("/export/start", server.WrapHandler(s.startExport))
r.With(server.MustAuth).Get("/export", server.WrapHandler(s.getExport))
// cancel user delete
// uses a special token, so handled in the function itself
r.Get("/cancel-delete", server.WrapHandler(s.cancelDelete))

View File

@ -92,6 +92,7 @@ const (
ErrInviteLimitReached = 1009 // invite limit reached (when creating invites)
ErrInviteAlreadyUsed = 1010 // invite already used (when signing up)
ErrDeletionPending = 1011 // own user deletion pending, returned with undo code
ErrRecentExport = 1012 // latest export is too recent
// User-related error codes
ErrUserNotFound = 2001
@ -126,6 +127,7 @@ var errCodeMessages = map[int]string{
ErrInviteLimitReached: "Your account has reached the invite limit",
ErrInviteAlreadyUsed: "That invite code has already been used",
ErrDeletionPending: "Your account is pending deletion",
ErrRecentExport: "Your latest data export is less than 1 day old",
ErrUserNotFound: "User not found",
@ -157,6 +159,7 @@ var errCodeStatuses = map[int]int{
ErrInviteLimitReached: http.StatusForbidden,
ErrInviteAlreadyUsed: http.StatusBadRequest,
ErrDeletionPending: http.StatusBadRequest,
ErrRecentExport: http.StatusBadRequest,
ErrUserNotFound: http.StatusNotFound,

View File

@ -102,6 +102,7 @@ export enum ErrorCode {
InvitesDisabled = 1008,
InviteLimitReached = 1009,
InviteAlreadyUsed = 1010,
RecentExport = 1012,
UserNotFound = 2001,
@ -109,6 +110,7 @@ export enum ErrorCode {
MemberLimitReached = 3002,
RequestTooBig = 4001,
MissingPermissions = 4002,
}
export const pronounDisplay = (entry: Pronoun) => {

View File

@ -5,6 +5,7 @@ import (
"os"
"codeberg.org/u1f320/pronouns.cc/backend"
"codeberg.org/u1f320/pronouns.cc/backend/exporter"
"codeberg.org/u1f320/pronouns.cc/backend/server"
"codeberg.org/u1f320/pronouns.cc/scripts/cleandb"
"codeberg.org/u1f320/pronouns.cc/scripts/migrate"
@ -18,6 +19,7 @@ var app = &cli.App{
Version: server.Tag,
Commands: []*cli.Command{
backend.Command,
exporter.Command,
{
Name: "database",
Aliases: []string{"db"},

View File

@ -46,6 +46,26 @@ func run(c *cli.Context) error {
fmt.Printf("deleted %v invalidated or expired tokens\n", ct.RowsAffected())
fmt.Println("deleting expired export files")
var exports []dbpkg.DataExport
err = pgxscan.Select(ctx, db, &exports, "SELECT * FROM data_exports WHERE created_at < $1", time.Now().Add(-dbpkg.KeepExportTime))
if err != nil {
fmt.Println("error getting to-be-deleted export files:", err)
return err
}
for _, de := range exports {
err = db.DeleteExport(ctx, de)
if err != nil {
fmt.Printf("error deleting export %v: %v\n", de.ID, err)
continue
}
fmt.Println("deleted export", de.ID)
}
fmt.Printf("deleted %v expired exports\n", len(exports))
var users []dbpkg.User
err = pgxscan.Select(ctx, db, &users, `SELECT * FROM users WHERE
deleted_at IS NOT NULL AND

View File

@ -5,6 +5,6 @@
create table data_exports (
id serial primary key,
user_id text not null references users (id) on delete cascade,
hash text not null,
filename text not null,
created_at timestamptz not null default now()
);