记一次基于playwright-python的工具
配置封装
依赖安装
pip install configparser
配置文件设计(config.ini)
[Browser]
port = 9222
title = 开始 | 钉钉宜搭
url = https://nozkyz.aliwork.com/workPlatform
;path = C:\Chrome\App\chrome.exe
[Excel]
live_room_path = ./files/直播间录入.xlsx
infringement_song_path = ./files/侵权歌曲录入.xlsx
# 直播间,侵权歌曲
[Strategy]
enabled_strategies = 直播间,侵权歌曲
; 下面的省略...
配置的加载与验证(util/config_utils.py)
- 配置的校验
- 必填配置项的校验
- 定制化配置参数的数据处理可在此实现
def _validate_config(config: configparser.ConfigParser) -> Dict[str, Any]:
config_dict = {k: dict(config.items(k)) for k in config.sections()}
require_sections = {
"Browser": ["path", "data"],
}
for section, keys in require_sections.items():
for key in keys:
if key not in config_dict[section]:
raise ValueError(f"配置文件缺少 {section} 配置项 {key}")
# 转换enabled_strategies 为列表
config_dict["Strategy"]["enabled_strategies"] = [
s.strip() for s in config["Strategy"]["enabled_strategies"].split(",")
]
return config_dict
- 配置文件的加载
def _load_config_file(config_file: Path) -> Dict[str, Any]:
config = configparser.ConfigParser()
config.optionxform = str
try:
config.read(config_file, encoding="utf-8")
return _validate_config(config)
except Exception as e:
raise ValueError(f"无法读取配置文件 {config_file}: {e}")
pass
- 对外暴露的util方法
def load_config(config_path: str = None) -> Dict[str, Any]:
"""
加载配置文件,明确区分开发环境和生产环境
规则:
1. 如果指定了config_path参数,优先使用该路径
2. 开发环境使用项目根目录下的config.ini
3. 生产环境使用exe同级目录下的config.ini
4. 如果都找不到,抛出明确的错误
"""
# 1. 如果指定了config_path参数,优先使用
if config_path:
logger.info(f"加载配置文件: {config_path}")
config_file = Path(config_path)
if config_file.exists():
return _load_config_file(config_file)
# 2. 判断环境并确定默认配置文件路径
if getattr(sys, 'frozen', False):
# 生产环境 - 使用exe同级目录下的config.ini
config_file = Path(sys.executable).parent / "config.ini"
env_type = "生产环境"
else:
# 开发环境 - 使用项目根目录下的config.ini
config_file = Path(__file__).parent.parent / "config.ini"
env_type = "开发环境"
# 调试信息(生产环境可移除)
logger.info(f"{env_type}配置文件路径: {config_file}")
logger.info(f"当前工作目录: {Path.cwd()}")
# 3. 检查并加载配置文件
if config_file.exists():
return _load_config_file(config_file)
# 4. 配置文件不存在,抛出详细错误
error_msg = (
f"未找到{env_type}配置文件\n"
f"期望路径: {config_file}\n"
f"当前工作目录: {Path.cwd()}\n"
)
if env_type == "生产环境":
error_msg += (
"解决方案:\n"
"1. 请确保config.ini文件与可执行文件在同一目录\n"
"2. 或者通过命令行参数指定配置文件路径"
)
raise FileNotFoundError(error_msg)
- 调用方(伪代码)
from util import config_utils, browser_utils
# 加载配置
config = config_utils.load_config(f"{RESOURCE_PATH}/config.ini")
# 配置的使用
download_dir = config["Platform"]["taobao_download"]
excel_out_dir = config["Platform"]["taobao_excel_out"]
钩子函数获取环境路径
# hook/runtime_hook.py
from pathlib import Path
def _fix_resource_path():
# 钩子路径
dev_path = Path(__file__).parent.parent
if dev_path.exists():
return str(dev_path)
# 设置全局变量
RESOURCE_PATH = _fix_resource_path()
- 变量的的使用
config_utils.load_config(f"{RESOURCE_PATH}/config.ini")
附:说明
getattr(sys, ‘frozen’, False) 是 Python 中一个常用的检查代码是否被打包成独立可执行文件的惯用方法。它的作用如下:
解释:
sys 模块:Python 的内置系统模块,提供与 Python 解释器交互的功能。
getattr() 函数:获取对象的属性值,如果属性不存在则返回默认值。
语法:getattr(object, name, default)
‘frozen’ 属性:
当 Python 程序被工具(如 PyInstaller、cx_Freeze、py2exe 等)打包成独立可执行文件时,这些工具通常会在 sys 模块中设置 sys.frozen = True。
在普通 Python 解释器环境中运行时,sys 模块默认没有 frozen 属性。
False:默认值,表示如果 sys.frozen 属性不存在,则返回 False。
字典推导式
{key_expr: value_expr for item in iterable if condition}
- key_expr: 生成字典键值的表达式
- value_expr: 生成字典值的表达式
- item: 可迭代对象(如列表、元组、集合等)中的每个元素
- if condition(可选):过滤条件,仅复合条件的元素会被包含在字典中
## 从列表生成字典
words = ["apple", "banana", "pear"]
dict1 = {word: len(word) for word in words}
print(dict1) # {'apple': 5, 'banana': 6, 'pear': 4}
## 带条件的推导式
dict2 = {word: len(word) for word in words if len(word) > 5}
print(dict2) # {'banana': 6}
## 从2个数组创建字典
keys = ["a", "b", "c"]
values = [1, 2, 3]
dict3 = {k: v for k, v in zip(keys, values)}
print(dict3) # {'a': 1, 'b': 2, 'c': 3}
## 转换现有字典
o_dict = {"a": 1, "b": 2, "c": 3}
n_dict = {k: v for k, v in o_dict.items() if v > 1}
print(n_dict) # {'b': 2, 'c': 3}
## 与for循环的对比
squares = {k: k**2 for k in range(1, 6)}
print(squares) # {1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
squares2 = {}
for x in range(1, 6):
squares2[x] = x**2
print(squares2) # {1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
日志系统
配置文件 config/logging_config.py
# config/logging_config.py
import logging
import sys
from pathlib import Path
def setup_logging():
"""配置全局日志系统,兼容打包和开发环境"""
# 确定基础路径
if getattr(sys, 'frozen', False):
# 打包环境 - 使用exe所在目录
base_path = Path(sys.executable).parent
else:
# 开发环境 - 使用项目根目录
base_path = Path(__file__).parent.parent
# 创建日志目录(确保可写)
log_dir = base_path / "logs"
try:
log_dir.mkdir(exist_ok=True, mode=0o755)
except Exception as e:
print(f"无法创建日志目录: {e}")
log_dir = Path.cwd() / "logs" # 回退到当前工作目录
log_dir.mkdir(exist_ok=True, mode=0o755)
# 配置日志
log_file = log_dir / "application.log"
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler(sys.stdout)
]
)
# 设置第三方库的日志级别
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("PIL").setLevel(logging.WARNING)
初始化及使用
# gui加载日志系统
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
# 加载日志系统
logging_config.setup_logging()
self.logger = logging.getLogger(__name__) # 使用统一日志系统
其他类需要引用仅需在最上层初始化即可,如下:
import logging
logger = logging.getLogger(__name__)
gui 日志组件
- 日志处理器 config/gui_log_handler.py
import logging
from PyQt6.QtGui import QColor
class GuiLogHandler(logging.Handler):
"""自定义日志处理器,将日志输出到GUI文本框"""
def __init__(self, text_widget):
super().__init__()
self.text_widget = text_widget
self.setFormatter(logging.Formatter(
'%(asctime)s [%(levelname)s] %(filename)s:%(lineno)d - %(message)s' # 统一格式
))
self.default_color = self.text_widget.textColor() # 保存初始颜色
def emit(self, record):
msg = self.format(record)
try:
# 先保存当前颜色(避免影响其他代码)
current_color = self.text_widget.textColor()
# 设置颜色
if record.levelno >= logging.ERROR:
self.text_widget.setTextColor(QColor("red"))
elif record.levelno >= logging.WARNING:
self.text_widget.setTextColor(QColor("orange"))
else:
self.text_widget.setTextColor(self.default_color)
self.text_widget.append(msg)
# 恢复原来的颜色(避免影响后续文本)
self.text_widget.setTextColor(current_color)
except Exception as e:
print(f"Error in GuiLogHandler: {e}")
- 主窗口新增组件
def setup_ui(self):
# 日志显示区域
self.log_output = QTextEdit()
self.log_output.setReadOnly(True)
layout.addWidget(self.log_output)
- setup_ui 之后绑定组件
from config import logging_config, gui_log_handler
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
## 其他逻辑
self.setup_ui() # UI设置
# 设置GUI日志处理器
self.setup_logging_handler()
def setup_logging_handler(self):
"""设置GUI专用的日志处理器"""
self.gui_handler = gui_log_handler.GuiLogHandler(self.log_output)
self.gui_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(self.gui_handler)
self.logger.info("GUI日志处理器初始化完成")
策略模式
策略接口(strategies.base_stratety.py)
from abc import ABC, abstractmethod
from threading import Event
from typing import Dict, Any, Optional
from playwright.sync_api import BrowserContext, Page
class DataProcessingStrategy(ABC):
"""数据处理策略接口"""
@abstractmethod
def process(
self,
context: BrowserContext,
page: Page,
config: Dict[str, Any],
search_mode: str,
keyword: Optional[str] = None,
image_path: Optional[str] = None,
stop_event: Optional[Event] = None
):
"""执行数据处理"""
pass
策略工厂(strategies._init_.py)
from .base_strategy import DataProcessingStrategy
from .infringement_song_strategy import InfringementSongStrategy
from .live_room_strategy import LiveRoomStrategy
class StrategyFactory:
"""策略工厂(集中管理所有策略)"""
@staticmethod
def create_strategy(strategy_name: str) -> DataProcessingStrategy:
strategies = {
'直播间': LiveRoomStrategy,
'侵权歌曲': InfringementSongStrategy
# 可以继续添加其他策略...
}
if strategy_name not in strategies:
raise ValueError(f"未知策略: {strategy_name}")
return strategies[strategy_name]()
具体实现类(如:直播间)
`from typing import Dict, Any
import pandas as pd
from playwright.sync_api import Page, BrowserContext
from strategies.base_strategy import DataProcessingStrategy
class LiveRoomStrategy(DataProcessingStrategy):
"""直播间录入策略"""
def process(self, context: BrowserContext, page: Page, config: Dict[str, Any]):
## 处理逻辑
def get_excel_path(self, config: Dict[str, Any]) -> str:
return config['Excel']['live_room_path']
def validate_data(self, data: 'pd.DataFrame') -> bool:
required_columns = {'直播间名称ID', '直播间名称', '蓝V平台认证公司名称', '状态'}
return required_columns.issubset(set(data.columns))
def _load_data(self, config: Dict[str, Any]) -> 'pd.DataFrame':
excel_path = self.get_excel_path(config)
data = pd.read_excel(excel_path)
if not self.validate_data(data):
raise ValueError("Excel文件格式不符合要求")
print(f"成功加载 excel {excel_path}")
return data
策略的使用
from strategies import StrategyFactory
def main():
# 拿到所有配置的策略
enabled_strategies = config['Strategy']['enabled_strategies']
for stratety_name in enabled_strategies:
strategy = StrategyFactory.create_strategy(strategy_name.strip())
strategy.process(context, page, config)
Playwright浏览器复用模板
util.browser_utils.py
import logging
import socket
import subprocess
from typing import Optional, Callable, TypeVar
from playwright.sync_api import BrowserContext, Playwright, Page
logger = logging.getLogger(__name__)
def is_port_in_use(port: int | str) -> bool:
"""检查浏览器端口是否被占用"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
if type(port) == str:
port = int(port)
s.bind(("localhost", port))
return False
except socket.error:
return True
def connect_browser_with_port(p: Playwright, port: int) -> BrowserContext:
"""根据端口初始化浏览器"""
if not is_port_in_use(port):
raise ConnectionError(f'端口 {port} 未被监听,请确保浏览器已经启动并监听该端口')
try:
browser = p.chromium.connect_over_cdp(endpoint_url=f'http://localhost:{port}')
context = browser.contexts[0] if browser.contexts else browser.new_context()
context.set_default_timeout(10000)
logger.info(f"成功连接到浏览器, 端口: {port}")
return context
except Exception as e:
raise ConnectionError(f"连接失败, {e}")
def kill_chrome_processes():
# 先杀死现有的 Chrome 进程(Windows)
try:
subprocess.run(["taskkill", "/f", "/im", "chrome.exe"], check=True)
except subprocess.CalledProcessError:
print("没有找到 Chrome 进程或杀死进程失败")
def launch_chrome_with_options(
p: Playwright,
path: str,
data_dir: str,
port: str = "9333",
) -> BrowserContext:
"""自定义打开新的浏览器"""
kill_chrome_processes()
context = p.chromium.launch_persistent_context(
user_data_dir=data_dir,
executable_path=path,
headless=False,
args=[
f"--remote-debugging-port={port}",
"--start-maximized"
],
no_viewport=True,
ignore_default_args=["--enable-automation"], # 避免被检测为自动化工具
)
context.set_default_timeout(10000)
logger.info(f"成功启动浏览器, 端口: {port}")
return context
"""
只保留浏览器1个页面
"""
def retain_only_one_page(context: BrowserContext):
for page in context.pages:
if len(context.pages) > 1:
page.bring_to_front()
page.wait_for_timeout(500)
page.close()
def safe_extract(page: Page, xpath, index=0, timeout=500) -> str | None:
"""安全提取元素"""
try:
return page.locator(xpath).nth(index).text_content(timeout=timeout) or None
except Exception:
return None
T = TypeVar("T")
def safe_page_query(
page: Page,
action: Callable[[Page], T],
*,
default: Optional[T] = None,
) -> Optional[T]:
"""安全执行页面"""
try:
return action(page)
except Exception:
return default
快速使用
def _execute_playwright(self):
playwright = None
browser_context = None
try:
playwright = sync_playwright().start()
if browser_utils.is_port_in_use(self.config["Browser"]["port"]):
# 初始化浏览器
browser_context = browser_utils.connect_browser_with_port(playwright, 【port】)
# 保留一个浏览器
browser_utils.retain_only_one_page(browser_context)
else:
browser_context = browser_utils.launch_chrome_with_options(
playwright,
【path】,
【data】,
【port】
)
page = browser_context.pages[0]
finally:
# 释放资源
try:
if browser_context:
browser_context.close()
except Exception as e:
self.logger.error(f"关闭浏览器上下文时发生错误: {e}")
try:
if playwright:
playwright.stop()
except Exception as e:
self.logger.error(f"停止Playwright实例时发生错误: {e}")
threading.Event 的事件监听
threading.Event() 详解
threading.Event()
是 Python 线程同步的一个基本工具,它提供了一个简单的线程间通信机制。以下是它的核心特性和用法:
- 基本特性
- 二进制标志:Event 对象内部维护一个布尔标志,初始为 False
- 线程安全:所有操作都是原子性的,适合多线程环境
- 轻量级:相比锁机制更轻量,适合简单的线程协调
- 主要方法
event = threading.Event()
# 设置标志为 True
event.set()
# 重置标志为 False
event.clear()
# 检查标志状态(非阻塞)
event.is_set()
# 等待标志变为 True(阻塞)
event.wait(timeout=None)
基于gui场景的实际应用
监听工具类 util/cancel_check.py
"""
cancel_check.py
任务取消检查工具模块
"""
import logging
from typing import Optional
import threading
logger = logging.getLogger(__name__)
class TaskCancelledError(RuntimeError):
"""自定义任务取消异常"""
def __init__(self, msg="任务已被取消"):
super().__init__(msg)
logger.warning(msg)
class CancellationChecker:
"""
取消检查工具类
使用示例:
checker = CancellationChecker(stop_event)
checker.raise_if_cancelled()
"""
def __init__(self, stop_event: Optional[threading.Event] = None):
self.stop_event = stop_event
def raise_if_cancelled(self):
"""如果任务已取消则抛出异常"""
if self.stop_event and self.stop_event.is_set():
raise TaskCancelledError()
@property
def is_cancelled(self) -> bool:
"""检查是否已取消(不抛出异常)"""
return bool(self.stop_event and self.stop_event.is_set())
def set_stop_event(self):
"""设置取消事件"""
self.stop_event.set()
def clear_stop_event(self):
"""清除取消事件"""
self.stop_event.clear()
gui 使用
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
## 其余初始化
# 新增停止标志
self.stop_event = threading.Event()
self.cancel_checker = cancel_check.CancellationChecker(self.stop_event)
## 其余逻辑...
def cancel_task(self):
"""取消任务"""
self.cancel_checker.set_stop_event()
self.logger.warning("取消任务...")
self.cancel_btn.setEnabled(False)
self.run_btn.setEnabled(True)
def run_strategy(self):
"""执行策略"""
# 重置停止标记
self.cancel_checker.clear_stop_event()
# 禁用执行按钮
self.run_btn.setEnabled(False)
# 启用取消按钮
self.cancel_btn.setEnabled(True)
def _execute_playwright(self):
try:
# 检查点
self.cancel_checker.raise_if_cancelled()
## 其他逻辑
except TaskCancelledError:
return
finally:
# 启用执行按钮
self.run_btn.setEnabled(True)
# 禁用取消按钮
self.cancel_btn.setEnabled(False)
动态表头映射
背景:为了兼容excel中的表头动态变化,因此这里可以在配置文件做一个动态映射
配置文件 config.ini
[ExcelHeaders.InfringementSong]
COMPANY = 隶属公司名称
ACCOUNT_ID = 账号ID
DATE = 取证日期
SONG = 歌曲
PERFORMER = 表演者
TIME_POINT = 所在证据时间点
EVIDENCE_LINK = 证据链接
EXTRACTION_CODE = 提取码
STATUS = 状态
[ExcelHeaders.LiveRoom]
ROOM_ID = 直播间名称ID
ROOM_NAME = 直播间名称
PLATFORM = 所在平台
COMPANY = 蓝V平台认证公司名称
STATUS = 状态
配置文件 config.excel_header.py
from typing import Dict, Any, ClassVar
class ExcelHeaderConfig:
"""Excel表头配置中心"""
_headers_loaded: ClassVar[bool] = False
# 侵权歌曲录入表头
INFRINGEMENT_SONG: ClassVar[Dict[str, str]] = {}
# 直播间录入表头
LIVE_ROOM: ClassVar[Dict[str, str]] = {}
@classmethod
def load_headers(cls, config: Dict[str, Any]):
"""从配置文件加载表头映射"""
if cls._headers_loaded:
return
# 加载侵权歌曲表头
cls.INFRINGEMENT_SONG = {
key: value
for key, value in config.items("ExcelHeaders.InfringementSong")
}
# 加载直播间表头
cls.LIVE_ROOM = {
key: value
for key, value in config.items("ExcelHeaders.LiveRoom")
}
cls._headers_loaded = True
class HeaderMapper:
"""表头映射工具类"""
def __init__(self, headers: dict):
# headers 结构应为 {字段键: 表头文字}
self.headers = {k.lower(): v for k, v in headers.items()}
# 创建反向映射 {表头文字: 字段键}
self.reverse_map = {v: k for k, v in headers.items()}
def get_excel_header(self, field_key: str) -> str:
# 大小写不敏感
key = field_key.lower()
"""通过字段键获取Excel表头文字"""
if key not in self.headers:
raise ValueError(f"未知字段键: {field_key} (尝试使用 {key})")
return self.headers[key] # 返回表头文字
def get_field_key(self, excel_header: str) -> str:
"""通过Excel表头文字获取字段键"""
return self.reverse_map.get(excel_header, excel_header)
配置的加载与使用
## 配置的加载(在主方法配置)
ExcelHeaderConfig.load_headers(config)
## mapper的实例化
class InfringementSongStrategy(DataProcessingStrategy):
def __init__(self):
## 初始化mapper
self.mapper = HeaderMapper(ExcelHeaderConfig.INFRINGEMENT_SONG)
def process(self, context: BrowserContext, page: Page, config: Dict[str, Any]):
## 获取字段 表头文字
company_header = self.mapper.get_excel_header("COMPANY")
打包与部署
安装 pyinstaller
pip install pyinstaller
打包配置文件 build.spec
单文件打包
# buildsingle.spec
# -*- mode: python -*-
from PyInstaller.utils.hooks import collect_all
import shutil
from pathlib import Path
# 自动收集 fitz 的所有依赖(包括 DLL)
datas, binaries, hiddenimports = collect_all("fitz")
exe_name = "知识产权工具"
a = Analysis(
["main.py"], # 主程序入口文件(必填)
pathex=["."], # Python模块搜索路径(列表形式)
binaries=binaries, # 收集的二进制依赖文件(如DLL)
hiddenimports=hiddenimports, # 显式声明隐藏导入的模块
hookspath=[], # 自定义hook文件路径
runtime_hooks=[], # 运行时hook脚本
win_no_prefer_redirects=False, # Windows禁用重定向
win_private_assemblies=False, # 不使用私有程序集
cipher=None, # 加密密钥(用于防反编译)
noarchive=False, # 是否禁用单文件打包
optimize=1, # 优化级别(0-2)
excludes=[ # 排除不需要的模块(减少体积)
"PyQt6.QtWebEngine",
"PyQt6.QtSql",
"PyQt6.QtNetwork",
],
)
pyz = PYZ(a.pure, a.zipped_data, cipher=None)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
name=f"{exe_name}", # 输出exe文件名(不带后缀)
debug=False,
strip=False, # 是否去除符号表(Linux/Mac)
upx=False, # 是否用UPX压缩可执行文件(减小体积)
console=False, # 是否显示控制台窗口(False为GUI程序)
icon="icon/favicon.ico", # 图标
)
dist_dir = Path('dist')
exe_dir = dist_dir / exe_name # dist/{exe_name}/ ##方便打压缩包
# 将编译文件移动至【dist/工具名/工具名.exe】
def _post_build():
if not exe_dir.exists():
exe_dir.mkdir(parents=True)
src = dist_dir / f'{exe_name}.exe' # 编译后文件
dst = exe_dir / f'{exe_name}.exe' # 上次编译后文件
if dst.exists(): # 如果上次编译文件残留
dst.unlink() # 删除旧文件
# 移动文件到子目录
src.rename(dst)
# 此处根据需求添加代码
def _config_build():
# 打包完成后自动复制config.ini到dist目录
shutil.copy('config.ini', exe_dir)
# file_dir = Path('files')
# # 将文件夹下的文件复制到打包目录
# shutil.copytree(file_dir, exe_dir / 'files', dirs_exist_ok=True)
_post_build()
_config_build()
文件夹形式打包
# buildcoll.spec
# -*- mode: python -*-
from PyInstaller.utils.hooks import collect_all
import shutil
from pathlib import Path
# 自动收集 fitz 的所有依赖(包括 DLL)
datas, binaries, hiddenimports = collect_all("fitz")
exe_name = "知识产权工具"
a = Analysis(
["main.py"], # 主程序入口文件(必填)
pathex=["."], # Python模块搜索路径(列表形式)
binaries=binaries, # 收集的二进制依赖文件(如DLL)
datas=[("config.ini", "."),], # 额外数据文件(格式:(源路径, 目标目录))
hiddenimports=hiddenimports, # 显式声明隐藏导入的模块
noarchive=True, # 是否禁用单文件打包
optimize=1, # 优化级别(0-2)
excludes=[ # 排除不需要的模块(减少体积)
"PyQt6.QtWebEngine",
"PyQt6.QtSql",
"PyQt6.QtNetwork",
],
)
pyz = PYZ(a.pure, a.zipped_data, cipher=None)
# 先构建EXE对象
exe = EXE(
pyz,
a.scripts,
name=exe_name,
debug=False,
strip=False,
upx=False,
console=False,
icon="icon/favicon.ico"
)
# 添加 COLLECT 构建文件夹结构
coll = COLLECT(
exe, # exe
a.binaries,
a.datas,
name=exe_name, # 输出文件夹名
strip=False,
upx=False,
)
文件夹形式要另外配置钩子函数获取相对路径
hook/runtime_hook.py
# hook/runtime_hook.py
import sys
from pathlib import Path
def _fix_resource_path():
# 钩子路径
dev_path = Path(__file__).parent.parent
# 生产环境 - 使用exe同级目录下的config.ini
dist_path = Path(sys.executable).parent
# 临时解压路径(单文件模式)
temp_path = Path(getattr(sys, '_MEIPASS', ''))
if dev_path.exists():
return str(dev_path)
elif dist_path.exists():
return str(dist_path)
elif temp_path.exists():
return str(temp_path)
else:
raise FileNotFoundError("找不到 资源路径")
# 设置全局变量
RESOURCE_PATH = _fix_resource_path()
钩子函数的使用
# 获取资源路径下的配置文件
self.config = config_utils.load_config(f"{RESOURCE_PATH}/config.ini")
打包
pyinstaller --clean buildsingle.spec
pyinstaller --clean buildcoll.spec
附:创建虚拟环境
python -m venv .venv
.\.venv\Scripts\activate # Windows
# 或 source bin/activate (Linux/Mac)
附:环境维护
- 生成 requirements.txt
pip freeze > requirements.txt
- 新环境初始化项目
# 在新环境中测试
python -m venv .venv
.\.venv\Scripts\activate
pip install -r requirements.txt
python main.py # 运行程序验证功能
enumerate 用法与 set
enumerate
enumerate() 是 Python 内置的一个非常有用的函数,用于在遍历序列(如列表、元组、字符串等)时同时获取元素的索引和值。
- 遍历列表获取索引和值
fruits = ['apple', 'banana', 'cherry']
for index, fruit in enumerate(fruits):
print(index, fruit)
输出:
0 apple
1 banana
2 cherry
- 指定起始值
for index, fruit in enumerate(fruits, start=1):
print(index, fruit)
输出:
1 apple
2 banana
3 cherry
- 本次使用方式
for song_index, (index, row) in enumerate(group.iterrows(), start=1):
print(f"正在添加歌曲: {row.to_dict()}")
if song_index > 1:
lc.locator("//span[text()='新增一项']").click()
n_page.wait_for_load_state("networkidle")
集合 set {}
required_columns = {'直播间名称ID', '直播间名称', '蓝V平台认证公司名称', '状态'}
# 确认是否为子集合
required_columns.issubset(set(data.columns))