Intro

This is a follow up to my last blog post: ASGI from scratch - Let’s build an ASGI web framework where I walked through the steps to build a simple but somewhat functional ASGI application framework. In that post, only HTTP is implemented and in this post, I’m going to go through the steps to add WebSocket support to the framework.

Goal

There are many ways to add WebSocket support to an ASGI framework. And I think ASGI’s WebSocket specification is already a good enough abstraction around WebSocket protocol. How it’s going to be implemented is very dependent on what kind of interface I want to provide to the framework’s users.

Assume I want to provide an interface as shown below, where the user can accept the connection and then read data from the client in an async generator.

# example-ws.py
from aaf import aaf
from aaf.routing import Router

router = Router()

@router.route('/ws_echo')
async def ws_echo(connection):
    await connection.accept()
    async for message in connection.iter_messages():
        if (message == "QUIT"):
            await connection.finish(close_code=1000)
            break
        await connection.send(message)
    print("disconnected")


app = aaf([router])

Implementation ofRouotuer class and aff function was covered in previous post

WebSocket in ASGI

Here’s a simple diagram of the lifespan of a WebSocket connection in the context of ASGI.

 sequenceDiagram

  

  Note over Client,Server: WebSocket

  Note over Server,Application: ASGI

  Client->>Server: Handshake

  Server->>Application: "websocket.connect"

  alt Should accept connection?

    Application->>Server: "websocket.accept"

    Server->>Client: Http 101

  else 

    Application->>Server: "websocket.close"

    Server->>Client: Http 403

  end

  

  rect aliceblue

    loop heartbeat

      Client-->Server: ping

      Client-->Server: pong

    end

    Client-->>Server: data frames

    Server-->>Application: "websocket.receive"

    Application-->>Server: "websocket.send"

    Server-->>Client: data frames

  end

  

  alt Client/Server discnnect

    Client->Server: Closed/lost connection


  else application close connection

    Application->>Server: "websocket.close"

    Server->Client: Discnnect

  end
 
  Server->>Application: "websocket.disconnect"
  

It shows when each type of ASGI messages are sent and received. Every WebSocket connection starts with a websocket.connect message. When this message is received, the application can send a websocket.accept to accept a connection or a websocket.close message to reject it. After this handshake, data frames can be exchanged using websocket.send and websocket.receive messages. Both the application and client can initiate a disconnection, the application can do this by sending a websocket.close message. The server will send a websocket.disconnect message to the application whenever the connection between client and server is closed or lost.

Implementation

Accept or Reject Connection

To accept a connection, we can simply send a ASGI message with type websocket.accept along with subprotocol and headers:

class Connection:
    ...
    async def accept(self, subprotocol=None):
        headers = [
            [k.encode("ascii"), v.encode("ascii")] for k, v in self.resp_headers.items()
        ]
        await self.asgi_send(
            {"type": "websocket.accept", "subprotocol": subprotocol, "headers": headers}
        )
    ...

While this method works as a way to accept the connection, it cannot be called immediately after we get the connection like how I want it to in the goal section, because we are not making sure it is called after thewebsocket.connection message is received.

To make sure we are ready to accept the connection, we need to know the status of the connection. One simple way to do it is by having a ws_status property on Connection, like this:

class WSStatus(Enum):
    init = "init"  # got the scope, but haven't received any messages. The initial status
    connecting = "connecting"  # got the "websocket.connect" message
    accepted = "accepted"  # connection is accepted
    closing = "closing"  # application initiated a close of connection.
    finished = "finished"  # connection is closed
class Connection:
    ...
    ws_status = WSStatus.init

Then, in order to set ws_status based on received ASGI messages, we can wrap self.asgi_receive in another method:

class Connection:
    ...
    async def _ws_receive(self):
        if self.ws_status == WSStatus.connecting:
            raise RuntimeError(
                "Unable to receive. Expect to accept or reject connection."
            )
        message = await self.asgi_receive()
        if self.ws_status == WSStatus.init:
            assert message["type"] == "websocket.connect"
            self.ws_status = WSStatus.connecting
        if message["type"] == "websocket.disconnect":
            self.ws_status = WSStatus.finished
            self.ws_close_code = message["code"]
            self.finished = True
        return message

Finally, back in the accept method, we can use a simple if statement to make sure a connection is ready to be accepted before accepting it:

    async def accept(self, subprotocol=None):
        if self.ws_status == WSStatus.init:
            message = await self._ws_receive()
            assert message["type"] == "websocket.connect"
        ...

To reject the connection, we can simply modify the finish method we have from previous post, just replace riase NotImplementedError() with these lines:

       await self.asgi_send({"type": "websocket.close", "code": close_code})
       self.ws_status = WSStatus.closing
       self.ws_close_code = close_code
       self.finished = True

Note that rejecting a connection and closing a connection from the application is the same thing. The difference is if the "websocket.close" message is sent before or after a connection is accepted.

Receive and Send Data

As described in goal section, I want to get received messages in an async generator. To do that, we can use a while loop to repeatedly receive ASGI messages, and yield the content of these messages when they are of type "websocket.receive" or stop the generator when "websocket.disconnect" is received.

    async def iter_messages(self):
        if self.ws_status != WSStatus.accepted:
            return
        while True:
            message = await self._ws_receive()
            if message["type"] == "websocket.disconnect":
                break
            if "bytes" in message and message["bytes"] is not None:
                data = message["bytes"]
            else:
                data = message["text"]
            yield data

The Connection.send method can just be a thin wrapper around self.asgi_send, that sends messages of type "websocket.send", and include either text or bytes depending on the type of messages.

    async def send(self, data: Union[bytes, str] = b"", finish: Optional[bool] = False):
        if self.finished:
            raise ValueError("No message can be sent when connection closed")
        if isinstance(data, str):
            await self.asgi_send({"type": "websocket.send", "text": data})
        else:
            await self.asgi_send({"type": "websocket.send", "bytes": data})

Conclusion

Now with the ability to accept/reject connections and send/receive messages, the WebSocket implementation is finished. The implementation is quite simple. Because WebSocket itself is simple, it is just a way for server and client to exchange information, what information should be exchanged and what meaning should it carry is up to the application to decide. And ASGI is already a fairly good abstraction.

To test WebSocket support, we start the server in example-ws.py, open browser and run the following javascript code in console:

ws = new WebSocket('ws://localhost:8000/ws_echo');
ws.addEventListener("message",(msg) => console.log(msg.data));
ws.addEventListener("open", () => {
  ws.send("hi");
});

You should be able to see “hi” printed back to the browser.

Clean up

As a afterthought, I think it make sense for WebSocket connections to have its own class. Apart from making code easier to read, we can remove a lot of the if self.type === ConnectionType.HTTP statements, because when a connection is being created, we already know the connection type, and a connection would never change its type during throughout it’s whole life. To put it all together, here’s the code for the WebSocketConnection class:

class WebSocketConnection(Connection):
    def __init__(self, scope, *, send, receive):
        super().__init__(scope, send=send, receive=receive)
        self.ws_status = WSStatus.init
        self.ws_close_code = None
        self.started = True

    async def _ws_receive(self):
        message = await self.asgi_receive()
        if self.ws_status == WSStatus.connecting:
            raise RuntimeError(
                "Unable to receive. Expect to accept or reject connection."
            )
        if self.ws_status == WSStatus.init:
            assert message["type"] == "websocket.connect"
            self.ws_status = WSStatus.connecting
        if message["type"] == "websocket.disconnect":
            self.ws_status = WSStatus.finished
            self.ws_close_code = message["code"]
            self.finished = True
        return message

    async def accept(self, subprotocol=None):
        if self.ws_status == WSStatus.init:
            message = await self._ws_receive()
            assert message["type"] == "websocket.connect"
        headers = [
            [k.encode("ascii"), v.encode("ascii")] for k, v in self.resp_headers.items()
        ]
        await self.asgi_send(
            {"type": "websocket.accept", "subprotocol": subprotocol, "headers": headers}
        )
        self.ws_status = WSStatus.accepted

    async def send(self, data: Union[bytes, str] = b"", finish: Optional[bool] = False):
        if self.finished:
            raise ValueError("No message can be sent when connection closed")
        if isinstance(data, str):
            await self.asgi_send({"type": "websocket.send", "text": data})
        else:
            await self.asgi_send({"type": "websocket.send", "bytes": data})

    async def finish(self, close_code: Optional[int] = 1000):
        if self.finished:
            raise ValueError("Connection already finished")
        await self.asgi_send({"type": "websocket.close", "code": close_code})
        self.ws_status = WSStatus.closing
        self.ws_close_code = close_code
        self.finished = True

    async def iter_messages(self):
        if self.ws_status != WSStatus.accepted:
            return
        while True:
            message = await self._ws_receive()
            if message["type"] == "websocket.disconnect":
                break
            if "bytes" in message and message["bytes"] is not None:
                data = message["bytes"]
            else:
                data = message["text"]
            yield data

Now to instantiate connection with the correct class in __init__.py:

...
        conn_class = Connection
        if (scope["type"] == "websocket"):
            conn_class = WebSocketConnection
        conn = conn_class(scope, send=send, receive=receive)
...