package admin import ( "context" "errors" "fmt" "git.lethalbits.com/lethalbits/gitea-mcp-extended/pkg/gitea" "git.lethalbits.com/lethalbits/gitea-mcp-extended/pkg/log" "git.lethalbits.com/lethalbits/gitea-mcp-extended/pkg/params" "git.lethalbits.com/lethalbits/gitea-mcp-extended/pkg/to" "git.lethalbits.com/lethalbits/gitea-mcp-extended/pkg/tool" gitea_sdk "code.gitea.io/sdk/gitea" "github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/server" ) var Tool = tool.New() const ( AdminListUsersToolName = "admin_list_users" AdminCreateUserToolName = "admin_create_user" AdminEditUserToolName = "admin_edit_user" AdminDeleteUserToolName = "admin_delete_user" AdminRenameUserToolName = "admin_rename_user" AdminListOrgsToolName = "admin_list_orgs" AdminCreateOrgToolName = "admin_create_org" AdminCreateRepoToolName = "admin_create_repo" AdminListHooksToolName = "admin_list_hooks" AdminGetHookToolName = "admin_get_hook" AdminCreateHookToolName = "admin_create_hook" AdminEditHookToolName = "admin_edit_hook" AdminDeleteHookToolName = "admin_delete_hook" ListCronTasksToolName = "admin_list_cron_tasks" RunCronTaskToolName = "admin_run_cron_task" ListUnadoptedReposToolName = "admin_list_unadopted_repos" AdoptUnadoptedRepoToolName = "admin_adopt_repo" DeleteUnadoptedRepoToolName = "admin_delete_unadopted_repo" AdminListEmailsToolName = "admin_list_emails" AdminSearchEmailsToolName = "admin_search_emails" ListUserBadgesToolName = "admin_list_user_badges" AddUserBadgesToolName = "admin_add_user_badges" DeleteUserBadgeToolName = "admin_delete_user_badge" ) var ( AdminListUsersTool = mcp.NewTool( AdminListUsersToolName, mcp.WithDescription("Admin: List all users"), mcp.WithNumber("page", mcp.Description("page number"), mcp.DefaultNumber(1)), mcp.WithNumber("pageSize", mcp.Description("page size"), mcp.DefaultNumber(100)), ) AdminCreateUserTool = mcp.NewTool( AdminCreateUserToolName, mcp.WithDescription("Admin: Create a new user"), mcp.WithString("username", mcp.Required(), mcp.Description("username")), mcp.WithString("email", mcp.Required(), mcp.Description("email address")), mcp.WithString("password", mcp.Required(), mcp.Description("password")), mcp.WithString("full_name", mcp.Description("full name")), mcp.WithBoolean("must_change_password", mcp.Description("require password change on first login (default: true)")), mcp.WithBoolean("send_notify", mcp.Description("send notification email")), ) AdminEditUserTool = mcp.NewTool( AdminEditUserToolName, mcp.WithDescription("Admin: Edit a user account"), mcp.WithString("username", mcp.Required(), mcp.Description("username to edit")), mcp.WithString("email", mcp.Description("new email")), mcp.WithString("password", mcp.Description("new password")), mcp.WithString("full_name", mcp.Description("new full name")), mcp.WithBoolean("must_change_password", mcp.Description("require password change")), mcp.WithBoolean("admin", mcp.Description("set admin status")), mcp.WithBoolean("active", mcp.Description("set active status")), mcp.WithBoolean("prohibit_login", mcp.Description("prohibit login")), mcp.WithBoolean("restricted", mcp.Description("set restricted status")), mcp.WithBoolean("allow_create_organization", mcp.Description("allow creating organizations")), ) AdminDeleteUserTool = mcp.NewTool( AdminDeleteUserToolName, mcp.WithDescription("Admin: Delete a user account"), mcp.WithString("username", mcp.Required(), mcp.Description("username to delete")), ) AdminRenameUserTool = mcp.NewTool( AdminRenameUserToolName, mcp.WithDescription("Admin: Rename a user"), mcp.WithString("username", mcp.Required(), mcp.Description("current username")), mcp.WithString("new_username", mcp.Required(), mcp.Description("new username")), ) AdminListOrgsTool = mcp.NewTool( AdminListOrgsToolName, mcp.WithDescription("Admin: List all organizations"), mcp.WithNumber("page", mcp.Description("page number"), mcp.DefaultNumber(1)), mcp.WithNumber("pageSize", mcp.Description("page size"), mcp.DefaultNumber(100)), ) AdminCreateOrgTool = mcp.NewTool( AdminCreateOrgToolName, mcp.WithDescription("Admin: Create an organization for a user"), mcp.WithString("username", mcp.Required(), mcp.Description("owner username")), mcp.WithString("name", mcp.Required(), mcp.Description("organization name")), mcp.WithString("full_name", mcp.Description("organization full name")), mcp.WithString("description", mcp.Description("organization description")), mcp.WithString("visibility", mcp.Description("visibility: public, limited, private (default: public)")), ) AdminCreateRepoTool = mcp.NewTool( AdminCreateRepoToolName, mcp.WithDescription("Admin: Create a repository for a user"), mcp.WithString("username", mcp.Required(), mcp.Description("owner username")), mcp.WithString("name", mcp.Required(), mcp.Description("repository name")), mcp.WithString("description", mcp.Description("repository description")), mcp.WithBoolean("private", mcp.Description("make repository private")), mcp.WithBoolean("auto_init", mcp.Description("auto-initialize with README")), ) AdminListHooksTool = mcp.NewTool( AdminListHooksToolName, mcp.WithDescription("Admin: List system webhooks"), mcp.WithNumber("page", mcp.Description("page number"), mcp.DefaultNumber(1)), mcp.WithNumber("pageSize", mcp.Description("page size"), mcp.DefaultNumber(100)), ) AdminGetHookTool = mcp.NewTool( AdminGetHookToolName, mcp.WithDescription("Admin: Get a system webhook by ID"), mcp.WithNumber("id", mcp.Required(), mcp.Description("webhook ID")), ) AdminCreateHookTool = mcp.NewTool( AdminCreateHookToolName, mcp.WithDescription("Admin: Create a system webhook"), mcp.WithString("type", mcp.Required(), mcp.Description("hook type: gitea, gogs, slack, discord, dingtalk, telegram, msteams, feishu, wechatwork, packagist")), mcp.WithString("url", mcp.Required(), mcp.Description("target URL")), mcp.WithString("content_type", mcp.Description("content type: json or form (default: json)")), mcp.WithString("secret", mcp.Description("webhook secret")), mcp.WithBoolean("active", mcp.Description("whether the webhook is active (default: true)")), ) AdminEditHookTool = mcp.NewTool( AdminEditHookToolName, mcp.WithDescription("Admin: Edit a system webhook"), mcp.WithNumber("id", mcp.Required(), mcp.Description("webhook ID")), mcp.WithString("url", mcp.Description("target URL")), mcp.WithString("content_type", mcp.Description("content type: json or form")), mcp.WithString("secret", mcp.Description("webhook secret")), mcp.WithBoolean("active", mcp.Description("whether the webhook is active")), ) AdminDeleteHookTool = mcp.NewTool( AdminDeleteHookToolName, mcp.WithDescription("Admin: Delete a system webhook"), mcp.WithNumber("id", mcp.Required(), mcp.Description("webhook ID")), ) ListCronTasksTool = mcp.NewTool( ListCronTasksToolName, mcp.WithDescription("Admin: List cron tasks"), mcp.WithNumber("page", mcp.Description("page number"), mcp.DefaultNumber(1)), mcp.WithNumber("pageSize", mcp.Description("page size"), mcp.DefaultNumber(100)), ) RunCronTaskTool = mcp.NewTool( RunCronTaskToolName, mcp.WithDescription("Admin: Run a cron task manually"), mcp.WithString("task", mcp.Required(), mcp.Description("cron task name")), ) ListUnadoptedReposTool = mcp.NewTool( ListUnadoptedReposToolName, mcp.WithDescription("Admin: List unadopted repositories"), mcp.WithNumber("page", mcp.Description("page number"), mcp.DefaultNumber(1)), mcp.WithNumber("pageSize", mcp.Description("page size"), mcp.DefaultNumber(100)), ) AdoptUnadoptedRepoTool = mcp.NewTool( AdoptUnadoptedRepoToolName, mcp.WithDescription("Admin: Adopt an unadopted repository"), mcp.WithString("owner", mcp.Required(), mcp.Description("repository owner")), mcp.WithString("repo", mcp.Required(), mcp.Description("repository name")), ) DeleteUnadoptedRepoTool = mcp.NewTool( DeleteUnadoptedRepoToolName, mcp.WithDescription("Admin: Delete an unadopted repository"), mcp.WithString("owner", mcp.Required(), mcp.Description("repository owner")), mcp.WithString("repo", mcp.Required(), mcp.Description("repository name")), ) AdminListEmailsTool = mcp.NewTool( AdminListEmailsToolName, mcp.WithDescription("Admin: List all emails"), mcp.WithNumber("page", mcp.Description("page number"), mcp.DefaultNumber(1)), mcp.WithNumber("pageSize", mcp.Description("page size"), mcp.DefaultNumber(100)), ) AdminSearchEmailsTool = mcp.NewTool( AdminSearchEmailsToolName, mcp.WithDescription("Admin: Search emails"), mcp.WithString("query", mcp.Required(), mcp.Description("search query")), mcp.WithNumber("page", mcp.Description("page number"), mcp.DefaultNumber(1)), mcp.WithNumber("pageSize", mcp.Description("page size"), mcp.DefaultNumber(100)), ) ListUserBadgesTool = mcp.NewTool( ListUserBadgesToolName, mcp.WithDescription("Admin: List badges for a user"), mcp.WithString("username", mcp.Required(), mcp.Description("username")), ) AddUserBadgesTool = mcp.NewTool( AddUserBadgesToolName, mcp.WithDescription("Admin: Add badges to a user"), mcp.WithString("username", mcp.Required(), mcp.Description("username")), mcp.WithString("slugs", mcp.Required(), mcp.Description("comma-separated badge slugs to add")), ) DeleteUserBadgeTool = mcp.NewTool( DeleteUserBadgeToolName, mcp.WithDescription("Admin: Remove badges from a user"), mcp.WithString("username", mcp.Required(), mcp.Description("username")), mcp.WithString("slugs", mcp.Required(), mcp.Description("comma-separated badge slugs to remove")), ) ) func init() { Tool.RegisterRead(server.ServerTool{Tool: AdminListUsersTool, Handler: AdminListUsersFn}) Tool.RegisterRead(server.ServerTool{Tool: AdminListOrgsTool, Handler: AdminListOrgsFn}) Tool.RegisterRead(server.ServerTool{Tool: AdminListHooksTool, Handler: AdminListHooksFn}) Tool.RegisterRead(server.ServerTool{Tool: AdminGetHookTool, Handler: AdminGetHookFn}) Tool.RegisterRead(server.ServerTool{Tool: ListCronTasksTool, Handler: ListCronTasksFn}) Tool.RegisterRead(server.ServerTool{Tool: ListUnadoptedReposTool, Handler: ListUnadoptedReposFn}) Tool.RegisterRead(server.ServerTool{Tool: AdminListEmailsTool, Handler: AdminListEmailsFn}) Tool.RegisterRead(server.ServerTool{Tool: AdminSearchEmailsTool, Handler: AdminSearchEmailsFn}) Tool.RegisterRead(server.ServerTool{Tool: ListUserBadgesTool, Handler: ListUserBadgesFn}) Tool.RegisterWrite(server.ServerTool{Tool: AdminCreateUserTool, Handler: AdminCreateUserFn}) Tool.RegisterWrite(server.ServerTool{Tool: AdminEditUserTool, Handler: AdminEditUserFn}) Tool.RegisterWrite(server.ServerTool{Tool: AdminDeleteUserTool, Handler: AdminDeleteUserFn}) Tool.RegisterWrite(server.ServerTool{Tool: AdminRenameUserTool, Handler: AdminRenameUserFn}) Tool.RegisterWrite(server.ServerTool{Tool: AdminCreateOrgTool, Handler: AdminCreateOrgFn}) Tool.RegisterWrite(server.ServerTool{Tool: AdminCreateRepoTool, Handler: AdminCreateRepoFn}) Tool.RegisterWrite(server.ServerTool{Tool: AdminCreateHookTool, Handler: AdminCreateHookFn}) Tool.RegisterWrite(server.ServerTool{Tool: AdminEditHookTool, Handler: AdminEditHookFn}) Tool.RegisterWrite(server.ServerTool{Tool: AdminDeleteHookTool, Handler: AdminDeleteHookFn}) Tool.RegisterWrite(server.ServerTool{Tool: RunCronTaskTool, Handler: RunCronTaskFn}) Tool.RegisterWrite(server.ServerTool{Tool: AdoptUnadoptedRepoTool, Handler: AdoptUnadoptedRepoFn}) Tool.RegisterWrite(server.ServerTool{Tool: DeleteUnadoptedRepoTool, Handler: DeleteUnadoptedRepoFn}) Tool.RegisterWrite(server.ServerTool{Tool: AddUserBadgesTool, Handler: AddUserBadgesFn}) Tool.RegisterWrite(server.ServerTool{Tool: DeleteUserBadgeTool, Handler: DeleteUserBadgeFn}) } func AdminListUsersFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminListUsersFn") page := params.GetOptionalInt(req.GetArguments(), "page", 1) pageSize := params.GetOptionalInt(req.GetArguments(), "pageSize", 100) client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } users, _, err := client.AdminListUsers(gitea_sdk.AdminListUsersOptions{ ListOptions: gitea_sdk.ListOptions{Page: int(page), PageSize: int(pageSize)}, }) if err != nil { return to.ErrorResult(fmt.Errorf("admin list users err: %v", err)) } return to.TextResult(users) } func AdminCreateUserFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminCreateUserFn") username, _ := req.GetArguments()["username"].(string) email, _ := req.GetArguments()["email"].(string) password, _ := req.GetArguments()["password"].(string) if username == "" || email == "" || password == "" { return to.ErrorResult(errors.New("username, email, and password are required")) } mustChange := true if v, ok := req.GetArguments()["must_change_password"].(bool); ok { mustChange = v } opt := gitea_sdk.CreateUserOption{ LoginName: username, Username: username, Email: email, Password: password, MustChangePassword: &mustChange, } if v, ok := req.GetArguments()["full_name"].(string); ok { opt.FullName = v } if v, ok := req.GetArguments()["send_notify"].(bool); ok { opt.SendNotify = v } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } user, _, err := client.AdminCreateUser(opt) if err != nil { return to.ErrorResult(fmt.Errorf("admin create user err: %v", err)) } return to.TextResult(user) } func AdminEditUserFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminEditUserFn") username, _ := req.GetArguments()["username"].(string) if username == "" { return to.ErrorResult(errors.New("username is required")) } opt := gitea_sdk.EditUserOption{} if v, ok := req.GetArguments()["email"].(string); ok { opt.Email = &v } if v, ok := req.GetArguments()["password"].(string); ok { opt.Password = v } if v, ok := req.GetArguments()["full_name"].(string); ok { opt.FullName = &v } if v, ok := req.GetArguments()["must_change_password"].(bool); ok { opt.MustChangePassword = &v } if v, ok := req.GetArguments()["admin"].(bool); ok { opt.Admin = &v } if v, ok := req.GetArguments()["active"].(bool); ok { opt.Active = &v } if v, ok := req.GetArguments()["prohibit_login"].(bool); ok { opt.ProhibitLogin = &v } if v, ok := req.GetArguments()["restricted"].(bool); ok { opt.Restricted = &v } if v, ok := req.GetArguments()["allow_create_organization"].(bool); ok { opt.AllowCreateOrganization = &v } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } _, err = client.AdminEditUser(username, opt) if err != nil { return to.ErrorResult(fmt.Errorf("admin edit user err: %v", err)) } return to.TextResult(map[string]string{"status": "updated", "username": username}) } func AdminDeleteUserFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminDeleteUserFn") username, _ := req.GetArguments()["username"].(string) if username == "" { return to.ErrorResult(errors.New("username is required")) } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } _, err = client.AdminDeleteUser(username) if err != nil { return to.ErrorResult(fmt.Errorf("admin delete user err: %v", err)) } return to.TextResult(map[string]string{"status": "deleted", "username": username}) } func AdminRenameUserFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminRenameUserFn") username, _ := req.GetArguments()["username"].(string) newUsername, _ := req.GetArguments()["new_username"].(string) if username == "" || newUsername == "" { return to.ErrorResult(errors.New("username and new_username are required")) } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } _, err = client.AdminRenameUser(username, gitea_sdk.RenameUserOption{NewUsername: newUsername}) if err != nil { return to.ErrorResult(fmt.Errorf("admin rename user err: %v", err)) } return to.TextResult(map[string]string{"status": "renamed", "old": username, "new": newUsername}) } func AdminListOrgsFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminListOrgsFn") page := params.GetOptionalInt(req.GetArguments(), "page", 1) pageSize := params.GetOptionalInt(req.GetArguments(), "pageSize", 100) client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } orgs, _, err := client.AdminListOrgs(gitea_sdk.AdminListOrgsOptions{ ListOptions: gitea_sdk.ListOptions{Page: int(page), PageSize: int(pageSize)}, }) if err != nil { return to.ErrorResult(fmt.Errorf("admin list orgs err: %v", err)) } return to.TextResult(orgs) } func AdminCreateOrgFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminCreateOrgFn") username, _ := req.GetArguments()["username"].(string) name, _ := req.GetArguments()["name"].(string) if username == "" || name == "" { return to.ErrorResult(errors.New("username and name are required")) } opt := gitea_sdk.CreateOrgOption{ Name: name, } if v, ok := req.GetArguments()["full_name"].(string); ok { opt.FullName = v } if v, ok := req.GetArguments()["description"].(string); ok { opt.Description = v } if v, ok := req.GetArguments()["visibility"].(string); ok { opt.Visibility = gitea_sdk.VisibleType(v) } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } org, _, err := client.AdminCreateOrg(username, opt) if err != nil { return to.ErrorResult(fmt.Errorf("admin create org err: %v", err)) } return to.TextResult(org) } func AdminCreateRepoFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminCreateRepoFn") username, _ := req.GetArguments()["username"].(string) name, _ := req.GetArguments()["name"].(string) if username == "" || name == "" { return to.ErrorResult(errors.New("username and name are required")) } opt := gitea_sdk.CreateRepoOption{ Name: name, } if v, ok := req.GetArguments()["description"].(string); ok { opt.Description = v } if v, ok := req.GetArguments()["private"].(bool); ok { opt.Private = v } if v, ok := req.GetArguments()["auto_init"].(bool); ok { opt.AutoInit = v } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } repo, _, err := client.AdminCreateRepo(username, opt) if err != nil { return to.ErrorResult(fmt.Errorf("admin create repo err: %v", err)) } return to.TextResult(repo) } func AdminListHooksFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminListHooksFn") page := params.GetOptionalInt(req.GetArguments(), "page", 1) pageSize := params.GetOptionalInt(req.GetArguments(), "pageSize", 100) client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } hooks, _, err := client.ListAdminHooks(gitea_sdk.ListAdminHooksOptions{ ListOptions: gitea_sdk.ListOptions{Page: int(page), PageSize: int(pageSize)}, }) if err != nil { return to.ErrorResult(fmt.Errorf("admin list hooks err: %v", err)) } return to.TextResult(hooks) } func AdminGetHookFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminGetHookFn") id, err := params.GetIndex(req.GetArguments(), "id") if err != nil { return to.ErrorResult(err) } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } hook, _, err := client.GetAdminHook(id) if err != nil { return to.ErrorResult(fmt.Errorf("admin get hook err: %v", err)) } return to.TextResult(hook) } func AdminCreateHookFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminCreateHookFn") hookType, _ := req.GetArguments()["type"].(string) url, _ := req.GetArguments()["url"].(string) if hookType == "" || url == "" { return to.ErrorResult(errors.New("type and url are required")) } contentType := "json" if v, ok := req.GetArguments()["content_type"].(string); ok { contentType = v } config := map[string]string{ "url": url, "content_type": contentType, } if v, ok := req.GetArguments()["secret"].(string); ok { config["secret"] = v } opt := gitea_sdk.CreateHookOption{ Type: gitea_sdk.HookType(hookType), Config: config, Active: true, } if v, ok := req.GetArguments()["active"].(bool); ok { opt.Active = v } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } hook, _, err := client.CreateAdminHook(opt) if err != nil { return to.ErrorResult(fmt.Errorf("admin create hook err: %v", err)) } return to.TextResult(hook) } func AdminEditHookFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminEditHookFn") id, err := params.GetIndex(req.GetArguments(), "id") if err != nil { return to.ErrorResult(err) } opt := gitea_sdk.EditHookOption{} config := map[string]string{} if v, ok := req.GetArguments()["url"].(string); ok { config["url"] = v } if v, ok := req.GetArguments()["content_type"].(string); ok { config["content_type"] = v } if v, ok := req.GetArguments()["secret"].(string); ok { config["secret"] = v } if len(config) > 0 { opt.Config = config } if v, ok := req.GetArguments()["active"].(bool); ok { opt.Active = &v } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } _, _, err = client.EditAdminHook(id, opt) if err != nil { return to.ErrorResult(fmt.Errorf("admin edit hook err: %v", err)) } return to.TextResult(map[string]string{"status": "updated"}) } func AdminDeleteHookFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminDeleteHookFn") id, err := params.GetIndex(req.GetArguments(), "id") if err != nil { return to.ErrorResult(err) } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } _, err = client.DeleteAdminHook(id) if err != nil { return to.ErrorResult(fmt.Errorf("admin delete hook err: %v", err)) } return to.TextResult(map[string]string{"status": "deleted"}) } func ListCronTasksFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called ListCronTasksFn") page := params.GetOptionalInt(req.GetArguments(), "page", 1) pageSize := params.GetOptionalInt(req.GetArguments(), "pageSize", 100) client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } tasks, _, err := client.ListCronTasks(gitea_sdk.ListCronTaskOptions{ ListOptions: gitea_sdk.ListOptions{Page: int(page), PageSize: int(pageSize)}, }) if err != nil { return to.ErrorResult(fmt.Errorf("list cron tasks err: %v", err)) } return to.TextResult(tasks) } func RunCronTaskFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called RunCronTaskFn") task, _ := req.GetArguments()["task"].(string) if task == "" { return to.ErrorResult(errors.New("task is required")) } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } _, err = client.RunCronTasks(task) if err != nil { return to.ErrorResult(fmt.Errorf("run cron task err: %v", err)) } return to.TextResult(map[string]string{"status": "triggered", "task": task}) } func ListUnadoptedReposFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called ListUnadoptedReposFn") page := params.GetOptionalInt(req.GetArguments(), "page", 1) pageSize := params.GetOptionalInt(req.GetArguments(), "pageSize", 100) client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } repos, _, err := client.ListUnadoptedRepos(gitea_sdk.ListUnadoptedReposOptions{ ListOptions: gitea_sdk.ListOptions{Page: int(page), PageSize: int(pageSize)}, }) if err != nil { return to.ErrorResult(fmt.Errorf("list unadopted repos err: %v", err)) } return to.TextResult(repos) } func AdoptUnadoptedRepoFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdoptUnadoptedRepoFn") owner, _ := req.GetArguments()["owner"].(string) repo, _ := req.GetArguments()["repo"].(string) if owner == "" || repo == "" { return to.ErrorResult(errors.New("owner and repo are required")) } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } _, err = client.AdoptUnadoptedRepo(owner, repo) if err != nil { return to.ErrorResult(fmt.Errorf("adopt repo err: %v", err)) } return to.TextResult(map[string]string{"status": "adopted"}) } func DeleteUnadoptedRepoFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called DeleteUnadoptedRepoFn") owner, _ := req.GetArguments()["owner"].(string) repo, _ := req.GetArguments()["repo"].(string) if owner == "" || repo == "" { return to.ErrorResult(errors.New("owner and repo are required")) } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } _, err = client.DeleteUnadoptedRepo(owner, repo) if err != nil { return to.ErrorResult(fmt.Errorf("delete unadopted repo err: %v", err)) } return to.TextResult(map[string]string{"status": "deleted"}) } func AdminListEmailsFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminListEmailsFn") page := params.GetOptionalInt(req.GetArguments(), "page", 1) pageSize := params.GetOptionalInt(req.GetArguments(), "pageSize", 100) client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } emails, _, err := client.ListAdminEmails(gitea_sdk.ListAdminEmailsOptions{ ListOptions: gitea_sdk.ListOptions{Page: int(page), PageSize: int(pageSize)}, }) if err != nil { return to.ErrorResult(fmt.Errorf("admin list emails err: %v", err)) } return to.TextResult(emails) } func AdminSearchEmailsFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AdminSearchEmailsFn") query, _ := req.GetArguments()["query"].(string) if query == "" { return to.ErrorResult(errors.New("query is required")) } page := params.GetOptionalInt(req.GetArguments(), "page", 1) pageSize := params.GetOptionalInt(req.GetArguments(), "pageSize", 100) client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } emails, _, err := client.SearchAdminEmails(gitea_sdk.SearchAdminEmailsOptions{ ListOptions: gitea_sdk.ListOptions{Page: int(page), PageSize: int(pageSize)}, Query: query, }) if err != nil { return to.ErrorResult(fmt.Errorf("admin search emails err: %v", err)) } return to.TextResult(emails) } func splitAndTrim(s string) []string { var result []string for _, part := range splitByComma(s) { trimmed := trimSpace(part) if trimmed != "" { result = append(result, trimmed) } } return result } func splitByComma(s string) []string { var result []string start := 0 for i := 0; i < len(s); i++ { if s[i] == ',' { result = append(result, s[start:i]) start = i + 1 } } result = append(result, s[start:]) return result } func trimSpace(s string) string { start, end := 0, len(s) for start < end && (s[start] == ' ' || s[start] == '\t') { start++ } for end > start && (s[end-1] == ' ' || s[end-1] == '\t') { end-- } return s[start:end] } func ListUserBadgesFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called ListUserBadgesFn") username, _ := req.GetArguments()["username"].(string) if username == "" { return to.ErrorResult(errors.New("username is required")) } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } badges, _, err := client.ListUserBadges(username) if err != nil { return to.ErrorResult(fmt.Errorf("list user badges err: %v", err)) } return to.TextResult(badges) } func AddUserBadgesFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called AddUserBadgesFn") username, _ := req.GetArguments()["username"].(string) slugs, _ := req.GetArguments()["slugs"].(string) if username == "" || slugs == "" { return to.ErrorResult(errors.New("username and slugs are required")) } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } _, err = client.AddUserBadges(username, gitea_sdk.UserBadgeOption{ BadgeSlugs: splitAndTrim(slugs), }) if err != nil { return to.ErrorResult(fmt.Errorf("add user badges err: %v", err)) } return to.TextResult(map[string]string{"status": "added"}) } func DeleteUserBadgeFn(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) { log.Debugf("Called DeleteUserBadgeFn") username, _ := req.GetArguments()["username"].(string) slugs, _ := req.GetArguments()["slugs"].(string) if username == "" || slugs == "" { return to.ErrorResult(errors.New("username and slugs are required")) } client, err := gitea.ClientFromContext(ctx) if err != nil { return to.ErrorResult(fmt.Errorf("get gitea client err: %v", err)) } _, err = client.DeleteUserBadge(username, gitea_sdk.UserBadgeOption{ BadgeSlugs: splitAndTrim(slugs), }) if err != nil { return to.ErrorResult(fmt.Errorf("delete user badge err: %v", err)) } return to.TextResult(map[string]string{"status": "removed"}) }