#!/usr/bin/env python3# -*- coding: utf-8 -*-"""多余回车去除工具功能:去除所有非指定标点后的回车支持拖拽文件和文件夹进行批量处理,直接覆盖原文件兼容Termux等移动端环境"""import reimport osimport sysimport tempfileimport shutilimport chardetimport timedef detect_file_encoding(file_path): """ 检测文件编码 使用chardet库检测编码,然后尝试读取 返回编码和文件内容 """ try: # 先读取二进制数据 with open(file_path, 'rb') as f: raw_data = f.read() if not raw_data: return None, "" # 检测编码 result = chardet.detect(raw_data) detected_encoding = result['encoding'] confidence = result['confidence'] # 常见的编码映射 encoding_map = { 'ascii': 'utf-8', # ASCII是UTF-8的子集 'gb2312': 'gbk', # gb2312是gbk的子集 'iso-8859-1': 'gbk', # 常见于中文Windows 'windows-1252': 'gbk', # 常见于中文Windows } # 如果检测到的编码是None,则使用默认编码 if detected_encoding is None: detected_encoding = 'utf-8' # 如果有映射,则使用映射后的编码 detected_encoding = encoding_map.get(detected_encoding.lower(), detected_encoding) # 如果置信度较低,则尝试常见的中文编码 if confidence < 0.7: for encoding in ['utf-8-sig', 'utf-8', 'gbk', 'gb2312', 'gb18030']: try: return encoding, raw_data.decode(encoding) except: continue # 使用检测到的编码尝试解码 try: return detected_encoding, raw_data.decode(detected_encoding, errors='ignore') except: # 如果检测到的编码失败,则尝试常见编码 for encoding in ['utf-8-sig', 'utf-8', 'gbk', 'gb2312', 'gb18030']: try: return encoding, raw_data.decode(encoding) except: continue # 如果所有编码都失败,则使用utf-8忽略错误 return 'utf-8', raw_data.decode('utf-8', errors='ignore') except Exception as e: print(f"编码检测错误: {file_path} - {e}") return None, Nonedef remove_extra_newlines(text): """ 移除所有非指定标点后的回车 保留回车的情况:前面是中文句号(。)、感叹号(!)、问号(?)、省略号(……) 中文引号(「」『』"")、中文单引号(「」『') 其他情况下的回车全部移除 """ # 定义需要保留回车的标点 # 包括:中文句号、感叹号、问号、省略号、各种引号 keep_punctuation = r'[。!?…·「」『』""\'\']' # 处理省略号,可能是三个点或六个点 # 先将连续的...或……替换为特殊标记 text = re.sub(r'…{1,}', '…', text) # 标准化省略号 text = re.sub(r'\.{3,}', '…', text) # 将英文省略号转换为中文省略号 # 匹配所有回车,但前面是指定标点的不处理 # 使用负向回顾断言,匹配前面不是指定标点的回车 pattern = rf'(?<!{keep_punctuation})\n' # 替换所有匹配的回车为空字符串 result = re.sub(pattern, '', text) return resultdef write_file_with_fallback(file_path, content, original_encoding): """ 使用回退策略写入文件 1. 先尝试用原编码写入 2. 如果失败,尝试用UTF-8写入 3. 如果还失败,尝试用UTF-8并忽略错误 """ temp_path = None try: # 创建临时文件 temp_file = tempfile.NamedTemporaryFile( mode='wb', # 使用二进制模式写入 delete=False, suffix='.tmp', dir=os.path.dirname(file_path) # 在同目录创建临时文件 ) temp_path = temp_file.name temp_file.close() # 策略1: 先尝试用原编码写入 try: with open(temp_path, 'w', encoding=original_encoding, errors='strict') as f: f.write(content) return temp_path, original_encoding, True except UnicodeEncodeError as e1: # 策略2: 如果原编码失败,尝试UTF-8 try: with open(temp_path, 'w', encoding='utf-8', errors='strict') as f: f.write(content) return temp_path, 'utf-8', True except UnicodeEncodeError as e2: # 策略3: 如果UTF-8也失败,使用忽略错误的方式 with open(temp_path, 'w', encoding='utf-8', errors='ignore') as f: f.write(content) return temp_path, 'utf-8', True except Exception as e: return None, None, False except Exception as e: # 如果出错,尝试删除临时文件 if temp_path and os.path.exists(temp_path): try: os.unlink(temp_path) except: pass return None, None, Falsedef process_file(file_path, file_index=None, total_files=None): """处理单个文件,直接覆盖原文件""" # 显示进度 if file_index is not None and total_files is not None: print(f"\r正在处理文件 [{file_index}/{total_files}]: {os.path.basename(file_path)}", end="", flush=True) # 检查文件是否存在 if not os.path.exists(file_path): if file_index is not None and total_files is not None: print() # 换行 return f"文件不存在: {file_path}" # 检查是否是txt文件 if not file_path.lower().endswith('.txt'): if file_index is not None and total_files is not None: print() # 换行 return f"跳过非txt文件: {file_path}" # 获取文件大小 file_size = os.path.getsize(file_path) if file_size == 0: if file_index is not None and total_files is not None: print() # 换行 return f"跳过空文件: {file_path}" if file_size > 10 * 1024 * 1024: # 大于10MB的文件 if file_index is not None and total_files is not None: print() # 换行 confirm = input(f"警告: 文件 {os.path.basename(file_path)} 大小超过10MB,处理可能较慢。继续吗?(y/N): ").strip().lower() if confirm != 'y': return f"已跳过大文件: {file_path}" # 检测文件编码并读取内容 encoding, content = detect_file_encoding(file_path) if content is None or encoding is None: if file_index is not None and total_files is not None: print() # 换行 return f"无法解码文件: {file_path}" # 处理多余回车 try: new_content = remove_extra_newlines(content) except Exception as e: if file_index is not None and total_files is not None: print() # 换行 return f"处理文本时出错: {file_path} - {e}" # 如果内容没有变化,则跳过 if new_content == content: if file_index is not None and total_files is not None: print() # 换行 return f"无需处理: {file_path}" # 使用回退策略写入文件 temp_path, used_encoding, success = write_file_with_fallback(file_path, new_content, encoding) if not success or temp_path is None: if file_index is not None and total_files is not None: print() # 换行 return f"写入文件时出错: {file_path} - 无法创建临时文件" try: # 备份原文件 backup_path = file_path + '.bak' try: shutil.copy2(file_path, backup_path) except: pass # 如果备份失败,继续处理 # 用临时文件替换原文件 shutil.move(temp_path, file_path) if file_index is not None and total_files is not None: print() # 换行 encoding_info = "" if used_encoding != encoding: encoding_info = f" (编码从{encoding}改为{used_encoding})" return f"处理完成: {file_path}{encoding_info}" except Exception as e: # 如果出错,尝试删除临时文件 if temp_path and os.path.exists(temp_path): try: os.unlink(temp_path) except: pass if file_index is not None and total_files is not None: print() # 换行 return f"替换文件时出错: {file_path} - {e}" finally: # 确保临时文件被删除 if temp_path and os.path.exists(temp_path): try: os.unlink(temp_path) except: passdef process_path(path, show_progress=True): """处理路径,可能是文件或文件夹""" results = [] if os.path.isfile(path): # 如果是文件,直接处理 result = process_file(path) results.append(result) elif os.path.isdir(path): # 如果是文件夹,递归处理所有txt文件 all_files = [] for root, dirs, files in os.walk(path): for file in files: if file.lower().endswith('.txt'): file_path = os.path.join(root, file) all_files.append(file_path) total_files = len(all_files) if total_files == 0: return ["文件夹中没有txt文件"] if show_progress: print(f"找到 {total_files} 个txt文件") for i, file_path in enumerate(all_files, 1): if show_progress: result = process_file(file_path, i, total_files) else: result = process_file(file_path) results.append(result) # 每处理10个文件显示一次进度 if show_progress and i % 10 == 0: print(f"已处理 {i}/{total_files} 个文件") else: results.append(f"路径不存在: {path}") return resultsdef interactive_mode(): """交互式模式""" print("多余回车去除工具 (直接覆盖原文件)") print("-" * 40) print("规则:只保留指定标点后的回车") print("指定标点包括:。!?……「」『'\"") print("-" * 40) while True: print("\n请选择操作:") print("1. 处理单个文件") print("2. 处理文件夹") print("3. 退出") choice = input("请输入选项 (1/2/3): ").strip() if choice == "1": file_path = input("请输入txt文件路径: ").strip() if not file_path: print("路径不能为空。") continue # 处理文件 result = process_file(file_path) print(result) elif choice == "2": dir_path = input("请输入文件夹路径: ").strip() if not dir_path: print("路径不能为空。") continue if not os.path.isdir(dir_path): print("路径不是有效的文件夹。") continue # 确认 confirm = input("警告:这将覆盖文件夹内所有txt文件,是否继续?(y/N): ").strip().lower() if confirm != 'y': print("已取消操作。") continue # 处理文件夹 print(f"开始处理文件夹: {dir_path}") start_time = time.time() results = process_path(dir_path, show_progress=True) end_time = time.time() # 显示统计信息 success_count = len([r for r in results if '处理完成' in r]) skip_count = len([r for r in results if '跳过' in r]) no_change_count = len([r for r in results if '无需处理' in r]) error_count = len([r for r in results if '出错' in r]) print(f"\n处理完成,用时 {end_time-start_time:.2f} 秒") print(f"共找到 {len(results)} 个文件:") print(f" ✓ 成功处理: {success_count}") print(f" ○ 无需处理: {no_change_count}") print(f" - 跳过: {skip_count}") print(f" ✗ 错误: {error_count}") if error_count > 0: print("\n错误详情:") for result in results: if '出错' in result or '无法解码' in result: print(f" ✗ {result}") elif choice == "3": print("退出程序。") break else: print("无效选项,请重新输入。")def batch_mode(): """批量处理模式(通过命令行参数)""" print("多余回车去除工具 - 批量处理模式 (直接覆盖原文件)") print("-" * 50) print("规则:只保留指定标点后的回车") print("指定标点包括:。!?……「」『'\"") print("-" * 50) # 获取所有命令行参数(跳过脚本名本身) paths = sys.argv[1:] if not paths: print("没有提供文件或文件夹路径。") return # 确认是否继续 if len(paths) > 1 or (len(paths) == 1 and os.path.isfile(paths[0])): confirm = input("警告:这将直接覆盖原文件,是否继续?(y/N): ").strip().lower() if confirm != 'y': print("已取消操作。") if sys.platform == "win32": input("按回车键退出...") return all_results = [] start_time = time.time() for i, path in enumerate(paths, 1): # 如果路径被引号包裹,去除引号 path = path.strip('"\'') print(f"\n处理路径 {i}/{len(paths)}: {path}") results = process_path(path, show_progress=True) all_results.extend(results) end_time = time.time() # 统计和显示结果 success_count = len([r for r in all_results if '处理完成' in r]) skip_count = len([r for r in all_results if '跳过' in r]) no_change_count = len([r for r in all_results if '无需处理' in r]) error_count = len([r for r in all_results if '出错' in r or '无法解码' in r]) print(f"\n{'='*50}") print(f"批量处理完成! 总用时: {end_time-start_time:.2f} 秒") print(f"成功处理: {success_count} 个文件") print(f"无需处理: {no_change_count} 个文件") print(f"跳过: {skip_count} 个非txt文件") print(f"错误: {error_count} 个") if error_count > 0: print("\n错误详情:") for result in all_results: if '出错' in result or '无法解码' in result: print(f" ✗ {result}")def main(): """ 主函数 判断运行模式: 1. 如果有命令行参数,则进入批量处理模式 2. 如果没有参数,则进入交互式模式 """ # 检查是否有命令行参数(除了脚本名) if len(sys.argv) > 1: # 批量处理模式 batch_mode() # 在Windows下,如果是双击运行或拖拽运行,保持打开窗口 if sys.platform == "win32": input("\n按回车键退出...") else: # 交互式模式 interactive_mode()if __name__ == "__main__": main()