Using Socket for TCP/UDP Communication with MaixPy MaixCAM

Introduction to Sockets

Sockets are software abstractions for TCP/UDP communication. Through socket interfaces, we can perform TCP/UDP communication.

Since MaixPy is based on Python, we can directly use the built-in socket library for communication. For more documentation and tutorials, please search online.

Here, we introduce simple usage methods. With these example codes, you can perform basic TCP and UDP communication on MaixPy MaixCAM. Remember to modify the IP address and port number according to your actual situation.

Socket TCP Client

This example requests a TCP server, sends a message, waits for a response, and then closes the connection.

import socket

def tcp_client(ip, port):
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_address = (ip, port)
    client_socket.connect(server_address)

    try:
        # Send data to the server
        message = 'Hello, Server!'
        print("Send:", message)
        client_socket.sendall(message.encode('utf-8'))

        # Receive the server's response
        data = client_socket.recv(1024)
        print('Received:', data.decode('utf-8'))
    finally:
        # Close the connection
        client_socket.close()

if __name__ == "__main__":
    tcp_client("10.228.104.1", 8080)

Socket TCP Server

This example creates a socket server that continuously waits for client connections. Once a client connects, a thread is created to communicate with the client, reading the client's message and echoing it back.

import socket
import threading

local_ip = "0.0.0.0"
local_port = 8080

def receiveThread(conn, addr):
    while True:
        print('Reading...')
        client_data = conn.recv(1024)
        if not client_data:
            break
        print(client_data)
        conn.sendall(client_data)
    print(f"Client {addr} disconnected")

ip_port = (local_ip, local_port)
sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk.bind(ip_port)
sk.listen(50)

print("Waiting for clients...")
while True:
    conn, addr = sk.accept()
    print(f"Client {addr} connected")
    # Create a new thread to communicate with this client
    t = threading.Thread(target=receiveThread, args=(conn, addr))
    t.daemon = True
    t.start()

Socket UDP Client

import socket

def udp_send(ip, port):
    # Create a socket object
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # Define the server's IP address and port number
    server_address = (ip, port)

    try:
        # Send data to the server
        message = 'Hello, Server!'
        udp_socket.sendto(message.encode('utf-8'), server_address)
    finally:
        # Close the connection
        udp_socket.close()

# Call the function
udp_send("10.228.104.1", 8080)

Socket UDP Server

import socket

def udp_receive(ip, port):
    # Create a socket object
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # Define the server's IP address and port number
    server_address = (ip, port)

    # Bind the port
    udp_socket.bind(server_address)

    print('Waiting for a message...')

    while True:
        data, address = udp_socket.recvfrom(1024)
        print('Received:', data.decode('utf-8'))
        print('From:', address)

    # Close the connection
    udp_socket.close()

# Call the function
udp_receive('0.0.0.0', 8080)