Providers
Providers handle communication between users and the agent. Ash supports multiple communication channels including Telegram and CLI.
Overview
Providers are responsible for:
- Receiving messages from users
- Delivering responses (including streaming)
- Managing message editing and deletion
- Handling platform-specific features
Provider Interface
Location: src/ash/providers/base.py
from abc import ABC, abstractmethodfrom typing import AsyncIterator, Callable, Awaitable
MessageHandler = Callable[[IncomingMessage], Awaitable[None]]
class Provider(ABC): @property @abstractmethod def name(self) -> str: """Provider name (e.g., 'telegram')."""
@abstractmethod async def start(self, handler: MessageHandler) -> None: """Start receiving messages."""
@abstractmethod async def stop(self) -> None: """Stop the provider."""
@abstractmethod async def send(self, message: OutgoingMessage) -> str: """Send a message, return message ID."""
@abstractmethod async def send_streaming( self, chat_id: str, stream: AsyncIterator[str], *, reply_to: str | None = None, ) -> str: """Send streaming response, updating message."""
@abstractmethod async def edit(self, message_id: str, content: str) -> None: """Edit an existing message."""
@abstractmethod async def delete(self, message_id: str) -> None: """Delete a message."""Message Types
class IncomingMessage: id: str chat_id: str user_id: str username: str | None content: str reply_to: str | None metadata: dict
class OutgoingMessage: chat_id: str content: str reply_to: str | None metadata: dictTelegram Provider
Connect Ash to Telegram for mobile access to your assistant.
Configuration
[telegram]bot_token = "123456789:ABCdefGHIjklMNOpqrSTUvwxYZ"allowed_users = ["@yourusername", "123456789"]allowed_groups = ["-100123456789"]group_mode = "mention"webhook_url = "https://your-domain.com/webhook"| Option | Type | Default | Description |
|---|---|---|---|
bot_token | string | required | Bot token from BotFather |
allowed_users | list | [] | Authorized usernames or IDs |
allowed_groups | list | [] | Authorized group chat IDs |
group_mode | string | "mention" | "mention" or "always" |
webhook_url | string | null | Webhook URL (polling if not set) |
Creating a Telegram Bot
-
Open BotFather
Search for
@BotFatherin Telegram and start a chat. -
Create a new bot
Send
/newbotand follow the prompts to name your bot. -
Copy the token
BotFather will give you a token like
123456789:ABCdef... -
Configure Ash
Add the token to your config or environment:
Terminal window export TELEGRAM_BOT_TOKEN=123456789:ABCdef...
User Authorization
Specify who can use the bot:
[telegram]allowed_users = [ "@yourusername", # By username "123456789", # By user ID]Finding Your User ID
Send /start to @userinfobot to get your Telegram user ID.
Group Chats
Allow the bot in specific groups:
[telegram]allowed_groups = ["-100123456789"]group_mode = "mention"| Mode | Behavior |
|---|---|
mention | Bot responds only when mentioned (@botname) |
always | Bot responds to all messages in the group |
Finding Group IDs
- Add your bot to the group
- Send a message in the group
- Check server logs for the group ID
Polling vs Webhook
Polling (Default)
The bot polls Telegram for updates. Good for local development:
uv run ash serveWebhook
For production, use webhooks. Set the URL:
[telegram]webhook_url = "https://your-domain.com/webhook"Start with webhook mode:
uv run ash serve --webhookImplementation
Location: src/ash/providers/telegram/
Uses aiogram 3.x for Telegram Bot API:
from aiogram import Bot, Dispatcher
class TelegramProvider(Provider): def __init__( self, bot_token: str, allowed_users: list[str], webhook_url: str | None = None, ): self.bot = Bot(token=bot_token) self.dp = Dispatcher()Features:
- Polling mode - Long polling for development
- Webhook mode - HTTP webhooks for production
- User authorization - Restrict to allowed users
- Group support - Mention-based or always respond
- Streaming - Edit message as response generates
Streaming Implementation
async def send_streaming(self, chat_id: str, stream: AsyncIterator[str]) -> str: content = "" message = None
async for chunk in stream: content += chunk if message is None: message = await self.bot.send_message(chat_id, content) else: await self.bot.edit_message_text( content, chat_id=chat_id, message_id=message.message_id, )
return str(message.message_id)CLI Provider
The CLI uses direct I/O rather than the provider interface:
async def chat_loop(): while True: user_input = input("> ") async for chunk in agent.process(user_input): print(chunk, end="", flush=True) print()Server Configuration
Configure the FastAPI server for webhooks and health checks:
[server]host = "127.0.0.1"port = 8080webhook_path = "/webhook"| Option | Type | Default | Description |
|---|---|---|---|
host | string | "127.0.0.1" | Bind address |
port | int | 8080 | Port number |
webhook_path | string | "/webhook" | Telegram webhook path |
Running the Server
Start with default settings:
uv run ash serveOverride host and port:
uv run ash serve --host 0.0.0.0 --port 3000Endpoints
| Path | Method | Description |
|---|---|---|
/health | GET | Health check |
/webhook | POST | Telegram webhook |
Production Setup
For production, bind to all interfaces:
[server]host = "0.0.0.0"port = 8080Reverse Proxy
Use nginx or Caddy for TLS termination:
server { listen 443 ssl; server_name your-domain.com;
ssl_certificate /path/to/cert.pem; ssl_certificate_key /path/to/key.pem;
location / { proxy_pass http://127.0.0.1:8080; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; }}Session Management
Configure how Ash manages conversation sessions:
[sessions]mode = "persistent"max_concurrent = 2| Option | Type | Default | Description |
|---|---|---|---|
mode | string | "persistent" | Session mode: "persistent" or "fresh" |
max_concurrent | int | 2 | Maximum parallel sessions processing at once |
Session Modes
| Mode | Behavior |
|---|---|
persistent | Conversation history maintained across messages |
fresh | Each message starts with a clean slate |
Concurrency
The max_concurrent setting controls how many different sessions can process messages simultaneously.
- Same session (same user/thread): Messages are serialized and can be “steered” into the running conversation
- Different sessions (different users or forum topics): Process in parallel up to the limit
Provider Registry
Location: src/ash/providers/registry.py
registry = ProviderRegistry()registry.register("telegram", TelegramProvider)
provider = registry.create("telegram", **config)await provider.start(handler)Creating Custom Providers
To add a new provider (e.g., Discord, Slack):
- Implement the
Providerinterface - Handle message reception and delivery
- Support streaming if platform allows
- Register in the provider registry