Using WebSocket with MaixPy MaixCAM
Introduction
Similar to sockets, WebSocket enables long-lived communication connections and supports communication with web pages.
Since MaixPy is based on Python, you can use the commonly available Python websockets
and asyncio
modules. For more detailed information, please refer to the documentation and tutorials available online.
WebSocket Client
The following example connects to a server, sends a message 10 times, and then ends the connection:
import asyncio
import websockets
import time
async def send_msg(websocket):
count = 1
while count <= 10:
msg = f"hello {count}"
await websocket.send(msg)
recv_text = await websocket.recv()
print(f"Received: {recv_text}", end="\n")
count += 1
time.sleep(1)
await websocket.close(reason="client exit")
async def main_logic(ip, port):
async with websockets.connect(f'ws://{ip}:{port}') as websocket:
await send_msg(websocket)
ip = "10.228.104.100"
port = 5678
asyncio.get_event_loop().run_until_complete(main_logic(ip, port))
WebSocket Server
The following example accepts client connections and responds with ack for msg:
followed by the received message.
import asyncio
import websockets
import functools
async def recv_msg(websocket):
print("New client connected, recv_msg start")
while True:
try:
recv_text = await websocket.recv()
except Exception as e:
print("Receive failed")
break
print("Received:", recv_text)
response_text = f"ack for msg: {recv_text}"
await websocket.send(response_text)
print("recv_msg end")
async def main_logic(websocket, path, other_param):
await recv_msg(websocket)
ip = "0.0.0.0"
port = 5678
start_server = websockets.serve(functools.partial(main_logic, other_param="test_value"), ip, port)
print("Start server")
asyncio.get_event_loop().run_until_complete(start_server)
print("Start server loop")
asyncio.get_event_loop().run_forever()