初识Python并行编程§
咸鱼上接了一单,任务就是统计某个文件夹里的所有txt文件,获取某些信息然后统一放到excel里。 这并不难。 但是,我不满意。为什么呢?太慢了,由于需要大规模读取文件,io压力比较大,同时遍历文件内容也是很cost的,我们认为这玩意既是io密集型也是cpu密集型。 看了看文件数量`45988`真的蛮多的,这些玩意加起来都1GB了。 实际上各个文件处理之间是没有联系的,那么我们为什么不采取并行编程呢? 说干就干!在这之前先展示一下我们的暴力代码。使用`pandas`写入`excel`就不用说了,常规操作罢了。 看下面几个函数。`remove_punctuation` 这个函数是为了移除字符串中的标点符号和特殊字符,只保留字母、数字以及中日韩文字。 为了使用正则表达式这里我们要`import re`,'[^a-zA-Z0-9\u4e00-\u9fff]'这个正则表达式模式匹配所有不是字母、数字或者中日韩统一表意文字的字符。 我们通过`sub`方法把这个正则表达式模式匹配到的内容替换为空就行了。`count_characters_in_file`我不解释了,很简单。`process_files`函数就是处理文件的整体逻辑,在这里为了可视化进度,我们使用了`tqdm`。import os import re from tqdm import tqdm import pandas as pd def remove_punctuation(text): pattern = r'[^a-zA-Z0-9\u4e00-\u9fff]' cleaned_text = re.sub(pattern, '', text) return cleaned_text def count_characters_in_file(file_path): with open(file_path, 'r', encoding='utf-8') as file: content = file.read() return len(remove_punctuation(content)) def process_files(directory): dict = { '股票代码' : [], '年份' : [], '字数' : [], } progress_bar = tqdm([file for file in os.listdir(directory) if file.endswith('.txt')]) for file in progress_bar: progress_bar.set_description(f"Processing : {file}") tokens = file.split('_') dict['股票代码'].append(tokens[0]) dict['年份'].append(tokens[1]) dict['字数'].append(count_characters_in_file(os.path.join(directory, file))) return dict directory = r'./年报(08-22)' df = pd.DataFrame(process_files(directory)) output_file = './output.xlsx' df.to_excel(output_file, index=False) print(f'DataFrame 已成功写入 {output_file}')接下来,我要开始大展拳脚了。 我会更改上方代码,在业务逻辑不变的情况下使用Python的`concurrent.futures`模块。这里就不进行代码讲解了,毕竟我也是比葫芦画瓢弄的,不过代码的思路很清晰哦。性能提升很显著,在最大线程数为10的情况下,性能峰值提升了15倍,之后会降低到5倍。 我跑代码的机子很烂,内存也很少,cpu和内存跑满了。 性能损失是可以预见的,但如果cpu性能足够,内存也够多,咱再开大点最大线程数,这提升的性能真的会很恐怖。最后评价一下我的工作,完美符号预期!哦对了,因为处理数据很大,而且还要照顾人家的隐私,这里就不提供了。 不过作为代码思路参考,还是很合适的^_^。import os import re from tqdm import tqdm import pandas as pd import concurrent.futures def remove_punctuation(text): pattern = r'[^a-zA-Z0-9\u4e00-\u9fff]' cleaned_text = re.sub(pattern, '', text) return cleaned_text def count_characters_in_file(file_path): with open(file_path, 'r', encoding='utf-8') as file: content = file.read() return len(remove_punctuation(content)) def process_file(file): file_path = os.path.join(directory, file) tokens = file.split('_') stock_code = tokens[0] year = tokens[1] char_count = count_characters_in_file(file_path) return {'股票代码': stock_code, '年份': year, '字数': char_count} def process_files(directory): dict_list = [] files = [file for file in os.listdir(directory) if file.endswith('.txt')] with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(process_file, file) for file in files] progress_bar = tqdm(concurrent.futures.as_completed(futures), total=len(files), colour='red') for future in progress_bar: dict_list.append(future.result()) dict_list = sorted(dict_list, key=lambda x: (x['股票代码'], x['年份'])) return dict_list directory = r'./年报(08-22)' df = pd.DataFrame(process_files(directory)) output_file = './output.xlsx' df.to_excel(output_file, index=False) print(f'DataFrame 已成功写入 {output_file}')