diff --git a/pkg/etl/indexer.go b/pkg/etl/indexer.go index e69a305..ec67987 100644 --- a/pkg/etl/indexer.go +++ b/pkg/etl/indexer.go @@ -66,8 +66,9 @@ func (e *Indexer) Run() error { e.pool = pool e.db = db.New(pool) - // Initialize entity manager dispatcher + // Initialize entity manager dispatcher and register handlers e.dispatcher = em.NewDispatcher(e.logger) + e.dispatcher.Register(em.UserCreate()) // Initialize pubsub instances e.blockPubsub = NewPubsub[*db.EtlBlock]() diff --git a/pkg/etl/processors/entity_manager/handler.go b/pkg/etl/processors/entity_manager/handler.go index a707bec..9dc6845 100644 --- a/pkg/etl/processors/entity_manager/handler.go +++ b/pkg/etl/processors/entity_manager/handler.go @@ -80,9 +80,9 @@ const ( // ID offsets matching discovery-provider constants. const ( - UserIDOffset = 1_000_000 - TrackIDOffset = 1_000_000 - PlaylistIDOffset = 1_000_000 + PlaylistIDOffset = 400_000 + TrackIDOffset = 2_000_000 + UserIDOffset = 3_000_000 ) // Character limit constants matching discovery-provider. diff --git a/pkg/etl/processors/entity_manager/user_create.go b/pkg/etl/processors/entity_manager/user_create.go new file mode 100644 index 0000000..e7cd24a --- /dev/null +++ b/pkg/etl/processors/entity_manager/user_create.go @@ -0,0 +1,167 @@ +package entity_manager + +import ( + "context" + "strings" + + "github.com/OpenAudio/go-openaudio/etl/db" +) + +type userCreateHandler struct{} + +func (h *userCreateHandler) EntityType() string { return EntityTypeUser } +func (h *userCreateHandler) Action() string { return ActionCreate } + +func (h *userCreateHandler) Handle(ctx context.Context, params *Params) error { + if err := validateUserCreate(ctx, params); err != nil { + return err + } + return insertUser(ctx, params) +} + +func validateUserCreate(ctx context.Context, params *Params) error { + // Stateless: entity type + if params.EntityType != EntityTypeUser { + return NewValidationError("wrong entity type %s", params.EntityType) + } + + // Stateless: user_id offset + if params.UserID < UserIDOffset { + return NewValidationError("user id %d below offset %d", params.UserID, UserIDOffset) + } + + // Stateless: bio length (check from metadata if present) + if bio := params.MetadataString("bio"); bio != "" { + if err := ValidateBio(bio); err != nil { + return err + } + } + + // Stateless: handle format (if present in metadata) + handle := params.MetadataString("handle") + if handle != "" { + if err := ValidateHandle(handle); err != nil { + return err + } + } + + // Stateless: name length + if name := params.MetadataString("name"); name != "" { + if err := ValidateUserName(name); err != nil { + return err + } + } + + // Stateful: user must not already exist + exists, err := userExists(ctx, params.DBTX, params.UserID) + if err != nil { + return err + } + if exists { + return NewValidationError("user %d already exists", params.UserID) + } + + // Stateful: wallet must not already be in use + walletUsed, err := walletExists(ctx, params.DBTX, params.Signer) + if err != nil { + return err + } + if walletUsed { + return NewValidationError("wallet %s already in use", params.Signer) + } + + // Stateful: signer must not be a developer app + isDeveloperApp, err := developerAppExists(ctx, params.DBTX, params.Signer) + if err != nil { + return err + } + if isDeveloperApp { + return NewValidationError("developer app %s cannot create user", params.Signer) + } + + // Stateful: handle uniqueness + if handle != "" { + handleTaken, err := handleExists(ctx, params.DBTX, strings.ToLower(handle)) + if err != nil { + return err + } + if handleTaken { + return NewValidationError("handle %q already exists", handle) + } + } + + return nil +} + +func insertUser(ctx context.Context, params *Params) error { + handle := params.MetadataString("handle") + name := params.MetadataString("name") + bio := params.MetadataString("bio") + location := params.MetadataString("location") + profilePicture := params.MetadataString("profile_picture") + profilePictureSizes := params.MetadataString("profile_picture_sizes") + coverPhoto := params.MetadataString("cover_photo") + coverPhotoSizes := params.MetadataString("cover_photo_sizes") + + handleLC := "" + if handle != "" { + handleLC = strings.ToLower(handle) + } + + _, err := params.DBTX.Exec(ctx, ` + INSERT INTO users ( + user_id, handle, handle_lc, wallet, name, bio, location, + profile_picture, profile_picture_sizes, cover_photo, cover_photo_sizes, + is_current, is_verified, is_deactivated, is_available, is_storage_v2, + created_at, updated_at, txhash, blockhash, blocknumber + ) VALUES ( + $1, $2, $3, $4, $5, $6, $7, + $8, $9, $10, $11, + true, false, false, true, true, + $12, $12, $13, '', $14 + ) + `, + params.UserID, + handle, + handleLC, + strings.ToLower(params.Signer), + name, + bio, + location, + profilePicture, + profilePictureSizes, + coverPhoto, + coverPhotoSizes, + params.BlockTime, + params.TxHash, + params.BlockNumber, + ) + return err +} + +func userExists(ctx context.Context, dbtx db.DBTX, userID int64) (bool, error) { + var exists bool + err := dbtx.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE user_id = $1 AND is_current = true)", userID).Scan(&exists) + return exists, err +} + +func walletExists(ctx context.Context, dbtx db.DBTX, wallet string) (bool, error) { + var exists bool + err := dbtx.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE wallet = $1)", strings.ToLower(wallet)).Scan(&exists) + return exists, err +} + +func developerAppExists(ctx context.Context, dbtx db.DBTX, address string) (bool, error) { + var exists bool + err := dbtx.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM developer_apps WHERE address = $1 AND is_delete = false)", strings.ToLower(address)).Scan(&exists) + return exists, err +} + +func handleExists(ctx context.Context, dbtx db.DBTX, handleLC string) (bool, error) { + var exists bool + err := dbtx.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM users WHERE handle_lc = $1 AND is_current = true)", handleLC).Scan(&exists) + return exists, err +} + +// UserCreate returns the User Create handler. +func UserCreate() Handler { return &userCreateHandler{} } diff --git a/pkg/etl/processors/entity_manager/user_create_test.go b/pkg/etl/processors/entity_manager/user_create_test.go new file mode 100644 index 0000000..668908e --- /dev/null +++ b/pkg/etl/processors/entity_manager/user_create_test.go @@ -0,0 +1,163 @@ +package entity_manager + +import ( + "context" + "encoding/json" + "strings" + "testing" +) + +func TestUserCreate_TxType(t *testing.T) { + h := UserCreate() + if h.EntityType() != EntityTypeUser { + t.Errorf("EntityType() = %q, want %q", h.EntityType(), EntityTypeUser) + } + if h.Action() != ActionCreate { + t.Errorf("Action() = %q, want %q", h.Action(), ActionCreate) + } +} + +func TestUserCreate_StatelessValidation(t *testing.T) { + tests := []struct { + name string + entityType string + userID int64 + metadata string + wantErr string + }{ + { + name: "wrong entity type", + entityType: EntityTypeTrack, + userID: UserIDOffset + 1, + metadata: `{"handle":"alice","name":"Alice"}`, + wantErr: "wrong entity type", + }, + { + name: "user_id below offset", + entityType: EntityTypeUser, + userID: 999, + metadata: `{"handle":"alice","name":"Alice"}`, + wantErr: "below offset", + }, + { + name: "bio too long", + entityType: EntityTypeUser, + userID: UserIDOffset + 1, + metadata: `{"handle":"alice","name":"Alice","bio":"` + strings.Repeat("x", CharacterLimitUserBio+1) + `"}`, + wantErr: "bio exceeds", + }, + { + name: "name too long", + entityType: EntityTypeUser, + userID: UserIDOffset + 1, + metadata: `{"handle":"alice","name":"` + strings.Repeat("x", CharacterLimitUserName+1) + `"}`, + wantErr: "name exceeds", + }, + { + name: "handle too long", + entityType: EntityTypeUser, + userID: UserIDOffset + 1, + metadata: `{"handle":"` + strings.Repeat("a", CharacterLimitHandle+1) + `","name":"Alice"}`, + wantErr: "exceeds", + }, + { + name: "handle illegal characters", + entityType: EntityTypeUser, + userID: UserIDOffset + 1, + metadata: `{"handle":"alice@#$","name":"Alice"}`, + wantErr: "illegal characters", + }, + { + name: "handle reserved word", + entityType: EntityTypeUser, + userID: UserIDOffset + 1, + metadata: `{"handle":"admin","name":"Admin"}`, + wantErr: "reserved", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + params := &Params{ + UserID: tt.userID, + EntityType: tt.entityType, + Action: ActionCreate, + Signer: "0xabc123", + } + + if tt.metadata != "" { + params.RawMetadata = tt.metadata + var meta map[string]any + if err := json.Unmarshal([]byte(tt.metadata), &meta); err == nil { + params.Metadata = meta + } + } + + err := validateUserCreate(context.Background(), params) + if err == nil { + t.Fatal("expected validation error, got nil") + } + if !IsValidationError(err) { + t.Fatalf("expected ValidationError, got %T: %v", err, err) + } + if tt.wantErr != "" && !strings.Contains(err.Error(), tt.wantErr) { + t.Fatalf("error %q does not contain %q", err.Error(), tt.wantErr) + } + }) + } +} + +// Tests below require a database. They are skipped unless ETL_TEST_DB_URL is set. + +func TestUserCreate_Success(t *testing.T) { + pool := setupTestDB(t) + h := UserCreate() + params := buildParams(t, pool, EntityTypeUser, ActionCreate, UserIDOffset+1, UserIDOffset+1, "0xNewWallet123", `{"handle":"alice","name":"Alice","bio":"hello world"}`) + mustHandle(t, h, params) + + var handle string + err := pool.QueryRow(context.Background(), "SELECT handle FROM users WHERE user_id = $1 AND is_current = true", UserIDOffset+1).Scan(&handle) + if err != nil { + t.Fatalf("failed to query inserted user: %v", err) + } + if handle != "alice" { + t.Errorf("handle = %q, want %q", handle, "alice") + } +} + +func TestUserCreate_RejectsExistingUser(t *testing.T) { + pool := setupTestDB(t) + seedUser(t, pool, UserIDOffset+1, "0xexistingwallet", "existing") + params := buildParams(t, pool, EntityTypeUser, ActionCreate, UserIDOffset+1, UserIDOffset+1, "0xNewWallet999", `{"handle":"newhandle","name":"New User"}`) + mustReject(t, UserCreate(), params, "already exists") +} + +func TestUserCreate_RejectsDuplicateWallet(t *testing.T) { + pool := setupTestDB(t) + seedUser(t, pool, UserIDOffset+1, "0xsharedwallet", "existinguser") + params := buildParams(t, pool, EntityTypeUser, ActionCreate, UserIDOffset+2, UserIDOffset+2, "0xSharedWallet", `{"handle":"newuser","name":"New"}`) + mustReject(t, UserCreate(), params, "wallet") +} + +func TestUserCreate_RejectsDuplicateHandle(t *testing.T) { + pool := setupTestDB(t) + seedUser(t, pool, UserIDOffset+1, "0xwallet1", "alice") + params := buildParams(t, pool, EntityTypeUser, ActionCreate, UserIDOffset+2, UserIDOffset+2, "0xwallet2", `{"handle":"Alice","name":"Alice 2"}`) + mustReject(t, UserCreate(), params, "handle") +} + +func TestUserCreate_NoMetadata(t *testing.T) { + pool := setupTestDB(t) + h := UserCreate() + params := buildParams(t, pool, EntityTypeUser, ActionCreate, UserIDOffset+1, UserIDOffset+1, "0xNewWallet", "") + mustHandle(t, h, params) + + var wallet string + err := pool.QueryRow(context.Background(), "SELECT wallet FROM users WHERE user_id = $1 AND is_current = true", UserIDOffset+1).Scan(&wallet) + if err != nil { + t.Fatalf("failed to query inserted user: %v", err) + } + if wallet != "0xnewwallet" { + t.Errorf("wallet = %q, want %q", wallet, "0xnewwallet") + } +}