from fastapi import FastAPI, Response, Request, HTTPException
from fastapi.responses import StreamingResponse
from typing import Dict, Any
import asyncio
import json
import time
import uvicorn
app = FastAPI()
def generate_sse_event(
event_type: str = "message",
data: Any = None,
event_id: str = None,
retry: int = None
) -> str:
"""Форматирование события в формате SSE"""
lines = []
if event_id is not None:
lines.append(f"id: {event_id}")
if event_type != "message":
lines.append(f"event: {event_type}")
if data is not None:
if isinstance(data, dict):
data_str = json.dumps(data)
else:
data_str = str(data)
# Многострочные данные
for line in data_str.split('\n'):
lines.append(f"data: {line}")
if retry is not None:
lines.append(f"retry: {retry}")
lines.append("") # Пустая строка завершает событие
lines.append("") # Двойной перевод строки
return "\n".join(lines)
async def event_stream(request: Request, start_id: int = 0):
"""Генератор событий для SSE"""
event_id = start_id
# Отправка начального события
yield generate_sse_event(
event_type="connected",
data={"message": "Подключено к потоку событий", "start_id": start_id},
event_id=str(event_id)
)
event_id += 1
# Установка интервала повтора
yield generate_sse_event(retry=5000)
try:
while True:
# Проверка отключения клиента
if await request.is_disconnected():
print("Клиент отключился")
break
# Генерация события
event_data = {
"id": event_id,
"timestamp": time.time(),
"message": f"Событие #{event_id}"
}
yield generate_sse_event(
event_type="tick",
data=event_data,
event_id=str(event_id)
)
event_id += 1
# Отправка комментария
yield f": Серверное время {time.strftime('%H:%M:%S')}\n\n"
await asyncio.sleep(2)
except asyncio.CancelledError:
print("Поток отменён")
raise
except Exception as e:
print(f"Ошибка в потоке: {e}")
yield generate_sse_event(
event_type="error",
data={"message": str(e)}
)
finally:
# Финальное событие
yield generate_sse_event(
event_type="disconnected",
data={"message": "Поток завершён"}
)
@app.get("/events")
async def sse_endpoint(request: Request):
"""Эндпоинт для SSE потока"""
# Получение последнего ID события
last_event_id = request.headers.get("Last-Event-ID")
start_id = int(last_event_id) if last_event_id else 0
print(f"Подключение клиента, восстановление с события #{start_id}")
return StreamingResponse(
event_stream(request, start_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Отключение буферизации nginx
}
)
# Симуляция уведомлений
notifications_store: Dict[int, Dict] = {}
notification_counter = 0
async def notification_stream(request: Request):
"""Поток уведомлений"""
global notification_counter
last_id = 0
# Получение последнего события
last_event_id = request.headers.get("Last-Event-ID")
if last_event_id:
last_id = int(last_event_id)
print(f"Восстановление уведомлений с #{last_id}")
try:
while True:
if await request.is_disconnected():
break
# Проверка новых уведомлений
new_notifications = [
(nid, payload) for nid, payload in notifications_store.items()
if nid > last_id
]
for nid, payload in sorted(new_notifications):
yield generate_sse_event(
event_type="notification",
data=payload,
event_id=str(nid)
)
last_id = nid
await asyncio.sleep(1)
except asyncio.CancelledError:
raise
@app.get("/notifications")
async def notifications_endpoint(request: Request):
"""Эндпоинт для потока уведомлений"""
return StreamingResponse(
notification_stream(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
@app.post("/notify")
async def create_notification(notification: Dict[str, Any]):
"""Создание нового уведомления"""
global notification_counter
notification_counter += 1
notification_data = {
"id": notification_counter,
"type": notification.get("type", "info"),
"title": notification.get("title", "Уведомление"),
"body": notification.get("body", ""),
"timestamp": time.time()
}
notifications_store[notification_counter] = notification_data
return {"status": "created", "id": notification_counter}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
from fastapi import FastAPI, Response, Request, HTTPException
from fastapi.responses import StreamingResponse
from typing import Dict, Any
import asyncio
import json
import time
import uvicorn
app = FastAPI()
def generate_sse_event(
event_type: str = "message",
data: Any = None,
event_id: str = None,
retry: int = None
) -> str:
"""Форматирование события в формате SSE"""
lines = []
if event_id is not None:
lines.append(f"id: {event_id}")
if event_type != "message":
lines.append(f"event: {event_type}")
if data is not None:
if isinstance(data, dict):
data_str = json.dumps(data)
else:
data_str = str(data)
# Многострочные данные
for line in data_str.split('\n'):
lines.append(f"data: {line}")
if retry is not None:
lines.append(f"retry: {retry}")
lines.append("") # Пустая строка завершает событие
lines.append("") # Двойной перевод строки
return "\n".join(lines)
async def event_stream(request: Request, start_id: int = 0):
"""Генератор событий для SSE"""
event_id = start_id
# Отправка начального события
yield generate_sse_event(
event_type="connected",
data={"message": "Подключено к потоку событий", "start_id": start_id},
event_id=str(event_id)
)
event_id += 1
# Установка интервала повтора
yield generate_sse_event(retry=5000)
try:
while True:
# Проверка отключения клиента
if await request.is_disconnected():
print("Клиент отключился")
break
# Генерация события
event_data = {
"id": event_id,
"timestamp": time.time(),
"message": f"Событие #{event_id}"
}
yield generate_sse_event(
event_type="tick",
data=event_data,
event_id=str(event_id)
)
event_id += 1
# Отправка комментария
yield f": Серверное время {time.strftime('%H:%M:%S')}\n\n"
await asyncio.sleep(2)
except asyncio.CancelledError:
print("Поток отменён")
raise
except Exception as e:
print(f"Ошибка в потоке: {e}")
yield generate_sse_event(
event_type="error",
data={"message": str(e)}
)
finally:
# Финальное событие
yield generate_sse_event(
event_type="disconnected",
data={"message": "Поток завершён"}
)
@app.get("/events")
async def sse_endpoint(request: Request):
"""Эндпоинт для SSE потока"""
# Получение последнего ID события
last_event_id = request.headers.get("Last-Event-ID")
start_id = int(last_event_id) if last_event_id else 0
print(f"Подключение клиента, восстановление с события #{start_id}")
return StreamingResponse(
event_stream(request, start_id),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # Отключение буферизации nginx
}
)
# Симуляция уведомлений
notifications_store: Dict[int, Dict] = {}
notification_counter = 0
async def notification_stream(request: Request):
"""Поток уведомлений"""
global notification_counter
last_id = 0
# Получение последнего события
last_event_id = request.headers.get("Last-Event-ID")
if last_event_id:
last_id = int(last_event_id)
print(f"Восстановление уведомлений с #{last_id}")
try:
while True:
if await request.is_disconnected():
break
# Проверка новых уведомлений
new_notifications = [
(nid, payload) for nid, payload in notifications_store.items()
if nid > last_id
]
for nid, payload in sorted(new_notifications):
yield generate_sse_event(
event_type="notification",
data=payload,
event_id=str(nid)
)
last_id = nid
await asyncio.sleep(1)
except asyncio.CancelledError:
raise
@app.get("/notifications")
async def notifications_endpoint(request: Request):
"""Эндпоинт для потока уведомлений"""
return StreamingResponse(
notification_stream(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
@app.post("/notify")
async def create_notification(notification: Dict[str, Any]):
"""Создание нового уведомления"""
global notification_counter
notification_counter += 1
notification_data = {
"id": notification_counter,
"type": notification.get("type", "info"),
"title": notification.get("title", "Уведомление"),
"body": notification.get("body", ""),
"timestamp": time.time()
}
notifications_store[notification_counter] = notification_data
return {"status": "created", "id": notification_counter}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)