本文最后更新于24 天前,其中的信息可能已经过时,如有错误请发送邮件到big_fw@foxmail.com
作为一名移动应用开发者,你是否也曾面临这样的困扰:需要同时维护测试环境和正式环境的多个应用,每个应用又有数十个推广渠道,管理这些应用的版本更新和下载链接简直是一场噩梦?
为了解决这个问题,我开发了一款基于 PyQt5 的 App 版本管理器,能够轻松管理多环境、多应用、多渠道的版本信息,并集成了阿里云 OSS 存储功能,实现了 APK 文件的自动上传与管理。今天就来分享一下这个工具的开发过程和使用方法

工具功能亮点
这款 App 版本管理器具备以下核心功能:
- 支持测试环境 (test) 和正式环境 (prod) 分离管理
- 可添加多个应用,每个应用可管理多个推广渠道
- 自动解析 APK 信息(版本号、渠道信息等)
- 集成阿里云 OSS,实现 APK 文件和图标自动上传
- 支持批量添加 APK 文件,自动处理版本更新
- 可导出渠道信息到 Excel,包含下载链接和分发页面
- 支持批量下载选中渠道的 APK 文件
- 数据自动本地保存,并可同步到云端
核心代码解析
data/channel.json
[
{
"id": "huawei",
"name": "华为应用市场"
},
]
app.py
import sys
import json
import os
import shutil
from datetime import datetime
from PyQt5.QtWidgets import (QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QPushButton, QTableWidget, QTableWidgetItem, QFileDialog,
QInputDialog, QMessageBox, QLabel, QLineEdit, QFormLayout,
QDialog, QHeaderView, QComboBox, QGridLayout, QTextEdit,
QGroupBox, QSizePolicy, QProgressDialog)
from PyQt5.QtCore import Qt, QUrl, QSettings
from PyQt5.QtGui import QPixmap, QIcon, QDesktopServices
import requests;
import oss2;
from dotenv import load_dotenv
from urllib.parse import urlparse
# 加载 .env 文件
load_dotenv()
# --- 配置文件路径 ---
CONFIG_DIR = os.path.join(os.path.expanduser("~"), ".night_sky_workshop")
try:
os.makedirs(CONFIG_DIR, exist_ok=True)
print(f"配置目录创建成功: {CONFIG_DIR}")
except Exception as e:
print(f"创建配置目录失败: {e}")
CONFIG_FILE = os.path.join(CONFIG_DIR, "config.json")
# --- 配置文件路径结束 ---
# 默认数据目录(动态获取)
def get_default_data_dir():
try:
if getattr(sys, 'frozen', False):
data_dir = os.path.join(sys._MEIPASS if hasattr(sys, '_MEIPASS') else os.path.dirname(sys.executable), "data")
else:
data_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
os.makedirs(data_dir, exist_ok=True)
print(f"数据目录创建成功: {data_dir}")
return data_dir
except Exception as e:
print(f"创建数据目录失败: {e}")
fallback_dir = os.path.join(CONFIG_DIR, "data")
os.makedirs(fallback_dir, exist_ok=True)
return fallback_dir
# --- 加载和保存配置 ---
def load_config():
try:
if os.path.exists(CONFIG_FILE):
try:
with open(CONFIG_FILE, 'r', encoding='utf-8') as f:
config = json.load(f)
print(f"配置文件加载成功: {CONFIG_FILE}")
return config
except json.JSONDecodeError:
QMessageBox.warning(None, "配置错误", "配置文件格式错误,将使用默认设置。")
return {"data_dir": get_default_data_dir()}
else:
default_config = {"data_dir": get_default_data_dir()}
save_config(default_config)
QMessageBox.information(None, "配置初始化", f"首次运行,已创建默认配置文件\n配置目录: {CONFIG_DIR}")
return default_config
except Exception as e:
QMessageBox.critical(None, "加载配置失败", f"加载配置文件时出错: {e}")
return {"data_dir": get_default_data_dir()}
def save_config(config):
try:
with open(CONFIG_FILE, 'w', encoding='utf-8') as f:
json.dump(config, f, ensure_ascii=False, indent=4)
print(f"配置文件保存成功: {CONFIG_FILE}")
return True
except Exception as e:
QMessageBox.critical(None, "保存配置失败", f"无法保存配置文件: {e}")
return False
# --- 初始化配置 ---
config = load_config()
DATA_DIR = config["data_dir"]
try:
os.makedirs(DATA_DIR, exist_ok=True)
print(f"数据目录确认成功: {DATA_DIR}")
except Exception as e:
QMessageBox.critical(None, "创建数据目录失败", f"无法创建数据目录: {e}")
sys.exit(1)
# --- 加载渠道名称字典 ---
def load_channel_dict():
channel_file = os.path.join(DATA_DIR, "channel.json")
try:
if os.path.exists(channel_file):
with open(channel_file, 'r', encoding='utf-8') as f:
channels = json.load(f)
channel_dict = {channel["id"]: channel["name"] for channel in channels}
print(f"渠道名称字典加载成功,共 {len(channel_dict)} 个渠道")
return channel_dict
else:
print(f"渠道名称文件 {channel_file} 不存在")
return {}
except Exception as e:
print(f"加载渠道名称字典失败: {e}")
return {}
CHANNEL_DICT = load_channel_dict()
# 图标缓存目录
ICON_CACHE_DIR = os.path.join(DATA_DIR, "icons")
try:
os.makedirs(ICON_CACHE_DIR, exist_ok=True)
except Exception as e:
QMessageBox.warning(None, "创建图标缓存目录失败", f"无法创建图标缓存目录: {e}")
# 数据文件路径
JSON_FILE = os.path.join(DATA_DIR, "app_versions.json")
# --- OSS 配置 ---
OSS_BUCKET_URL = "https://你的ossUrl.cn" # 修改为自己的
OSS_APP_ICON_DIR = "apps/icons/apps" #应用图标存储地址
OSS_APK_ICON_DIR = "apps/icons/apks" #apk图标存储地址
OSS_APK_DIR = "apps" #存储文件根地址
OSS_METADATA_PATH = "apps/metadata/app_versions.json"
access_key_id = os.getenv('access_key_id')
access_key_secret = os.getenv('access_key_secret')
endpoint = 'https://oss-cn-<修改为你得自己>.aliyuncs.com' #shanghai/hangzhou等
bucket_name = 'loan-user' #修改为你的oss bucket 名称
app_name_keywords = [] #apk名称格式化
appid_dict = {
'__UNI__你的appId': 'yqh',
'__UNI__你的appId':'yqy'
}
# --- OSS 配置结束 ---
headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/126.0.0.0',
"x-oss-object-acl": "public-read",
"x-oss-forbid-overwrite": "true",
};
# 检查OSS配置
try:
auth = oss2.Auth(access_key_id, access_key_secret)
bucket = oss2.Bucket(auth, endpoint, bucket_name)
except Exception as e:
QMessageBox.warning(None, "OSS配置警告", f"OSS配置可能不正确: {e}\n部分功能可能无法正常使用。")
class SettingsDialog(QDialog):
"""设置对话框"""
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("设置")
self.setMinimumWidth(500)
self.config = load_config()
# 数据文件夹设置
self.data_dir_group = QGroupBox("数据存储位置")
self.data_dir_edit = QLineEdit(self.config["data_dir"])
self.data_dir_button = QPushButton("浏览...")
self.data_dir_button.clicked.connect(self.choose_data_dir)
group_layout = QHBoxLayout()
group_layout.addWidget(self.data_dir_edit)
group_layout.addWidget(self.data_dir_button)
self.data_dir_group.setLayout(group_layout)
# 按钮
self.save_button = QPushButton("保存")
self.cancel_button = QPushButton("取消")
self.save_button.clicked.connect(self.accept)
self.cancel_button.clicked.connect(self.reject)
# 主布局
main_layout = QVBoxLayout(self)
main_layout.addWidget(self.data_dir_group)
button_layout = QHBoxLayout()
button_layout.addStretch()
button_layout.addWidget(self.save_button)
button_layout.addWidget(self.cancel_button)
main_layout.addLayout(button_layout)
def choose_data_dir(self):
dir_path = QFileDialog.getExistingDirectory(self, "选择数据存储文件夹", self.data_dir_edit.text())
if dir_path:
self.data_dir_edit.setText(dir_path)
def accept(self):
new_data_dir = self.data_dir_edit.text()
if not new_data_dir:
QMessageBox.warning(self, "警告", "数据存储路径不能为空!")
return
if new_data_dir != self.config["data_dir"]:
move_data = QMessageBox.question(
self,
"路径已更改",
f"数据存储路径已更改。是否将现有数据从 '{self.config['data_dir']}' 移动到新路径 '{new_data_dir}'?",
QMessageBox.Yes | QMessageBox.No | QMessageBox.Cancel
)
if move_data == QMessageBox.Cancel:
return
elif move_data == QMessageBox.Yes:
try:
os.makedirs(new_data_dir, exist_ok=True)
if os.path.exists(DATA_DIR):
for item in os.listdir(DATA_DIR):
s = os.path.join(DATA_DIR, item)
d = os.path.join(new_data_dir, item)
if os.path.isdir(s):
if os.path.exists(d):
shutil.rmtree(d)
shutil.move(s, d)
else:
if os.path.exists(d):
os.remove(d)
shutil.move(s, d)
except Exception as e:
QMessageBox.critical(self, "移动数据失败", f"无法移动数据文件: {e}\n请手动移动或重新启动程序。")
return
self.config["data_dir"] = new_data_dir
save_config(self.config)
QMessageBox.information(self, "设置已保存", "设置已成功保存。更改将在重启程序后生效。")
super().accept()
class DataManager:
"""数据管理类,负责读写JSON数据"""
@staticmethod
def load_data():
try:
if os.path.exists(JSON_FILE):
try:
with open(JSON_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, dict) or "test" not in data or "prod" not in data:
QMessageBox.warning(None, "数据格式错误", "数据格式不正确,将创建新的空数据。")
try:
os.remove(JSON_FILE)
QMessageBox.information(None, "旧数据已删除", "检测到旧格式数据,已删除并创建新的空数据。")
except Exception as e:
QMessageBox.warning(None, "删除旧数据失败", f"无法删除旧数据文件: {e}")
return {"test": [], "prod": []}
return data
except json.JSONDecodeError:
QMessageBox.warning(None, "数据错误", "JSON文件格式错误,将创建新文件。")
try:
os.remove(JSON_FILE)
QMessageBox.information(None, "旧数据已删除", "检测到旧格式数据,已删除并创建新的空数据。")
except Exception as e:
QMessageBox.warning(None, "删除旧数据失败", f"无法删除旧数据文件: {e}")
return {"test": [], "prod": []}
else:
default_data = {"test": [], "prod": []}
DataManager.save_data(default_data)
QMessageBox.information(None, "数据初始化", f"首次运行,已创建默认数据文件\n数据文件: {JSON_FILE}")
return default_data
except Exception as e:
QMessageBox.critical(None, "加载数据失败", f"加载数据时出错: {e}")
return {"test": [], "prod": []}
@staticmethod
def save_data(data):
try:
if not isinstance(data, dict) or "test" not in data or "prod" not in data:
raise ValueError("数据格式错误,必须包含test和prod环境")
os.makedirs(os.path.dirname(JSON_FILE), exist_ok=True)
with open(JSON_FILE, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=4)
return True
except Exception as e:
QMessageBox.critical(None, "保存失败", f"保存数据时出错: {str(e)}")
return False
class OSSManager:
"""OSS管理类,阿里云OSS操作"""
@staticmethod
def percentage(consumed_bytes, total_bytes):
if total_bytes:
rate = int(100 * (float(consumed_bytes) / float(total_bytes)))
print(f'\r{rate}%', end='')
if rate == 100:
print()
@staticmethod
def _get_oss_url(oss_object_key, local_file_path):
global OSS_BUCKET_URL
global headers
global bucket
global bucket_name
print(f"正在上传文件: {local_file_path}")
print(f"目标路径: {oss_object_key}")
try:
result = bucket.put_object_from_file(oss_object_key, local_file_path, progress_callback= OSSManager.percentage)
print(f"上传成功!")
print(f"HTTP状态码: {result.status}")
print(f"{OSS_BUCKET_URL}/{oss_object_key}")
return f"{OSS_BUCKET_URL}/{oss_object_key}"
except Exception as e:
QMessageBox.warning(None, "OSS上传失败", f"OSS上传失败: {e}")
return None
@staticmethod
def _del_oss_url(file_path):
try:
parsed_url = urlparse(file_path)
object_key = parsed_url.path.lstrip('/')
result = bucket.delete_object(object_key)
print(f"删除结果: {result.__dict__}")
if result.status == 204 and bucket.object_exists(object_key) == False:
print(f"文件 {file_path} 删除成功")
return True
else:
print(f"文件 {file_path} 删除失败,状态码:{result.status}")
return False
except oss2.exceptions.OssError as e:
print(f"删除文件时发生错误:{e}")
return False
@staticmethod
def upload_file(local_file_path, oss_object_path, show_message=True):
try:
if not os.path.exists(local_file_path):
QMessageBox.warning(None, "文件不存在", f"本地文件不存在: {local_file_path}")
return None
file_name = os.path.basename(local_file_path)
print(f"正在上传文件 '{file_name}' 到 OSS 路径 '{oss_object_path}'")
oss_url = OSSManager._get_oss_url(oss_object_path, local_file_path)
if oss_url and show_message:
QMessageBox.information(None, "上传成功", f"文件已成功上传到OSS!\nURL: {oss_url}")
return oss_url
except Exception as e:
QMessageBox.critical(None, "上传失败", f"上传文件时出错: {str(e)}")
return None
@staticmethod
def upload_app_icon(self,local_icon_path, app_id, show_message=True):
if not local_icon_path:
return None
file_ext = os.path.splitext(local_icon_path)[1].lower()
oss_path = f"{OSS_APK_ICON_DIR}/{self.current_environment}/{app_id}{file_ext}"
return OSSManager.upload_file(local_icon_path, oss_path, show_message)
@staticmethod
def upload_apk_icon(self,local_icon_path, app_id, channel_name, version_code, show_message=True):
if not local_icon_path:
return None
file_ext = os.path.splitext(local_icon_path)[1].lower()
oss_path = f"{OSS_APP_ICON_DIR}/{self.current_environment}/{app_id}/{channel_name}/{version_code}{file_ext}"
return OSSManager.upload_file(local_icon_path, oss_path, show_message)
@staticmethod
def upload_apk(self, file_path, app_id, channel_name, version_code, show_message=True):
file_name = truncate_text(channel_name, app_name_keywords) + '.' + os.path.basename(file_path).split('.')[-1]
app_name = appid_dict[app_id]
oss_path = f"{OSS_APK_DIR}/{self.current_environment}/{app_name}/{file_name}"
return OSSManager.upload_file(file_path, oss_path, show_message)
@staticmethod
def upload_metadata_file(local_json_path, environment=None, show_message=True):
if environment:
oss_path = f"{OSS_METADATA_PATH}_{environment}"
else:
oss_path = OSS_METADATA_PATH
return OSSManager.upload_file(local_json_path, oss_path, show_message)
@staticmethod
def delete_apk(oss_url):
print(f"正在从OSS删除文件: {oss_url}")
result = OSSManager._del_oss_url(oss_url)
if result:
QMessageBox.information(None, "删除成功", f"OSS上的文件已成功删除!\nURL: {oss_url}")
return True
else:
QMessageBox.critical(None, "删除失败")
return False
def truncate_text(text, keywords = []):
for keyword in keywords:
if keyword in text:
return text.split(keyword)[0]
return text
def get_latest_version_info(versions):
if not versions:
return {"name": "N/A", "code": "N/A"}
latest_version = max(versions, key=lambda v: int(v["version_code"]))
return {
"name": latest_version["version_name"],
"code": latest_version["version_code"]
}
def get_latest_enabled_version_info(versions):
enabled_versions = [v for v in versions if v.get("is_enabled", False)]
if not enabled_versions:
return {"name": "N/A", "code": "N/A"}
latest_enabled_version = max(enabled_versions, key=lambda v: int(v["version_code"]))
return {
"name": latest_enabled_version["version_name"],
"code": latest_enabled_version["version_code"]
}
def get_apk_info(apk_file_path):
try:
from androguard.core.apk import APK
a = APK(apk_file_path)
app_name = a.get_app_name()
version_name = a.get_androidversion_name()
version_code = a.get_androidversion_code()
channel_name = ""
channel_code = ""
from apkutils import APK
from string_get import extract_channel_values
b = APK.from_file(apk_file_path).parse_resource()
meta_info = b.get_manifest()
print(f'{meta_info}')
b.close()
channel_list = extract_channel_values(meta_info)
if channel_list:
channel_code = channel_list[0]
channel_name = channel_list[0]
return {
"app_name": app_name if app_name else "",
"version_name": version_name if version_name else "",
"version_code": str(version_code) if version_code is not None else "",
"channel_name": channel_name if channel_name else "",
"channel_code": channel_code if channel_code else ""
}
except Exception as e:
QMessageBox.warning(None, "APK解析失败", f"无法解析APK文件信息: {e}\n将使用手动输入。")
return {
"app_name": "",
"version_name": "",
"version_code": ""
}
class IconLineEdit(QWidget):
"""自定义组件,用于显示和上传图标"""
def __init__(self, parent=None, icon_url=""):
super().__init__(parent)
self.icon_url = icon_url
self.local_path = ""
self.layout = QHBoxLayout(self)
self.layout.setContentsMargins(0, 0, 0, 0)
self.label = QLabel(self)
self.label.setFixedSize(64, 64)
self.label.setAlignment(Qt.AlignCenter)
self.label.setStyleSheet("border: 1px solid #ccc;")
self.line_edit = QLineEdit(self)
self.line_edit.setReadOnly(True)
self.button = QPushButton("选择图标", self)
self.button.clicked.connect(self.choose_icon)
self.layout.addWidget(self.label)
self.layout.addWidget(self.line_edit)
self.layout.addWidget(self.button)
if self.icon_url:
self.line_edit.setText(self.icon_url)
self.label.setText("线上图标")
elif self.local_path:
self.update_icon_display()
def choose_icon(self):
file_path, _ = QFileDialog.getOpenFileName(self, "选择图标", "", "Image Files (*.png *.jpg *.jpeg *.bmp)")
if file_path:
self.local_path = file_path
self.icon_url = ""
self.update_icon_display()
def update_icon_display(self):
if self.local_path and os.path.exists(self.local_path):
pixmap = QPixmap(self.local_path)
scaled_pixmap = pixmap.scaled(64, 64, Qt.KeepAspectRatio, Qt.SmoothTransformation)
self.label.setPixmap(scaled_pixmap)
self.line_edit.setText(os.path.basename(self.local_path))
else:
self.label.setText("无图标")
self.line_edit.clear()
def get_local_path(self):
return self.local_path
def set_icon_url(self, url):
self.icon_url = url
self.local_path = ""
self.line_edit.setText(url)
self.label.setText("线上图标")
class AppDetailDialog(QDialog):
"""应用详情对话框"""
def __init__(self, parent=None, app_data=None):
super().__init__(parent)
self.setWindowTitle("应用详情" if app_data else "新增应用")
self.setMinimumWidth(500)
self.app_data = app_data if app_data else {
"id": None, "name": "", "description": "", "app_id": "", "app_code": "", "icon_url": "", "channels": []
}
self.icon_edit = IconLineEdit(self, self.app_data.get("icon_url", ""))
self.name_edit = QLineEdit(self.app_data.get("name", ""))
self.description_edit = QLineEdit(self.app_data.get("description", ""))
self.app_id_edit = QLineEdit(self.app_data.get("app_id", ""))
self.app_code_edit = QLineEdit(self.app_data.get("app_code", ""))
form_layout = QFormLayout()
form_layout.addRow("应用图标:", self.icon_edit)
form_layout.addRow("应用名称:", self.name_edit)
form_layout.addRow("应用描述:", self.description_edit)
form_layout.addRow("App ID:", self.app_id_edit)
form_layout.addRow("App Code:", self.app_code_edit)
button_layout = QHBoxLayout()
self.save_button = QPushButton("保存")
self.cancel_button = QPushButton("取消")
self.save_button.clicked.connect(self.accept)
self.cancel_button.clicked.connect(self.reject)
button_layout.addWidget(self.save_button)
button_layout.addWidget(self.cancel_button)
main_layout = QVBoxLayout(self)
main_layout.addLayout(form_layout)
main_layout.addLayout(button_layout)
def get_data(self):
self.app_data["name"] = self.name_edit.text()
self.app_data["description"] = self.description_edit.text()
self.app_data["app_id"] = self.app_id_edit.text()
self.app_data["app_code"] = self.app_code_edit.text()
self.app_data["icon_url"] = self.icon_edit.icon_url
return self.app_data
class ChannelDetailDialog(QDialog):
"""渠道详情对话框(合并了APK信息)"""
def __init__(self, parent=None, channel_data=None, app_id=None):
super().__init__(parent)
self.setWindowTitle("渠道详情" if channel_data else "新增渠道")
self.setMinimumWidth(500)
self.channel_data = channel_data if channel_data else {
"id": None, "name": "", "version_name": "", "version_code": "",
"download_url": "", "icon_url": "", "is_enabled": True, "notes": ""
}
self.app_id = app_id
self.icon_edit = IconLineEdit(self, self.channel_data.get("icon_url", ""))
self.name_edit = QLineEdit(self.channel_data.get("name", ""))
self.version_name_edit = QLineEdit(self.channel_data.get("version_name", ""))
self.version_code_edit = QLineEdit(self.channel_data.get("version_code", ""))
self.download_url_edit = QLineEdit(self.channel_data.get("download_url", ""))
self.is_enabled_checkbox = QComboBox(self)
self.is_enabled_checkbox.addItems(["否", "是"])
self.is_enabled_checkbox.setCurrentIndex(0 if not self.channel_data.get("is_enabled", False) else 1)
self.notes_edit = QTextEdit(self)
self.notes_edit.setText(self.channel_data.get("notes", ""))
self.upload_button = QPushButton("上传APK文件")
self.upload_button.clicked.connect(lambda: self.upload_apk(self.parent()))
form_layout = QFormLayout()
form_layout.addRow("APK图标:", self.icon_edit)
form_layout.addRow("渠道名称:", self.name_edit)
form_layout.addRow("版本名称:", self.version_name_edit)
form_layout.addRow("版本号:", self.version_code_edit)
form_layout.addRow("下载链接:", self.download_url_edit)
form_layout.addRow("是否启用:", self.is_enabled_checkbox)
form_layout.addRow("备注:", self.notes_edit)
form_layout.addRow(self.upload_button)
button_layout = QHBoxLayout()
self.save_button = QPushButton("保存")
self.cancel_button = QPushButton("取消")
self.save_button.clicked.connect(self.accept)
self.cancel_button.clicked.connect(self.reject)
button_layout.addWidget(self.save_button)
button_layout.addWidget(self.cancel_button)
main_layout = QVBoxLayout(self)
main_layout.addLayout(form_layout)
main_layout.addLayout(button_layout)
def upload_apk(self, parent_window):
if not self.app_id:
QMessageBox.warning(self, "警告", "无法获取应用ID,无法上传APK")
return
file_path, _ = QFileDialog.getOpenFileName(self, "选择APK文件", "", "APK Files (*.apk)")
if file_path:
apk_info = get_apk_info(file_path)
if apk_info["app_name"]:
self.name_edit.setText(apk_info["app_name"])
if apk_info["version_name"]:
self.version_name_edit.setText(apk_info["version_name"])
if apk_info["version_code"]:
self.version_code_edit.setText(apk_info["version_code"])
if apk_info["channel_name"]:
self.name_edit.setText(apk_info["channel_name"])
QMessageBox.information(self, "渠道信息", f"从APK中获取到渠道信息:\n渠道名称: {apk_info['channel_name']}\n渠道代码: {apk_info['channel_code']}")
version_code = self.version_code_edit.text() or apk_info["version_code"]
if not version_code:
QMessageBox.warning(self, "警告", "无法获取版本号,请手动填写后再上传!")
return
oss_url = OSSManager.upload_apk(parent_window,file_path, self.app_id, self.name_edit.text(), version_code)
if oss_url:
self.download_url_edit.setText(oss_url)
def get_data(self):
version_code_text = self.version_code_edit.text()
if not version_code_text.isdigit():
QMessageBox.warning(self, "输入错误", "版本号必须是纯数字!")
return None
self.channel_data["name"] = self.name_edit.text()
self.channel_data["version_name"] = self.version_name_edit.text()
self.channel_data["version_code"] = version_code_text
self.channel_data["download_url"] = self.download_url_edit.text()
self.channel_data["icon_url"] = self.icon_edit.icon_url
self.channel_data["is_enabled"] = (self.is_enabled_checkbox.currentIndex() == 1)
self.channel_data["notes"] = self.notes_edit.toPlainText()
icon_local_path = self.icon_edit.get_local_path()
if icon_local_path and self.app_id:
icon_url = OSSManager.upload_apk_icon(self,icon_local_path, self.app_id, self.name_edit.text(), version_code_text)
if icon_url:
self.channel_data["icon_url"] = icon_url
return self.channel_data
class VersionManager(QMainWindow):
"""主窗口类"""
def __init__(self):
super().__init__()
self.setWindowTitle("夜空工坊 - App版本管理器")
self.setGeometry(100, 100, 1200, 800)
# 加载数据
self.all_data = DataManager.load_data()
# 当前环境和层级
self.current_environment = "test" # 默认环境
self.current_level = "app"
self.current_app_index = None
self.current_channel_index = None
# 创建UI
self.init_ui()
# 显示应用列表
self.show_app_list()
def init_ui(self):
"""初始化UI"""
central_widget = QWidget()
self.setCentralWidget(central_widget)
self.main_layout = QVBoxLayout(central_widget)
# 导航栏
self.nav_layout = QHBoxLayout()
self.back_button = QPushButton("返回上一级")
self.back_button.clicked.connect(self.back_to_previous)
self.back_button.setEnabled(False)
self.nav_label = QLabel("应用列表")
# 环境选择下拉框
self.environment_label = QLabel("当前环境:")
self.environment_combo = QComboBox()
self.environment_combo.addItems(["test (测试环境)", "prod (正式环境)"])
self.environment_combo.currentIndexChanged.connect(self.on_environment_changed)
# 设置按钮
self.settings_button = QPushButton("设置")
self.settings_button.clicked.connect(self.open_settings)
self.nav_layout.addWidget(self.back_button)
self.nav_layout.addWidget(self.nav_label)
self.nav_layout.addStretch()
self.nav_layout.addWidget(self.environment_label)
self.nav_layout.addWidget(self.environment_combo)
self.nav_layout.addWidget(self.settings_button)
# 操作按钮
self.action_layout = QHBoxLayout()
self.add_button = QPushButton("新增")
self.batch_add_button = QPushButton("批量添加")
self.edit_button = QPushButton("修改")
self.delete_button = QPushButton("删除")
self.upload_button = QPushButton("更新到OSS")
self.download_button = QPushButton("下载")
# 导出Excel按钮
self.export_excel_button = QPushButton("导出Excel")
self.export_excel_button.clicked.connect(self.export_to_excel)
self.add_button.clicked.connect(self.add_item)
self.batch_add_button.clicked.connect(self.batch_add_items)
self.edit_button.clicked.connect(self.edit_item)
self.delete_button.clicked.connect(self.delete_item)
self.upload_button.clicked.connect(self.upload_to_oss)
self.download_button.clicked.connect(self.download_file)
self.action_layout.addWidget(self.add_button)
self.action_layout.addWidget(self.batch_add_button)
self.action_layout.addWidget(self.edit_button)
self.action_layout.addWidget(self.delete_button)
self.action_layout.addWidget(self.upload_button)
self.action_layout.addWidget(self.download_button)
self.action_layout.addWidget(self.export_excel_button)
self.table = QTableWidget()
self.table.cellDoubleClicked.connect(self.on_cell_double_clicked)
self.table.setEditTriggers(QTableWidget.NoEditTriggers)
self.table.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
self.main_layout.addLayout(self.nav_layout)
self.main_layout.addLayout(self.action_layout)
self.main_layout.addWidget(self.table)
self.update_action_buttons()
def on_environment_changed(self, index):
"""环境切换事件处理"""
new_environment = "test" if index == 0 else "prod"
# 如果数据有未保存的更改,询问用户是否保存
if self.is_data_modified():
save_changes = QMessageBox.question(
self,
"切换环境",
f"当前环境({self.current_environment.upper()})的数据可能已更改,是否保存?",
QMessageBox.Yes | QMessageBox.No | QMessageBox.Cancel,
QMessageBox.Yes
)
if save_changes == QMessageBox.Cancel:
# 取消切换,恢复原来的选择
self.environment_combo.setCurrentIndex(0 if self.current_environment == "test" else 1)
return
elif save_changes == QMessageBox.Yes:
DataManager.save_data(self.all_data)
# 更新当前环境并刷新界面
self.current_environment = new_environment
self.show_app_list()
# 切换环境后清除选择状态
self.table.clearSelection()
QMessageBox.information(self, "环境切换", f"已切换到{new_environment.upper()}环境")
def is_data_modified(self):
"""检查数据是否已修改"""
return False
def open_settings(self):
"""打开设置对话框"""
dialog = SettingsDialog(self)
dialog.exec_()
def update_action_buttons(self):
has_selection = self.table.selectionModel().hasSelection()
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
if self.current_level == "app":
self.back_button.setEnabled(False)
self.add_button.setEnabled(True)
self.batch_add_button.setEnabled(False)
self.edit_button.setEnabled(has_selection)
self.delete_button.setEnabled(has_selection)
self.upload_button.setEnabled(True)
self.download_button.setEnabled(False)
self.export_excel_button.setEnabled(False)
self.nav_label.setText(f"应用列表 [{self.current_environment.upper()}]")
elif self.current_level == "channel":
self.back_button.setEnabled(True)
self.add_button.setEnabled(True)
self.batch_add_button.setEnabled(True)
# 检查是否有选中的复选框
has_checked = False
for row in range(self.table.rowCount()):
item = self.table.item(row, 0)
if item and item.checkState() == Qt.Checked:
has_checked = True
break
# 检查是否有选中的行
has_selection = self.table.selectionModel().hasSelection()
# 编辑按钮只在单选时可用
self.edit_button.setEnabled(has_selection and not has_checked)
# 删除和下载按钮在有选中的行或复选框时可用
self.delete_button.setEnabled(has_checked or has_selection)
self.upload_button.setEnabled(False)
self.download_button.setEnabled(has_checked or has_selection)
self.export_excel_button.setEnabled(True)
app_name = current_data[self.current_app_index]["name"]
self.nav_label.setText(f"应用列表 [{self.current_environment.upper()}] > {app_name} > 渠道管理")
try:
self.table.selectionModel().selectionChanged.disconnect(self.update_action_buttons)
except TypeError:
pass
self.table.selectionModel().selectionChanged.connect(self.update_action_buttons)
def show_app_list(self):
self.current_level = "app"
self.current_app_index = None
self.current_channel_index = None
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
self.table.clear()
self.table.setRowCount(len(current_data))
self.table.setColumnCount(6)
self.table.setHorizontalHeaderLabels(["图标", "应用名称", "描述", "AppId", "AppCode", "渠道数量"])
for row, app in enumerate(current_data):
# 图标
icon_item = QTableWidgetItem()
icon_url = app.get("icon_url", "")
if icon_url:
pixmap = QPixmap()
try:
if pixmap.loadFromData(requests.get(icon_url).content):
icon_item.setIcon(QIcon(pixmap))
else:
icon_item.setIcon(QIcon.fromTheme("image-x-generic", QIcon()))
except Exception as e:
print(f"加载图标失败: {e}")
icon_item.setIcon(QIcon.fromTheme("image-x-generic", QIcon()))
self.table.setItem(row, 0, icon_item)
# 应用名称
name_item = QTableWidgetItem(app.get("name", ""))
self.table.setItem(row, 1, name_item)
# 应用描述
desc_item = QTableWidgetItem(app.get("description", ""))
self.table.setItem(row, 2, desc_item)
# AppId
self.table.setItem(row, 3, QTableWidgetItem(app.get("app_id", "")))
# AppCode
self.table.setItem(row, 4, QTableWidgetItem(app.get("app_code", "")))
# 显示渠道数量
channel_count = len(app.get("channels", []))
self.table.setItem(row, 5, QTableWidgetItem(str(channel_count)))
# 设置列宽
self.table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(2, QHeaderView.Stretch)
self.table.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(4, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(5, QHeaderView.ResizeToContents)
# 设置最大宽度限制
self.table.horizontalHeader().resizeSection(1, 150)
self.table.horizontalHeader().setMinimumSectionSize(100)
# 清理并重新绑定双击事件
try:
self.table.cellDoubleClicked.disconnect(self.on_cell_double_clicked)
except TypeError:
pass
self.table.cellDoubleClicked.connect(self.on_cell_double_clicked)
self.update_action_buttons()
def show_channel_list(self, app_index):
self.current_level = "channel"
self.current_app_index = app_index
self.current_channel_index = None
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
app = current_data[app_index]
channels = app.get("channels", [])
self.table.clear()
self.table.setRowCount(len(channels))
self.table.setColumnCount(10)
self.table.setHorizontalHeaderLabels(["选择", "图标", "渠道名称", "版本名称", "版本号", "下载链接", "复制链接", "是否启用", "操作", "备注"])
for row, channel in enumerate(channels):
# 复选框
checkbox_item = QTableWidgetItem()
checkbox_item.setFlags(Qt.ItemIsUserCheckable | Qt.ItemIsEnabled)
checkbox_item.setCheckState(Qt.Unchecked)
self.table.setItem(row, 0, checkbox_item)
# 图标
icon_item = QTableWidgetItem()
icon_url = channel.get("icon_url", "")
if icon_url:
pixmap = QPixmap()
try:
if pixmap.loadFromData(requests.get(icon_url).content):
icon_item.setIcon(QIcon(pixmap))
else:
icon_item.setIcon(QIcon.fromTheme("image-x-generic", QIcon()))
except Exception as e:
print(f"加载图标失败: {e}")
icon_item.setIcon(QIcon.fromTheme("image-x-generic", QIcon()))
self.table.setItem(row, 1, icon_item)
# 渠道名称
name_item = QTableWidgetItem(channel.get("name", ""))
self.table.setItem(row, 2, name_item)
# 版本信息
self.table.setItem(row, 3, QTableWidgetItem(channel.get("version_name", "")))
self.table.setItem(row, 4, QTableWidgetItem(channel.get("version_code", "")))
# 下载链接
url_item = QTableWidgetItem("点击访问")
url_item.setData(Qt.UserRole, channel.get("download_url", ""))
url_item.setForeground(Qt.blue)
url_item.setTextAlignment(Qt.AlignCenter)
self.table.setItem(row, 5, url_item)
# 复制链接按钮
copy_btn = QPushButton("复制")
copy_btn.setStyleSheet("background-color: #165DFF; color: white; border: none; padding: 4px 8px; border-radius: 4px;")
copy_btn.clicked.connect(lambda checked, r=row: self.copy_url(r))
self.table.setCellWidget(row, 6, copy_btn)
# 是否启用
is_enabled_item = QTableWidgetItem("是" if channel.get("is_enabled", False) else "否")
is_enabled_item.setTextAlignment(Qt.AlignCenter)
if channel.get("is_enabled", False):
is_enabled_item.setForeground(Qt.green)
else:
is_enabled_item.setForeground(Qt.red)
self.table.setItem(row, 7, is_enabled_item)
# 启用/禁用切换按钮
toggle_btn = QPushButton("禁用" if channel.get("is_enabled", False) else "启用")
if channel.get("is_enabled", False):
toggle_btn.setStyleSheet("background-color: #F53F3F; color: white; border: none; padding: 4px 8px; border-radius: 4px;")
else:
toggle_btn.setStyleSheet("background-color: #00B42A; color: white; border: none; padding: 4px 8px; border-radius: 4px;")
toggle_btn.clicked.connect(lambda checked, r=row: self.toggle_channel_status(r))
self.table.setCellWidget(row, 8, toggle_btn)
# 备注
self.table.setItem(row, 9, QTableWidgetItem(channel.get("notes", "")))
# 设置列宽
self.table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(2, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(3, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(4, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(5, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(6, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(7, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(8, QHeaderView.ResizeToContents)
self.table.horizontalHeader().setSectionResizeMode(9, QHeaderView.Stretch)
# 添加表头复选框全选/反选功能
self.table.horizontalHeader().sectionClicked.connect(self.on_header_clicked)
# 设置最大宽度限制
self.table.horizontalHeader().resizeSection(1, 150)
self.table.horizontalHeader().setMinimumSectionSize(100)
# 先移除可能存在的旧绑定,再添加新绑定
try:
self.table.cellClicked.disconnect(self.on_cell_clicked)
except TypeError:
pass # 如果没有绑定过,忽略错误
self.table.cellClicked.connect(self.on_cell_clicked)
self.update_action_buttons()
def on_cell_clicked(self, row, column):
if self.current_level == "channel":
if column == 0:
# 点击复选框后更新按钮状态
self.update_action_buttons()
elif column == 5:
# 点击下载链接
item = self.table.item(row, column)
url = item.data(Qt.UserRole)
if url:
QDesktopServices.openUrl(QUrl(url))
def on_header_clicked(self, logical_index):
"""表头头点击事件处理,实现全选/反选功能"""
if self.current_level != "channel" or logical_index != 0:
return
# 获取当前选中状态
all_checked = True
for row in range(self.table.rowCount()):
item = self.table.item(row, 0)
if item and item.checkState() != Qt.Checked:
all_checked = False
break
# 设置新的选中状态
new_state = Qt.Unchecked if all_checked else Qt.Checked
for row in range(self.table.rowCount()):
item = self.table.item(row, 0)
if item:
item.setCheckState(new_state)
# 更新按钮状态
self.update_action_buttons()
def copy_url(self, row):
"""复制APK链接到剪贴板(修复:获取正确的列索引)"""
if self.current_level != "channel":
return
# 修复:获取第5列(索引5)的数据,而不是第4列
item = self.table.item(row, 5)
url = item.data(Qt.UserRole) if item else ""
if url:
clipboard = QApplication.clipboard()
clipboard.setText(url)
QMessageBox.information(self, "复制成功", "APK链接已复制到剪贴板")
else:
QMessageBox.warning(self, "复制失败", "没有可用的APK链接")
def toggle_channel_status(self, row):
"""切换渠道的启用/禁用状态"""
if self.current_level != "channel":
return
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
app = current_data[self.current_app_index]
channel = app["channels"][row]
# 切换状态
channel["is_enabled"] = not channel["is_enabled"]
# 保存数据
DataManager.save_data(self.all_data)
# 更新表格显示
self.show_channel_list(self.current_app_index)
# 显示提示
status = "启用" if channel["is_enabled"] else "禁用"
QMessageBox.information(self, "状态更新", f"渠道 '{channel.get('name', '')}' 已{status}")
def on_cell_double_clicked(self, row, column):
if self.current_level == "app":
self.show_channel_list(row)
elif self.current_level == "channel":
# 双击渠道行直接进入编辑模式
self.edit_item()
def back_to_previous(self):
if self.current_level == "channel":
self.show_app_list()
def add_item(self):
if self.current_level == "app":
dialog = AppDetailDialog(self)
if dialog.exec_():
new_app = dialog.get_data()
icon_local_path = dialog.icon_edit.get_local_path()
if icon_local_path:
icon_url = OSSManager.upload_app_icon(self,icon_local_path, new_app["app_id"])
if icon_url:
new_app["icon_url"] = icon_url
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
new_app["id"] = len(current_data) + 1
new_app["channels"] = []
current_data.append(new_app)
DataManager.save_data(self.all_data)
self.show_app_list()
elif self.current_level == "channel":
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
app = current_data[self.current_app_index]
# 使用合并的渠道详情对话框
dialog = ChannelDetailDialog(self, app_id=app["app_id"])
if dialog.exec_():
new_channel = dialog.get_data()
if new_channel:
new_channel["id"] = len(app["channels"]) + 1
app["channels"].append(new_channel)
DataManager.save_data(self.all_data)
self.show_channel_list(self.current_app_index)
def batch_add_items(self):
"""批量添加APK文件"""
if self.current_level != "channel":
return
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
app = current_data[self.current_app_index]
# 选择文件夹
folder_path = QFileDialog.getExistingDirectory(self, "选择APK文件夹", "")
if not folder_path:
return
# 获取文件夹下所有APK文件
apk_files = []
for root, dirs, files in os.walk(folder_path):
for file in files:
if file.lower().endswith(".apk"):
apk_files.append(os.path.join(root, file))
if not apk_files:
QMessageBox.warning(self, "无APK文件", "选择的文件夹中没有找到APK文件")
return
# 创建进度对话框
progress_dialog = QProgressDialog("正在处理APK文件...", "取消", 0, len(apk_files), self)
progress_dialog.setWindowTitle("批量添加APK")
progress_dialog.setWindowModality(Qt.WindowModal)
progress_dialog.setMinimumDuration(0)
# 处理每个APK文件
success_count = 0
replace_count = 0
error_count = 0
for i, apk_path in enumerate(apk_files):
# 更新进度
progress_dialog.setValue(i)
QApplication.processEvents() # 刷新UI
if progress_dialog.wasCanceled():
break
try:
# 获取APK信息
apk_info = get_apk_info(apk_path)
if not apk_info["version_code"]:
error_count += 1
continue
# 确保渠道名称存在
if not apk_info["channel_name"]:
# 如果APK中没有渠道信息,使用文件名作为渠道名称
apk_info["channel_name"] = os.path.splitext(os.path.basename(apk_path))[0]
# 检查是否已存在相同渠道的APK
existing_channel_index = None
for j, channel in enumerate(app["channels"]):
if channel["name"] == apk_info["channel_name"]:
existing_channel_index = j
break
# 上传APK到OSS,批量上传时不显示单个文件的成功消息
oss_url = OSSManager.upload_apk(
self,
apk_path,
app["app_id"],
apk_info["channel_name"],
apk_info["version_code"],
show_message=False
)
if not oss_url:
error_count += 1
continue
# 创建或更新渠道信息
channel_data = {
"id": len(app["channels"]) + 1 if existing_channel_index is None else app["channels"][existing_channel_index]["id"],
"name": apk_info["channel_name"],
"version_name": apk_info["version_name"],
"version_code": apk_info["version_code"],
"download_url": oss_url,
"icon_url": "",
"is_enabled": True,
"notes": f"批量添加于 {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
}
# 如果是更新现有渠道,删除旧的APK文件
if existing_channel_index is not None:
old_download_url = app["channels"][existing_channel_index].get("download_url", "")
if old_download_url and old_download_url != oss_url:
OSSManager.delete_apk(old_download_url)
app["channels"][existing_channel_index] = channel_data
replace_count += 1
else:
app["channels"].append(channel_data)
success_count += 1
except Exception as e:
print(f"处理APK文件 {apk_path} 时出错: {e}")
error_count += 1
# 完成进度
progress_dialog.setValue(len(apk_files))
# 保存数据
DataManager.save_data(self.all_data)
# 更新表格显示
self.show_channel_list(self.current_app_index)
# 显示结果
result_message = f"批量添加完成!\n"
result_message += f"成功添加: {success_count} 个\n"
result_message += f"替换更新: {replace_count} 个\n"
result_message += f"处理失败: {error_count} 个"
QMessageBox.information(self, "批量添加结果", result_message)
def edit_item(self):
selected_rows = self.table.selectionModel().selectedRows()
if not selected_rows:
QMessageBox.warning(self, "警告", "请先选择要修改的项目")
return
row = selected_rows[0].row()
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
if self.current_level == "app":
dialog = AppDetailDialog(self, current_data[row])
if dialog.exec_():
updated_app = dialog.get_data()
icon_local_path = dialog.icon_edit.get_local_path()
if icon_local_path:
icon_url = OSSManager.upload_app_icon(self,icon_local_path, updated_app["app_id"])
if icon_url:
updated_app["icon_url"] = icon_url
current_data[row] = updated_app
DataManager.save_data(self.all_data)
self.show_app_list()
elif self.current_level == "channel":
app = current_data[self.current_app_index]
channel = app["channels"][row]
# 使用合并的渠道详情对话框
dialog = ChannelDetailDialog(self, channel_data=channel, app_id=app["app_id"])
if dialog.exec_():
updated_channel = dialog.get_data()
if updated_channel:
# 保存旧的下载链接用于删除
old_download_url = channel.get("download_url", "")
# 更新渠道信息
channel["name"] = updated_channel["name"]
channel["version_name"] = updated_channel["version_name"]
channel["version_code"] = updated_channel["version_code"]
channel["download_url"] = updated_channel["download_url"]
channel["icon_url"] = updated_channel["icon_url"]
channel["is_enabled"] = updated_channel["is_enabled"]
channel["notes"] = updated_channel["notes"]
# 如果上传了新的APK文件,删除旧的APK文件
if old_download_url and old_download_url != updated_channel["download_url"]:
OSSManager.delete_apk(old_download_url)
DataManager.save_data(self.all_data)
self.show_channel_list(self.current_app_index)
def delete_item(self):
# 检查是否有选中的项目
if self.current_level == "channel":
# 对于渠道列表,使用复选框选择
selected_rows = []
for row in range(self.table.rowCount()):
item = self.table.item(row, 0)
if item and item.checkState() == Qt.Checked:
selected_rows.append(row)
if not selected_rows:
QMessageBox.warning(self, "警告", "请先选择要删除的渠道")
return
# 按逆序删除,避免索引混乱
selected_rows.sort(reverse=True)
confirm = QMessageBox.question(self, "确认", f"确定要删除选中的 {len(selected_rows)} 个渠道吗?\n(当前环境: {self.current_environment.upper()})",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if confirm == QMessageBox.Yes:
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
app = current_data[self.current_app_index]
# 删除选中的渠道
for row in selected_rows:
channel_to_delete = app["channels"][row]
# 删除OSS上的APK文件
if channel_to_delete.get("download_url"):
OSSManager.delete_apk(channel_to_delete["download_url"])
del app["channels"][row]
# 更新ID
for i, channel in enumerate(app["channels"]):
channel["id"] = i + 1
DataManager.save_data(self.all_data)
self.show_channel_list(self.current_app_index)
QMessageBox.information(self, "删除成功", f"已成功删除 {len(selected_rows)} 个渠道")
else:
# 对于应用列表,保持原有的单选删除功能
selected_rows = self.table.selectionModel().selectedRows()
if not selected_rows:
QMessageBox.warning(self, "警告", "请先选择要删除的项目")
return
row = selected_rows[0].row()
confirm = QMessageBox.question(self, "确认", f"确定要删除选中的项目吗?\n(当前环境: {self.current_environment.upper()})",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if confirm == QMessageBox.Yes:
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
del current_data[row]
for i, app in enumerate(current_data):
app["id"] = i + 1
self.show_app_list()
DataManager.save_data(self.all_data)
def upload_to_oss(self):
if self.current_level != "app":
return
# 保存当前数据
DataManager.save_data(self.all_data)
# 确认上传
confirm = QMessageBox.question(
self,
"上传到OSS",
"您确定要将所有环境(test和prod)的数据合并上传到OSS吗?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if confirm == QMessageBox.Yes:
# 上传所有环境到一个JSON文件
import tempfile
import json
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False, encoding='utf-8') as f:
json.dump(self.all_data, f, ensure_ascii=False, indent=4)
temp_file = f.name
# 上传到OSS,不添加环境后缀
OSSManager.upload_metadata_file(temp_file)
os.unlink(temp_file)
QMessageBox.information(self, "上传成功", "所有环境的数据已合并上传到OSS!")
def download_file(self):
if self.current_level != "channel":
return
# 首先检查是否有选中的复选框
selected_checkbox_rows = []
for row in range(self.table.rowCount()):
item = self.table.item(row, 0)
if item and item.checkState() == Qt.Checked:
selected_checkbox_rows.append(row)
if selected_checkbox_rows:
# 有选中的复选框,执行批量下载
self.batch_download(selected_checkbox_rows)
else:
# 没有选中的复选框,检查是否有选中的行
selected_rows = self.table.selectionModel().selectedRows()
if not selected_rows:
QMessageBox.warning(self, "警告", "请先选择要下载的渠道")
return
# 单选选中的行,执行单选下载
row = selected_rows[0].row()
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
app = current_data[self.current_app_index]
channel = app["channels"][row]
if channel.get("download_url"):
QMessageBox.information(self, "下载提示", f"正在尝试用系统默认方式打开下载链接...\nURL: {channel['download_url']}")
QDesktopServices.openUrl(QUrl(channel['download_url']))
else:
QMessageBox.warning(self, "警告", "该APK没有下载链接")
def batch_download(self, selected_rows=None):
"""批量下载选中的渠道APK文件"""
if self.current_level != "channel":
return
# 如果没有传入选中的行,则从复选框获取
if selected_rows is None:
selected_rows = []
for row in range(self.table.rowCount()):
item = self.table.item(row, 0)
if item and item.checkState() == Qt.Checked:
selected_rows.append(row)
if not selected_rows:
QMessageBox.warning(self, "警告", "请先选择要下载的渠道")
return
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
app = current_data[self.current_app_index]
# 选择下载目录
download_dir = QFileDialog.getExistingDirectory(self, "选择下载目录", "")
if not download_dir:
return
# 创建进度对话框
progress_dialog = QProgressDialog("正在批量下载APK文件...", "取消", 0, len(selected_rows), self)
progress_dialog.setWindowTitle("批量下载")
progress_dialog.setWindowModality(Qt.WindowModal)
progress_dialog.setMinimumDuration(0)
# 下载每个选中的渠道
success_count = 0
error_count = 0
for i, row in enumerate(selected_rows):
# 更新进度
progress_dialog.setValue(i)
QApplication.processEvents() # 刷新UI
if progress_dialog.wasCanceled():
break
try:
channel = app["channels"][row]
download_url = channel.get("download_url", "")
if not download_url:
error_count += 1
continue
# 从URL中获取文件名
file_name = os.path.basename(urlparse(download_url).path)
if not file_name:
file_name = f"{channel.get('name', 'unknown')}_{channel.get('version_code', 'unknown')}.apk"
# 下载文件
response = requests.get(download_url, stream=True)
response.raise_for_status()
# 保存文件
file_path = os.path.join(download_dir, file_name)
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
success_count += 1
except Exception as e:
print(f"下载渠道 '{app['channels'][row].get('name', '')}' 时出错: {e}")
error_count += 1
# 完成进度
progress_dialog.setValue(len(selected_rows))
# 显示结果
result_message = f"批量下载完成!\n"
result_message += f"成功下载: {success_count} 个\n"
result_message += f"下载失败: {error_count} 个"
QMessageBox.information(self, "批量下载结果", result_message)
def export_to_excel(self):
"""导出渠道信息到Excel表格"""
if self.current_level != "channel":
return
# 获取当前环境的数据
current_data = self.all_data[self.current_environment]
app = current_data[self.current_app_index]
all_channels = app.get("channels", [])
if not all_channels:
QMessageBox.warning(self, "导出失败", "当前应用没有渠道信息可导出")
return
# 检查是否有选中的渠道
selected_rows = []
for row in range(self.table.rowCount()):
item = self.table.item(row, 0)
if item and item.checkState() == Qt.Checked:
selected_rows.append(row)
# 如果有选中的渠道,则只导出选中的,否则导出全部
if selected_rows:
channels = [all_channels[row] for row in selected_rows]
export_type = "选中的"
default_filename = f"{app.get('name', '应用')}_渠道信息_选中.xlsx"
else:
channels = all_channels
export_type = "全部"
default_filename = f"{app.get('name', '应用')}_渠道信息.xlsx"
# 让用户选择保存路径
file_path, _ = QFileDialog.getSaveFileName(
self,
"导出Excel文件",
default_filename,
"Excel Files (*.xlsx)"
)
if not file_path:
return
try:
# 导入openpyxl库
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
# 创建工作簿
wb = Workbook()
ws = wb.active
ws.title = "渠道信息"
# 定义表头
headers = [
"应用名称",
"渠道code",
"渠道名称",
"版本名称",
"下载链接",
"下载页链接",
"分发页链接"
]
# 写入表头
for col, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col, value=header)
# 设置表头样式
cell.font = Font(bold=True, color="FFFFFF")
cell.fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
cell.alignment = Alignment(horizontal="center", vertical="center")
# 根据环境使用不同的Web地址
if self.current_environment == "test":
web_download_url = "https://h5 下载地址可不配置"
web_distribution_url = "https://h5 下载地址可不配置"
else: # prod
web_download_url = "https://h5 下载地址可不配置"
web_distribution_url = "https://h5 下载地址可不配置"
# 写入渠道数据
for row, channel in enumerate(channels, 2):
# 应用名称
ws.cell(row=row, column=1, value=app.get("name", ""))
# 渠道code
channel_code = channel.get("name", "")
ws.cell(row=row, column=2, value=channel_code)
# 渠道名称(从字典中获取)
channel_name = CHANNEL_DICT.get(channel_code, channel_code)
ws.cell(row=row, column=3, value=channel_name)
# 版本名称
ws.cell(row=row, column=4, value=channel.get("version_name", ""))
# 下载链接
download_url = channel.get("download_url", "")
ws.cell(row=row, column=5, value=download_url)
# 下载页链接
app_code = app.get("app_code", "")
download_page_url = f"{web_download_url}/?appCode={app_code}&channelCode={channel_code}"
ws.cell(row=row, column=6, value=download_page_url)
# 分发页链接
distribution_page_url = f"{web_distribution_url}/?appCode={app_code}&channelCode={channel_code}"
ws.cell(row=row, column=7, value=distribution_page_url)
# 设置列宽
column_widths = [15, 15, 20, 15, 40, 40, 40]
for col, width in enumerate(column_widths, 1):
ws.column_dimensions[chr(64 + col)].width = width
# 设置边框
thin_border = Border(
left=Side(style='thin'),
right=Side(style='thin'),
top=Side(style='thin'),
bottom=Side(style='thin')
)
for row in ws.iter_rows(min_row=1, max_row=len(channels) + 1, min_col=1, max_col=len(headers)):
for cell in row:
cell.border = thin_border
# 保存文件
wb.save(file_path)
QMessageBox.information(self, "导出成功", f"渠道信息已成功导出到:\n{file_path}")
except Exception as e:
QMessageBox.critical(self, "导出失败", f"导出Excel文件时出错: {str(e)}")
if __name__ == '__main__':
app = QApplication(sys.argv)
app.setApplicationName("夜空工坊-app版本管理器")
window = VersionManager()
window.show()
sys.exit(app.exec_())
string_get.py
import re
import os
from typing import List, Tuple, Optional, Generator, Any
class StringExtractor:
"""
字符串提取器类,用于从字符串中提取特定模式的字段
主要功能:
- 从单个字符串中提取目标字段
- 从字符串列表中批量提取
- 从文件中读取并提取
- 从目录中的多个文件批量提取
支持智能模式,可以识别多种格式变体
"""
def __init__(self):
"""
初始化字符串提取器
"""
# 预编译正则表达式模式
self.pattern = re.compile(
r'<meta-data\s+android:name="DCLOUD_STREAMAPP_CHANNEL"\s+android:value="[^|"]+\|[^|"]+\|[^|"]+\|([^|"]+)"/>',
re.IGNORECASE
)
# 智能模式的模式列表
self.smart_patterns = [
# 主要模式
re.compile(r'android:value="[^|"]+\|[^|"]+\|[^|"]+\|([^|"]+)"/>'),
# 变体模式1:引号类型变化
re.compile(r"android:value='[^|']+\|[^|']+\|[^|']+\|([^|']+)'/>"),
# 变体模式2:属性顺序变化
re.compile(r'<meta-data\s+android:value="[^|"]+\|[^|"]+\|[^|"]+\|([^|"]+)"\s+android:name="DCLOUD_STREAMAPP_CHANNEL"/>'),
# 变体模式3:更多分隔符
re.compile(r'android:value="[^|"]+\|[^|"]+\|[^|"]+\|[^|"]+\|([^|"]+)"/>'),
# 变体模式4:更少分隔符
re.compile(r'android:value="[^|"]+\|[^|"]+\|([^|"]+)"/>'),
]
def extract_single_string(self, input_str: str, smart_mode: bool = False) -> Optional[str]:
"""
从单个字符串中提取目标字段
Args:
input_str: 输入字符串
smart_mode: 是否启用智能模式
Returns:
提取的字段,如果没有找到则返回None
"""
try:
# 首先尝试匹配主要模式
match = self.pattern.search(input_str)
if match:
return match.group(1)
# 如果启用智能模式,尝试其他模式
if smart_mode:
for pattern in self.smart_patterns:
match = pattern.search(input_str)
if match:
return match.group(1)
return None
except Exception as e:
print(f"处理字符串时出错: {e}")
return None
def extract_from_list(self, input_list: List[str], smart_mode: bool = False) -> Generator[Tuple[str, Optional[str]], None, None]:
"""
从字符串列表中提取目标字段
Args:
input_list: 输入字符串列表
smart_mode: 是否启用智能模式
Returns:
生成器,产生(原始字符串, 提取结果)的元组
"""
for item in input_list:
result = self.extract_single_string(item.strip(), smart_mode)
yield (item.strip(), result)
def extract_from_file(self, file_path: str, smart_mode: bool = False, encoding: str = 'utf-8') -> Generator[Tuple[str, Optional[str], int], None, None]:
"""
从文件中提取目标字段
Args:
file_path: 文件路径
smart_mode: 是否启用智能模式
encoding: 文件编码
Returns:
生成器,产生(原始字符串, 提取结果, 行号)的元组
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"文件 {file_path} 不存在")
with open(file_path, 'r', encoding=encoding, errors='ignore') as file:
for line_num, line in enumerate(file, 1):
stripped_line = line.strip()
if stripped_line: # 跳过空行
result = self.extract_single_string(stripped_line, smart_mode)
yield (stripped_line, result, line_num)
def batch_extract_from_files(self, file_paths: List[str], smart_mode: bool = False, encoding: str = 'utf-8') -> Generator[Tuple[str, Optional[str], int, str], None, None]:
"""
从多个文件中批量提取目标字段
Args:
file_paths: 文件路径列表
smart_mode: 是否启用智能模式
encoding: 文件编码
Returns:
生成器,产生(原始字符串, 提取结果, 行号, 文件名)的元组
"""
for file_path in file_paths:
try:
for line, result, line_num in self.extract_from_file(file_path, smart_mode, encoding):
yield (line, result, line_num, file_path)
except Exception as e:
print(f"处理文件 {file_path} 时出错: {e}")
continue
def get_extracted_values(self, results: Generator) -> List[str]:
"""
从结果生成器中提取所有成功提取的值
Args:
results: 结果生成器
Returns:
成功提取的值列表
"""
extracted_values = []
for result in results:
if len(result) >= 2 and result[1] is not None:
extracted_values.append(result[1])
return extracted_values
def process_and_collect(self, input_data: Any, smart_mode: bool = False, encoding: str = 'utf-8') -> Tuple[List[str], List[Tuple[str, Optional[str], Any]]]:
"""
统一处理方法,可以处理字符串、列表、文件路径或文件路径列表
Args:
input_data: 输入数据,可以是字符串、列表、文件路径或文件路径列表
smart_mode: 是否启用智能模式
encoding: 文件编码
Returns:
元组,包含(提取的值列表, 详细结果列表)
"""
detailed_results = []
extracted_values = []
try:
if isinstance(input_data, str):
# 检查是否是文件路径
if os.path.isfile(input_data):
# 处理单个文件
results = self.extract_from_file(input_data, smart_mode, encoding)
for line, result, line_num in results:
detailed_results.append((line, result, line_num))
if result is not None:
extracted_values.append(result)
else:
# 处理单个字符串
result = self.extract_single_string(input_data, smart_mode)
detailed_results.append((input_data, result, None))
if result is not None:
extracted_values.append(result)
elif isinstance(input_data, list):
# 检查列表中的元素是否是文件路径
if input_data and isinstance(input_data[0], str) and os.path.isfile(input_data[0]):
# 处理文件列表
results = self.batch_extract_from_files(input_data, smart_mode, encoding)
for line, result, line_num, file_path in results:
detailed_results.append((line, result, (file_path, line_num)))
if result is not None:
extracted_values.append(result)
else:
# 处理字符串列表
results = self.extract_from_list(input_data, smart_mode)
for item, result in results:
detailed_results.append((item, result, None))
if result is not None:
extracted_values.append(result)
else:
raise TypeError(f"不支持的输入类型: {type(input_data)}")
except Exception as e:
print(f"处理数据时出错: {e}")
return extracted_values, detailed_results
# 方便集成的快捷函数
def extract_channel(input_data: Any, smart_mode: bool = False, encoding: str = 'utf-8') -> Tuple[List[str], List[Tuple[str, Optional[str], Any]]]:
"""
快捷函数,用于快速创建提取器并处理数据
Args:
input_data: 输入数据,可以是字符串、列表、文件路径或文件路径列表
smart_mode: 是否启用智能模式
encoding: 文件编码
Returns:
元组,包含(提取的值列表, 详细结果列表)
"""
extractor = StringExtractor()
return extractor.process_and_collect(input_data, smart_mode, encoding)
def extract_channel_values(input_data: Any, smart_mode: bool = False, encoding: str = 'utf-8') -> List[str]:
"""
更简洁的快捷函数,只返回提取的值列表
Args:
input_data: 输入数据,可以是字符串、列表、文件路径或文件路径列表
smart_mode: 是否启用智能模式
encoding: 文件编码
Returns:
提取的值列表
"""
values, _ = extract_channel(input_data, smart_mode, encoding)
return values
# 示例用法
if __name__ == "__main__":
# 示例1:处理字符串列表
test_list = [
'<meta-data android:name="DCLOUD_STREAMAPP_CHANNEL" android:value="com.example|__UNI__123456|789012345678|huawei"/>',
'<meta-data android:name="DCLOUD_STREAMAPP_CHANNEL" android:value="com.test|__UNI__ABCDEF|112233445566|xiaomi"/>'
]
results = extract_channel_values(test_list)
print(f"示例2 - 字符串列表提取结果: {results}")
# 示例2:处理文件(需要确保文件存在)
try:
file_results, detailed = extract_channel("test_input.txt", smart_mode=True)
print(f"示例3 - 文件提取结果数量: {len(file_results)}")
print(f"示例3 - 前5个提取结果: {file_results[:5]}")
except Exception as e:
print(f"示例3 - 处理文件时出错: {e}")
界面展示
工具采用层级化界面设计:
- 应用列表界面:展示当前环境下所有应用,包含应用名称、描述、AppID 等基本信息,以及该应用包含的渠道数量。
- 渠道管理界面:选中某个应用后进入,展示该应用的所有渠道信息,包括渠道名称、版本号、下载链接等,支持启用 / 禁用渠道、复制链接、批量操作等功能。
- 编辑对话框:用于添加或修改应用 / 渠道信息,支持上传图标和 APK 文件。
使用说明
- 环境准备:
- 配置云存储信息(替换代码中的 “你的云存储 XX” 部分)
- 安装必要依赖:
pip install PyQt5 requests oss2 python-dotenv openpyxl
- 基础操作:
- 首次运行会自动创建配置文件和数据目录
- 在应用列表页点击 “新增” 添加应用信息
- 双击应用进入渠道管理,可添加单个渠道或批量导入 APK
- 高级功能:
- 批量导入:选择存放多个 APK 的文件夹,工具会自动解析并上传
- 导出 Excel:将渠道信息导出为包含下载链接的表格
- 云同步:点击 “更新到 OSS” 可将数据同步到云端
个性化修改建议
- 根据你的应用情况修改
appid_dict映射表 - 调整
app_name_keywords以适应你的文件名规则 - 如需支持更多环境,可修改代码中的环境相关部分
- 可扩展
get_apk_info函数以支持更多渠道信息解析规则
