多线程批量翻译
合理爬取,不恶意扩大站点压力 本文章仅作示例,请勿用作非法用途
 
 
效果  
需求  自己的项目 需要一个功能,要求有大量的中英对照翻译,中文词很好找,网上随便搜一下就有了   但是英文翻译呢? 手动翻译自然可以,但身为程序猿,用代码实现不是更加优雅吗? 开干!
分析 经过梳理后,我们可以把整个过程分为以下几个步骤:
加载xlsx文件,解析为中文词的列表(元组) 
一一翻译,将结果储存在一个字典中 
保存为json文本 
 
开始 加载xlsx 由于不是本篇重点,直接上代码好了
1 2 3 4 5    xls = xlrd.open_workbook("D:/Downloads/7000hanzi.xlsx" )    sheet = xls.sheet_by_index(0 )    col = sheet.col_values(1 )    col.pop(0 )  
 
爬取翻译 旧方法 如果按照之前的方法,只需要遍历一下列表,然后一个一个翻译就好了。 但是!这个效率问题值得我们考虑。 假设翻译一个单词需要1s,连续翻译7000个也就要7000s;换句话说,将近两个小时! 这时候,就需要多线程出场了!
新思路 使用多线程有什么好处呢?
假设现在你面前有一个桌子,上面放着一堆苹果,有很多人等待着吃苹果。 规则是每次每人只能拿一个苹果,且拿到苹果的人吃完之后其他人才能拿下一个。 我们现在来看一看两种不同情况
 
单线程:由于吃苹果是一个很耗时间的过程,于是你的面前出现了一条长龙,每个人都在干巴巴的等着那个吃苹果的人,眼巴巴的看着他吃完苹果再换人。
 
多线程:一堆人一字排开,同时拿着苹果开吃,谁吃完了就换一个顶替。这样不一会儿苹果就被吃完了。
 
上面是我一个简单的比喻,希望你可以理解。 换到代码上面怎么实现呢?
使用多线程的例子 Python里面使用多线程很简单,主要用到threading.Thread 来一个小例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import  threadingimport  randomclass  FunnyThread (threading.Thread):    def  __init__ (self,id  ):         threading.Thread.__init__(self)         self.id  = id      def  run (self ) -> None :         print (f"线程 {self.id }  开始执行" )         sum_number = 0          for  i in  range (random.randint(100 ,10000000 )):             sum_number += i         print (f"线程 {self.id }  算完了" ) if  __name__ == "__main__" :    thread1 = FunnyThread("thread1" )     thread2 = FunnyThread("thread2" )     thread3 = FunnyThread("thread3" )     thread1.start()     thread2.start()     thread3.start() 
 
在上面这个例子中,我们定义了一个类继承自Thread,然后重写run方法,该方法就是线程执行时干的事。 上述例子中,每个线程会计算一定数量的整数和(吃苹果),具体算多少个数是随机的,以模拟不同线程执行不同耗时任务(也就是吃苹果的速度不一样) 运行上述例子,你会得到以下结果:(因随机数产生不同,结果以实际运行为准)  三条线程分别计算三组结果,互不干扰。关于多线程,就介绍到这里;有兴趣的话您可以先收藏着本帖,去其他地方深入了解相关知识再回来。这里简单的知识就够用了  
使用Queue 多线程翻译是可以了,但是怎么知道当前的线程应该去翻译哪个单词了呢? 这里我们用到Queue 还是拿苹果比喻。
假设有一个竖着的管子,管子的宽度只能容纳单个苹果,且单项开口,你最后放进去的最先拿出来。每个人走到管子前面,伸手抓一个,到旁边吃,换个人再抓一个……直到管子里面没有苹果,结束。
 
值得注意的是,Python中的Queue是线程安全的,也就是说,即使108个好汉都想同时上梁山,它也能确保梁山一次只被一个好汉上,不会因为多个线程同时访问而冲突。
放到代码中如下:
1 2 words_queue = Queue(7000 ) 
 
1 2 3 4 while  not  self.queue.empty():           cur_word = self.queue.get()            print (f"-->线程 {self.id }  正在翻译 “{cur_word} ” " )            cur_translation = self.translate(cur_word) 
 
保存为json 在保存之前,我们需要确保所有线程都已翻译完毕。 使用Thread的join()方法实现。
完整代码 请注意,不像之前的几个教程,这份代码不能直接执行。请对照注释自行修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 from  time import  sleepimport  xlrdimport  jsonimport  threadingimport  requestsfrom  queue import  Queuefrom  youdao3 import  YoudaoFanyiclass  Crawl_thread (threading.Thread):    def  __init__ (self, id , queue ) -> None :         threading.Thread.__init__(self)         self.id  = id          self.queue = queue     def  run (self ) -> None :         print ("->启动线程" , self.id )         self.start_crawl()         print ("-<线程" , self.id , "结束" )     def  start_crawl (self ):         while  not  self.queue.empty():             cur_word = self.queue.get()             print (f"-->线程 {self.id }  正在翻译 “{cur_word} ” " )             cur_translation = self.translate(cur_word)             words[cur_word] = cur_translation             sleep(0.5 )     def  translate (self, word ):         try :                                                    return  youdao.fanyi(word)         except  Exception as  e:             print (f"在翻译 {word}  时出错,错误是 {e}  " )             return  "错误"  words = {} THREAD_NUMS = 10   youdao = YoudaoFanyi() def  main ():              xls = xlrd.open_workbook("D:/Downloads/7000hanzi.xlsx" )     sheet = xls.sheet_by_index(0 )     col = sheet.col_values(1 )     col.pop(0 )            words_queue = Queue(7000 )     for  word in  col:         words_queue.put(word)          crawl_threads = []     crawl_names = []     for  i in  range (THREAD_NUMS):         crawl_names.append(f"thread-{i+1 } " )     for  name in  crawl_names:         thread = Crawl_thread(name, words_queue)         thread.start()         crawl_threads.append(thread)          for  t in  crawl_threads:         t.join()     print ("爬取完毕!" )          final_text = json.dumps(words)     with  open ("D:\projects\others\words.json" , "w+" ) as  f:         f.write(final_text)     print ("完成!" ) if  __name__ == "__main__" :    main() 
 
顺带一提,软件运行起来的效果看起来真的非常爽 刷刷刷的~
其他