2023年10月7日星期六

python异步网络传输与协议

python异步网络传输与协议

python和异步网络编程

python 3.12.0

写本文的缘由是笔者最近在做一个需要用到大量异步处理的程序,在网络编程这方面笔者一直烂。
另外,传统的对话软件多采用多线程一个线程用来输出网络流来的数据,另一个线程接受键盘输入并发送。
笔者想用协程解决这个问题,同时解锁更多协程的技能,于是诞生本文。

注意,本文尽可能避免大段摘抄文档,但是在api的说明上是不可避免的,尽情见谅 。

基础

这部分已经有博客很好的阐述了,就不抄录了。

网络基础:ISO网络七层模型

https://blog.csdn.net/annita2019/article/details/109101643

TCP/IP

https://blog.csdn.net/wwy0324/article/details/109310658

IP地址的分类及范围详解:A、B、C、D、E五类是如何划分的

https://zhuanlan.zhihu.com/p/353821843

传输和协议

本blog内容大多来自于官方文档的介绍。
此外,Python socket的一些接口也是本文的基础。

前提

传输和协议会被像 loop.create_connection() 这类 底层 事件循环接口使用。它们使用基于回调的编程风格支持网络或IPC协议(如HTTP)的高性能实现。
传输对象和协议对象总是一对一关系:协议调用传输方法来发送数据,而传输在接收到数据时调用协议方法传递数据。

在最顶层,传输只关心 怎样 传送字节内容,而协议决定传送 哪些 字节内容(还要在一定程度上考虑何时)。
也可以这样说:从传输的角度来看,传输是套接字(或类似的I/O终端)的抽象,而协议是应用程序的抽象。

传输对象和协议对象总是一对一关系:协议调用传输方法来发送数据,而传输在接收到数据时调用协议方法传递数据。

大部分面向连接的事件循环方法(如 loop.create_connection() ) 通常接受 protocol_factory 参数为接收到的链接创建 协议 对象,并用 传输 对象来表示。这些方法一般会返回 (transport, protocol) 元组。

传输

在笔者现阶段code时这部分是无感的,先略过

协议

类别

asyncio 提供了一组抽象基类,它们应当被用于实现网络协议。 这些类被设计为与 传输 配合使用。

抽象基础协议类的子类可以实现其中的部分或全部方法。 所有这些方法都是回调:它们由传输或特定事件调用,例如当数据被接收的时候。 基础协议方法应当由相应的传输来调用。

抽象的类和接口这里就不复制了,在之后的blog内容中会简单介绍所用到的。

示例

UDP

创建一个数据报连接。
这里介绍一些协议与传输的类和方法

基础协议

连接回调:连接回调会在所有协议上被调用,每个成功的连接将恰好调用一次。所有其他协议回调只能在以下两个方法之间被调用。
BaseProtocol.connection_made(transport)
连接建立时被调用。

transport 参数是代表连接的传输。 此协议负责将引用保存至对应的传输。

BaseProtocol.connection_lost(exc)
连接丢失或关闭时将被调用。

方法的参数是一个异常对象或为 None。 后者意味着收到了常规的 EOF,或者连接被连接的一端取消或关闭。

Datagram协议工厂

DatagramProtocol.datagram_received(data, addr)
当接收到数据报时被调用。 data 是包含传入数据的字节串对象。 addr 是发送数据的对等端地址;实际的格式取决于具体传输。

DatagramProtocol.error_received(exc)
当前一个发送或接收操作引发 OSError 时被调用。 exc 是 OSError 的实例。

此方法会在当传输(例如UDP)检测到无法将数据报传给接收方等极少数情况下被调用。 而在大多数情况下,无法送达的数据报将被静默地丢弃。

基础传输

BaseTransport.get_extra_info(name, default=None)
name 是表示要获取传输特定信息的字符串。
default 是在信息不可用或传输不支持第三方事件循环实现或当前平台查询时返回的值。
套接字:

‘peername’: 套接字链接时的远端地址,socket.socket.getpeername() 方法的结果 (出错时为 None )
‘socket’: socket.socket 实例
‘sockname’: 套接字本地地址, socket.socket.getsockname() 方法的结果

更多方法见官方文档,这里不赘述本文用不到的

数据报传输

DatagramTransport.sendto(data, addr=None)
将 data 字节串发送到 addr (基于传输的目标地址) 所给定的远端对等方。 如果 addr 为 None,则将数据发送到传输创建时给定的目标地址。

此方法不会阻塞;它会缓冲数据并安排其被异步地发出。

DatagramTransport.abort()
立即关闭传输,不会等待已提交的操作执行完毕。 已缓存的数据将会丢失。 不会接收更多的数据。 协议的 protocol.connection_lost() 方法最终将附带 None 作为参数被调用。

loop.create_datagram_endpoint() 方法

coroutine loop.create_datagram_endpoint(        
        protocol_factory,        # 协议工厂
        local_addr: tuple[str, int] | str | None = None,
        remote_addr: tuple[str, int] | str | None = None,
        *,
        family: int = 0,
        proto: int = 0,
        flags: int = 0,
        reuse_address: bool | None = None,
        reuse_port: bool | None = None,
        allow_broadcast: bool | None = None,
        sock: socket | None = None)

The socket family can be either AF_INET, AF_INET6, or AF_UNIX, depending on host (or the family argument, if provided).
The socket type will be SOCK_DGRAM.

protocol_factory 必须为一个返回 协议 实现的可调用对象。

成功时返回一个 (transport, protocol) 元组。

其他参数:

如果给出 local_addr,它应当是一个用来在本地绑定套接字的 (local_host, local_port) 元组。 local_host 和 local_port 是使用 getaddrinfo() 来查找的。

remote_addr,如果指定的话,就是一个 (remote_host, remote_port) 元组,用于同一个远程地址连接。remote_host 和 remote_port 是使用 getaddrinfo() 来查找的。

family, proto, flags 是可选的地址族,协议和标志,其会被传递给 getaddrinfo() 来完成 host 的解析。如果要指定的话,这些都应该是来自于 socket 模块的对应常量。

reuse_port tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created. This option is not supported on Windows and some Unixes. If the SO_REUSEPORT constant is not defined then this capability is unsupported.

allow_broadcast 告知内核允许此端点向广播地址发送消息。

sock 可选择通过指定此值用于使用一个预先存在的,已经处于连接状态的 socket.socket 对象,并将其提供给此传输对象使用。如果指定了这个值, local_addr 和 remote_addr 就应该被忽略 (必须为 None)。

注意事项,protocol_factory 需要是一个可调用对象,所以不能直接传入一个protocol_factory实例进去。

import asyncio
from asyncio import DatagramTransport, transports
from typing import Any
class UDPProtocolFactory(asyncio.DatagramProtocol):
    """
    这里尝试一下1对1
    """

    def connection_made(self, transport: DatagramTransport) -> None:
        self.trans = transport
        print("udp made")

    def datagram_received(self, data: bytes, addr: tuple[str | Any, int]) -> None:
        msg = data.decode()
        print(f"{addr}给你发送信息{msg}")
        self.trans.sendto("收到".encode(), addr=addr)

        pass

    def connection_lost(self, exc: Exception | None) -> None:
        if exc is not None:
            print(exc, " error")
        else:
            print("normal exit")


async def main():
    loop = asyncio.get_running_loop()  # 获得现在的事件循环

    trans, prot = await loop.create_datagram_endpoint(
        protocol_factory=lambda: UDPProtocolFactory(), local_addr=("127.0.0.1", 9961)   
        # 这里官方推荐用匿名函数,实际情况下UDPProtocolFactory这个类也是可调用的
    )

    try:
        # 因为异步,这里睡眠等着,同时防止exception
        await asyncio.sleep(10)
    finally:
        trans.close()

if __name__ == "__main__":
    asyncio.run(main=main())
    pass

这是一个udp监听服务器的实例,其在ip 127.0.0.1上开放9961端口,进行监听10s。

这里是一个实现简单的对话的代码。
在此之前,需要先导入一个第三方库aioconsole

aioconsole

aioconsole doc
aioconsole.ainput()则为本文需要的一个方法。
在获得控制台中的输入(这里假设已经是str的类型)后,欲将其发送到其他的client。这里继续介绍设置回调函数的方法。

协程设置回调函数

Task.add_done_callback( __fn: (Task[None]) -> object, *, context: Context | None = None)
Add a callback to be run when the future becomes done.
The callback is called with a single argument - the future object. If the future is already done when this is called, the callback is scheduled with call_soon.
这里fn必须是可调用的对象,而且在调用时会自动传入future object本身作为唯一的参数。
为了更好的拓展性如何引入更多参数呢,比如在ainput执行完后网络通信。这里介绍两种方法,上下文context

官方文档
Python协程中使用上下文
重要: 上下文变量应该在顶级模块中创建,且永远不要在闭包中创建。 Context 对象拥有对上下文变量的强引用,这可以让上下文变量被垃圾收集器正确回收。
保存在上下文中的变量一定要在使用完成后显示清理,否则会导致内存泄漏。(.set(None))

contextvar.contextvars
set(),get(),reset(),Token,
这里上代码。

from typing import Any
import aioconsole
import asyncio
import contextvars
from asyncio import DatagramProtocol, Transport, transports

context_msg = contextvars.ContextVar("msg")
context_transport = contextvars.ContextVar("transport")

addrList = []

# 我们假设在循环等待中,每次接受命令行输入的message,判断如果不是规定的如'EOF'或者直接Ctrl+z则
# 发送消息至设置的客机,这里的客机指的是一切在程序运行期间和主机创建udp连接的addr


class UDProtocolFactory(DatagramProtocol):
    def connection_made(self, transport) -> None:
        print("Connection successfully made!")

    def datagram_received(self, data: bytes, addr: tuple[str | Any, int]) -> None:
        """
        收到信息时,将addr拉入清单
        """
        global addrList
        if addr not in addrList:
            addrList.append(addr)
        print("收到信息", addr[0], ":", addr[1], ">", data.decode(), "\n>")

    def connection_lost(self, exc: Exception | None) -> None:
        print("close connection")


async def sendmsg2All():
    """
    end of call
    """
    tmpMsg = context_msg.get()
    transport = context_transport.get()
    for addr in addrList:
        transport.sendto(tmpMsg.encode(), addr=addr)  #  此方法不会阻塞;它会缓冲数据并安排其被异步地发出。
        pass
    context_msg.set(None)
    # context_transport.set(None)


async def listen():
    while True:
        try:
            msg = await aioconsole.ainput(">")
            if msg == "EOF":
                raise EOFError
        except EOFError:
            context_transport.get().close()
            context_transport.set(None)
            break
        else:
            context_msg.set(msg)
            # send
            # await sendmsg2All()
            copyCtx = contextvars.copy_context()
            asyncio.get_running_loop().create_task(sendmsg2All(), context=copyCtx)
            del copyCtx


async def main():
    """
    启动
    """
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: UDProtocolFactory(), ("127.0.0.1", 9961),# 可以加上remote_addr来连接别人
    )
    context_transport.set(transport)
    await listen()

asyncio.run(main())

TCP

create_server

coroutine loop.create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

创建一个 TCP 服务器 (套接字类型 SOCK_STREAM) 在 host 地址的 port 上进行监听。

返回一个 Server 对象。

参数:

protocol_factory 必须为一个返回 协议 实现的可调用对象。

host 形参可被设为几种类型,它确定了服务器所应监听的位置:

如果 host 是一个字符串,则 TCP 服务器会被绑定到 host 所指明的单一网络接口。

如果 host 是一个字符串序列,则 TCP 服务器会被绑定到序列所指明的所有网络接口。

如果 host 是一个空字符串或 None,则会应用所有接口并将返回包含多个套接字的列表(通常是一个 IPv4 的加一个 IPv6 的)。

可以设置 port 参数来指定服务器应该监听哪个端口。如果为 0 或者 None (默认),将选择一个随机的未使用的端口(注意,如果 host 解析到多个网络接口,将为每个接口选择一个不同的随机端口)。

family 可被设为 socket.AF_INET 或 AF_INET6 以强制此套接字使用 IPv4 或 IPv6。 如果未设定,则 family 将通过主机名为确定 (默认为 AF_UNSPEC)。

flags 是用于 getaddrinfo() 的位掩码。

可以选择指定 sock 以便使用预先存在的套接字对象。 如果指定了此参数,则不可再指定 host 和 port。

备注 sock 参数可将套接字的所有权转给所创建的服务器。 要关闭该套接字,请调用服务器的 close() 方法。
backlog 是传递给 listen() 的最大排队连接的数量(默认为100)。

ssl 可被设置为一个 SSLContext 实例以在所接受的连接上启用 TLS。

reuse_address 告知内核要重用一个处于 TIME_WAIT 状态的本地套接字,而不是等待其自然超时失效。 如果未指定此参数则在 Unix 上将自动设置为 True。

reuse_port 告知内核,只要在创建的时候都设置了这个标志,就允许此端点绑定到其它端点列表所绑定的同样的端口上。这个选项在 Windows 上是不支持的。

ssl_handshake_timeout 是(用于 TLS 服务器的)在放弃连接之前要等待 TLS 握手完成的秒数。 如果参数为 (默认值) None 则为 60.0 秒。

ssl_shutdown_timeout 是在放弃连接之前要等待 SSL 关闭完成的秒数。 如为 None (默认值) 则使用 30.0。

start_serving 设置成 True (默认值) 会导致创建server并立即开始接受连接。设置成 False ,用户需要等待 Server.start_serving() 或者 Server.serve_forever() 以使server开始接受连接。

这里的协议工厂需要返回流式协议的工厂(笔者同时看到官方doc上写有带缓冲的协议可与任何支持 流式协议 的事件循环方法配合使用。,不确定能否混合使用)

流式协议

Protocol.eof_received()
当发出信号的另一端不再继续发送数据时(例如通过调用 transport.write_eof(),如果另一端也使用 asyncio 的话)被调用。

此方法可能返回假值 (包括 None),在此情况下传输将会自行关闭。 相反地,如果此方法返回真值,将以所用的协议来确定是否要关闭传输。 由于默认实现是返回 None,因此它会隐式地关闭连接。

某些传输,包括 SSL 在内,并不支持半关闭的连接,在此情况下从该方法返回真值将导致连接被关闭。 一旦 eof_received() 被调用,data_received() 就不会再被调用。

create_connection

coroutine loop.create_connection(protocol_factory, host=None, port=None, *, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None, all_errors=False)

打开一个流式传输连接,连接到由 host 和 port 指定的地址。

套接字族可以是 AF_INET 或 AF_INET6,具体取决于 host (或 family 参数,如果有提供的话)。

套接字类型将为 SOCK_STREAM。

protocol_factory 必须为一个返回 asyncio 协议 实现的可调用对象。

这个方法会尝试在后台创建连接。当创建成功,返回 (transport, protocol) 组合。

底层操作的大致的执行顺序是这样的:

  1. 创建连接并为其创建一个 传输。

  2. 不带参数地调用 protocol_factory 并预期返回一个 协议 实例。

  3. 协议实例通过调用其 connection_made() 方法与传输进行配对。

  4. 成功时返回一个 (transport, protocol) 元组。

创建的传输是一个具体实现相关的双向流。

只读传输

ReadTransport.is_reading()如果传输接收到新数据时返回 True 。

感觉这个方法挺迷的,暂时不了解使用场景。

ReadTransport.pause_reading()
暂停传输的接收端。protocol.data_received() 方法将不会收到数据,除非 resume_reading() 被调用。这个方法幂等的, 它可以在传输已经暂停或关闭时调用。

ReadTransport.resume_reading()
恢复接收端。如果有数据可读取时,协议方法 protocol.data_received() 将再次被调用。这个方法幂等的, 它可以在传输已经准备好读取数据时调用。

只写传输

WriteTransport.can_write_eof()
如果传输支持 write_eof() 返回 True 否则返回 False 。

WriteTransport.write(data)
将一些 data 字节串写入传输。
此方法不会阻塞;它会缓冲数据并安排其被异步地发出。

WriteTransport.writelines(list_of_data)
将数据字节串的列表(或任意可迭代对象)写入传输。 这在功能上等价于在可迭代对象产生的每个元素上调用 write(),但其实现可能更为高效。

WriteTransport.write_eof()
在刷新所有已缓冲数据之后关闭传输的写入端。 数据仍可以被接收。
如果传输(例如 SSL)不支持半关闭的连接,此方法会引发 NotImplementedError。

更多方法见官方文档。

具体实现

服务端

import asyncio
from asyncio import transports
import aioconsole

trSet: set[asyncio.BaseTransport] = set()


class TCPServerProtocolFactory(asyncio.Protocol):
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
        self.addr = transport.get_extra_info("peername")
        print(f"NEW {self.addr}")
        self.transport = transport
        trSet.add(transport)

    def eof_received(self) -> bool | None:
        print(f"EXIT {self.addr}")
        trSet.discard(self.transport)
        self.transport.close()

    def data_received(self, data: bytes) -> None:
        print(f"MSG {data.decode()} FROM {self.addr}")


async def chat(server: asyncio.Server):
    while True:
        try:
            msg = await aioconsole.ainput(">")
            if msg == "EOF":
                raise EOFError
        except EOFError:
            print("WARNING closing all connections")
            for t in trSet:
                t.write_eof()

            trSet.clear()
            server.close()
            break
        else:
            for t in trSet:
                t.write(msg.encode())


async def main():
    loop = asyncio.get_running_loop()
    server = await loop.create_server(
        lambda: TCPServerProtocolFactory(), host="127.0.0.1", port=9961
    )
    await server.start_serving()
    await chat(server)


asyncio.run(main())

客户端

import asyncio
from asyncio import transports
import aioconsole

trans = None


class TCPClientProtocol(asyncio.Protocol):
    def connection_made(self, transport: asyncio.BaseTransport) -> None:
        self.remoteaddr = transport.get_extra_info("peername")
        print("Connection made with ", self.remoteaddr)

    def data_received(self, data: bytes) -> None:
        print("MSG ", data.decode())

    def eof_received(self) -> bool | None:
        print("WARNING server is closed")

    def connection_lost(self, exc: Exception | None) -> None:
        print("Connection closed")


async def chat(trans: asyncio.Transport):
    while True:
        try:
            msg = await aioconsole.ainput(">")
            if msg == "EOF":
                raise EOFError
        except EOFError:
            trans.write_eof()
            trans.close()
            break
        else:
            trans.write(msg.encode())


async def main():
    loop = asyncio.get_running_loop()
    transport, protocol = await loop.create_connection(
        lambda: TCPClientProtocol(),
        host="127.0.0.1",
        port=9961,
        local_addr=("127.0.0.1", 12306),
    )
    await chat(transport)


asyncio.run(main())

其他

更高级的接口如
open_connection(),start_server实战放在下一篇博文,并连带弱引用与部分设计模式的实例。
子进程,ssl — 套接字对象的 TLS/SSL 包装器等之后需要用时再写。

0 评论:

发表评论