ASGI from scratch - WebSocket
Intro
This is a follow up to my last blog post: ASGI from scratch - Let’s build an ASGI web framework where I walked through the steps to build a simple but somewhat functional ASGI application framework. In that post, only HTTP is implemented and in this post, I’m going to go through the steps to add WebSocket support to the framework.
Goal
There are many ways to add WebSocket support to an ASGI framework. And I think ASGI’s WebSocket specification is already a good enough abstraction around WebSocket protocol. How it’s going to be implemented is very dependent on what kind of interface I want to provide to the framework’s users.
Assume I want to provide an interface as shown below, where the user can accept the connection and then read data from the client in an async generator.
# example-ws.py
from aaf import aaf
from aaf.routing import Router
router = Router()
@router.route('/ws_echo')
async def ws_echo(connection):
await connection.accept()
async for message in connection.iter_messages():
if (message == "QUIT"):
await connection.finish(close_code=1000)
break
await connection.send(message)
print("disconnected")
app = aaf([router])
Implementation of
Rouotuer
class andaff
function was covered in previous post
WebSocket in ASGI
Here’s a simple diagram of the lifespan of a WebSocket connection in the context of ASGI.
sequenceDiagram
Note over Client,Server: WebSocket
Note over Server,Application: ASGI
Client->>Server: Handshake
Server->>Application: "websocket.connect"
alt Should accept connection?
Application->>Server: "websocket.accept"
Server->>Client: Http 101
else
Application->>Server: "websocket.close"
Server->>Client: Http 403
end
rect aliceblue
loop heartbeat
Client-->Server: ping
Client-->Server: pong
end
Client-->>Server: data frames
Server-->>Application: "websocket.receive"
Application-->>Server: "websocket.send"
Server-->>Client: data frames
end
alt Client/Server discnnect
Client->Server: Closed/lost connection
else application close connection
Application->>Server: "websocket.close"
Server->Client: Discnnect
end
Server->>Application: "websocket.disconnect"
It shows when each type of ASGI messages are sent and received. Every WebSocket connection starts with a websocket.connect
message. When this message is received, the application can send a websocket.accept
to accept a connection or a websocket.close
message to reject it. After this handshake, data frames can be exchanged using websocket.send
and websocket.receive
messages. Both the application and client can initiate a disconnection, the application can do this by sending a websocket.close
message. The server will send a websocket.disconnect
message to the application whenever the connection between client and server is closed or lost.
Implementation
Accept or Reject Connection
To accept a connection, we can simply send a ASGI message with type websocket.accept
along with subprotocol
and headers
:
class Connection:
...
async def accept(self, subprotocol=None):
headers = [
[k.encode("ascii"), v.encode("ascii")] for k, v in self.resp_headers.items()
]
await self.asgi_send(
{"type": "websocket.accept", "subprotocol": subprotocol, "headers": headers}
)
...
While this method works as a way to accept the connection, it cannot be called immediately after we get the connection like how I want it to in the goal section, because we are not making sure it is called after thewebsocket.connection
message is received.
To make sure we are ready to accept the connection, we need to know the status of the connection. One simple way to do it is by having a ws_status
property on Connection
, like this:
class WSStatus(Enum):
init = "init" # got the scope, but haven't received any messages. The initial status
connecting = "connecting" # got the "websocket.connect" message
accepted = "accepted" # connection is accepted
closing = "closing" # application initiated a close of connection.
finished = "finished" # connection is closed
class Connection:
...
ws_status = WSStatus.init
Then, in order to set ws_status
based on received ASGI messages, we can wrap self.asgi_receive
in another method:
class Connection:
...
async def _ws_receive(self):
if self.ws_status == WSStatus.connecting:
raise RuntimeError(
"Unable to receive. Expect to accept or reject connection."
)
message = await self.asgi_receive()
if self.ws_status == WSStatus.init:
assert message["type"] == "websocket.connect"
self.ws_status = WSStatus.connecting
if message["type"] == "websocket.disconnect":
self.ws_status = WSStatus.finished
self.ws_close_code = message["code"]
self.finished = True
return message
Finally, back in the accept
method, we can use a simple if
statement to make sure a connection is ready to be accepted before accepting it:
async def accept(self, subprotocol=None):
if self.ws_status == WSStatus.init:
message = await self._ws_receive()
assert message["type"] == "websocket.connect"
...
To reject the connection, we can simply modify the finish
method we have from previous post, just replace riase NotImplementedError()
with these lines:
await self.asgi_send({"type": "websocket.close", "code": close_code})
self.ws_status = WSStatus.closing
self.ws_close_code = close_code
self.finished = True
Note that rejecting a connection and closing a connection from the application is the same thing. The difference is if the
"websocket.close"
message is sent before or after a connection is accepted.
Receive and Send Data
As described in goal section, I want to get received messages in an async generator. To do that, we can use a while loop to repeatedly receive ASGI messages, and yield
the content of these messages when they are of type "websocket.receive"
or stop the generator when "websocket.disconnect"
is received.
async def iter_messages(self):
if self.ws_status != WSStatus.accepted:
return
while True:
message = await self._ws_receive()
if message["type"] == "websocket.disconnect":
break
if "bytes" in message and message["bytes"] is not None:
data = message["bytes"]
else:
data = message["text"]
yield data
The Connection.send
method can just be a thin wrapper around self.asgi_send
, that sends messages of type "websocket.send"
, and include either text
or bytes
depending on the type of messages.
async def send(self, data: Union[bytes, str] = b"", finish: Optional[bool] = False):
if self.finished:
raise ValueError("No message can be sent when connection closed")
if isinstance(data, str):
await self.asgi_send({"type": "websocket.send", "text": data})
else:
await self.asgi_send({"type": "websocket.send", "bytes": data})
Conclusion
Now with the ability to accept/reject connections and send/receive messages, the WebSocket implementation is finished. The implementation is quite simple. Because WebSocket itself is simple, it is just a way for server and client to exchange information, what information should be exchanged and what meaning should it carry is up to the application to decide. And ASGI is already a fairly good abstraction.
To test WebSocket support, we start the server in example-ws.py
, open browser and run the following javascript code in console:
ws = new WebSocket('ws://localhost:8000/ws_echo');
ws.addEventListener("message",(msg) => console.log(msg.data));
ws.addEventListener("open", () => {
ws.send("hi");
});
You should be able to see “hi” printed back to the browser.
Clean up
As a afterthought, I think it make sense for WebSocket connections to have its own class. Apart from making code easier to read, we can remove a lot of the if self.type === ConnectionType.HTTP
statements, because when a connection is being created, we already know the connection type, and a connection would never change its type during throughout it’s whole life. To put it all together, here’s the code for the WebSocketConnection
class:
class WebSocketConnection(Connection):
def __init__(self, scope, *, send, receive):
super().__init__(scope, send=send, receive=receive)
self.ws_status = WSStatus.init
self.ws_close_code = None
self.started = True
async def _ws_receive(self):
message = await self.asgi_receive()
if self.ws_status == WSStatus.connecting:
raise RuntimeError(
"Unable to receive. Expect to accept or reject connection."
)
if self.ws_status == WSStatus.init:
assert message["type"] == "websocket.connect"
self.ws_status = WSStatus.connecting
if message["type"] == "websocket.disconnect":
self.ws_status = WSStatus.finished
self.ws_close_code = message["code"]
self.finished = True
return message
async def accept(self, subprotocol=None):
if self.ws_status == WSStatus.init:
message = await self._ws_receive()
assert message["type"] == "websocket.connect"
headers = [
[k.encode("ascii"), v.encode("ascii")] for k, v in self.resp_headers.items()
]
await self.asgi_send(
{"type": "websocket.accept", "subprotocol": subprotocol, "headers": headers}
)
self.ws_status = WSStatus.accepted
async def send(self, data: Union[bytes, str] = b"", finish: Optional[bool] = False):
if self.finished:
raise ValueError("No message can be sent when connection closed")
if isinstance(data, str):
await self.asgi_send({"type": "websocket.send", "text": data})
else:
await self.asgi_send({"type": "websocket.send", "bytes": data})
async def finish(self, close_code: Optional[int] = 1000):
if self.finished:
raise ValueError("Connection already finished")
await self.asgi_send({"type": "websocket.close", "code": close_code})
self.ws_status = WSStatus.closing
self.ws_close_code = close_code
self.finished = True
async def iter_messages(self):
if self.ws_status != WSStatus.accepted:
return
while True:
message = await self._ws_receive()
if message["type"] == "websocket.disconnect":
break
if "bytes" in message and message["bytes"] is not None:
data = message["bytes"]
else:
data = message["text"]
yield data
Now to instantiate connection with the correct class in __init__.py
:
...
conn_class = Connection
if (scope["type"] == "websocket"):
conn_class = WebSocketConnection
conn = conn_class(scope, send=send, receive=receive)
...