Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/etl/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ func (e *Indexer) Run() error {
e.pool = pool
e.db = db.New(pool)

// Initialize entity manager dispatcher
// Initialize entity manager dispatcher and register handlers
e.dispatcher = em.NewDispatcher(e.logger)
e.dispatcher.Register(em.UserCreate())

// Initialize pubsub instances
e.blockPubsub = NewPubsub[*db.EtlBlock]()
Expand Down
6 changes: 3 additions & 3 deletions pkg/etl/processors/entity_manager/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ const (

// ID offsets matching discovery-provider constants.
const (
UserIDOffset = 1_000_000
TrackIDOffset = 1_000_000
PlaylistIDOffset = 1_000_000
PlaylistIDOffset = 400_000
TrackIDOffset = 2_000_000
UserIDOffset = 3_000_000
)

// Character limit constants matching discovery-provider.
Expand Down
167 changes: 167 additions & 0 deletions pkg/etl/processors/entity_manager/user_create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package entity_manager

import (
"context"
"strings"

"github.com/OpenAudio/go-openaudio/etl/db"
)

type userCreateHandler struct{}

func (h *userCreateHandler) EntityType() string { return EntityTypeUser }
func (h *userCreateHandler) Action() string { return ActionCreate }

func (h *userCreateHandler) Handle(ctx context.Context, params *Params) error {
if err := validateUserCreate(ctx, params); err != nil {
return err
}
return insertUser(ctx, params)
}

func validateUserCreate(ctx context.Context, params *Params) error {
// Stateless: entity type
if params.EntityType != EntityTypeUser {
return NewValidationError("wrong entity type %s", params.EntityType)
}

// Stateless: user_id offset
if params.UserID < UserIDOffset {
return NewValidationError("user id %d below offset %d", params.UserID, UserIDOffset)
}

// Stateless: bio length (check from metadata if present)
if bio := params.MetadataString("bio"); bio != "" {
if err := ValidateBio(bio); err != nil {
return err
}
}

// Stateless: handle format (if present in metadata)
handle := params.MetadataString("handle")
if handle != "" {
if err := ValidateHandle(handle); err != nil {
return err
}
}

// Stateless: name length
if name := params.MetadataString("name"); name != "" {
if err := ValidateUserName(name); err != nil {
return err
}
}

// Stateful: user must not already exist
exists, err := userExists(ctx, params.DBTX, params.UserID)
if err != nil {
return err
}
if exists {
return NewValidationError("user %d already exists", params.UserID)
}

// Stateful: wallet must not already be in use
walletUsed, err := walletExists(ctx, params.DBTX, params.Signer)
if err != nil {
return err
}
if walletUsed {
return NewValidationError("wallet %s already in use", params.Signer)
}

// Stateful: signer must not be a developer app
isDeveloperApp, err := developerAppExists(ctx, params.DBTX, params.Signer)
if err != nil {
return err
}
if isDeveloperApp {
return NewValidationError("developer app %s cannot create user", params.Signer)
}

// Stateful: handle uniqueness
if handle != "" {
handleTaken, err := handleExists(ctx, params.DBTX, strings.ToLower(handle))
if err != nil {
return err
}
if handleTaken {
return NewValidationError("handle %q already exists", handle)
}
}

return nil
}

func insertUser(ctx context.Context, params *Params) error {
handle := params.MetadataString("handle")
name := params.MetadataString("name")
bio := params.MetadataString("bio")
location := params.MetadataString("location")
profilePicture := params.MetadataString("profile_picture")
profilePictureSizes := params.MetadataString("profile_picture_sizes")
coverPhoto := params.MetadataString("cover_photo")
coverPhotoSizes := params.MetadataString("cover_photo_sizes")

handleLC := ""
if handle != "" {
handleLC = strings.ToLower(handle)
}

_, err := params.DBTX.Exec(ctx, `
INSERT INTO users (
user_id, handle, handle_lc, wallet, name, bio, location,
profile_picture, profile_picture_sizes, cover_photo, cover_photo_sizes,
is_current, is_verified, is_deactivated, is_available, is_storage_v2,
created_at, updated_at, txhash, blockhash, blocknumber
) VALUES (
$1, $2, $3, $4, $5, $6, $7,
$8, $9, $10, $11,
true, false, false, true, true,
$12, $12, $13, '', $14
)
`,
params.UserID,
handle,
handleLC,
strings.ToLower(params.Signer),
name,
bio,
location,
profilePicture,
profilePictureSizes,
coverPhoto,
coverPhotoSizes,
params.BlockTime,
params.TxHash,
params.BlockNumber,
)
return err
}

func userExists(ctx context.Context, dbtx db.DBTX, userID int64) (bool, error) {
var exists bool
err := dbtx.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE user_id = $1 AND is_current = true)", userID).Scan(&exists)
return exists, err
}

func walletExists(ctx context.Context, dbtx db.DBTX, wallet string) (bool, error) {
var exists bool
err := dbtx.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE wallet = $1)", strings.ToLower(wallet)).Scan(&exists)
return exists, err
}

func developerAppExists(ctx context.Context, dbtx db.DBTX, address string) (bool, error) {
var exists bool
err := dbtx.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM developer_apps WHERE address = $1 AND is_delete = false)", strings.ToLower(address)).Scan(&exists)
return exists, err
}

func handleExists(ctx context.Context, dbtx db.DBTX, handleLC string) (bool, error) {
var exists bool
err := dbtx.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE handle_lc = $1 AND is_current = true)", handleLC).Scan(&exists)
return exists, err
}

// UserCreate returns the User Create handler.
func UserCreate() Handler { return &userCreateHandler{} }
163 changes: 163 additions & 0 deletions pkg/etl/processors/entity_manager/user_create_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package entity_manager

import (
"context"
"encoding/json"
"strings"
"testing"
)

func TestUserCreate_TxType(t *testing.T) {
h := UserCreate()
if h.EntityType() != EntityTypeUser {
t.Errorf("EntityType() = %q, want %q", h.EntityType(), EntityTypeUser)
}
if h.Action() != ActionCreate {
t.Errorf("Action() = %q, want %q", h.Action(), ActionCreate)
}
}

func TestUserCreate_StatelessValidation(t *testing.T) {
tests := []struct {
name string
entityType string
userID int64
metadata string
wantErr string
}{
{
name: "wrong entity type",
entityType: EntityTypeTrack,
userID: UserIDOffset + 1,
metadata: `{"handle":"alice","name":"Alice"}`,
wantErr: "wrong entity type",
},
{
name: "user_id below offset",
entityType: EntityTypeUser,
userID: 999,
metadata: `{"handle":"alice","name":"Alice"}`,
wantErr: "below offset",
},
{
name: "bio too long",
entityType: EntityTypeUser,
userID: UserIDOffset + 1,
metadata: `{"handle":"alice","name":"Alice","bio":"` + strings.Repeat("x", CharacterLimitUserBio+1) + `"}`,
wantErr: "bio exceeds",
},
{
name: "name too long",
entityType: EntityTypeUser,
userID: UserIDOffset + 1,
metadata: `{"handle":"alice","name":"` + strings.Repeat("x", CharacterLimitUserName+1) + `"}`,
wantErr: "name exceeds",
},
{
name: "handle too long",
entityType: EntityTypeUser,
userID: UserIDOffset + 1,
metadata: `{"handle":"` + strings.Repeat("a", CharacterLimitHandle+1) + `","name":"Alice"}`,
wantErr: "exceeds",
},
{
name: "handle illegal characters",
entityType: EntityTypeUser,
userID: UserIDOffset + 1,
metadata: `{"handle":"alice@#$","name":"Alice"}`,
wantErr: "illegal characters",
},
{
name: "handle reserved word",
entityType: EntityTypeUser,
userID: UserIDOffset + 1,
metadata: `{"handle":"admin","name":"Admin"}`,
wantErr: "reserved",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
params := &Params{
UserID: tt.userID,
EntityType: tt.entityType,
Action: ActionCreate,
Signer: "0xabc123",
}

if tt.metadata != "" {
params.RawMetadata = tt.metadata
var meta map[string]any
if err := json.Unmarshal([]byte(tt.metadata), &meta); err == nil {
params.Metadata = meta
}
}

err := validateUserCreate(context.Background(), params)
if err == nil {
t.Fatal("expected validation error, got nil")
}
if !IsValidationError(err) {
t.Fatalf("expected ValidationError, got %T: %v", err, err)
}
if tt.wantErr != "" && !strings.Contains(err.Error(), tt.wantErr) {
t.Fatalf("error %q does not contain %q", err.Error(), tt.wantErr)
}
})
}
}

// Tests below require a database. They are skipped unless ETL_TEST_DB_URL is set.

func TestUserCreate_Success(t *testing.T) {
pool := setupTestDB(t)
h := UserCreate()
params := buildParams(t, pool, EntityTypeUser, ActionCreate, UserIDOffset+1, UserIDOffset+1, "0xNewWallet123", `{"handle":"alice","name":"Alice","bio":"hello world"}`)
mustHandle(t, h, params)

var handle string
err := pool.QueryRow(context.Background(), "SELECT handle FROM users WHERE user_id = $1 AND is_current = true", UserIDOffset+1).Scan(&handle)
if err != nil {
t.Fatalf("failed to query inserted user: %v", err)
}
if handle != "alice" {
t.Errorf("handle = %q, want %q", handle, "alice")
}
}

func TestUserCreate_RejectsExistingUser(t *testing.T) {
pool := setupTestDB(t)
seedUser(t, pool, UserIDOffset+1, "0xexistingwallet", "existing")
params := buildParams(t, pool, EntityTypeUser, ActionCreate, UserIDOffset+1, UserIDOffset+1, "0xNewWallet999", `{"handle":"newhandle","name":"New User"}`)
mustReject(t, UserCreate(), params, "already exists")
}

func TestUserCreate_RejectsDuplicateWallet(t *testing.T) {
pool := setupTestDB(t)
seedUser(t, pool, UserIDOffset+1, "0xsharedwallet", "existinguser")
params := buildParams(t, pool, EntityTypeUser, ActionCreate, UserIDOffset+2, UserIDOffset+2, "0xSharedWallet", `{"handle":"newuser","name":"New"}`)
mustReject(t, UserCreate(), params, "wallet")
}

func TestUserCreate_RejectsDuplicateHandle(t *testing.T) {
pool := setupTestDB(t)
seedUser(t, pool, UserIDOffset+1, "0xwallet1", "alice")
params := buildParams(t, pool, EntityTypeUser, ActionCreate, UserIDOffset+2, UserIDOffset+2, "0xwallet2", `{"handle":"Alice","name":"Alice 2"}`)
mustReject(t, UserCreate(), params, "handle")
}

func TestUserCreate_NoMetadata(t *testing.T) {
pool := setupTestDB(t)
h := UserCreate()
params := buildParams(t, pool, EntityTypeUser, ActionCreate, UserIDOffset+1, UserIDOffset+1, "0xNewWallet", "")
mustHandle(t, h, params)

var wallet string
err := pool.QueryRow(context.Background(), "SELECT wallet FROM users WHERE user_id = $1 AND is_current = true", UserIDOffset+1).Scan(&wallet)
if err != nil {
t.Fatalf("failed to query inserted user: %v", err)
}
if wallet != "0xnewwallet" {
t.Errorf("wallet = %q, want %q", wallet, "0xnewwallet")
}
}