eclipseimport+pydev from multiprocessing import Pool, cpu_count 报错

Python多进程multiprocessing使用示例 - 为程序员服务
为程序员服务
Python多进程multiprocessing使用示例
由于要做把一个多线程改成多进程,看一下相关方面的东西,总结一下,主要是以下几个相关的标准库
subprocess
multiprocessing
mutilprocess简介
像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。
简单的创建进程
import multiprocessing
def worker(num):
&&&thread worker function&&&
print 'Worker:', num
if __name__ == '__main__':
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
确定当前的进程,即是给进程命名,方便标识区分,跟踪
import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(2)
print name, 'Exiting'
def my_service():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(3)
print name, 'Exiting'
if __name__ == '__main__':
service = multiprocessing.Process(name='my_service',
target=my_service)
worker_1 = multiprocessing.Process(name='worker 1',
target=worker)
worker_2 = multiprocessing.Process(target=worker) # default name
worker_1.start()
worker_2.start()
service.start()
守护进程就是不阻挡主程序退出,自己干自己的
mutilprocess.setDaemon(True)
等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print 'Starting:', name
time.sleep(2)
print 'Exiting :', name
def non_daemon():
name = multiprocessing.current_process().name
print 'Starting:', name
print 'Exiting :', name
if __name__ == '__main__':
d = multiprocessing.Process(name='daemon',
target=daemon)
d.daemon = True
n = multiprocessing.Process(name='non-daemon',
target=non_daemon)
n.daemon = False
print 'd.is_alive()', d.is_alive()
最好使用 poison pill,强制的使用terminate()
注意 terminate之后要join,使其可以更新状态
import multiprocessing
import time
def slow_worker():
print 'Starting worker'
time.sleep(0.1)
print 'Finished worker'
if __name__ == '__main__':
p = multiprocessing.Process(target=slow_worker)
print 'BEFORE:', p, p.is_alive()
print 'DURING:', p, p.is_alive()
p.terminate()
print 'TERMINATED:', p, p.is_alive()
print 'JOINED:', p, p.is_alive()
进程的退出状态
== 0 未生成任何错误
进程有一个错误,并以该错误码退出
进程由一个-1 * exitcode信号结束
import multiprocessing
import sys
import time
def exit_error():
sys.exit(1)
def exit_ok():
def return_value():
def raises():
raise RuntimeError('There was an error!')
def terminated():
time.sleep(3)
if __name__ == '__main__':
for f in [exit_error, exit_ok, return_value, raises, terminated]:
print 'Starting process for', f.func_name
j = multiprocessing.Process(target=f, name=f.func_name)
jobs.append(j)
jobs[-1].terminate()
for j in jobs:
print '%15s.exitcode = %s' % (j.name, j.exitcode)
方便的调试,可以用logging
import multiprocessing
import logging
import sys
def worker():
print 'Doing some work'
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p = multiprocessing.Process(target=worker)
利用class来创建进程,定制子类
import multiprocessing
class Worker(multiprocessing.Process):
def run(self):
print 'In %s' % self.name
if __name__ == '__main__':
for i in range(5):
p = Worker()
jobs.append(p)
for j in jobs:
python进程间传递消息
这一块我之前结合SocketServer写过一点,见Python多进程
一般的情况是Queue来传递。
import multiprocessing
class MyFancyClass(object):
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print 'Doing something fancy in %s for %s!' % \
(proc_name, self.name)
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
queue.put(MyFancyClass('Fancy Dan'))
# Wait for the worker to finish
queue.close()
queue.join_thread()
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % proc_name
self.task_queue.task_done()
print '%s: %s' % (proc_name, next_task)
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print 'Creating %d consumers' % num_consumers
consumers = [ Consumer(tasks, results)
for i in xrange(num_consumers) ]
for w in consumers:
# Enqueue jobs
num_jobs = 10
for i in xrange(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in xrange(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print 'Result:', result
num_jobs -= 1
进程间信号传递
Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。
import multiprocessing
import time
def wait_for_event(e):
&&&Wait for the event to be set before doing anything&&&
print 'wait_for_event: starting'
print 'wait_for_event: e.is_set()-&', e.is_set()
def wait_for_event_timeout(e, t):
&&&Wait t seconds and then timeout&&&
print 'wait_for_event_timeout: starting'
print 'wait_for_event_timeout: e.is_set()-&', e.is_set()
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(name='block',
target=wait_for_event,
args=(e,))
w1.start()
w2 = multiprocessing.Process(name='nonblock',
target=wait_for_event_timeout,
args=(e, 2))
w2.start()
print 'main: waiting before calling Event.set()'
time.sleep(3)
print 'main: event is set'
您可能的代码
相关聚客文章
荣誉:1553
相关专栏文章Access denied | www.programcreek.com used Cloudflare to restrict access
Please enable cookies.
What happened?
The owner of this website (www.programcreek.com) has banned your access based on your browser's signature (d53c0-ua98).1 进程池Pool基本概述
在使用Python进行系统管理时,特别是同时操作多个文件目录或者远程控制多台主机,并行操作可以节约大量时间,如果操作的对象数目不大时,还可以直接适用Process类动态生成多个进程,几十个尚可,若上百个甚至更多时,手动限制进程数量就显得特别繁琐,此时进程池就显得尤为重要。
进程池Pool类可以提供指定数量的进程供用户调用,当有新的请求提交至Pool中时,若进程池尚未满,就会创建一个新的进程来执行请求;若进程池中的进程数已经达到规定的最大数量,则该请求就会等待,直到进程池中有进程结束,才会创建新的进程来处理该请求。
进程池不用频繁创建和销毁进程
2 进程池Pool的语法
&Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])&
processes:使用的工作进程的数量;若processes是None,默认适用os.cpu_count()返回的数量。
initializer:若initializer是None,则每一个工作进程在开始的时候就会调用initializer(*initargs)。
maxtasksperchild:工作进程退出前可以完成的任务数,完成后用一个新的工作进程来替代原进程,让闲置的资源释放,maxtasksperchild默认是None,此意味只要Pool存在工作进程就一直存活
context: 用在制定工作进程启动时的上下文,一般使用multiprocessing.Pool()或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。
如果主进程退出,则进程池中的所有进程均退出。
使用Pool创建进程池对象,同时进程池中进程已经启动,向进程池中添加事件时,事件排队执行。
实例方法:
p为进程池对象
p.apply():
&apply(func[, args=()[, kwds={}]])&
该函数用于传递不定参数,主进程会被阻塞直到函数执行结束,实际上这也就说所谓的同步执行。
同步执行,按照加入进程池的顺序执行事件,每次执行完一个再执行另一个,无法获取返回值。
p.apply_async()
&apply_async(func[, args=()[, kwds={}[, callback=None]]])&
与apply用法一样,但它是非阻塞且支持结果返回进行回调;实际上也就是异步执行。
异步执行,同时启动进程池中多个进程执行事件,可以获取事件返回值 —&&multiprocessing.pool.ApplyResult object at 0x7f7f6e4357f0&。
&map(func, iterable[, chunksize=None])&
Pool类中的map方法,与内置map函数用法基本一致,它融合了map函数和apply_async()函数的功能;它会使进程阻塞直到返回结果。
注意:虽然第二个参数是一个迭代器,但实际应用中,必须在整个队列就绪后,程序才会运行子进程。
p.close():关闭进程池,阻止更多的任务提交到进程池Pool,待任务完成后,工作进程会退出
p.terminate():结束工作进程,不再处理未完成的任务
p.join():等待工作线程的退出,必须在close()或terminate()之后使用,因被终止的进程需要被父进程调用wait(join等价于wait),否则进程会成为僵尸进程。
(1)使用Pool创建进程池对象,同时进程池中进程已经启动
(2)向进程池对象中添加事件,事件排队执行
(3)如果主进程退出,则进程池中所有进程都退出
3.1 基础实例
import multiprocessing as mp
def test():
p = mp.Pool(processes = 5) # 创建5条进程
for i in range(10):
p.apply_async(test) # 向进程池添加任务
p.close() # 关闭进程池,不再接受请求
p.join() # 等待所有的子进程结束
(1)进程池Pool被创建出来后,&p.apply_async(test)&语句不停地循环执行,相当于向进程池中提交了10个请求,它们会被放到一个队列中。
(2)&p = mp.Pool(5)&执行完毕后创建了5条进程,但尚未给它们分配各自的任务;也就意味着,无论有多少任务,实际的进程数只有5条,每次最多5条进程并行。
(3)当Pool中有进程任务执行完毕后,这条进程资源会被释放,Pool会按先进先出的原则取出一个新的请求给空闲的进程继续执行。
(4)当Pool所有的进程任务完成后,会产生5个僵尸进程,如果主进程/主线程不结束,系统不会自动回收资源,需要调用join函数负责回收。
(5)在创建Pool进程池时,若不指定进程的最大数量,默认创建的进程数为系统的内核数量
(6)如果采用p.apply(test)阻塞方式添加任务,其每次只能向进程池中添加一条任务,然后for循环会被阻塞等待,直到添加的任务被执行完毕,进程池中的5个进程交替执行新来的任务,此时相当于单进程。——该语句需要再深刻理解,尚未完全明白
3.2 apply方式添加任务
multiprocessing as mp
from time import sleep
def worker(msg):
print(os.getpid())
print(msg)
return msg
#创建进程池对象
p = mp.Pool(processes = 4)#创建4条进程
pool_result = []
for i in range(10):
msg = 'hello-%d'%i
r = p.apply(worker,(msg,))
#向进程池中添加事件,该语句为同步执行,
#没有返回值,这种方法用的比较少   r = p.apply(worker,(msg,))
pool_result.append(r)
#获取事件函数的返回值
for r in pool_result:
print('return:',r)
p.close()# 关闭进程池,不再接受请求,不能再向里面添加事件
p.join() # 等待进程池中的事件执行完毕,回收进程池
hello-<span style="color: #
return: hello-<span style="color: #
return: hello-1
return: hello-2
return: hello-3
return: hello-4
return: hello-5
return: hello-6
return: hello-7
return: hello-8
return: hello-9
这段代码运行较慢,和进程阻塞有关。相当于单线程!
当将代码(22行)中的&print('return:',r) &修改为&print('return:',r.get()) &时
hello-<span style="color: #
Traceback (most recent call last):
File "test1.py", line 22, in &module&
print('return:',r.get())
AttributeError: 'str' object has no attribute 'get'
最后报错:&AttributeError: 'str' object has no attribute 'get'&
3.3 applay_async方式添加任务
import multiprocessing as mp
from time import sleep
def worker(msg):
print(os.getpid())
print(msg)
return msg
#创建进程池对象
p = mp.Pool(processes = 4) #创建4条进程
pool_result = []
for i in range(10):
msg = 'hello-%d'%i
r = p.apply_async(worker,(msg,)) #向进程池中添加事件
pool_result.append(r)
#获取事件函数的返回值
for r in pool_result:
print('return:',r)
p.close()#关闭进程池,不再接受请求
p.join()# 等待进程池中的事件执行完毕,回收进程池
return: &multiprocessing.pool.ApplyResult object at 0x7f66d0e37d68&
return: &multiprocessing.pool.ApplyResult object at 0x7f66d0e37e80&
return: &multiprocessing.pool.ApplyResult object at 0x7f66d0e37f98&
return: &multiprocessing.pool.ApplyResult object at 0x7f66d0e410f0&
return: &multiprocessing.pool.ApplyResult object at 0x7f66d0e41208&
return: &multiprocessing.pool.ApplyResult object at 0x7f66d0e41320&
return: &multiprocessing.pool.ApplyResult object at 0x7f66d0e41438&
return: &multiprocessing.pool.ApplyResult object at 0x7f66d0e41550&
return: &multiprocessing.pool.ApplyResult object at 0x7f66d0e41668&
return: &multiprocessing.pool.ApplyResult object at 0x7f66d0e41780&
hello-<span style="color: #
(1)由于这个是异步方式添加任务,所以运行非常快
(2)由于for是内置循环函数,执行效率较高,所以在结果的前10行均为for语句执行结果
(3)&r = p.apply_async(worker,(msg,))&执行结果为进度对象。
(4)由于任务是异步执行,所以在结果中是“乱序”;并不像applay那样有序打印。
同样将代码(22行)中的&print('return:',r)&修改为&print('return:',r.get()) &时,
hello-<span style="color: #
return: hello-<span style="color: #
return: hello-1
return: hello-2
return: hello-3
return: hello-4
return: hello-5
return: hello-6
return: hello-7
return: hello-8
return: hello-9
&在结果中出现“顺序混乱”
这与进程调度及运行时间有所差别有关,当有多个进程并行执行时,每个进程得到的时间片时间不一样,哪个进程接受那个求情以及执行完成时间都死不定的,所以输出会出现乱序的情况。
有时候还会出现两行数据出现在同一行,而下一行却为空行的情况,该情况可能时再执行第一个进程时,刚要打印换行符时,另一个进程也打印出来,这样就有可能本来两行的数据却在同一行打印出来,而两个换行符却次第打印出来,所以就会出现空行的情况。
注意:apply_async()函数本身就可以返回被进程调用的函数返回值。在创建子进程的代码中,若在被调用函数中返回一个值,那么pool.apply_async(func, (msg,))的结果就是返回pool中所有进程的“值的对象”(注意是对象,而不是值本身);同时对比不难发现,pool.apply的结果返回的是被调用函数的返回值,这里是值而不是对象。
比较使用进程池与函数之间
import time
from multiprocessing import Pool
def run(fn):
time.sleep(1)
return fn*fn
test = [1,2,3,4,5,6,7,8]
s = time.time()
for fn in test:
e = time.time()
print('执行时间:',e - s)
pool = Pool(3)
#使用该模块中的map融合了原map函数和该模块中apply_async函数
r = pool.map(run,test)
pool.close()
pool.join()
e1 = time.time()
print('执行时间:',e1 - e)
执行时间: 8.746
执行时间: 3.6904
阅读(...) 评论()在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
例1:使用进程池
#coding: utf-8
import multiprocessing
import time
def func(msg):
print "msg:", msg
time.sleep(3)
print "end"
if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in xrange(4):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
pool.close()
pool.join()
#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print "Sub-process(es) done."
一次执行结果
mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ello 0
msg: hello 1
msg: hello 2
msg: hello 3
Sub-process(es) done.
函数解释:
apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
close() & &关闭pool,使其不在接受新的任务。
terminate() & &结束工作进程,不在处理未完成的任务。
join() & &主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出&msg: hello 3&出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出&mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~&,主程序在pool.join()处等待各个进程的结束。
例2:使用进程池(阻塞)
#coding: utf-8
import multiprocessing
import time
def func(msg):
print "msg:", msg
time.sleep(3)
print "end"
if __name__ == "__main__":
pool = multiprocessing.Pool(processes = 3)
for i in xrange(4):
msg = "hello %d" %(i)
pool.apply(func, (msg, ))
#维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
pool.close()
pool.join()
#调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print "Sub-process(es) done."
一次执行的结果
msg: hello 0
msg: hello 1
msg: hello 2
msg: hello 3
Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~
Sub-process(es) done. 
例3:使用进程池,并关注结果
import multiprocessing
import time
def func(msg):
print "msg:", msg
time.sleep(3)
print "end"
return "done" + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(3):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print ":::", res.get()
print "Sub-process(es) done."
一次执行结果
msg: hello 0
msg: hello 1
msg: hello 2
::: donehello 0
::: donehello 1
::: donehello 2
Sub-process(es) done.
&注:get()函数得出每个返回结果的值
例4:使用多个进程池
#coding: utf-8
import multiprocessing
import os, time, random
def Lee():
print "\nRun task Lee-%s" %(os.getpid()) #os.getpid()获取当前的进程的ID
start = time.time()
time.sleep(random.random() * 10) #random.random()随机生成0-1之间的小数
end = time.time()
print 'Task Lee, runs %0.2f seconds.' %(end - start)
def Marlon():
print "\nRun task Marlon-%s" %(os.getpid())
start = time.time()
time.sleep(random.random() * 40)
end=time.time()
print 'Task Marlon runs %0.2f seconds.' %(end - start)
def Allen():
print "\nRun task Allen-%s" %(os.getpid())
start = time.time()
time.sleep(random.random() * 30)
end = time.time()
print 'Task Allen runs %0.2f seconds.' %(end - start)
def Frank():
print "\nRun task Frank-%s" %(os.getpid())
start = time.time()
time.sleep(random.random() * 20)
end = time.time()
print 'Task Frank runs %0.2f seconds.' %(end - start)
if __name__=='__main__':
function_list=
[Lee, Marlon, Allen, Frank]
print "parent process %s" %(os.getpid())
pool=multiprocessing.Pool(4)
for func in function_list:
pool.apply_async(func)
#Pool执行函数,apply执行函数,当有一个进程执行完毕后,会添加一个新的进程到pool中
print 'Waiting for all subprocesses done...'
pool.close()
pool.join()
#调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到pool,join函数等待素有子进程结束
print 'All subprocesses done.'
一次执行结果
parent process 7704
Waiting for all subprocesses done...
Run task Lee-6948
Run task Marlon-2896
Run task Allen-7304
Run task Frank-3052
Task Lee, runs 1.59 seconds.
Task Marlon runs 8.48 seconds.
Task Frank runs 15.68 seconds.
Task Allen runs 18.08 seconds.
All subprocesses done.
#coding: utf-8
import multiprocessing
def m1(x):
print x * x
if __name__ == '__main__':
pool = multiprocessing.Pool(multiprocessing.cpu_count())
i_list = range(8)
pool.map(m1, i_list)
一次执行结果
 参考:http://www.dotblogs.com.tw/rickyteng/archive//69635.aspx 
问题:http://bbs.chinaunix.net/thread--1.html
#coding: utf-8
import multiprocessing
import logging
def create_logger(i):
class CreateLogger(object):
def __init__(self, func):
self.func = func
if __name__ == '__main__':
ilist = range(10)
cl = CreateLogger(create_logger)
pool = multiprocessing.Pool(multiprocessing.cpu_count())
pool.map(cl.func, ilist)
print "hello------------&"
一次执行结果
hello------------&
阅读(...) 评论()在 SegmentFault,学习技能、解决问题
每个月,我们帮助 1000 万的开发者解决各种各样的技术问题。并助力他们在技术能力、职业生涯、影响力上获得提升。
问题对人有帮助,内容完整,我也想知道答案
问题没有实际价值,缺少关键内容,没有改进余地
def calcutype(dataframe,model,xiangguandict):
'''主函数'''
typelist = {}
xiangguan = {}
pool = multiprocessing.Pool(40)
for index, row in dataframe.iterrows():
scorearr = []
name = row['data_name1'].split(',') #data_name
descrip = row['data_descrip1'].split(',') #data_descrip
#计算得出分类,与相关度
res[index]=pool.apply_async(calcumodelnum, (name,descrip,model,xiangguandict))
pool.close()
pool.join()
for i in res:
#本条分类相关度计算完毕
typelist[i] = res[i].get()[0]
xiangguan[i] = res[i].get()[1]
return typelist,xiangguan
def calcumodelnum(***省略***):
logging.info(multiprocessing.current_process().name+'计算完了')
return 结果
def main():
data1 = **省略***
model = **省略***
dict1 = **省略***
calcutype(data1,model,dict1)
if __name__ == '__main__':
大家看到我上面的代码了,因为在40核的机器上跑,所以我启动了40个进程,但是看cpu情况
明明任务很多,却根本没有打满,有很多核空着,
像这些时间那就根本动也不动。
log显示确实启动了40个进程,可是那些进程好像启动之后就不动了似的。
运行的log如下:
我又试了试启动10个进程,最后程序运行时间与启用40个几乎一样长!!!
这是怎么回事?求大神指点。(那个for循环很长,计算强度够大)
答案对人有帮助,有参考价值
答案没帮助,是错误的答案,答非所问
代码修改下, 应该能跑满CPU
from multiprocessing import Pool, cpu_count, current_process
data1 = '...'
# dataframe
model = '...'
dict1 = '...'
typelist = {}
xiangguan = {}
def calcutype(row):
index, data = row
name = data['data_name1'].split(',')
# data_name
descrip = data['data_descrip1'].split(',')
# data_descrip
res[index] = calcumodelnum(name, descrip)
logging.info(current_process().name + '计算完了')
def calcumodelnum(name, des):
"""你的计算逻辑"""
def main():
pool = Pool(cpu_count())
pool.map(calcutype, data1.iterrows())
pool.close()
pool.join()
for i in res:
# 本条分类相关度计算完毕
typelist[i] = res[i].get()[0]
xiangguan[i] = res[i].get()[1]
return typelist, xiangguan
if __name__ == '__main__':
答案对人有帮助,有参考价值
答案没帮助,是错误的答案,答非所问
multiprocessing.Pool 只是用来启动多个进程而不是在每个core上启动一个进程。换句话说Python解释器本身不会去在每个core或者processor去做负载均衡。这个是由操作系统决定的。如果你的工作特别的计算密集型的话,操作系统确实会分配更多的core,但这也不是Python或者代码所能控制的或指定的。
multiprocessing.Pool(num)中的num可以很小也可以很大,比如I/O密集型的操作,这个值完全可以大于cpu的个数。
硬件系统的资源分配是由操作系统决定的,如果你希望每个core都在工作,就需要更多的从操作系统出发了~
分享到微博?
关闭理由:
删除理由:
忽略理由:
推广(招聘、广告、SEO 等)方面的内容
与已有问题重复(请编辑该提问指向已有相同问题)
答非所问,不符合答题要求
宜作评论而非答案
带有人身攻击、辱骂、仇恨等违反条款的内容
无法获得确切结果的问题
非开发直接相关的问题
非技术提问的讨论型问题
其他原因(请补充说明)
我要该,理由是:
在 SegmentFault,学习技能、解决问题
每个月,我们帮助 1000 万的开发者解决各种各样的技术问题。并助力他们在技术能力、职业生涯、影响力上获得提升。

我要回帖

更多关于 eclipseimport报错 的文章

 

随机推荐