If you've ever tried wiring a Telegram bot to a live exchange feed, you've probably hit the same wall I did — everything works fine until you need both the bot commands and the WebSocket price stream running simultaneously, without one blocking the other. This tutorial walks through the architecture I settled on after a few failed attempts: a single-process asyncio event loop with three clean I/O layers sharing one queue.
This is episode 1 of a series. By the end, you'll have a running skeleton that receives Telegram commands and streams real-time Binance price ticks at the same time — no threads, no multiprocessing, no glue hacks.
The Problem: Three Async Worlds, One Loop
The naive approach is to run bot.polling() in a thread and the WebSocket in another. That works until you need shared state — like a position tracker that both the command handler and the price stream need to write to. Thread locks, race conditions, and cryptic deadlocks follow quickly.
The first thing I tried was concurrent.futures.ThreadPoolExecutor with a shared dictionary protected by a threading.Lock. It worked in testing, but under load the WebSocket reconnect logic and the Telegram handler occasionally grabbed the lock at the same time. The bot would freeze silently, which is the worst failure mode for anything touching real money.
The fix is to stop fighting the GIL and lean into asyncio instead. python-telegram-bot v20+ is fully async-native. Binance's WebSocket stream works fine with the websockets library. Everything can live in one event loop, communicating through a single asyncio.Queue.
pip install "python-telegram-bot[webhooks]" websockets ccxt aiohttp pydantic
Section 1: Event Model First
Before writing any network code, define what events look like. This is the contract between every layer.
# core/events.py
from dataclasses import dataclass, field
from decimal import Decimal
from datetime import datetime
from enum import Enum
class EventType(Enum):
COMMAND = "command"
TICK = "tick"
ORDER_ACK = "order_ack"
@dataclass
class TickEvent:
type: EventType = field(default=EventType.TICK, init=False)
symbol: str
price: Decimal
volume: Decimal
ts: datetime
@dataclass
class CommandEvent:
type: EventType = field(default=EventType.COMMAND, init=False)
user_id: int
command: str
args: list[str]
One thing worth calling out: Decimal instead of float. Floating-point rounding on order quantities will quietly introduce errors. If your bot calculates a position size of 0.1 BTC in float arithmetic, what actually hits the exchange might be 0.09999999999999999. Exchanges reject malformed quantities or, worse, round in their favor. Use Decimal everywhere money is involved.
Section 2: The Two Gateways
Telegram Gateway
# gateway/telegram_gw.py
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes
from core.events import CommandEvent
import asyncio
class TelegramGateway:
def __init__(self, token: str, event_queue: asyncio.Queue):
self.app = Application.builder().token(token).build()
self.queue = event_queue
self._register_handlers()
def _register_handlers(self):
for cmd in ["start", "stop", "status", "buy", "sell"]:
self.app.add_handler(CommandHandler(cmd, self._dispatch))
async def _dispatch(self, update: Update, ctx: ContextTypes.DEFAULT_TYPE):
msg = update.message
args = ctx.args or []
evt = CommandEvent(
user_id=msg.from_user.id,
command=msg.text.split()[0].lstrip("/"),
args=args,
)
await self.queue.put(evt)
await msg.reply_text(f"✓ {evt.command} queued")
async def start(self):
await self.app.initialize()
await self.app.start()
await self.app.updater.start_polling(drop_pending_updates=True)
async def stop(self):
await self.app.updater.stop()
await self.app.stop()
await self.app.shutdown()
The drop_pending_updates=True flag is important. When you restart the bot after a crash, Telegram buffers any commands sent while it was down. Without this flag, those queued commands replay immediately on startup — including any /buy orders a user sent two hours ago while the bot was offline. That's a serious bug in a trading context.
Exchange WebSocket Stream
# gateway/exchange_ws.py
import asyncio, json, websockets
from decimal import Decimal
from datetime import datetime, timezone
from core.events import TickEvent
BINANCE_WS = "wss://stream.binance.com:9443/stream?streams={}"
class ExchangeStream:
def __init__(self, symbols: list[str], event_queue: asyncio.Queue):
self.symbols = [s.lower() for s in symbols]
self.queue = event_queue
self._running = False
def _build_url(self) -> str:
streams = "/".join(f"{s}@bookTicker" for s in self.symbols)
return BINANCE_WS.format(streams)
async def _reconnect_loop(self):
backoff = 1
while self._running:
try:
async with websockets.connect(
self._build_url(),
ping_interval=20,
ping_timeout=10,
) as ws:
backoff = 1
async for raw in ws:
data = json.loads(raw).get("data", {})
symbol = data.get("s", "")
if not symbol:
continue
evt = TickEvent(
symbol=symbol,
price=Decimal(data["b"]), # best bid
volume=Decimal(data["B"]),
ts=datetime.now(timezone.utc),
)
await self.queue.put(evt)
except (websockets.WebSocketException, ConnectionError):
await asyncio.sleep(min(backoff, 60))
backoff *= 2
async def start(self):
self._running = True
asyncio.create_task(self._reconnect_loop())
async def stop(self):
self._running = False
The exponential backoff reconnect loop is not optional. Binance force-closes WebSocket connections server-side every 24 hours. Without reconnect logic, a bot that runs fine all day will go silent overnight. The backoff caps at 60 seconds so you're not hammering a server that's actually down, but you reconnect quickly after transient drops.
Section 3: Router and Main Loop
The router is the glue. It pulls events off the queue and dispatches them to whichever handlers subscribed to that event type.
# core/router.py
import asyncio
from core.events import EventType
class Router:
def __init__(self, queue: asyncio.Queue):
self.queue = queue
self._subscribers: dict[EventType, list] = {}
def subscribe(self, etype: EventType, handler):
self._subscribers.setdefault(etype, []).append(handler)
async def run(self):
while True:
evt = await self.queue.get()
for handler in self._subscribers.get(evt.type, []):
asyncio.create_task(handler(evt))
self.queue.task_done()
Each handler is wrapped in create_task so a slow handler doesn't block the router from processing the next event. This matters when your strategy does any I/O — like checking account balance before deciding to order.
# main.py
import asyncio
from config import Config
from gateway.telegram_gw import TelegramGateway
from gateway.exchange_ws import ExchangeStream
from core.router import Router
from core.events import EventType
async def on_tick(evt):
print(f"[TICK] {evt.symbol} {evt.price}")
async def on_command(evt):
print(f"[CMD] /{evt.command} {evt.args}")
async def main():
queue = asyncio.Queue(maxsize=1000)
router = Router(queue)
router.subscribe(EventType.TICK, on_tick)
router.subscribe(EventType.COMMAND, on_command)
tg = TelegramGateway(Config.TG_TOKEN, queue)
ws = ExchangeStream(["BTCUSDT", "ETHUSDT"], queue)
await tg.start()
await ws.start()
await router.run()
if __name__ == "__main__":
asyncio.run(main())
The maxsize=1000 on the queue is deliberate backpressure. If the strategy engine gets slow — maybe it's doing a heavy calculation — the queue fills up instead of growing without bound. Once it hits 1000 items, queue.put() will block, which signals that something upstream is backed up. In episode 2 I'll add a put_nowait + QueueFull handler that sends a Telegram alert when the queue backs up.
Section 4: Config and Secrets
# config.py
import os
from dataclasses import dataclass
@dataclass(frozen=True)
class Config:
TG_TOKEN: str = os.environ["TG_BOT_TOKEN"]
TG_CHAT_ID: int = int(os.environ["TG_CHAT_ID"])
ENV: str = os.getenv("ENV", "dev")
Inject secrets via environment variables only. Never commit a .env file with real API keys to any repository, even a private one. If you're running locally, export them in your shell session:
export TG_BOT_TOKEN=your_token_here
export TG_CHAT_ID=your_chat_id_here
python main.py
Variations & Gotchas
Webhook vs. polling: Polling works fine for development and low-traffic bots. If you're on a server with a public HTTPS endpoint, switch to webhooks — it cuts Telegram API calls to zero when no commands are pending. The python-telegram-bot[webhooks] extra includes everything you need; just call start_webhook() instead of start_polling().
Multi-exchange: The ExchangeStream class as written is Binance-specific. To support other exchanges, swap the URL builder and field names. ccxt provides a unified REST interface for orders, but for WebSocket streams you'll still need exchange-specific connectors — or use ccxt.pro which wraps them.
Mac vs. Linux event loop: On macOS with Python 3.12+, you may hit a RuntimeError: no running event loop when calling asyncio.run() inside certain frameworks. The fix is explicit: asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) at the top of main.py. On Linux this is never an issue.
Queue blocking gotcha: If router.run() is the last await in main(), the Telegram and WebSocket tasks need to be started as background tasks rather than awaited sequentially, otherwise the queue consumer never starts. The pattern above handles this correctly — tg.start() and ws.start() both return quickly (they launch their own internal tasks), and then router.run() takes over the main coroutine.
Verification
Start the bot and watch the terminal:
[TICK] BTCUSDT 68423.12000
[TICK] ETHUSDT 3641.05000
[TICK] BTCUSDT 68424.00000
Send /status in Telegram:
[CMD] /status []
Both streams running concurrently in one process — that's the skeleton complete. In episode 2, StrategyEngine goes on top of this: a moving-average crossover that reads from the tick stream and fires orders through ccxt when the signal triggers.
🐦 Faster updates on X: @baegseungh7061
📚 More in this series: All posts
💌 Subscribe: Follow on X or grab the RSS
댓글
댓글 쓰기