最近在做视频帧处理(前景提取),显然这个要求视频帧按照顺序输出。帧的处理顺序没有关系,因为没有使用推测功能。(要是开了帧预测的话,显然就只能顺序处理了,由于帧之间有依赖关系,所以最多只能做单帧内部的处理优化)

所以本次的思路基本是,寻找一个可以乱序执行,但是一定要能够按照任务的加入顺序获取执行结果的模型。看了一下,直接用threading.Thread,没有看到简洁地获取执行结果的方法;而用multithreading.pool.Pool,使用map函数的callback参数注册回调,在之前的尝试中没有成功过。然后在StackOverflow上看到有人推荐concurrent.futures.ThreadPoolExecutor。在Python里看到future这个词往往都是有惊喜的,这次也不例外。大致流程如下:

使用pool = ThreadPoolExecutor(maxThreadCount)注册一个全新的线程池 -> 使用pool.submit(function, args...)提交一个任务,并获得类似于线程句柄一类的返回对象,可以用来控制线程,包括在运行之前取消该任务、获取运行状态(是否完成)以及获取完成的返回值等。这几乎满足了之前提出的需求:只需要再自建一个满足先进先出的队列,就可以实现按帧的顺序读取了。

框架大致如下:

import multiprocessing as mp
from threading import Thread
from concurrent.futures import ProcessPoolExecutor

processPool = ProcessPoolExecutor(max(mp.cpu_count(), 1))
procList = []
maxQueueLength = 10

def produce(...):
    # 用来执行具体工作,比如处理单帧视频
    ...

def producer_func(...):
    # 用来控制多线程生成和执行情况的“主线程”,调用produce函数
    <condition-loop>
        ....
        # 确保队列不会过长,节省资源,时间换空间
        while len(procList) > maxQueueLength:
            continue
        procList.append(processPool.submit(produce, (...)))

if __name__ == '__main__':
    # 不阻塞主线程,将后续的任务还要放在这里执行,比如图像的显示
    Thread(target=producer_func, ...).start()

    while True:
        if len(procList) <= 0:
            continue
        elif not procList[0].done():
            continue
        res = procList[0].result()
        ...
        if <condition>:
            break

标签: none

添加新评论