Python Bridge (FastAPI)
Тонкий мост между Telegram, PostgreSQL и LLM провайдерами
🌉Принцип Thin Bridge
Python Bridge — это тонкий слой, который НЕ содержит бизнес-логики. Он только маршрутизирует, аутентифицирует и обеспечивает коммуникацию.
✓ Bridge ДЕЛАЕТ
- • Telegram long-polling
- • FastAPI endpoints
- • Auth & RBAC
- • Connection pooling (asyncpg)
- • Queue & backpressure
- • Provider clients
- • Message split (4096)
- • Prometheus metrics
✗ Bridge НЕ ДЕЛАЕТ
- • Business logic
- • Process definitions
- • Agent capability maps
- • Policy values
- • Execution decisions
- • Data transformation
- • State management
- • Workflow logic
Архитектура Bridge
Entry Points
Telegram
📨
Long-Poll
:28000
HTTP
🤖
FastAPI
:8000
Internal
🗄️
Internal API
↓
Handlers Layer
ChatHandler
сообщения
CmdHandler
команды
OperHandler
оператор
IntAPIHandler
internal
↓
Services Layer
UserService
ChatService
AgentService
ProcessService
↓
Provider Clients
🧠
Ollama
🧠
GLM
🌐
OpenAI
⚡
n8n
↓
Core Infrastructure
asyncpg
PostgreSQL pool
chat_queue
Backpressure
metrics
Prometheus
logging
structlog
Компоненты
Telegram Long-Polling
Получает обновления от Telegram API через long-polling. Не использует webhooks для простоты деплоя.
- •
TELEGRAM_BOT_TOKEN— основной токен - •
TELEGRAM_BOT_TOKENS— дополнительные боты - •
DISABLE_TELEGRAM_POLLING— отключить
# bot/telegram/poller.py
async def poll_updates():
offset = 0
while True:
updates = await bot.get_updates(
offset=offset,
timeout=30,
allowed_updates=["message", ...]
)
for update in updates:
await handle_update(update)
offset = update.update_id + 1Queue & Backpressure
Управляет параллелизмом и защищает от перегрузки.
- •
MAX_CONCURRENT_CHAT_JOBS= 2 - •
MAX_PENDING_CHAT_JOBS_PER_CHAT= 8
# Queue structure
chat_queue = {
"global_semaphore": Semaphore(2),
"per_chat": {
chat_id: Semaphore(8)
}
}
# Backpressure handling
if queue.full():
return "Занят, попробуйте позже"Connection Pool (asyncpg)
Пул соединений к PostgreSQL для высокой производительности.
- • Min connections: 5
- • Max connections: 20
- • Command timeout: 60s
# Database connection
pool = await asyncpg.create_pool(
host="postgres",
port=5432,
database="clowbot",
user="clowbot",
password=os.getenv("POSTGRES_PASSWORD"),
min_size=5,
max_size=20,
command_timeout=60
)Message Split (4096 limit)
Telegram ограничивает сообщения 4096 символами. Bridge автоматически разбивает длинные сообщения.
- • Разбивает по абзацам
- • Сохраняет форматирование
- • Добавляет "(продолжение)"
def split_message(text: str, limit=4096):
if len(text) <= limit:
return [text]
chunks = []
while text:
# Split at paragraph boundary
chunk = text[:limit]
last_nl = chunk.rfind('\n\n')
if last_nl > limit // 2:
chunk = text[:last_nl]
chunks.append(chunk)
text = text[len(chunk):]
return chunksHandler Pattern
Все хендлеры следуют единому паттерну: получают HandlerContext, выполняют работу через сервисы, отправляют ответ через outbox.
# Handler pattern
class ChatHandler:
async def handle(self, ctx: HandlerContext) -> None:
# 1. Load context from DB (via service)
user = await self.user_service.get_or_create(ctx.user_id)
chat = await self.chat_service.get_or_create(ctx.chat_id)
# 2. Get agent selection (via DB policy)
agent = await self.agent_service.select_agent(
intent=ctx.intent,
user_id=user.id
)
# 3. Call LLM (via provider client)
response = await self.llm_client.generate(
agent=agent,
messages=await self.get_context_messages(chat.id)
)
# 4. Send response (via outbox)
await self.outbox.send(
chat_id=ctx.chat_id,
text=response.text,
parse_mode="Markdown"
)
# 5. Log to DB (via service)
await self.message_service.save(
chat_id=chat.id,
role="assistant",
content=response.text
)Internal API
Bridge предоставляет Internal API для n8n и других внутренних сервисов. Все запросы требуют INTERNAL_API_TOKEN.
| Endpoint | Метод | Назначение |
|---|---|---|
| /health | GET | Health check |
| /internal/godmodetask | POST | Operator entrypoint |
| /internal/agent/select | POST | Agent selection |
| /internal/process/events/run | POST | Process run events |
| /internal/process/events/step | POST | Process step events |
| /internal/process/artifacts | POST | Process artifacts |
Конфигурация (.env)
# Telegram TELEGRAM_BOT_TOKEN=your_token TELEGRAM_BOT_TOKENS=token1,token2,token3 # Multi-bot # PostgreSQL POSTGRES_DB=clowbot POSTGRES_USER=clowbot POSTGRES_PASSWORD=clowbot_pass POSTGRES_PORT=25432 # App BOT_PORT=28000 INTERNAL_API_TOKEN=your_internal_token RUNTIME_MODE=docker DISABLE_TELEGRAM_POLLING=false # Queue MAX_CONCURRENT_CHAT_JOBS=2 MAX_PENDING_CHAT_JOBS_PER_CHAT=8 # Timeouts HTTP_TIMEOUT_SECONDS=420 N8N_TIMEOUT_SECONDS=240 PROCESS_LLM_TIMEOUT_SECONDS=300
Связанные страницы
Навыки для понимания
Python asyncioFastAPIasyncpgTelegram Bot APIQueue patterns