v0.71.2 - fix auth route conflict
What's Changed
- fix: auth route conflict by @VishnuSanal in https://github.com/sparckles/Robyn/pull/987
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.71.1...v0.71.2
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.71.1...v0.71.2
"""
Server-Sent Events (SSE) Example for Robyn
This example demonstrates how to implement Server-Sent Events using Robyn. SSE allows real-time server-to-client
communication over a single HTTP connection.
"""
import asyncio
import time
from robyn import Robyn, SSEMessage, SSEResponse
app = Robyn(__file__)
@app.get("/events")
def stream_events(request):
"""Basic SSE endpoint that sends a message every second"""
def event_generator():
"""Generator function that yields SSE-formatted messages"""
for i in range(10):
yield SSEMessage(f"Message {i} - {time.strftime('%H:%M:%S')}", id=str(i))
time.sleep(1)
# Send a final message
yield SSEMessage("Stream ended", event="end")
return SSEResponse(event_generator())
@app.get("/events/json")
def stream_json_events(request):
"""SSE endpoint that sends JSON data"""
import json
def json_event_generator():
"""Generator that yields JSON data as SSE"""
for i in range(5):
data = {"id": i, "message": f"JSON message {i}", "timestamp": time.time(), "type": "notification"}
yield SSEMessage(json.dumps(data), event="notification", id=str(i))
time.sleep(2)
return SSEResponse(json_event_generator())
@app.get("/events/named")
def stream_named_events(request):
"""SSE endpoint with named events"""
def named_event_generator():
"""Generator that yields named SSE events"""
events = [
("user_joined", "Alice joined the chat"),
("message", "Hello everyone!"),
("user_left", "Bob left the chat"),
("message", "How is everyone doing?"),
("user_joined", "Charlie joined the chat"),
]
for i, (event_type, message) in enumerate(events):
yield SSEMessage(message, event=event_type, id=str(i))
time.sleep(1.5)
return SSEResponse(named_event_generator())
@app.get("/events/async")
async def stream_async_events(request):
"""Async SSE endpoint demonstrating async generators"""
async def async_event_generator():
"""Async generator for SSE events"""
for i in range(8):
# Simulate async work
await asyncio.sleep(0.5)
yield SSEMessage(f"Async message {i} - {time.strftime('%H:%M:%S')}", event="async", id=str(i))
return SSEResponse(async_event_generator())
@app.get("/events/heartbeat")
def stream_heartbeat(request):
"""SSE endpoint that sends heartbeat messages"""
def heartbeat_generator():
"""Generator that sends heartbeat pings"""
counter = 0
while counter < 20: # Send 20 heartbeats
yield SSE_Message(f"heartbeat {counter}", event="heartbeat", id=str(counter))
counter += 1
time.sleep(0.5)
yield SSEMessage("heartbeat ended", event="end")
return SSEResponse(heartbeat_generator())
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.71.0...v0.71.1
"""
Server-Sent Events (SSE) Example for Robyn
This example demonstrates how to implement Server-Sent Events using Robyn,
similar to FastAPI's implementation. SSE allows real-time server-to-client
communication over a single HTTP connection.
"""
import asyncio
import time
from robyn import Robyn, SSE_Message, SSE_Response
app = Robyn(__file__)
@app.get("/events")
def stream_events(request):
"""Basic SSE endpoint that sends a message every second"""
def event_generator():
"""Generator function that yields SSE-formatted messages"""
for i in range(10):
yield SSE_Message(f"Message {i} - {time.strftime('%H:%M:%S')}", id=str(i))
time.sleep(1)
# Send a final message
yield SSE_Message("Stream ended", event="end")
return SSE_Response(event_generator())
@app.get("/events/json")
def stream_json_events(request):
"""SSE endpoint that sends JSON data"""
import json
def json_event_generator():
"""Generator that yields JSON data as SSE"""
for i in range(5):
data = {"id": i, "message": f"JSON message {i}", "timestamp": time.time(), "type": "notification"}
yield SSE_Message(json.dumps(data), event="notification", id=str(i))
time.sleep(2)
return SSE_Response(json_event_generator())
@app.get("/events/named")
def stream_named_events(request):
"""SSE endpoint with named events"""
def named_event_generator():
"""Generator that yields named SSE events"""
events = [
("user_joined", "Alice joined the chat"),
("message", "Hello everyone!"),
("user_left", "Bob left the chat"),
("message", "How is everyone doing?"),
("user_joined", "Charlie joined the chat"),
]
for i, (event_type, message) in enumerate(events):
yield SSE_Message(message, event=event_type, id=str(i))
time.sleep(1.5)
return SSE_Response(named_event_generator())
@app.get("/events/async")
async def stream_async_events(request):
"""Async SSE endpoint demonstrating async generators"""
async def async_event_generator():
"""Async generator for SSE events"""
for i in range(8):
# Simulate async work
await asyncio.sleep(0.5)
yield SSE_Message(f"Async message {i} - {time.strftime('%H:%M:%S')}", event="async", id=str(i))
return SSE_Response(async_event_generator())
@app.get("/events/heartbeat")
def stream_heartbeat(request):
"""SSE endpoint that sends heartbeat messages"""
def heartbeat_generator():
"""Generator that sends heartbeat pings"""
counter = 0
while counter < 20: # Send 20 heartbeats
yield SSE_Message(f"heartbeat {counter}", event="heartbeat", id=str(counter))
counter += 1
time.sleep(0.5)
yield SSE_Message("heartbeat ended", event="end")
return SSE_Response(heartbeat_generator())
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.70.0...v0.71.0
Full write up https://sanskar.wtf/posts/the-future-of-robyn
First web framework with built-in AI agents and memory.
from robyn import Robyn
from robyn.ai import agent, memory
app = Robyn(__file__)
@app.post("/chat")
async def chat(request):
mem = memory(provider="inmemory", user_id="user")
ai_agent = agent(runner="simple", memory=mem)
result = await ai_agent.run(request.json().get("query"))
return {"response": result["response"]}
Native Model Context Protocol integration.
# MCP Resources
@app.mcp.resource("time://current")
def current_time() -> str:
return f"Current time: {datetime.now().isoformat()}"
@app.mcp.tool(
name="echo",
description="Echo back text",
input_schema={"type": "object", "properties": {"text": {"type": "string", "description": "Text to echo"}}, "required": ["text"]},
)
def echo_tool(args):
return args.get("text", "")
# MCP Prompts
@app.mcp.prompt(
name="explain_code",
description="Generate code explanation prompt",
arguments=[
{"name": "code", "description": "Code to explain", "required": True},
{"name": "language", "description": "Programming language", "required": False},
],
)
def explain_code_prompt(args):
code = args.get("code", "")
language = args.get("language", "unknown")
return f"""Please explain this {language} code:
{language}
{code}
Include:
1. What it does
2. How it works
3. Key concepts used
"""
These features are very much experimental. Checkout the examples/
folder to see the new features in action.
Install: pip install robyn==0.70.0
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.69.0...v0.70.0
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.68.0...v0.69.0
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.67.0...v0.68.0
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.66.2...v0.67.0
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.65.0...v0.66.2
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.64.2...v0.65.0
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.64.1...v0.64.2
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.64.0...v0.64.1
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.63.0...v0.64.0
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.58.2...v0.63.0
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.62.0...v0.62.1
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.61.2...v0.62.0
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.61.1...v0.61.2
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.60.2...v0.61.1
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.60.1...v0.60.2
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.59.0...v0.60.1
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.58.1...v0.58.2
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.58.0...v0.58.1
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.57.1...v0.58.0
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.57.0...v0.57.1
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.56.3...v0.57.0
set_cookie
function by @VishnuSanal in https://github.com/sparckles/Robyn/pull/870Full Changelog: https://github.com/sparckles/Robyn/compare/v0.56.2...v0.56.3
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.56.1...v0.56.2
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.56.0...v0.56.1
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.55.0...v0.56.0
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.54.5...v0.55.0
Full Changelog: https://github.com/sparckles/Robyn/compare/v0.54.4...v0.54.5