发明名称 一种基于多线程编程及消息队列的多线程并行处理方法
摘要 本发明提供一种基于多线程编程及消息队列的多线程并行处理方法,属于计算机高性能计算领域。本发明对传统单线程串行软件的并行化进行改造,利用当前流行的多核CPU计算设备、pthread多线程并行计算技术及消息队列实现线程间通信的技术,其方法内容包括:在单节点内,创建三类pthread线程,分别为读、计算、写线程,并且各类线程数目灵活可配置,开辟多缓存,创建四个队列,用于线程间通信,调配计算任务及管理缓存空间资源。该方法广泛适用于具有多线程并行处理需求的应用场合,可指导软件开发人员对现有软件进行多线程化改造,实现软件对系统资源利用最优化,显著提高硬件资源利用率,提高软件的计算效率和软件整体性能。
申请公布号 CN102902512B 申请公布日期 2015.12.16
申请号 CN201210316211.7 申请日期 2012.08.31
申请人 浪潮电子信息产业股份有限公司 发明人 吴庆;张清;赵开勇
分类号 G06F9/38(2006.01)I 主分类号 G06F9/38(2006.01)I
代理机构 代理人
主权项 一种基于多线程编程及消息队列的多线程并行处理方法,其特征在于在单节点内,创建三类pthread线程,分别为读、计算、写线程,并且各类线程数目灵活可配置,开辟多缓存,创建四个队列,用于线程间通信,调配计算任务及管理缓存空间资源,具体步骤如下:一、基于多缓冲和消息队列建立任务分发机制,包括:1)计算任务的划分:任务划分的基本单位总的计算任务数是TOTAL_JOB,它可以被划分成多个子任务,定义每个子任务大小为JOB_SIZE,定义灵活的任务划分策略,软件有自动配置模式和用户手动配置模式;2)任务分发、执行策略,包括:(1)子任务实际由读线程来生成;读线程定义每个子任务的信息tmp_msg,包括:job_begin,job_size,buf_id;其中:job_begin是该任务计数编号,通过它可以确定该任务的起始LINE号和CMP号;job_size定义了该任务的大小,其上限是预先已经定义好的JOB_SIZE;buf_id指明了该任务所在的BUF编号;(2)任务信息tmp_msg的类型实际上就是消息队列成员的类型,被加入到各个队列中;3)子任务执行所需资源的竞争策略完成一个子任务,需要如下几个步骤:a)读线程根据当前读取进度CURRENT_READ_STEP及总作业大TOTAL_STEP,确定当前任务的起始job_begin,任务大小job_size,并且从空SR_BUF队列SR_BUF_EMPTY_QUEUE中获取一个空的SR_BUF_ID,将数据读入SR_BUF_ID对应的SR_BUF中,即SR_BUF[SR_BUF_ID],然后将新生成的任务信息保存至tmp_msg中,并将tmp_msg加入新计算任务队列SR_BUF_FULL_QUEUE中;b)计算线程需先从新计算任务队列SR_BUF_FULL_QUEUE中获取一个新计算任务,然后再从空闲目标缓冲队列DR_BUF_EMPTY_QUEUE中获取一个空闲DR_BUF_ID,之后才进行计算,计算源数据为SR_BUF[SR_BUF_ID],计算结果存放于DR_BUF[DR_BUF_ID]中,计算结束后,释放SR_BUF_ID对应的源数据缓存,即将SR_BUF_ID加入SR_BUF_EMPTY_QUEUE队列中,并告知写线程进行输出,即将tmp_msg加入到待输出队列DR_BUF_FULL_QUEUE中;c)写线程从待输出任务队列DR_BUF_FULL_QUEUE中获取一个写任务信息tmp_msg,该任务信息定义了数据存放的DR_BUF_ID以及该写任务需要写到的位置信息,即job_begin,以及写任务的规模job_size,写线程完成该输出任务后,需要告知计算线程DR_BUF[DR_BUF_ID]中的数据已经输出完毕,可重新用于存放计算结果,即将DR_BUF_ID加入DR_BUF_EMPTY_QUEUE队列中;二、多缓冲设计设计多个源数据缓冲SR_BUF和目标数据缓冲DR_BUF,缓冲的数目灵活可调,为了以最少的缓冲达到最高的效能,缓冲的个数有一个临限值,理论上,源缓冲与目标缓冲的数目至少为计算线程数的2倍,即:SR_BUF_NUM&gt;=2*COMPUTE_THREAD_NUM,DR_BUF_NUM&gt;=2*COMPUTE_THREAD_NUM考虑到实际生产中网络资源的竟争和不稳定因素,保证计算线程随时都能获得一个源缓冲和一个目标缓冲,软件为每个计算线程预留一个缓冲余量,默认将源数据缓冲和目标缓冲数都设置为计算线程数的3倍;三、环形消息队列设计为了实现上述任务分发策略,设计以下四个队列:<tables num="0001" id="ctbl0001"><table><tgroup cols="5"><colspec colname="c001" colwidth="22%" /><colspec colname="c002" colwidth="17%" /><colspec colname="c003" colwidth="16%" /><colspec colname="c004" colwidth="22%" /><colspec colname="c005" colwidth="23%" /><tbody><row><entry morerows="1">消息队列</entry><entry morerows="1">生产者</entry><entry morerows="1">消费者</entry><entry morerows="1">初始状态</entry><entry morerows="1">备注</entry></row><row><entry morerows="1">SR_BUF_EMPTY_QUEUE</entry><entry morerows="1">COMPUTE_thread</entry><entry morerows="1">READ_thread</entry><entry morerows="1">SR_BUF_ID全部入队</entry><entry morerows="1">空SR_BUF队列</entry></row><row><entry morerows="1">SR_BUF_FULL_QUEUE</entry><entry morerows="1">READ_thread</entry><entry morerows="1">COMPUTE_thread</entry><entry morerows="1">空</entry><entry morerows="1">满SR_BUF队列</entry></row><row><entry morerows="1">DR_BUF_EMPTY_QUEUE</entry><entry morerows="1">WRITE_thread</entry><entry morerows="1">COMPUTE_thread</entry><entry morerows="1">DR_BUF_ID全部入队</entry><entry morerows="1">空DR_BUF队列</entry></row><row><entry morerows="1">DR_BUF_FULL_QUEUE</entry><entry morerows="1">COMPUTE_thread</entry><entry morerows="1">WRITE_thread</entry><entry morerows="1">空</entry><entry morerows="1">满DR_BUF队列</entry></row></tbody></tgroup></table></tables>其中消息队列中存放的消息数据类型定义如下:<img file="FDA0000820529500000021.GIF" wi="1080" he="215" />1)SR_BUF_FULL_QUEUE:新的计算任务队列新计算任务消息队列,记录作业信息JOB_INFO,包括JOB_BEGIN,JOB_SIZE,SR_BUF_ID,由读线程写入(生产),计算线程弹出(消费)当读线程向SR_BUF_ID读入新数据时,将JOB_INFO入队,计算线程弹出JOB_INFO时,计算SR_BUF_ID对应的源数据;2)SR_BUF_EMPTY_QUEUE:存放当前空闲SR_BUF_ID号源缓冲释放消息队列,与SR_BUF_FULL_QUEUE功能相反,由计算线程写入,读线程弹出,当SR_BUF_ID对应的任务计算完毕时,释放SR_BUF_ID,告知读线程可对其更新数据;3)DR_BUF_EMPTY_QUEUE:存放当前空闲DR_BUF_ID号目标缓冲为空消息队列,记录DR_BUF_ID号,由写线程写入,计算线程弹出;当写线程对DR_BUF_ID数据输出完毕时,将DR_BUF_ID入队,告知计算线程,该DR_BUF_ID输出完毕可重新用于计算,计算线程弹出DR_BUF_ID时,启动计算,并将结果写入DR_BUF_ID对应的目标缓冲中;4)DR_BUF_FULL_QUEUE:新的写任务队列新写任务消息队列,记录作业信息JOB_INFO,包括JOB_BEGIN,JOB_SIZE,DR_BUF_ID,由计算线程写入,写线程弹出;当计算线程向DR_BUF_ID读入新数据时,将JOB_INFO入队,写线程弹出JOB_INFO时,对DR_BUF_ID对应的目标数据进行输出;四、线程设计1)主线程设计(1)主线程功能及运行流程如下:1&gt;参数预处理;2&gt;定义读、计算、写线程数;3&gt;定义源、目标缓存数目,与计算线程数相关;4&gt;定义任务划分粒度,即子任务规模JOB_SIZE;5&gt;开辟源、目标缓冲内存空间;6&gt;创建并初始化消息队列、锁、信号量;7&gt;创建并启动读、计算、写线程;8&gt;等待所有线程退出;9&gt;其它处理;10&gt;程序退出;2)读线程设计(1)线程数设计根据实际应用需求,灵活设置读线程数,默认只设置一个读线程;(2)线程功能及运行流程1&gt;参数初始化;2&gt;检查错误标志及用户行为,如果出错或被用户取消,则进入步骤9,否则进入步聚3;3&gt;检查当前任务进度READ_CURRENT_STEP,判断是否完成所有读任务,如果是,则进入步骤9,否则进入步聚4;4&gt;根据当前读进度READ_CURRENT_STEP和总任务数Total_Step,计算剩余任务数left_job,生成新任务起始job_begin及大小信息Job_size,job_size上限为JOB_SIZE,更新任务计数READ_CURRENT_STEP;5&gt;从SR_BUF_EMPTY_QUEUE队列中获得一个空闲SR_BUF_ID;6&gt;从源文件INPUT_FILE中读取任务源数据至源数据缓存SR_BUF[SR_BUF_ID]中;7&gt;将该任务信息tmp_msg加入新计算任务队列SR_BUF_FULL_QUEUE中;8&gt;返回步骤2;9&gt;线程退出;3)计算线程设计(1)线程数设计:默认情况下,计算线程数为系统可用CPU核数,即SYS_CPU_CORE_NUM,用户通过宏COMPUTE_THREAD_NUM来定义计算线程数;(2)线程功能及运行流程:1&gt;参数初始化;2&gt;检查错误标志及用户行为,如果出错或被用户取消,则进入步聚10,否则进入步聚3;3&gt;检查当前任务进度COMPUTE_CURRENT_STEP,判断是否完成所有读任务,如果是,则进入步骤10,否则进入步聚4;4&gt;从新计算任务队列SR_BUF_FULL_QUEUE中获取一个任务信息tmp_msg,其包含了任务的起始信息job_begin,大小信息job_size,以及任务数据存放源缓冲编号SR_BUF_ID,并更新任务计数COMPUTE_CURRENT_STEP;5&gt;从DR_BUF_EMPTY_QUEUE队列中获得一个空闲的DR_BUF_ID;6&gt;以SR_BUF[SR_BUF_ID]为数据输入缓存,以DR_BUF[DR_BUF_ID]为数据输出缓存进行计算;7&gt;将SR_BUF_ID加入SR_BUF_EMPTY_QUEUE中,表示SR_BUF[SR_BUF_ID]所存的数据计算完毕,需要重新加载源数据;8&gt;根据计算任务信息,生成写任务信息,并将其加入到DR_BUF_FULL_QUEUE队列中,表示需要写线程进行输出;9&gt;返回步骤2;10&gt;线程退出;4)写线程设计(1)线程数设计根据实际应用需求,灵活设置写线程数,默认只设置一个写线程;(2)线程功能及运行流程1&gt;参数初始化;2&gt;检查错误标志及用户行为,如果出错或被用户取消,则进入步骤9,否则进入步聚3;3&gt;检查当前任务进度WRITE_CURRENT_STEP,判断是否完成所有读任务,如果是,则进入步骤9,否则进入步聚4;4&gt;从新写任务队列DR_BUF_FULL_QUEUE中获取一个写任务信息tmp_msg,其包含了任务的起始信息job_begin,大小信息job_size,以及任务数据存放目标缓冲编号DR_BUF_ID,并更新任务计数WRITE_CURRENT_STEP;5&gt;将目标缓存DR_BUF[DR_BUF_ID]中的数据输出至OUTPUT_FILE;6&gt;将DR_BUF_ID加入DR_BUF_EMPTY_QUEUE中,表示DR_BUF[DR_BUF_ID]所存的数据输出完毕,需要重新加载计算结果;7&gt;向主线程发送更新进度条信号;8&gt;返回步骤2;9&gt;线程退出。
地址 250014 山东省济南市高新区舜雅路1036号