SDK Custom Tools Guide
版本要求:本文档针对 CodeBuddy Agent SDK v0.1.24 及以上版本。
功能状态:SDK Custom Tools 是 CodeBuddy Agent SDK 的一项 Preview 功能。
本文档介绍如何在 CodeBuddy Agent SDK 中创建和使用自定义工具。自定义工具允许你定义专属的功能,让 Agent 能够调用它们来完成特定任务。
概述
Custom Tools 是 CodeBuddy Agent SDK 提供的一种通过 MCP(Model Context Protocol)创建自定义工具的方式。与配置外部 MCP 服务器不同,Custom Tools 允许你直接在应用程序中定义工具,无需单独的进程或服务器。
核心优势
- 内进程执行:工具在应用程序内执行,无需创建独立进程
- 类型安全:支持 TypeScript 完整的类型检查和类型推断
- 简化部署:无需单独部署 MCP 服务器,一切随应用部署
- 紧密集成:与应用程序共享内存和状态
- 零额外依赖:利用现有的 SDK 基础设施
快速开始
TypeScript
创建一个简单的计算器工具:
typescript
import { query, createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
// 创建 MCP 服务器并定义工具
const calculatorServer = createSdkMcpServer('calculator', {
tools: [
tool({
name: 'add',
description: 'Add two numbers',
schema: z.object({
a: z.number().describe('First number'),
b: z.number().describe('Second number'),
}),
handler: async ({ a, b }) => {
return { result: a + b };
},
}),
tool({
name: 'multiply',
description: 'Multiply two numbers',
schema: z.object({
a: z.number().describe('First number'),
b: z.number().describe('Second number'),
}),
handler: async ({ a, b }) => {
return { result: a * b };
},
}),
],
});
// 在 SDK 中使用自定义工具
const result = query({
prompt: 'Calculate 15 + 27 and then multiply the result by 3',
options: {
mcpServers: {
'calculator': calculatorServer,
},
},
});
for await (const message of result) {
console.log(message);
}Python
Python SDK 使用装饰器模式定义工具:
python
from codebuddy_agent_sdk import query, create_sdk_mcp_server, tool
from typing import Optional
# 创建 MCP 服务器并使用装饰器定义工具
calculator_server = create_sdk_mcp_server('calculator')
@calculator_server.tool()
def add(a: float, b: float) -> dict:
"""Add two numbers"""
return {'result': a + b}
@calculator_server.tool()
def multiply(a: float, b: float) -> dict:
"""Multiply two numbers"""
return {'result': a * b}
# 在 SDK 中使用自定义工具
async def calculate():
result = query(
prompt='Calculate 15 + 27 and then multiply the result by 3',
options={
'mcp_servers': {
'calculator': calculator_server,
},
},
)
async for message in result:
print(message)
# 运行
import asyncio
asyncio.run(calculate())创建自定义工具
TypeScript - 基本工具定义
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
const myServer = createSdkMcpServer('my-tools', {
tools: [
tool({
name: 'my_tool',
description: 'Description of what the tool does',
schema: z.object({
parameter1: z.string().describe('Description of parameter1'),
parameter2: z.number().optional().describe('Optional parameter'),
}),
handler: async (input) => {
// 实现工具逻辑
return {
result: 'Tool output',
details: input,
};
},
}),
],
});TypeScript - 完整示例:文件分析工具
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
import * as fs from 'fs/promises';
import * as path from 'path';
const fileAnalysisServer = createSdkMcpServer('file-analysis', {
tools: [
tool({
name: 'count_lines',
description: 'Count lines in a file',
schema: z.object({
filePath: z.string().describe('Path to the file'),
}),
handler: async ({ filePath }) => {
try {
const content = await fs.readFile(filePath, 'utf-8');
const lineCount = content.split('\n').length;
return {
success: true,
filePath,
lineCount,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
tool({
name: 'list_files',
description: 'List all files in a directory',
schema: z.object({
dirPath: z.string().describe('Path to the directory'),
pattern: z.string().optional().describe('Optional glob pattern'),
}),
handler: async ({ dirPath, pattern }) => {
try {
const files = await fs.readdir(dirPath);
let filtered = files;
if (pattern) {
const minimatch = require('minimatch').minimatch;
filtered = files.filter(f => minimatch(f, pattern));
}
return {
success: true,
dirPath,
files: filtered,
count: filtered.length,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
tool({
name: 'get_file_info',
description: 'Get information about a file',
schema: z.object({
filePath: z.string().describe('Path to the file'),
}),
handler: async ({ filePath }) => {
try {
const stats = await fs.stat(filePath);
return {
success: true,
filePath,
size: stats.size,
created: stats.birthtime,
modified: stats.mtime,
isDirectory: stats.isDirectory(),
isFile: stats.isFile(),
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
],
});
export default fileAnalysisServer;Python - 装饰器模式
python
from codebuddy_agent_sdk import create_sdk_mcp_server
import os
from pathlib import Path
file_analysis_server = create_sdk_mcp_server('file-analysis')
@file_analysis_server.tool()
def count_lines(file_path: str) -> dict:
"""Count lines in a file"""
try:
with open(file_path, 'r') as f:
line_count = len(f.readlines())
return {
'success': True,
'file_path': file_path,
'line_count': line_count,
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@file_analysis_server.tool()
def list_files(dir_path: str, pattern: str = None) -> dict:
"""List all files in a directory"""
try:
files = os.listdir(dir_path)
if pattern:
import fnmatch
files = [f for f in files if fnmatch.fnmatch(f, pattern)]
return {
'success': True,
'dir_path': dir_path,
'files': files,
'count': len(files),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@file_analysis_server.tool()
def get_file_info(file_path: str) -> dict:
"""Get information about a file"""
try:
stat = os.stat(file_path)
return {
'success': True,
'file_path': file_path,
'size': stat.st_size,
'created': stat.st_ctime,
'modified': stat.st_mtime,
'is_file': os.path.isfile(file_path),
'is_dir': os.path.isdir(file_path),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}多个工具管理
TypeScript
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
const multiToolServer = createSdkMcpServer('multi-tools', {
tools: [
tool({
name: 'tool_one',
description: 'First tool',
schema: z.object({ input: z.string() }),
handler: async ({ input }) => ({ result: `Tool 1: ${input}` }),
}),
tool({
name: 'tool_two',
description: 'Second tool',
schema: z.object({ data: z.number() }),
handler: async ({ data }) => ({ result: `Tool 2: ${data * 2}` }),
}),
tool({
name: 'tool_three',
description: 'Third tool',
schema: z.object({
name: z.string(),
age: z.number().optional(),
}),
handler: async ({ name, age }) => ({
result: `Tool 3: ${name}, age ${age ?? 'unknown'}`,
}),
}),
],
});
// 在 SDK 中使用
const result = query({
prompt: 'Use all the available tools',
options: {
mcpServers: {
'multi-tools': multiToolServer,
},
},
});Python
python
from codebuddy_agent_sdk import create_sdk_mcp_server
server = create_sdk_mcp_server('multi-tools')
@server.tool()
def tool_one(input: str) -> dict:
"""First tool"""
return {'result': f'Tool 1: {input}'}
@server.tool()
def tool_two(data: int) -> dict:
"""Second tool"""
return {'result': f'Tool 2: {data * 2}'}
@server.tool()
def tool_three(name: str, age: int = None) -> dict:
"""Third tool"""
age_str = age if age else 'unknown'
return {'result': f'Tool 3: {name}, age {age_str}'}类型安全
TypeScript - 使用 Zod 模式
Zod 提供运行时类型验证和强大的类型推断:
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
const dataProcessingServer = createSdkMcpServer('data-processing', {
tools: [
tool({
name: 'process_user_data',
description: 'Process and validate user data',
schema: z.object({
userId: z.number().int().positive().describe('User ID'),
email: z.string().email().describe('User email'),
tags: z.array(z.string()).describe('User tags'),
preferences: z.object({
notifications: z.boolean().default(true),
theme: z.enum(['light', 'dark', 'auto']).default('auto'),
}).optional(),
}),
handler: async (input) => {
// input 类型完全由 Zod schema 推断
// TypeScript 知道所有字段的类型
const result = {
userId: input.userId,
email: input.email,
tagCount: input.tags.length,
hasPreferences: !!input.preferences,
};
return result;
},
}),
],
});Python - 类型注解
Python SDK 使用标准类型注解:
python
from codebuddy_agent_sdk import create_sdk_mcp_server
from typing import Optional, List, Dict, Any
from enum import Enum
class Theme(str, Enum):
LIGHT = 'light'
DARK = 'dark'
AUTO = 'auto'
server = create_sdk_mcp_server('data-processing')
@server.tool()
def process_user_data(
user_id: int,
email: str,
tags: List[str],
notifications: bool = True,
theme: Theme = Theme.AUTO,
) -> Dict[str, Any]:
"""Process and validate user data"""
return {
'user_id': user_id,
'email': email,
'tag_count': len(tags),
'theme': theme.value,
'notifications': notifications,
}完整示例:数据库查询工具
TypeScript
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
interface QueryResult {
rows: Record<string, any>[];
rowCount: number;
}
interface Database {
query(sql: string, params?: any[]): Promise<QueryResult>;
}
// 假设你已有数据库连接
const db: Database = new Database();
const databaseServer = createSdkMcpServer('database', {
tools: [
tool({
name: 'execute_query',
description: 'Execute a read-only SQL query',
schema: z.object({
sql: z.string().describe('SQL query to execute'),
params: z.array(z.any()).optional().describe('Query parameters'),
}),
handler: async ({ sql, params }) => {
try {
// 防止危险操作
const upperSql = sql.toUpperCase();
if (
upperSql.includes('DROP') ||
upperSql.includes('DELETE') ||
upperSql.includes('UPDATE') ||
upperSql.includes('INSERT')
) {
return {
success: false,
error: 'Only SELECT queries are allowed',
};
}
const result = await db.query(sql, params);
return {
success: true,
rows: result.rows,
rowCount: result.rowCount,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Query execution failed',
};
}
},
}),
tool({
name: 'get_table_schema',
description: 'Get the schema of a table',
schema: z.object({
tableName: z.string().describe('Name of the table'),
}),
handler: async ({ tableName }) => {
try {
const result = await db.query(
`SELECT column_name, data_type FROM information_schema.columns WHERE table_name = $1`,
[tableName]
);
return {
success: true,
tableName,
columns: result.rows,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Schema retrieval failed',
};
}
},
}),
],
});Python
python
from codebuddy_agent_sdk import create_sdk_mcp_server
from typing import List, Dict, Any, Optional
server = create_sdk_mcp_server('database')
class Database:
"""Simplified database wrapper"""
async def query(self, sql: str, params: List[Any] = None) -> Dict[str, Any]:
# 实现实际的数据库查询
pass
db = Database()
@server.tool()
async def execute_query(
sql: str,
params: List[Any] = None,
) -> Dict[str, Any]:
"""Execute a read-only SQL query"""
try:
# 防止危险操作
dangerous_keywords = ['DROP', 'DELETE', 'UPDATE', 'INSERT']
if any(keyword in sql.upper() for keyword in dangerous_keywords):
return {
'success': False,
'error': 'Only SELECT queries are allowed',
}
result = await db.query(sql, params)
return {
'success': True,
'rows': result.get('rows', []),
'row_count': result.get('row_count', 0),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@server.tool()
async def get_table_schema(table_name: str) -> Dict[str, Any]:
"""Get the schema of a table"""
try:
result = await db.query(
'SELECT column_name, data_type FROM information_schema.columns WHERE table_name = %s',
[table_name]
)
return {
'success': True,
'table_name': table_name,
'columns': result.get('rows', []),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}完整示例:API 集成工具
TypeScript
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
const apiGatewayServer = createSdkMcpServer('api-gateway', {
tools: [
tool({
name: 'stripe_create_payment',
description: 'Create a payment through Stripe',
schema: z.object({
amount: z.number().positive().describe('Amount in cents'),
currency: z.string().default('usd').describe('Currency code'),
description: z.string().optional().describe('Payment description'),
}),
handler: async ({ amount, currency, description }) => {
try {
const response = await fetch('https://api.stripe.com/v1/payment_intents', {
method: 'POST',
headers: {
'Authorization': `Bearer ${process.env.STRIPE_API_KEY}`,
'Content-Type': 'application/x-www-form-urlencoded',
},
body: new URLSearchParams({
amount: amount.toString(),
currency,
...(description && { description }),
}),
});
if (!response.ok) {
const error = await response.json();
return {
success: false,
error: error.error?.message || 'Payment creation failed',
};
}
const data = await response.json();
return {
success: true,
paymentId: data.id,
status: data.status,
clientSecret: data.client_secret,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
tool({
name: 'github_search_repos',
description: 'Search repositories on GitHub',
schema: z.object({
query: z.string().describe('Search query'),
language: z.string().optional().describe('Programming language'),
sort: z.enum(['stars', 'forks', 'updated']).default('stars'),
}),
handler: async ({ query, language, sort }) => {
try {
const searchQuery = language
? `${query} language:${language}`
: query;
const response = await fetch(
`https://api.github.com/search/repositories?q=${encodeURIComponent(
searchQuery
)}&sort=${sort}`,
{
headers: {
'Authorization': `Bearer ${process.env.GITHUB_TOKEN}`,
},
}
);
if (!response.ok) {
return {
success: false,
error: `GitHub API error: ${response.status}`,
};
}
const data = await response.json();
return {
success: true,
repos: data.items.map((repo: any) => ({
name: repo.name,
url: repo.html_url,
stars: repo.stargazers_count,
language: repo.language,
description: repo.description,
})),
total: data.total_count,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
tool({
name: 'slack_send_message',
description: 'Send a message to a Slack channel',
schema: z.object({
channel: z.string().describe('Channel ID or name'),
text: z.string().describe('Message text'),
thread_ts: z.string().optional().describe('Thread timestamp (for replies)'),
}),
handler: async ({ channel, text, thread_ts }) => {
try {
const payload: Record<string, any> = {
channel,
text,
};
if (thread_ts) {
payload.thread_ts = thread_ts;
}
const response = await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
headers: {
'Authorization': `Bearer ${process.env.SLACK_BOT_TOKEN}`,
'Content-Type': 'application/json',
},
body: JSON.stringify(payload),
});
const data = await response.json();
if (!data.ok) {
return {
success: false,
error: data.error || 'Failed to send message',
};
}
return {
success: true,
messageTs: data.ts,
channel: data.channel,
};
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
};
}
},
}),
],
});
export default apiGatewayServer;Python
python
from codebuddy_agent_sdk import create_sdk_mcp_server
from typing import Optional, List, Dict, Any
import requests
import os
server = create_sdk_mcp_server('api-gateway')
@server.tool()
def stripe_create_payment(
amount: float,
currency: str = 'usd',
description: Optional[str] = None,
) -> Dict[str, Any]:
"""Create a payment through Stripe"""
try:
headers = {
'Authorization': f"Bearer {os.environ.get('STRIPE_API_KEY')}",
}
data = {
'amount': int(amount),
'currency': currency,
}
if description:
data['description'] = description
response = requests.post(
'https://api.stripe.com/v1/payment_intents',
headers=headers,
data=data,
)
if response.status_code >= 400:
error = response.json().get('error', {})
return {
'success': False,
'error': error.get('message', 'Payment creation failed'),
}
data = response.json()
return {
'success': True,
'payment_id': data['id'],
'status': data['status'],
'client_secret': data.get('client_secret'),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@server.tool()
def github_search_repos(
query: str,
language: Optional[str] = None,
sort: str = 'stars',
) -> Dict[str, Any]:
"""Search repositories on GitHub"""
try:
search_query = f"{query} language:{language}" if language else query
response = requests.get(
'https://api.github.com/search/repositories',
params={
'q': search_query,
'sort': sort,
},
headers={
'Authorization': f"Bearer {os.environ.get('GITHUB_TOKEN')}",
},
)
if response.status_code >= 400:
return {
'success': False,
'error': f"GitHub API error: {response.status_code}",
}
data = response.json()
repos = [
{
'name': repo['name'],
'url': repo['html_url'],
'stars': repo['stargazers_count'],
'language': repo['language'],
'description': repo['description'],
}
for repo in data.get('items', [])
]
return {
'success': True,
'repos': repos,
'total': data.get('total_count', 0),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
@server.tool()
def slack_send_message(
channel: str,
text: str,
thread_ts: Optional[str] = None,
) -> Dict[str, Any]:
"""Send a message to a Slack channel"""
try:
payload = {
'channel': channel,
'text': text,
}
if thread_ts:
payload['thread_ts'] = thread_ts
response = requests.post(
'https://slack.com/api/chat.postMessage',
headers={
'Authorization': f"Bearer {os.environ.get('SLACK_BOT_TOKEN')}",
'Content-Type': 'application/json',
},
json=payload,
)
data = response.json()
if not data.get('ok'):
return {
'success': False,
'error': data.get('error', 'Failed to send message'),
}
return {
'success': True,
'message_ts': data['ts'],
'channel': data['channel'],
}
except Exception as e:
return {
'success': False,
'error': str(e),
}选择性地允许工具
你可以选择性地允许特定工具被调用:
TypeScript
typescript
import { query, createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
const result = query({
prompt: 'Search for popular repositories and send a message to Slack',
options: {
mcpServers: {
'api-gateway': apiGatewayServer,
},
canUseTool: (toolCall) => {
// 只允许 GitHub 搜索工具
const allowedTools = [
'mcp__api-gateway__github_search_repos',
];
if (!allowedTools.includes(toolCall.name)) {
return false;
}
return true;
},
},
});Python
python
from codebuddy_agent_sdk import query
from api_gateway_server import server as api_gateway_server
result = query(
prompt='Search for popular repositories and send a message to Slack',
options={
'mcp_servers': {
'api-gateway': api_gateway_server,
},
'can_use_tool': lambda toolCall: (
# 只允许 GitHub 搜索工具
toolCall.get('name') == 'mcp__api-gateway__github_search_repos'
),
},
)错误处理
TypeScript - API 调用错误处理
typescript
import { createSdkMcpServer, tool } from '@tencent-ai/agent-sdk';
import { z } from 'zod';
const apiServer = createSdkMcpServer('api-tools', {
tools: [
tool({
name: 'fetch_data',
description: 'Fetch data from an API',
schema: z.object({
endpoint: z.string().url().describe('API endpoint URL'),
}),
handler: async ({ endpoint }) => {
try {
const response = await fetch(endpoint);
if (!response.ok) {
return {
content: [{
type: 'text',
text: `API error: ${response.status} ${response.statusText}`,
}],
};
}
const data = await response.json();
return {
content: [{
type: 'text',
text: JSON.stringify(data, null, 2),
}],
};
} catch (error) {
return {
content: [{
type: 'text',
text: `Failed to fetch data: ${error instanceof Error ? error.message : String(error)}`,
}],
};
}
},
}),
],
});Python - API 调用错误处理
python
from codebuddy_agent_sdk import create_sdk_mcp_server
import aiohttp
import json
from typing import Any
server = create_sdk_mcp_server('api-tools')
@server.tool()
async def fetch_data(endpoint: str) -> dict[str, Any]:
"""Fetch data from an API"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(endpoint) as response:
if response.status != 200:
return {
'content': [{
'type': 'text',
'text': f'API error: {response.status} {response.reason}'
}]
}
data = await response.json()
return {
'content': [{
'type': 'text',
'text': json.dumps(data, indent=2)
}]
}
except Exception as e:
return {
'content': [{
'type': 'text',
'text': f'Failed to fetch data: {str(e)}'
}]
}最佳实践
1. 使用明确的参数类型和描述
为工具参数提供清晰的类型和描述,帮助 Agent 理解如何调用工具:
typescript
tool({
name: 'process_data',
schema: z.object({
data: z.array(z.string()).describe('Data to process'),
format: z.enum(['json', 'csv']).describe('Output format'),
}),
handler: async ({ data, format }) => {
// 处理逻辑
},
})2. 提供有意义的错误反馈
始终返回明确的错误信息,以便 Agent 和用户理解发生了什么:
typescript
handler: async (input) => {
try {
// 执行操作
} catch (error) {
return {
content: [{
type: 'text',
text: `Operation failed: ${error instanceof Error ? error.message : String(error)}`,
}],
};
}
}3. 验证输入参数
确保输入符合预期的格式和范围:
typescript
handler: async ({ userId, email }) => {
if (!Number.isInteger(userId) || userId <= 0) {
return {
content: [{
type: 'text',
text: 'Error: User ID must be a positive integer',
}],
};
}
if (!email.includes('@')) {
return {
content: [{
type: 'text',
text: 'Error: Invalid email format',
}],
};
}
// 继续处理
}