2023年8月1日星期二

myProcess

pyProcess

参考视频

多道技术

单核实现并发的效果,
并发:看起来像同时运行的就可以叫做并发
并行:真正意义上的同时运行

空间与时间上的复用

  • 空间上
    多个程序公用一套计算机硬件
  • 时间上
    切换+保存状态

切换分为两种

  1. 当一个程序遇到IO操作,操作系统会立刻剥夺该程序的cpu执行权限(提供了cpu利用率并且不影响程序的执行效率)
  2. 当一个程序长时间占用cpu,操作系统也会立刻剥夺该程序的cpu执行权限(降低了程序的运行效率但是有并发的效果)

进程理论

进程调度

先来先服务调度算法对长作业有利,对短作业无益
短作业优先调度算法对短作业有利,多长作业无益
时间片轮转法和多级反馈队列:
时间片:将固定的时间切分成N多份,每一份就表示一个时间片
越往下说明该任务需要的时间越长,越往下任务的执行优先级越低
当第一个队列中出现了新的任务,那么cpu会立刻停止当前执行的任务去执行新添加进来的第一层队列中的任务
在linux中可以给任务设置优先级,一次性分配好几个时间片

进程运行的三状态图

三态
三态解说

两对重要概念

  • 同步异步
    描述任务的提交方式
    • 同步:任务提交后,原地等待任务的返回结果,等待过程中不做任何事。
      程序层面上表现出来的感觉就是卡住了
    • 异步:任务提及之后,不原地等待任务的返回结果,直接去做其他事情。
      提交任务结果如何获取?任务的返回结果会有一个异步回调机制自动处理
  • 阻塞非阻塞
    描述程序的运行状态
    • 阻塞:阻塞态
    • 非阻塞:就绪态、运行态
      理想状态下,我们的代码永远处于就绪态和运行态之间切换

上述最高效一种组合是异步非阻塞

开启进程两种方式

定心丸:代码开启进程和线程的方式,代码书写基本是一样的,你学会了如何开启进程就学会了如何开启线程

# 开启进程两种方式
# 1创建对象
from multiprocessing import Process, freeze_support
import time

def task(name):
    print(f"{name} is running")
    print(1)
    time.sleep(3)
    print(f"{name} is over")

"""
windows下,创建进程一定要main下创建
因为windows下创建进程类似于模块导入的方式
会从上往下以此执行代码 
linux则是完整拷贝一份代码
"""
# 1.创建一个对象
p = Process(target=task, args=("touhou",))
# 2.开启一个进程
p.start()  # 告诉操作系统创建进程,异步
print("main process is over")
# 2。类的继承
from multiprocessing import Process
import time
class MyProcess(Process):
    def run(self):
        # 名字固定必须是run函数
        print("hello world")
        time.sleep(1)
        print("awake")

p = MyProcess()
p.start()
print("main")

总结
创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去
一个进程对应在内存中就是一块独立的内存空间
多个进程对应在内存中就是多块独立的内存空间
进程与进程之间数据默认情况下是无法直接交互,如果想交互可以借助于第三方工具、模块

join方法

join是让主进程等待子进程代码运行结束之后,再继续运行。不影响其他子进程的执行

from multiprocessing import Process, freeze_support
import time


def task(name,t):
    print(f"{name} is running")
    time.sleep(t)
    print(f"{name} is over")


if __name__ == "__main__":
    # 1.创建一个对象
    p1 = Process(target=task, args=("reimu",1))
    p2 = Process(target=task, args=("hakurei",2))
    # 2.开启一个进程
    p1.start()
    p2.start()

    '''
    如果有多个进程,那么start即使有顺序但是操作系统执行不按顺序,仅仅是告诉操作系统要创建进程
    '''
    # 主进程等待子进程p运行结束之后再继续往后执行.但是执行开始是从start开始(计时)的
    p1.join()  
    p2.join()
    print("main process is over")

结果,多次程序运行,reimu和hakurei输出顺序会不同。此外程序共运行了3秒

进程之间数据相互隔离

from multiprocessing import Process, freeze_support
import time

money = 100

def task():
    global money
    money = 66

if __name__ == "__main__":
    p = Process(target=task)
    p.start()
    p.join()
    print(money) # 主进程中money=100,子进程是66,但这里输出的是主进程不能被子进程修改

关于target参数传入的方法带不带括号,如果不带,则是调用的是方法本身,或者说是这个方法的内存地址。
带括号则是调用方法的返回值,子进程阻塞地执行,直到返回

进程对象及其他方法

一台计算机上面运行看很多进程,那么计算机是如何区分并管理这些进程服务端的呢
计算机会给每一个运行的进程分配一个PID号
如何查看
windows电脑
进入cmd输入tasklist即可查看
tasklist | findstr PID查看具体的进程

py中使用current_process().pidos.getpid()查看当前进程号
主主:例如在pycharm解释器中执行代码,可以通过os.getppid()得到父进程的pid

杀死当前进程,使用Process.terminate()非阻塞的方法
是告诉操作系统帮你去杀死当前进程,但是需要一定的时间,而代码的运行速度极快,在terminate()起效前已经判断进程是否存活,所有需要sleep(time)
判断当前进程是否存活Process.is_alive()

一般情况下我们会默认将存储布尔值的变量名和返回的结果是布尔值的方法名都起成以is_开头

僵尸进程与孤儿进程

僵尸进程:死了但没死透

def run():
    print("hello")
    time.sleep(1)
    print("exit")

if __name__ == "__main__":
    print("main start")
    p = Process(target=run)
    p.start()
    print("main end")

例如在程序中,开启子进程后,该进程死后不会立刻释放占用的进程号
因为要让父进程能够查看到开设的子进程的一些基本信息,如占用的pid号 运行时间
所有的进程都会步入僵尸进程

  • 父进程不死并且在无限制的创建子进程并且子进程也不结束
  • 回收子进程占用的pid号
    = 父进程等待子进程运行结束
    = 父进程调用join方法

孤儿进程:子进程存活,父进程意外死亡
操作系统会开设一个”儿童福利院”专门管理孤儿进程回收相关资源

守护进程

daemon

def task(name: str):
    print(f"{name}总管正常活着")
    time.sleep(3)
    print(f"{name}总管正常死亡")

if __name__ == "__main__":
    p = Process(target=task, args=("reimu",))
    p.daemon = True  # 将进程p设置成守护进程,需要在start之前设置这里
    p.start()
    time.sleep(1)
    print("main end")

输出

reimu总管正常活着
main end

互斥锁

首先请看此模拟抢票问题,如下代码

# 模拟抢票功能
def query(i):
    # 文件操作,读取票数
    with open("./data", "r", encoding="utf-8") as f:
        data: dict = json.load(f)
    print(f"用户{i}查询余票{data['ticket_num']}")  # 字典取值不要用[]的形式,推荐使用get。代码追求不能报错

# 买票,先查询,再买票
def buy(i):
    with open("./data", "r", encoding="utf-8") as f:
        data: dict = json.load(f)
    # simulate delay of internet
    time.sleep(random.randint(1, 3))
    # judge whether there are tickets
    if data.get("ticket_num") > 0:
        # 有票,修改数据库
        data["ticket_num"] -= 1
        # 写入数据
        with open("./data", "w", encoding="utf-8") as f:
            json.dump(data, f)
        print(f"{i} buy successfully")
    else:
        print(f"{i} buy failed")

def run(i):
    query(i)
    buy(i)

if __name__ == "__main__":
    for i in range(1, 10):
        p = Process(target=run, args=(i,))
        p.start()
# data
{"ticket_num":1}

输出结果为每个用户在查票时,返回的都是成功购票前的票数导致read的都是1,write都是0.导致一票多买

针对多个进程操作同一份数据的时候会出现数据错乱的问题,
针对上述问题,解决方式就是加锁处理:将并发变成串行,牺牲效率但是保证了数据的安全

解决办法

from multiprocessing import Lock
def run(i, mutex: " Lock"):
    query(i)
    # 买票时需要排队,给买票环节加锁处理
    # 抢锁
    mutex.acquire()
    buy(i)
    # 释放锁
    mutex.release()

if __name__ == "__main__":
    # 在主进程里生成一把锁。让所有子进程抢,谁抢到谁买
    mutex = Lock()
    for i in range(1, 10):
        p = Process(target=run, args=(i, mutex))
        p.start()

结果,串行,只有一个successfully。其他依次执行,也会delay但抢不到票

拓展:行锁,表锁。仅能有一人操作一条数据和一个表的区别

注息
1.锁不要轻易的使用,容易造成死锁现象(我们写代码一般不会用到,都是内部封装好的)
2.锁只在处理数据的部分加来保证数据安全(只在争抢数据的环节加锁处理即可)

进程间通信IPC机制

前文中所述,Process间互不干扰,那么怎么在进程间传递数据
借助消息队列(管道)

队列

queue模块
用法类似stl的queue

from queue import Queue

q = Queue()  # 括号内可以传数字,表示maxsize,默认为0(32767)
# 当队列数据放满了之后 如果还有数据要放程序会阻塞 直到有位置让出来
# 存数据,emplace
q.put(1)

# 取,top() + pop()
# 取如果已没数据的队列也会堵塞
print(q.get())
# 这个不会堵塞
q.get_nowait()
q.get(timeout=3)# 没有数据,三秒后报错queue empty

q.full()
q.empty()
q.get_nowait()
在多进程的情况下是不精确,因为可能一个进程操作完时另一个进程进行操作

队列:管道+锁

IPC机制

  1. 主进程跟子进程借助于队列通信
def producer(q: Queue):
    q.put("12345")
    print("hello world")

if __name__ == "__main__":
    q = Queue()
    p = Process(target=producer, args=(q,))
    p.start()
    # p.join() 这里不需要同步,因为get如果get不到数据会阻塞
    print(q.get())
  1. 子进程跟子进程借助于队列通信
def producer(q: Queue):
    q.put("12345")
    print("hello world")

def consumer(q: Queue):
    print(q.get())

if __name__ == "__main__":
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    p1.start()
    p2.start()

这个queue就是消息队列

生产者消费者模型

  • 生产者:生产/制造东西的
  • 消费者:消费/处理东西的

该模型除了上述两个之外还需要一个媒介
生活中的例子做包子的将包子做好后放在蒸笼(媒介)里面,买包子的取蒸笼里面拿
厨师做菜做完之后用盘子装着给你消费者端过去
生产者和消费者之间不是直接做交互的,而是借助于媒介做交互
生产者(做包子的) + 消息队列(蒸笼) +消费者(吃包子的)

示例

from multiprocessing import Process, Queue
import time
import random


def Producer(cooker, food, q: Queue):
    for nu in range(1, 5):
        time.sleep(random.randint(1, 2))
        print(f"{cooker} made {food}{nu}")
        q.put(food + str(nu))


def Consumer(player, q: Queue):
    while True:
        food = q.get() # 没有数据就会卡住
        if food is None:
            break
        print(f"{player} eat {food}")


if __name__ == "__main__":
    q = Queue()
    p1 = Process(target=Producer, args=("a1", "baozi", q))
    p2 = Process(target=Producer, args=("b2", "jiaozi", q))
    c1 = Process(target=Consumer, args=("v1", q))
    c2 = Process(target=Consumer, args=("v2", q))

    p1.start()
    p2.start()
    c1.start()
    c2.start()

    # 等待所有生产者生产完毕后,往队列中添加特定的结束符号
    p1.join()
    p2.join()

    q.put(None) # 但是不方便统计消费者的数量,怎么处理
    q.put(None)

使用joinablequeue 和 守护进程改良程序
joinablequeue类型的队列,
队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
方法介绍:JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:

  • q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
  • q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
from multiprocessing import Process, Queue, JoinableQueue
import time
import random


def Producer(cooker, food, q: Queue):
    for nu in range(1, 5):
        time.sleep(random.randint(1, 2))
        print(f"{cooker} made {food}{nu}")
        q.put(food + str(nu))


def Consumer(player, q: JoinableQueue):
    while True:
        food = q.get()
        print(f"{player} eat {food}")
        q.task_done()  # 告诉队列你已经从里面取出了一个数据并且处理完毕了


if __name__ == "__main__":
    q = JoinableQueue()
    p1 = Process(target=Producer, args=("a1", "baozi", q))
    p2 = Process(target=Producer, args=("b2", "jiaozi", q))
    c1 = Process(target=Consumer, args=("v1", q))
    c2 = Process(target=Consumer, args=("v2", q))

    c1.daemon = True
    c2.daemon = True
    p1.start()
    p2.start()
    c1.start()
    c2.start()

    # 等待所有生产者生产完毕后,往队列中添加特定的结束符号
    p1.join()
    p2.join()

    q.join()  # 等待队列中所有的数据被取完再执行往下执行代码

线程相关知识点

线程理论

什么是线程:
进程:资源单位,起一个进程仅仅只是在内存空间中开辟一块独立的空间
线程:执行单位,真正被cpu执行的其实是进程里面的线程,线程指的就是代码的执行过程,执行代码中所需要使用到的资源都找所在的进程索要

将操作系统比喻为一个工厂,那么进程就是工厂里的车间,线程是工厂里的流水线

所以进程在工作时,干活的实际是进程里的线程,所以每个进程要自带一个线程,每个线程所需要的资料如变量名向进程要。
每一个进程肯定自带一个线程

进程和线程都是虚拟单位,只是为了我们更加方便的描述问题

为何要有线程
开设进程

  • 申请内存空间,耗资源
  • “拷贝代码”,耗资源

开线程

  • 一个进程内可以开设多个线程,在用一个进程内开设多个线程无需再次申请内存空间操作
  • 开设线程的开销要远远的小于进程的开销
  • 同一个进程下的多个线程数据是共享的

例如,我们要开发一款文本编辑器

  • 获取用户输入的功能
  • 实时展示到屏幕的功能
  • 自动保存到硬盘的功能
  • 针对上面这三个功能,开设进程还是线程合适???
    开三个线程处理上面的三个功能更加的合理

0 评论:

发表评论