Wiki/Architecture/Python Bridge

Python Bridge (FastAPI)

Тонкий мост между Telegram, PostgreSQL и LLM провайдерами

🌉Принцип Thin Bridge

Python Bridge — это тонкий слой, который НЕ содержит бизнес-логики. Он только маршрутизирует, аутентифицирует и обеспечивает коммуникацию.

✓ Bridge ДЕЛАЕТ
  • • Telegram long-polling
  • • FastAPI endpoints
  • • Auth & RBAC
  • • Connection pooling (asyncpg)
  • • Queue & backpressure
  • • Provider clients
  • • Message split (4096)
  • • Prometheus metrics
✗ Bridge НЕ ДЕЛАЕТ
  • • Business logic
  • • Process definitions
  • • Agent capability maps
  • • Policy values
  • • Execution decisions
  • • Data transformation
  • • State management
  • • Workflow logic

Архитектура Bridge

Entry Points

Telegram
📨
Long-Poll
:28000
HTTP
🤖
FastAPI
:8000
Internal
🗄️
Internal API

Handlers Layer

ChatHandler
сообщения
CmdHandler
команды
OperHandler
оператор
IntAPIHandler
internal

Services Layer

UserService
ChatService
AgentService
ProcessService

Provider Clients

🧠
Ollama
🧠
GLM
🌐
OpenAI
n8n

Core Infrastructure

asyncpg
PostgreSQL pool
chat_queue
Backpressure
metrics
Prometheus
logging
structlog

Компоненты

Telegram Long-Polling

Получает обновления от Telegram API через long-polling. Не использует webhooks для простоты деплоя.

  • TELEGRAM_BOT_TOKEN — основной токен
  • TELEGRAM_BOT_TOKENS — дополнительные боты
  • DISABLE_TELEGRAM_POLLING — отключить
# bot/telegram/poller.py
async def poll_updates():
    offset = 0
    while True:
        updates = await bot.get_updates(
            offset=offset,
            timeout=30,
            allowed_updates=["message", ...]
        )
        for update in updates:
            await handle_update(update)
            offset = update.update_id + 1

Queue & Backpressure

Управляет параллелизмом и защищает от перегрузки.

  • MAX_CONCURRENT_CHAT_JOBS = 2
  • MAX_PENDING_CHAT_JOBS_PER_CHAT = 8
# Queue structure
chat_queue = {
    "global_semaphore": Semaphore(2),
    "per_chat": {
        chat_id: Semaphore(8)
    }
}

# Backpressure handling
if queue.full():
    return "Занят, попробуйте позже"

Connection Pool (asyncpg)

Пул соединений к PostgreSQL для высокой производительности.

  • • Min connections: 5
  • • Max connections: 20
  • • Command timeout: 60s
# Database connection
pool = await asyncpg.create_pool(
    host="postgres",
    port=5432,
    database="clowbot",
    user="clowbot",
    password=os.getenv("POSTGRES_PASSWORD"),
    min_size=5,
    max_size=20,
    command_timeout=60
)

Message Split (4096 limit)

Telegram ограничивает сообщения 4096 символами. Bridge автоматически разбивает длинные сообщения.

  • • Разбивает по абзацам
  • • Сохраняет форматирование
  • • Добавляет "(продолжение)"
def split_message(text: str, limit=4096):
    if len(text) <= limit:
        return [text]

    chunks = []
    while text:
        # Split at paragraph boundary
        chunk = text[:limit]
        last_nl = chunk.rfind('\n\n')
        if last_nl > limit // 2:
            chunk = text[:last_nl]
        chunks.append(chunk)
        text = text[len(chunk):]

    return chunks

Handler Pattern

Все хендлеры следуют единому паттерну: получают HandlerContext, выполняют работу через сервисы, отправляют ответ через outbox.

# Handler pattern
class ChatHandler:
    async def handle(self, ctx: HandlerContext) -> None:
        # 1. Load context from DB (via service)
        user = await self.user_service.get_or_create(ctx.user_id)
        chat = await self.chat_service.get_or_create(ctx.chat_id)

        # 2. Get agent selection (via DB policy)
        agent = await self.agent_service.select_agent(
            intent=ctx.intent,
            user_id=user.id
        )

        # 3. Call LLM (via provider client)
        response = await self.llm_client.generate(
            agent=agent,
            messages=await self.get_context_messages(chat.id)
        )

        # 4. Send response (via outbox)
        await self.outbox.send(
            chat_id=ctx.chat_id,
            text=response.text,
            parse_mode="Markdown"
        )

        # 5. Log to DB (via service)
        await self.message_service.save(
            chat_id=chat.id,
            role="assistant",
            content=response.text
        )

Internal API

Bridge предоставляет Internal API для n8n и других внутренних сервисов. Все запросы требуют INTERNAL_API_TOKEN.

EndpointМетодНазначение
/healthGETHealth check
/internal/godmodetaskPOSTOperator entrypoint
/internal/agent/selectPOSTAgent selection
/internal/process/events/runPOSTProcess run events
/internal/process/events/stepPOSTProcess step events
/internal/process/artifactsPOSTProcess artifacts

Конфигурация (.env)

# Telegram
TELEGRAM_BOT_TOKEN=your_token
TELEGRAM_BOT_TOKENS=token1,token2,token3  # Multi-bot

# PostgreSQL
POSTGRES_DB=clowbot
POSTGRES_USER=clowbot
POSTGRES_PASSWORD=clowbot_pass
POSTGRES_PORT=25432

# App
BOT_PORT=28000
INTERNAL_API_TOKEN=your_internal_token
RUNTIME_MODE=docker
DISABLE_TELEGRAM_POLLING=false

# Queue
MAX_CONCURRENT_CHAT_JOBS=2
MAX_PENDING_CHAT_JOBS_PER_CHAT=8

# Timeouts
HTTP_TIMEOUT_SECONDS=420
N8N_TIMEOUT_SECONDS=240
PROCESS_LLM_TIMEOUT_SECONDS=300

Связанные страницы

Навыки для понимания

Python asyncioFastAPIasyncpgTelegram Bot APIQueue patterns