2023年8月23日星期三

python两种方法实现异步的mysql

python两种方法实现异步的mysql

本次实验中将用两种方法实现异步的mysql

  1. 步骤1.创建(插入)表stutable
  2. 步骤2.创建(插入)表stutable
  3. 步骤3.在前两步完成后进行查询

方法1

import asyncio
import pymysql
import concurrent.futures

host = "127.0.0.1"
usr = "root"
pwd = "123456"
db = 'testaio'
port = 3306
def gatherTables(results:tuple[tuple[str]])->list[str]:
    ret = []
    for row in results:
        ret.append(row[0])
    return ret

def gatherTables(cur)->list[str]:
    cur.execute("SHOW TABLES;")
    results:tuple = cur.fetchall()
    ret = []
    for row in results:
        ret.append(row[0])
    return ret
       

'''
classic
'''
def classicfunc1(cur):
    table_list = gatherTables(cur)
    if "stutable" not in table_list:
        cur.execute("""CREATE TABLE stutable(
            name varchar(32),
            age int,
            hobby varchar(32));
        """)
        pass
    cur.execute("INSERT INTO stutable VALUES('epicmo',18,'program'),('wisdomgo',18,'cybersecurity'),('scarletborder',19,'sleep')")


def classicfunc2(cur):
    table_list = gatherTables(cur)
    if "stutable" not in table_list:
        cur.execute("""CREATE TABLE stutable(
            name varchar(32),
            age int,
            hobby varchar(32));
        """)
        pass
    cur.execute("INSERT INTO stutable VALUES('niuniu',18,'football'),('chenbao',18,'guandan'),('xiaowang',19,'cookrice')")
    pass

def classicfunc3(cur)->tuple[tuple]:
    table_list = gatherTables(cur)
    if 'stutable' in table_list:
        cur.execute("SELECT * FROM stutable")
        return cur.fetchall()
    else:
        return None

async def main():
    conn = pymysql.Connection(host=host,user=usr,password=pwd,port=port,autocommit=True)
    conn.select_db(db=db)

    cur = conn.cursor()
    cur.execute("DROP TABLE IF EXISTS stutable")

    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        fut1 = loop.run_in_executor(pool,classicfunc1,cur)
        fut2 = loop.run_in_executor(pool,classicfunc2,cur)
        await fut1
        await fut2

    print(classicfunc3(cur=cur))
    pass

    cur.close()
    conn.close()

asyncio.run(main())

方法2

import aiomysql
import asyncio

host = "127.0.0.1"
usr = "root"
pwd = "123456"
db = 'testaio'
port = 3306
async def aiogatherTables(cur:aiomysql.Cursor)->list[str]:
    await cur.execute("SHOW TABLES;")
    results:tuple = await cur.fetchall()
    ret = []
    for row in results:
        ret.append(row[0])
    return ret
    
async def delexisted(cur:aiomysql.Cursor):
    await cur.execute("DROP TABLE IF EXISTS stutable")

if __name__ == '__main__':
    async def aiofunc1(cur):
        table_list = await aiogatherTables(cur)
        if "stutable" not in table_list:
            await cur.execute("""CREATE TABLE stutable(
                name varchar(32),
                age int,
                hobby varchar(32));
            """)
            pass
        await cur.execute("INSERT INTO stutable VALUES('epicmo',18,'program'),('wisdomgo',18,'cybersecurity'),('scarletborder',19,'sleep')")


    async def aiofunc2(cur):
        await asyncio.sleep(0.5)
        table_list = await aiogatherTables(cur)
        if "stutable" not in table_list:
            await cur.execute("""CREATE TABLE stutable(
                name varchar(32),
                age int,
                hobby varchar(32));
            """)
            pass
        await cur.execute("INSERT INTO stutable VALUES('niuniu',18,'football'),('chenbao',18,'guandan'),('xiaowang',19,'cookrice')")
        pass

    async def aiofunc3(cur)->tuple[tuple]:
        table_list = await aiogatherTables(cur)
        if 'stutable' in table_list:
            await cur.execute("SELECT * FROM stutable")
            return cur.fetchall()
        else:
            return None
        
        pass
    async def main():
        conn1 = await aiomysql.connect(host=host,user=usr,password=pwd,port=port,autocommit=True,db=db)
        conn2 = await aiomysql.connect(host=host,user=usr,password=pwd,port=port,autocommit=True,db=db)

        cur1 = await conn1.cursor()
        cur2 = await conn2.cursor()
        await delexisted(cur1)

        tasks = [aiofunc1(cur1),aiofunc2(cur2)]
        await asyncio.gather(*tasks)
        results = await aiofunc3(cur1)
        print(results)

        await cur1.close()
        await cur2.close()
        conn1.close()
        conn2.close()



    asyncio.run(main=main())

值得注意的是,方法2中需要开两个connect,否则会导致

RuntimeError: readexactly() called while another coroutine is already waiting for incoming data

见网上教程说是能用连接池,这里mark一下

转载内容

Aiomysql 与 Sqlalchemy 的使用 - 知乎 (zhihu.com)

所以这里我们需要用两个不同的连接, 当然可以在每个函数中都重新对mysql数据进行连接,在执行完查询操作以后再关闭,但是这样就会造成之前说有频繁的创建连接会造成一些资源的浪费,同时网站的性能也会受到影响。

所以这时我们需要使用连接池,连接池会保存一定数量的连接对象,每个函数在需要使用的时候从池子中拿一个连接对象, 使用完以后再将连接对象放到池子中, 这样避免了频繁的和mysql数据库进行打开关闭操作,同时也避免出现上面的同个连接在不同的协程对象中使用而出现的异常。

loop = asyncio.get_event_loop()

async def test():
    pool = await aiomysql.create_pool(
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123456',
        db='mytest',
        minsize=1,
        maxsize=2,
        echo=True,
        autocommit=True,
        loop=loop
    )

async def get_user():
    async with pool.acquire() as conn:
        print(id(conn), 'in get user')
        async with conn.cursor() as cur:
            count = await cur.execute("select * from user")
            if not count:
                return
            r = await cur.fetchall()
            print("get data from user")
            for i in r:
                print(i)

async def get_jobs():
    async with pool.acquire() as conn:
        print(id(conn), 'in get jobs')
        async with conn.cursor() as cur:
            count = await cur.execute("select * from jobs")
            if not count:
                return
            r = await cur.fetchall()
            print("get data from jobs......")
            for i in r:
                print(i)

async def get_email():
    async with pool.acquire() as conn:
        print(id(conn), 'in get email')
        async with conn.cursor() as cur:
            count = await cur.execute("select * from email")
            if not count:
                return
            r = await cur.fetchall()
            print("get data from email......")
            for i in r:
                print(i)

await asyncio.gather(get_jobs(), get_user(), get_email())


loop.run_until_complete(test())

连接池的初始化函数 aiomysql.create_pool 和 aiomysql.connect 参数差不多,数据库的基本信息, 这里多了两个参数 minsize,maxsize, 最少连接数和最大连接数,我这里为了实验,将最大连接数设置为2,然后下面用了三个函数来获取连接池,我们将连接对象conn的id信息打印出来看下

2977786527496 in get jobs
2977786527496 in get user
2977786590984 in get email

上面的脚本也不再报错,并且可以正常的获取到数据库里的信息,且都是异步的进行查询

0 评论:

发表评论