玩命加载中🤣🤣🤣

基于playwright的工具


记一次基于playwright-python的工具

配置封装

依赖安装

pip install configparser

配置文件设计(config.ini)

[Browser]
port = 9222
title = 开始 | 钉钉宜搭
url = https://nozkyz.aliwork.com/workPlatform
;path = C:\Chrome\App\chrome.exe

[Excel]
live_room_path = ./files/直播间录入.xlsx
infringement_song_path = ./files/侵权歌曲录入.xlsx

# 直播间,侵权歌曲
[Strategy]
enabled_strategies = 直播间,侵权歌曲

; 下面的省略...

配置的加载与验证(util/config_utils.py)

  • 配置的校验
    • 必填配置项的校验
    • 定制化配置参数的数据处理可在此实现
def _validate_config(config: configparser.ConfigParser) -> Dict[str, Any]:
    config_dict = {k: dict(config.items(k)) for k in config.sections()}
    require_sections = {
        "Browser": ["path", "data"],
    }
    for section, keys in require_sections.items():
        for key in keys:
            if key not in config_dict[section]:
                raise ValueError(f"配置文件缺少 {section} 配置项 {key}")


    # 转换enabled_strategies 为列表
    config_dict["Strategy"]["enabled_strategies"] = [
        s.strip() for s in config["Strategy"]["enabled_strategies"].split(",")
    ]
    return config_dict
  • 配置文件的加载
def _load_config_file(config_file: Path) -> Dict[str, Any]:
    config = configparser.ConfigParser()
    config.optionxform = str
    try:
        config.read(config_file, encoding="utf-8")
        return _validate_config(config)
    except Exception as e:
        raise ValueError(f"无法读取配置文件 {config_file}: {e}")

    pass
  • 对外暴露的util方法
def load_config(config_path: str = None) -> Dict[str, Any]:
    """
    加载配置文件,明确区分开发环境和生产环境

    规则:
    1. 如果指定了config_path参数,优先使用该路径
    2. 开发环境使用项目根目录下的config.ini
    3. 生产环境使用exe同级目录下的config.ini
    4. 如果都找不到,抛出明确的错误
    """
    # 1. 如果指定了config_path参数,优先使用
    if config_path:
        logger.info(f"加载配置文件: {config_path}")
        config_file = Path(config_path)
        if config_file.exists():
            return _load_config_file(config_file)


    # 2. 判断环境并确定默认配置文件路径
    if getattr(sys, 'frozen', False):
        # 生产环境 - 使用exe同级目录下的config.ini
        config_file = Path(sys.executable).parent / "config.ini"
        env_type = "生产环境"
    else:
        # 开发环境 - 使用项目根目录下的config.ini
        config_file = Path(__file__).parent.parent / "config.ini"
        env_type = "开发环境"

    # 调试信息(生产环境可移除)
    logger.info(f"{env_type}配置文件路径: {config_file}")
    logger.info(f"当前工作目录: {Path.cwd()}")

    # 3. 检查并加载配置文件
    if config_file.exists():
        return _load_config_file(config_file)

    # 4. 配置文件不存在,抛出详细错误
    error_msg = (
        f"未找到{env_type}配置文件\n"
        f"期望路径: {config_file}\n"
        f"当前工作目录: {Path.cwd()}\n"
    )
    if env_type == "生产环境":
        error_msg += (
            "解决方案:\n"
            "1. 请确保config.ini文件与可执行文件在同一目录\n"
            "2. 或者通过命令行参数指定配置文件路径"
        )
    raise FileNotFoundError(error_msg)
  • 调用方(伪代码)
from util import config_utils, browser_utils

# 加载配置
config = config_utils.load_config(f"{RESOURCE_PATH}/config.ini")
# 配置的使用
download_dir = config["Platform"]["taobao_download"]
excel_out_dir = config["Platform"]["taobao_excel_out"]

钩子函数获取环境路径

# hook/runtime_hook.py
from pathlib import Path

def _fix_resource_path():
    # 钩子路径
    dev_path = Path(__file__).parent.parent

    if dev_path.exists():
        return str(dev_path)

# 设置全局变量
RESOURCE_PATH = _fix_resource_path()
  • 变量的的使用
config_utils.load_config(f"{RESOURCE_PATH}/config.ini")

附:说明

getattr(sys, ‘frozen’, False) 是 Python 中一个常用的检查代码是否被打包成独立可执行文件的惯用方法。它的作用如下:

解释:
sys 模块:Python 的内置系统模块,提供与 Python 解释器交互的功能。
getattr() 函数:获取对象的属性值,如果属性不存在则返回默认值。
语法:getattr(object, name, default)
‘frozen’ 属性:
当 Python 程序被工具(如 PyInstaller、cx_Freeze、py2exe 等)打包成独立可执行文件时,这些工具通常会在 sys 模块中设置 sys.frozen = True。
在普通 Python 解释器环境中运行时,sys 模块默认没有 frozen 属性。
False:默认值,表示如果 sys.frozen 属性不存在,则返回 False。

字典推导式

{key_expr: value_expr for item in iterable if condition}

  • key_expr: 生成字典键值的表达式
  • value_expr: 生成字典值的表达式
  • item: 可迭代对象(如列表、元组、集合等)中的每个元素
  • if condition(可选):过滤条件,仅复合条件的元素会被包含在字典中
## 从列表生成字典
words = ["apple", "banana", "pear"]
dict1 = {word: len(word) for word in words}
print(dict1) # {'apple': 5, 'banana': 6, 'pear': 4}

## 带条件的推导式
dict2 = {word: len(word) for word in words if len(word) > 5}
print(dict2) # {'banana': 6}

## 从2个数组创建字典
keys = ["a", "b", "c"]
values = [1, 2, 3]
dict3 = {k: v for k, v in zip(keys, values)}
print(dict3) # {'a': 1, 'b': 2, 'c': 3}

## 转换现有字典
o_dict = {"a": 1, "b": 2, "c": 3}
n_dict = {k: v for k, v in o_dict.items() if v > 1}
print(n_dict) # {'b': 2, 'c': 3}

## 与for循环的对比
squares = {k: k**2 for k in range(1, 6)}
print(squares) # {1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
squares2 = {}
for x in range(1, 6):
    squares2[x] = x**2
print(squares2) # {1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

日志系统

配置文件 config/logging_config.py

# config/logging_config.py
import logging
import sys
from pathlib import Path


def setup_logging():
    """配置全局日志系统,兼容打包和开发环境"""
    # 确定基础路径
    if getattr(sys, 'frozen', False):
        # 打包环境 - 使用exe所在目录
        base_path = Path(sys.executable).parent
    else:
        # 开发环境 - 使用项目根目录
        base_path = Path(__file__).parent.parent

    # 创建日志目录(确保可写)
    log_dir = base_path / "logs"
    try:
        log_dir.mkdir(exist_ok=True, mode=0o755)
    except Exception as e:
        print(f"无法创建日志目录: {e}")
        log_dir = Path.cwd() / "logs"  # 回退到当前工作目录
        log_dir.mkdir(exist_ok=True, mode=0o755)

    # 配置日志
    log_file = log_dir / "application.log"
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
        handlers=[
            logging.FileHandler(log_file, encoding='utf-8'),
            logging.StreamHandler(sys.stdout)
        ]
    )

    # 设置第三方库的日志级别
    logging.getLogger("urllib3").setLevel(logging.WARNING)
    logging.getLogger("PIL").setLevel(logging.WARNING)

初始化及使用

# gui加载日志系统
class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        # 加载日志系统
        logging_config.setup_logging()
        self.logger = logging.getLogger(__name__)  # 使用统一日志系统

其他类需要引用仅需在最上层初始化即可,如下:

import logging

logger = logging.getLogger(__name__)

gui 日志组件

  • 日志处理器 config/gui_log_handler.py
import logging

from PyQt6.QtGui import QColor


class GuiLogHandler(logging.Handler):
    """自定义日志处理器,将日志输出到GUI文本框"""

    def __init__(self, text_widget):
        super().__init__()
        self.text_widget = text_widget
        self.setFormatter(logging.Formatter(
            '%(asctime)s [%(levelname)s] %(filename)s:%(lineno)d - %(message)s'  # 统一格式
        ))
        self.default_color = self.text_widget.textColor()  # 保存初始颜色

    def emit(self, record):
        msg = self.format(record)
        try:
            # 先保存当前颜色(避免影响其他代码)
            current_color = self.text_widget.textColor()

            # 设置颜色
            if record.levelno >= logging.ERROR:
                self.text_widget.setTextColor(QColor("red"))
            elif record.levelno >= logging.WARNING:
                self.text_widget.setTextColor(QColor("orange"))
            else:
                self.text_widget.setTextColor(self.default_color)

            self.text_widget.append(msg)

            # 恢复原来的颜色(避免影响后续文本)
            self.text_widget.setTextColor(current_color)

        except Exception as e:
            print(f"Error in GuiLogHandler: {e}")
  • 主窗口新增组件
def setup_ui(self):    
    # 日志显示区域
    self.log_output = QTextEdit()
    self.log_output.setReadOnly(True)
    layout.addWidget(self.log_output)
  • setup_ui 之后绑定组件
from config import logging_config, gui_log_handler

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        ## 其他逻辑
        self.setup_ui()  # UI设置
        # 设置GUI日志处理器
        self.setup_logging_handler()

    def setup_logging_handler(self):
        """设置GUI专用的日志处理器"""
        self.gui_handler = gui_log_handler.GuiLogHandler(self.log_output)
        self.gui_handler.setLevel(logging.INFO)
        logging.getLogger().addHandler(self.gui_handler)
        self.logger.info("GUI日志处理器初始化完成")

策略模式

策略接口(strategies.base_stratety.py)

from abc import ABC, abstractmethod
from threading import Event
from typing import Dict, Any, Optional

from playwright.sync_api import BrowserContext, Page


class DataProcessingStrategy(ABC):
    """数据处理策略接口"""

    @abstractmethod
    def process(
            self,
            context: BrowserContext,
            page: Page,
            config: Dict[str, Any],
            search_mode: str,
            keyword: Optional[str] = None,
            image_path: Optional[str] = None,
            stop_event: Optional[Event] = None
    ):
        """执行数据处理"""
        pass

策略工厂(strategies._init_.py)

from .base_strategy import DataProcessingStrategy
from .infringement_song_strategy import InfringementSongStrategy
from .live_room_strategy import LiveRoomStrategy


class StrategyFactory:
    """策略工厂(集中管理所有策略)"""

    @staticmethod
    def create_strategy(strategy_name: str) -> DataProcessingStrategy:
        strategies = {
            '直播间': LiveRoomStrategy,
            '侵权歌曲': InfringementSongStrategy
            # 可以继续添加其他策略...
        }

        if strategy_name not in strategies:
            raise ValueError(f"未知策略: {strategy_name}")

        return strategies[strategy_name]()

具体实现类(如:直播间)

`from typing import Dict, Any

import pandas as pd
from playwright.sync_api import Page, BrowserContext

from strategies.base_strategy import DataProcessingStrategy


class LiveRoomStrategy(DataProcessingStrategy):
    """直播间录入策略"""

    def process(self, context: BrowserContext, page: Page, config: Dict[str, Any]):
        ## 处理逻辑

    def get_excel_path(self, config: Dict[str, Any]) -> str:
        return config['Excel']['live_room_path']

    def validate_data(self, data: 'pd.DataFrame') -> bool:
        required_columns = {'直播间名称ID', '直播间名称', '蓝V平台认证公司名称', '状态'}
        return required_columns.issubset(set(data.columns))

    def _load_data(self, config: Dict[str, Any]) -> 'pd.DataFrame':
        excel_path = self.get_excel_path(config)
        data = pd.read_excel(excel_path)
        if not self.validate_data(data):
            raise ValueError("Excel文件格式不符合要求")
        print(f"成功加载 excel {excel_path}")
        return data

策略的使用

from strategies import StrategyFactory

def main():
  # 拿到所有配置的策略
  enabled_strategies = config['Strategy']['enabled_strategies']
  for stratety_name in enabled_strategies:
    strategy = StrategyFactory.create_strategy(strategy_name.strip())
    strategy.process(context, page, config)

Playwright浏览器复用模板

util.browser_utils.py

import logging
import socket
import subprocess
from typing import Optional, Callable, TypeVar

from playwright.sync_api import BrowserContext, Playwright, Page

logger = logging.getLogger(__name__)


def is_port_in_use(port: int | str) -> bool:
    """检查浏览器端口是否被占用"""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        try:
            if type(port) == str:
                port = int(port)
            s.bind(("localhost", port))
            return False
        except socket.error:
            return True


def connect_browser_with_port(p: Playwright, port: int) -> BrowserContext:
    """根据端口初始化浏览器"""
    if not is_port_in_use(port):
        raise ConnectionError(f'端口 {port} 未被监听,请确保浏览器已经启动并监听该端口')
    try:
        browser = p.chromium.connect_over_cdp(endpoint_url=f'http://localhost:{port}')
        context = browser.contexts[0] if browser.contexts else browser.new_context()
        context.set_default_timeout(10000)
        logger.info(f"成功连接到浏览器, 端口: {port}")
        return context
    except Exception as e:
        raise ConnectionError(f"连接失败, {e}")


def kill_chrome_processes():
    # 先杀死现有的 Chrome 进程(Windows)
    try:
        subprocess.run(["taskkill", "/f", "/im", "chrome.exe"], check=True)
    except subprocess.CalledProcessError:
        print("没有找到 Chrome 进程或杀死进程失败")


def launch_chrome_with_options(
        p: Playwright,
        path: str,
        data_dir: str,
        port: str = "9333",
) -> BrowserContext:
    """自定义打开新的浏览器"""
    kill_chrome_processes()

    context = p.chromium.launch_persistent_context(
        user_data_dir=data_dir,
        executable_path=path,
        headless=False,
        args=[
            f"--remote-debugging-port={port}",
            "--start-maximized"
        ],
        no_viewport=True,
        ignore_default_args=["--enable-automation"], # 避免被检测为自动化工具
    )
    context.set_default_timeout(10000)
    logger.info(f"成功启动浏览器, 端口: {port}")
    return context

"""
    只保留浏览器1个页面
"""
def retain_only_one_page(context: BrowserContext):
    for page in context.pages:
        if len(context.pages) > 1:
            page.bring_to_front()
            page.wait_for_timeout(500)
            page.close()


def safe_extract(page: Page, xpath, index=0, timeout=500) -> str | None:
    """安全提取元素"""
    try:
        return page.locator(xpath).nth(index).text_content(timeout=timeout) or None
    except Exception:
        return None

T = TypeVar("T")

def safe_page_query(
        page: Page,
        action: Callable[[Page], T],
        *,
        default: Optional[T] = None,
) -> Optional[T]:
    """安全执行页面"""
    try:
        return action(page)
    except Exception:
        return default

快速使用

def _execute_playwright(self):
    playwright = None
    browser_context = None
    try:
        playwright = sync_playwright().start()
        
        if browser_utils.is_port_in_use(self.config["Browser"]["port"]):
            # 初始化浏览器
            browser_context = browser_utils.connect_browser_with_port(playwright, 【port】)
            # 保留一个浏览器
            browser_utils.retain_only_one_page(browser_context)
        else:
            browser_context = browser_utils.launch_chrome_with_options(
                playwright,
                【path】,
                【data】,
                【port】
            )
        page = browser_context.pages[0]
        
        
    finally:
        # 释放资源
        try:
            if browser_context:
                browser_context.close()
        except Exception as e:
            self.logger.error(f"关闭浏览器上下文时发生错误: {e}")

        try:
            if playwright:
                playwright.stop()
        except Exception as e:
            self.logger.error(f"停止Playwright实例时发生错误: {e}")

threading.Event 的事件监听

threading.Event() 详解

threading.Event() 是 Python 线程同步的一个基本工具,它提供了一个简单的线程间通信机制。以下是它的核心特性和用法:

  • 基本特性
    • 二进制标志:Event 对象内部维护一个布尔标志,初始为 False
    • 线程安全:所有操作都是原子性的,适合多线程环境
    • 轻量级:相比锁机制更轻量,适合简单的线程协调
  • 主要方法
event = threading.Event()

# 设置标志为 True
event.set()

# 重置标志为 False
event.clear()

# 检查标志状态(非阻塞)
event.is_set()

# 等待标志变为 True(阻塞)
event.wait(timeout=None)

基于gui场景的实际应用

监听工具类 util/cancel_check.py

"""
cancel_check.py
任务取消检查工具模块
"""

import logging
from typing import Optional
import threading

logger = logging.getLogger(__name__)

class TaskCancelledError(RuntimeError):
    """自定义任务取消异常"""
    def __init__(self, msg="任务已被取消"):
        super().__init__(msg)
        logger.warning(msg)

class CancellationChecker:
    """
    取消检查工具类
    使用示例:
    checker = CancellationChecker(stop_event)
    checker.raise_if_cancelled()
    """

    def __init__(self, stop_event: Optional[threading.Event] = None):
        self.stop_event = stop_event

    def raise_if_cancelled(self):
        """如果任务已取消则抛出异常"""
        if self.stop_event and self.stop_event.is_set():
            raise TaskCancelledError()

    @property
    def is_cancelled(self) -> bool:
        """检查是否已取消(不抛出异常)"""
        return bool(self.stop_event and self.stop_event.is_set())

    def set_stop_event(self):
        """设置取消事件"""
        self.stop_event.set()
        
    def clear_stop_event(self):
        """清除取消事件"""
        self.stop_event.clear()

gui 使用

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()
        
        ## 其余初始化
        # 新增停止标志
        self.stop_event = threading.Event()
        self.cancel_checker = cancel_check.CancellationChecker(self.stop_event)
        ## 其余逻辑...
        
	def cancel_task(self):
        """取消任务"""
        self.cancel_checker.set_stop_event()
        self.logger.warning("取消任务...")
        self.cancel_btn.setEnabled(False)
        self.run_btn.setEnabled(True)
        
    def run_strategy(self):
        """执行策略"""
        # 重置停止标记
        self.cancel_checker.clear_stop_event()
        # 禁用执行按钮
        self.run_btn.setEnabled(False)
        # 启用取消按钮
        self.cancel_btn.setEnabled(True)
        
    def _execute_playwright(self):
        try:
            # 检查点
            self.cancel_checker.raise_if_cancelled()
            ## 其他逻辑
        except TaskCancelledError:
            return
        finally:
            # 启用执行按钮
            self.run_btn.setEnabled(True)
            # 禁用取消按钮
            self.cancel_btn.setEnabled(False)

动态表头映射

背景:为了兼容excel中的表头动态变化,因此这里可以在配置文件做一个动态映射

配置文件 config.ini

[ExcelHeaders.InfringementSong]
COMPANY = 隶属公司名称
ACCOUNT_ID = 账号ID
DATE = 取证日期
SONG = 歌曲
PERFORMER = 表演者
TIME_POINT = 所在证据时间点
EVIDENCE_LINK = 证据链接
EXTRACTION_CODE = 提取码
STATUS = 状态

[ExcelHeaders.LiveRoom]
ROOM_ID = 直播间名称ID
ROOM_NAME = 直播间名称
PLATFORM = 所在平台
COMPANY = 蓝V平台认证公司名称
STATUS = 状态

配置文件 config.excel_header.py

from typing import Dict, Any, ClassVar


class ExcelHeaderConfig:
    """Excel表头配置中心"""

    _headers_loaded: ClassVar[bool] = False

    # 侵权歌曲录入表头
    INFRINGEMENT_SONG: ClassVar[Dict[str, str]] = {}

    # 直播间录入表头
    LIVE_ROOM: ClassVar[Dict[str, str]] = {}

    @classmethod
    def load_headers(cls, config: Dict[str, Any]):
        """从配置文件加载表头映射"""
        if cls._headers_loaded:
            return

        # 加载侵权歌曲表头
        cls.INFRINGEMENT_SONG = {
            key: value
            for key, value in config.items("ExcelHeaders.InfringementSong")
        }

        # 加载直播间表头
        cls.LIVE_ROOM = {
            key: value
            for key, value in config.items("ExcelHeaders.LiveRoom")
        }

        cls._headers_loaded = True


class HeaderMapper:
    """表头映射工具类"""

    def __init__(self, headers: dict):
        # headers 结构应为 {字段键: 表头文字}
        self.headers = {k.lower(): v for k, v in headers.items()}
        # 创建反向映射 {表头文字: 字段键}
        self.reverse_map = {v: k for k, v in headers.items()}

    def get_excel_header(self, field_key: str) -> str:
        # 大小写不敏感
        key = field_key.lower()
        """通过字段键获取Excel表头文字"""
        if key not in self.headers:
            raise ValueError(f"未知字段键: {field_key} (尝试使用 {key})")
        return self.headers[key]  # 返回表头文字

    def get_field_key(self, excel_header: str) -> str:
        """通过Excel表头文字获取字段键"""
        return self.reverse_map.get(excel_header, excel_header)

配置的加载与使用

## 配置的加载(在主方法配置)
ExcelHeaderConfig.load_headers(config)

## mapper的实例化
class InfringementSongStrategy(DataProcessingStrategy):
    def __init__(self):
        ## 初始化mapper
        self.mapper = HeaderMapper(ExcelHeaderConfig.INFRINGEMENT_SONG)

    def process(self, context: BrowserContext, page: Page, config: Dict[str, Any]):
        ## 获取字段 表头文字
        company_header = self.mapper.get_excel_header("COMPANY")

打包与部署

安装 pyinstaller

pip install pyinstaller

打包配置文件 build.spec

单文件打包
# buildsingle.spec
# -*- mode: python -*-
from PyInstaller.utils.hooks import collect_all
import shutil
from pathlib import Path

# 自动收集 fitz 的所有依赖(包括 DLL)
datas, binaries, hiddenimports = collect_all("fitz")

exe_name = "知识产权工具"

a = Analysis(
    ["main.py"], # 主程序入口文件(必填)
    pathex=["."], # Python模块搜索路径(列表形式)
    binaries=binaries, # 收集的二进制依赖文件(如DLL)
    hiddenimports=hiddenimports, # 显式声明隐藏导入的模块
    hookspath=[], # 自定义hook文件路径
    runtime_hooks=[], # 运行时hook脚本
    win_no_prefer_redirects=False, # Windows禁用重定向
    win_private_assemblies=False, # 不使用私有程序集
    cipher=None, # 加密密钥(用于防反编译)
    noarchive=False, # 是否禁用单文件打包
    optimize=1,           # 优化级别(0-2)
    excludes=[  # 排除不需要的模块(减少体积)
        "PyQt6.QtWebEngine",
        "PyQt6.QtSql",
        "PyQt6.QtNetwork",
    ],
)

pyz = PYZ(a.pure, a.zipped_data, cipher=None)
exe = EXE(
    pyz,
    a.scripts,
    a.binaries,
    a.datas,
    name=f"{exe_name}", # 输出exe文件名(不带后缀)
    debug=False,
    strip=False, # 是否去除符号表(Linux/Mac)
    upx=False, # 是否用UPX压缩可执行文件(减小体积)
    console=False, # 是否显示控制台窗口(False为GUI程序)
    icon="icon/favicon.ico", # 图标
)

dist_dir = Path('dist')
exe_dir = dist_dir / exe_name # dist/{exe_name}/ ##方便打压缩包

# 将编译文件移动至【dist/工具名/工具名.exe】
def _post_build():
    if not exe_dir.exists():
        exe_dir.mkdir(parents=True)

    src = dist_dir / f'{exe_name}.exe' # 编译后文件
    dst = exe_dir / f'{exe_name}.exe' # 上次编译后文件
    if dst.exists(): # 如果上次编译文件残留
        dst.unlink() # 删除旧文件
    # 移动文件到子目录
    src.rename(dst)


# 此处根据需求添加代码
def _config_build():
    # 打包完成后自动复制config.ini到dist目录
    shutil.copy('config.ini', exe_dir)
#    file_dir = Path('files')
#    # 将文件夹下的文件复制到打包目录
#    shutil.copytree(file_dir, exe_dir / 'files', dirs_exist_ok=True)

_post_build()
_config_build()
文件夹形式打包
# buildcoll.spec
# -*- mode: python -*-
from PyInstaller.utils.hooks import collect_all
import shutil
from pathlib import Path

# 自动收集 fitz 的所有依赖(包括 DLL)
datas, binaries, hiddenimports = collect_all("fitz")

exe_name = "知识产权工具"

a = Analysis(
    ["main.py"], # 主程序入口文件(必填)
    pathex=["."], # Python模块搜索路径(列表形式)
    binaries=binaries, # 收集的二进制依赖文件(如DLL)
    datas=[("config.ini", "."),], # 额外数据文件(格式:(源路径, 目标目录))
    hiddenimports=hiddenimports, # 显式声明隐藏导入的模块
    noarchive=True, # 是否禁用单文件打包
    optimize=1,           # 优化级别(0-2)
    excludes=[  # 排除不需要的模块(减少体积)
        "PyQt6.QtWebEngine",
        "PyQt6.QtSql",
        "PyQt6.QtNetwork",
    ],
)
pyz = PYZ(a.pure, a.zipped_data, cipher=None)

# 先构建EXE对象
exe = EXE(
    pyz,
    a.scripts,
    name=exe_name,
    debug=False,
    strip=False,
    upx=False,
    console=False,
    icon="icon/favicon.ico"
)


# 添加 COLLECT 构建文件夹结构
coll = COLLECT(
    exe,  # exe
    a.binaries,
    a.datas,
    name=exe_name,  # 输出文件夹名
    strip=False,
    upx=False,
)

文件夹形式要另外配置钩子函数获取相对路径

hook/runtime_hook.py

# hook/runtime_hook.py
import sys
from pathlib import Path

def _fix_resource_path():
    # 钩子路径
    dev_path = Path(__file__).parent.parent

    # 生产环境 - 使用exe同级目录下的config.ini
    dist_path = Path(sys.executable).parent

    # 临时解压路径(单文件模式)
    temp_path = Path(getattr(sys, '_MEIPASS', ''))

    if dev_path.exists():
        return str(dev_path)
    elif dist_path.exists():
        return str(dist_path)
    elif temp_path.exists():
        return str(temp_path)
    else:
        raise FileNotFoundError("找不到 资源路径")

# 设置全局变量
RESOURCE_PATH = _fix_resource_path()

钩子函数的使用

# 获取资源路径下的配置文件
self.config = config_utils.load_config(f"{RESOURCE_PATH}/config.ini")
打包
pyinstaller --clean buildsingle.spec
pyinstaller --clean buildcoll.spec

附:创建虚拟环境

python -m venv .venv
.\.venv\Scripts\activate  # Windows
# 或 source bin/activate (Linux/Mac)

附:环境维护

  • 生成 requirements.txt
pip freeze > requirements.txt
  • 新环境初始化项目
# 在新环境中测试
python -m venv .venv
.\.venv\Scripts\activate
pip install -r requirements.txt
python main.py  # 运行程序验证功能

enumerate 用法与 set

enumerate

enumerate() 是 Python 内置的一个非常有用的函数,用于在遍历序列(如列表、元组、字符串等)时同时获取元素的索引和值。

  1. 遍历列表获取索引和值
fruits = ['apple', 'banana', 'cherry']

for index, fruit in enumerate(fruits):
    print(index, fruit)

输出:

0 apple
1 banana
2 cherry
  1. 指定起始值
for index, fruit in enumerate(fruits, start=1):
    print(index, fruit)

输出:

1 apple
2 banana
3 cherry
  1. 本次使用方式
for song_index, (index, row) in enumerate(group.iterrows(), start=1):
    print(f"正在添加歌曲: {row.to_dict()}")
    if song_index > 1:
        lc.locator("//span[text()='新增一项']").click()
        n_page.wait_for_load_state("networkidle")

集合 set {}

required_columns = {'直播间名称ID', '直播间名称', '蓝V平台认证公司名称', '状态'}
# 确认是否为子集合
required_columns.issubset(set(data.columns))

文章作者: 👑Dee👑
版权声明: 本博客所有文章除特別声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来源 👑Dee👑 !
  目录