python多线程与单线程采集毛选对比
目录
前言
对于这种简单的电子书式网站采集,文章数不多,串行完全也足够快,没有必要单独去考虑多线程的实现方式。
但我们主要是借这个例子,梳理下,如何将单线程串行的多IO的业务,转换成采用多线程的方式,将多个IO分解异步进行,提高IO处理能力(这里的业务逻辑不是CPU密集计算型),也就是目的:
- 了解多线程解决IO异步问题,提高效率
- 了解多线程与单线程编码差异
- 了解计算密集型任务与IO密集型任务
- 采集markdown格式化毛选文章,以备自己阅读
两种思路
-
单线程阻塞
- 采集网站目录页面所有文章地址
- 遍历每一个文章地址,串行采集、markdown格式化、写入本地文件,组织md文章
- 程序结束退出
-
多线程非阻塞
- 采集网站目录页面获取所有文章地址
- 多线程采集与markdown格式化页面内容,写入内容队列
- 多线程读取内容队列,并写入本地文件,组织成md文章
- 采集线程与写文件线程退出,主线程退出
单线程采集毛选
单线程阻塞的编码方式很自然简单,处理完一个所有环节后,再处理下一个,逻辑详见入口方法run:
# coding:utf-8
## 从https://weiyinfu.cn/MaoZeDongAnthology/目录.html 获取毛泽东选集所有文章
import requests
from bs4 import BeautifulSoup
import os
class GetBooksMarkdown:
__desDir = "I:\\9ong\\毛泽东选集\\" #目的文件目录
__content = "" # 文章内容
__urlList = {}
def __init__(self):
print("get books ...\n")
# 获取目录页面是所有文章的标题与url地址,组织成字典__urlList
def __getUrlList(self):
page = requests.get("https://weiyinfu.cn/MaoZeDongAnthology/")
soupObj = BeautifulSoup(page.content,"html.parser")
bookItems = soupObj.select("li.chapter-item")
urls = {}
for item in bookItems:
title = item.select("a")[0].attrs["href"]
url = 'https://weiyinfu.cn/MaoZeDongAnthology/' + title
title = title[:-5]
if len(title) > 0 and len(url) > 0:
urls[title] = url
self.__urlList = urls
return self.__urlList
def __getDesDir(self):
return self.__desDir
# 采集指定url地址及区域,并转换成markdown格式,返回内容
def __toMarkdown(self,__pageUrl):
url = "http://www.9ong.com/xxx/api" # 演示demo
postdata = {
"url":__pageUrl,
"selector":"main",
"strip_tags":True,
"remove_nodes":"script",
"save_file":"false",
}
response = requests.post(url,data=postdata)
return response
# 以标题作为文件名,将md内容写入文件
def __writePage(self,desFilePath):
desDirPath = os.path.dirname(desFilePath)
# print("##Final Path:"+desFilePath)
# return
if not os.path.exists(desDirPath):
os.makedirs(desDirPath)
with open(desFilePath,"w",encoding="utf-8") as nf:
nf.write(self.__content)
if os.path.exists(desFilePath):
print("写入文章成功:",desFilePath,"\n")
else:
print("!!!写入新文件失败\n")
# 入口
def run(self):
# 遍历urls
urls = self.__getUrlList()
for title in urls:
print("开始处理文章:",title,"\n")
# 请求转换成markdown
print("请求转换markdown...\n")
res = self.__toMarkdown(urls[title])
# print(res.text)
# 写入文件
print("文章准备写入文件...","\n")
desFilePath = self.__getDesDir()+title+".md"
self.__content = res.text
self.__writePage(desFilePath)
# 循环下一个url
self.__content = ""
# return False # 测试一次
return True
if __name__ == '__main__':
gbm = GetBooksMarkdown()
gbm.run()
单线程阻塞的方式简单易理解,对于采集的数据不多的情况下,可快速无多思考的实现,时间效率(执行效率)换空间效率(编码设计效率)。
多线程采集毛选
具体逻辑详见run方法,实现两个线程池,一个处理采集IO任务,一个处理写文件IO任务,通过队列的方式,将两个任务分离异步处理,相当于IO并行,理论上效率至少会比单线程提升一倍(由于python有个全局解释器锁GIL,最终还是单线程运行的,所以多线程并不会提高cpu的利用率):
# coding:utf-8
## 从https://weiyinfu.cn/MaoZeDongAnthology/目录.html 获取毛泽东选集所有文章
import queue
import requests,os,time
from bs4 import BeautifulSoup
from queue import Queue
import threading
class GetBooksMarkdown:
__desDir = "I:\\9ong\\毛泽东选集\\" #目的文件目录
__urlList = {} # 所有文章标题与url地址 字典
__urlQueue = Queue() #type: Queue # 文章标题与地址队列,用于采集
__contentQueue = Queue() #type: Queue # 采集处理后的内容队列,生产于采集线程,用于写文件线程
__stopWriteThreadFlag = False # 用于通知写文件线程退出
def __init__(self):
print("get books ...\n")
# 获取所有文章标题与url地址 字典
def __getUrlList(self):
page = requests.get("https://weiyinfu.cn/MaoZeDongAnthology/")
soupObj = BeautifulSoup(page.content,"html.parser")
bookItems = soupObj.select("li.chapter-item")
urls = {}
for item in bookItems:
title = item.select("a")[0].attrs["href"]
url = 'https://weiyinfu.cn/MaoZeDongAnthology/' + title
title = title[:-5]
if len(title) > 0 and len(url) > 0:
urls[title] = url
self.__urlList = urls
return self.__urlList
# 获取文章写入文件目录
def __getDesDir(self):
return self.__desDir
# 远程文章获取并转换成markdown格式
def __toMarkdownThread(self):
try:
print('当前工作的线程为:',threading.currentThread().name,"\n")
while True:
if self.__urlQueue.empty():
print('当前工作的线程为:',threading.currentThread().name," __urlQueue队列已空\n")
break
else:
serverUrl = "http://www.9ong.com/xxx/api" # 演示demo
pageUrlItem = self.__urlQueue.get() #type: map # {"title":"","url":""}
print('当前工作的线程为:',threading.currentThread().name," 正在采集与md转换:",pageUrlItem["title"],"\n")
postdata = {
"url":pageUrlItem["url"],
"selector":"main",
"strip_tags":True,
"remove_nodes":"script",
"save_file":"false",
}
time.sleep(1)
response = requests.post(serverUrl,data=postdata)
if response.ok:
self.__contentQueue.put({"title":pageUrlItem["title"],"content":response.text})
except Exception as ex:
print("toMarkdownThread采集异常:",ex)
# finally:
# return True
# return response
# md格式文章写入本地文件
def __writePageThread(self):
while not self.__stopWriteThreadFlag:
try:
#if not self.__contentQueue.empty():
item = self.__contentQueue.get(False)
if not item:
pass
else:
desFilePath = self.__getDesDir() + item["title"] + ".md"
desDirPath = os.path.dirname(desFilePath)
print('当前工作的线程为:',threading.current_thread().name," 正在写入文件:",desFilePath,"\n")
# print("##Final Path:"+desFilePath)
# return
if not os.path.exists(desDirPath):
os.makedirs(desDirPath)
with open(desFilePath,"w",encoding="utf-8") as nf:
nf.write(item["content"])
if os.path.exists(desFilePath):
print("写入文章成功:",desFilePath,"\n")
else:
print("!!!写入新文件失败\n")
except Exception as ex:
pass
# print("writePageThread异常:",ex)
def run(self):
# 遍历urls
print("获取所有文章标题与地址...\n")
urls = self.__getUrlList()
urlLen = len(urls) # 2
print("共获取",urlLen,"篇文章地址信息...\n")
# print(urls)
# return False
# 构造文章url队列
print("构造文章地址信息队列...\n")
self.__urlQueue = Queue(urlLen)
count = 0
for title in urls:
# if count == 4:
# break
self.__urlQueue.put({"title":title,"url":urls[title]})
count += 1
# 初始化采集thread
print("初始化采集与md转换线程池...\n")
markdownThreads = []
markdownThreadIdList = ["md1","md2","md3","md4","md5"]
for tId in markdownThreadIdList:
mdThread = threading.Thread(target=self.__toMarkdownThread,name=tId)
mdThread.start()
markdownThreads.append(mdThread)
# 初始化写入文件thread
print("初始化文章md内容写入文件线程池...\n")
writeThreads = []
writeThreadIdList = ["write1","write2","write3"]
for tId in writeThreadIdList:
mdThread = threading.Thread(target=self.__writePageThread,name=tId)
mdThread.start()
writeThreads.append(mdThread)
# 等待队列,先进行采集与md转换
print("等待文章地址信息写入队列__urlQueue...\n")
while not self.__urlQueue.empty():
pass #不为空,则继续阻塞
for t in markdownThreads:
# print(t)
t.join()
# 等待队列,写入文件
print("等待文章写入文章内容队列__contentQueue...\n")
while not self.__contentQueue.empty():
pass
# contentQueue空时,通知所有write线程退出
print("通知写入文件线程退出...\n")
self.__stopWriteThreadFlag = True
for t in writeThreads:
t.join()
print("主线程结束.\n")
if __name__ == '__main__':
gbm = GetBooksMarkdown()
gbm.run()
两种思路小结
-
执行效率
单线程串行200+篇文章的采集、markdown、写文件,无sleep的实现,从第一篇文章22:47完成,到最后一篇文章22:54完成,共用时7~8分钟(480秒)。
多线程同样内容的采集(5个线程)、markdown、写文件(3个线程),每次采集sleep(1),从第一篇文章10:40完成,到最后一篇文章10:45完成,共用时5~6分钟(360秒),扣除sleep的230秒,实际执行用时 360 - 230 = 130秒左右,比单线程串行效率提高了3~4倍。
-
编码效率
单线程串行不到100行能完成采集、写入(注:markdown由远端接口完成)
多线程虽然看似只要不到200行,但更多精力会耗在设计上,毕竟解决方案思想需要人去深思,特别是追求高效、简单、快速,而这三个往往是不能同时存在的,高效、简单编码,就需要思考复杂方案,简单、快速编码,大概率不高效,快速高效编码,肯定不简单。
python多线程是真多线程吗
Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
python多线程适合IO密集型任务
计算密集型任务,主要是进行大量的计算,消耗cpu资源,比如圆周率计算、视频高清解码、图片处理、机器学习等,全靠CPU(GPU)的运算能力,最典型的就是深度学习,cpu负责调度,gpu就负责计算(任务单一,高效率),因此对于这些消耗cpu的任务,数据结构、算法、代码效率异常重要。
除了计算密集型的任务外,我们经常看到的更多的是IO密集型任务,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。
IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言不一定是最好的。
参考
- 多线程 - 廖雪峰的官方网站
- 为什么有人说 Python 的多线程是鸡肋呢? - 知乎
- python中sleep引起的程序假死现象 - 知乎
- 整理phper初学python想知道的知识点 - 9ong
- 目录 - 毛泽东选集