first commit
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,33 @@
|
||||
# LVGL图片转换工具
|
||||
|
||||
这个目录包含两个用于处理和转换图片为LVGL格式的Python脚本:
|
||||
|
||||
## 1. LVGLImage (LVGLImage.py)
|
||||
|
||||
引用自LVGL[官方repo](https://github.com/lvgl/lvgl)的转换脚本[LVGLImage.py](https://github.com/lvgl/lvgl/blob/master/scripts/LVGLImage.py)
|
||||
|
||||
## 2. LVGL图片转换工具 (lvgl_tools_gui.py)
|
||||
|
||||
调用`LVGLImage.py`,将图片批量转换为LVGL图片格式
|
||||
可用于修改小智的默认表情,具体修改教程[在这里](https://www.bilibili.com/video/BV12FQkYeEJ3/)
|
||||
|
||||
### 特性
|
||||
|
||||
- 图形化操作,界面更友好
|
||||
- 支持批量转换图片
|
||||
- 自动识别图片格式并选择最佳的颜色格式转换
|
||||
- 多分辨率支持
|
||||
|
||||
### 使用方法
|
||||
|
||||
安装Pillow
|
||||
|
||||
```bash
|
||||
pip install Pillow # 处理图像需要
|
||||
```
|
||||
|
||||
运行转换工具
|
||||
|
||||
```bash
|
||||
python lvgl_tools_gui.py
|
||||
```
|
||||
@@ -0,0 +1,253 @@
|
||||
import tkinter as tk
|
||||
from tkinter import ttk, filedialog, messagebox
|
||||
from PIL import Image
|
||||
import os
|
||||
import tempfile
|
||||
import sys
|
||||
from LVGLImage import LVGLImage, ColorFormat, CompressMethod
|
||||
|
||||
HELP_TEXT = """LVGL图片转换工具使用说明:
|
||||
|
||||
1. 添加文件:点击“添加文件”按钮选择需要转换的图片,支持批量导入
|
||||
|
||||
2. 移除文件:在列表中选中文件前的复选框“[ ]”(选中后会变成“[√]”),点击“移除选中”可删除选定文件
|
||||
|
||||
3. 设置分辨率:选择需要的分辨率,如128x128
|
||||
建议根据自己的设备的屏幕分辨率来选择。过大和过小都会影响显示效果。
|
||||
|
||||
4. 颜色格式:选择“自动识别”会根据图片是否透明自动选择,或手动指定
|
||||
除非你了解这个选项,否则建议使用自动识别,不然可能会出现一些意想不到的问题……
|
||||
|
||||
5. 压缩方式:选择NONE或RLE压缩
|
||||
除非你了解这个选项,否则建议保持默认NONE不压缩
|
||||
|
||||
6. 输出目录:设置转换后文件的保存路径
|
||||
默认为程序所在目录下的output文件夹
|
||||
|
||||
7. 转换:点击“转换全部”或“转换选中”开始转换
|
||||
"""
|
||||
|
||||
class ImageConverterApp:
|
||||
def __init__(self, root):
|
||||
self.root = root
|
||||
self.root.title("LVGL图片转换工具")
|
||||
self.root.geometry("750x650")
|
||||
|
||||
# 初始化变量
|
||||
self.output_dir = tk.StringVar(value=os.path.abspath("output"))
|
||||
self.resolution = tk.StringVar(value="128x128")
|
||||
self.color_format = tk.StringVar(value="自动识别")
|
||||
self.compress_method = tk.StringVar(value="NONE")
|
||||
|
||||
# 创建UI组件
|
||||
self.create_widgets()
|
||||
self.redirect_output()
|
||||
|
||||
def create_widgets(self):
|
||||
# 参数设置框架
|
||||
settings_frame = ttk.LabelFrame(self.root, text="转换设置")
|
||||
settings_frame.grid(row=0, column=0, padx=10, pady=5, sticky="ew")
|
||||
|
||||
# 分辨率设置
|
||||
ttk.Label(settings_frame, text="分辨率:").grid(row=0, column=0, padx=2)
|
||||
ttk.Combobox(settings_frame, textvariable=self.resolution,
|
||||
values=["128x128", "64x64", "32x32"], width=8).grid(row=0, column=1, padx=2)
|
||||
|
||||
# 颜色格式
|
||||
ttk.Label(settings_frame, text="颜色格式:").grid(row=0, column=2, padx=2)
|
||||
ttk.Combobox(settings_frame, textvariable=self.color_format,
|
||||
values=["自动识别", "RGB565", "RGB565A8"], width=10).grid(row=0, column=3, padx=2)
|
||||
|
||||
# 压缩方式
|
||||
ttk.Label(settings_frame, text="压缩方式:").grid(row=0, column=4, padx=2)
|
||||
ttk.Combobox(settings_frame, textvariable=self.compress_method,
|
||||
values=["NONE", "RLE"], width=8).grid(row=0, column=5, padx=2)
|
||||
|
||||
# 文件操作框架
|
||||
file_frame = ttk.LabelFrame(self.root, text="输入文件")
|
||||
file_frame.grid(row=1, column=0, padx=10, pady=5, sticky="nsew")
|
||||
|
||||
# 文件操作按钮
|
||||
btn_frame = ttk.Frame(file_frame)
|
||||
btn_frame.pack(fill=tk.X, pady=2)
|
||||
ttk.Button(btn_frame, text="添加文件", command=self.select_files).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(btn_frame, text="移除选中", command=self.remove_selected).pack(side=tk.LEFT, padx=2)
|
||||
ttk.Button(btn_frame, text="清空列表", command=self.clear_files).pack(side=tk.LEFT, padx=2)
|
||||
|
||||
# 文件列表(Treeview)
|
||||
self.tree = ttk.Treeview(file_frame, columns=("selected", "filename"),
|
||||
show="headings", height=10)
|
||||
self.tree.heading("selected", text="选中", anchor=tk.W)
|
||||
self.tree.heading("filename", text="文件名", anchor=tk.W)
|
||||
self.tree.column("selected", width=60, anchor=tk.W)
|
||||
self.tree.column("filename", width=600, anchor=tk.W)
|
||||
self.tree.pack(fill=tk.BOTH, expand=True)
|
||||
self.tree.bind("<ButtonRelease-1>", self.on_tree_click)
|
||||
|
||||
# 输出目录
|
||||
output_frame = ttk.LabelFrame(self.root, text="输出目录")
|
||||
output_frame.grid(row=2, column=0, padx=10, pady=5, sticky="ew")
|
||||
ttk.Entry(output_frame, textvariable=self.output_dir, width=60).pack(side=tk.LEFT, padx=5)
|
||||
ttk.Button(output_frame, text="浏览", command=self.select_output_dir).pack(side=tk.RIGHT, padx=5)
|
||||
|
||||
# 转换按钮和帮助按钮
|
||||
convert_frame = ttk.Frame(self.root)
|
||||
convert_frame.grid(row=3, column=0, padx=10, pady=10)
|
||||
ttk.Button(convert_frame, text="转换全部文件", command=lambda: self.start_conversion(True)).pack(side=tk.LEFT, padx=5)
|
||||
ttk.Button(convert_frame, text="转换选中文件", command=lambda: self.start_conversion(False)).pack(side=tk.LEFT, padx=5)
|
||||
ttk.Button(convert_frame, text="帮助", command=self.show_help).pack(side=tk.RIGHT, padx=5)
|
||||
|
||||
# 日志区域(新增清空按钮部分)
|
||||
log_frame = ttk.LabelFrame(self.root, text="日志")
|
||||
log_frame.grid(row=4, column=0, padx=10, pady=5, sticky="nsew")
|
||||
|
||||
# 添加按钮框架
|
||||
log_btn_frame = ttk.Frame(log_frame)
|
||||
log_btn_frame.pack(fill=tk.X, side=tk.BOTTOM)
|
||||
|
||||
# 清空日志按钮
|
||||
ttk.Button(log_btn_frame, text="清空日志", command=self.clear_log).pack(side=tk.RIGHT, padx=5, pady=2)
|
||||
|
||||
self.log_text = tk.Text(log_frame, height=15)
|
||||
self.log_text.pack(fill=tk.BOTH, expand=True)
|
||||
|
||||
# 布局配置
|
||||
self.root.columnconfigure(0, weight=1)
|
||||
self.root.rowconfigure(1, weight=1)
|
||||
self.root.rowconfigure(4, weight=1)
|
||||
|
||||
def clear_log(self):
|
||||
"""清空日志内容"""
|
||||
self.log_text.delete(1.0, tk.END)
|
||||
|
||||
def show_help(self):
|
||||
messagebox.showinfo("帮助", HELP_TEXT)
|
||||
|
||||
def redirect_output(self):
|
||||
class StdoutRedirector:
|
||||
def __init__(self, text_widget):
|
||||
self.text_widget = text_widget
|
||||
self.original_stdout = sys.stdout
|
||||
|
||||
def write(self, message):
|
||||
self.text_widget.insert(tk.END, message)
|
||||
self.text_widget.see(tk.END)
|
||||
self.original_stdout.write(message)
|
||||
|
||||
def flush(self):
|
||||
self.original_stdout.flush()
|
||||
|
||||
sys.stdout = StdoutRedirector(self.log_text)
|
||||
|
||||
def on_tree_click(self, event):
|
||||
region = self.tree.identify("region", event.x, event.y)
|
||||
if region == "cell":
|
||||
col = self.tree.identify_column(event.x)
|
||||
item = self.tree.identify_row(event.y)
|
||||
if col == "#1": # 点击的是选中列
|
||||
current_val = self.tree.item(item, "values")[0]
|
||||
new_val = "[√]" if current_val == "[ ]" else "[ ]"
|
||||
self.tree.item(item, values=(new_val, self.tree.item(item, "values")[1]))
|
||||
|
||||
def select_output_dir(self):
|
||||
path = filedialog.askdirectory()
|
||||
if path:
|
||||
self.output_dir.set(path)
|
||||
|
||||
def select_files(self):
|
||||
files = filedialog.askopenfilenames(filetypes=[("图片文件", "*.png;*.jpg;*.jpeg;*.bmp;*.gif")])
|
||||
for f in files:
|
||||
self.tree.insert("", tk.END, values=("[ ]", os.path.basename(f)), tags=(f,))
|
||||
|
||||
def remove_selected(self):
|
||||
to_remove = []
|
||||
for item in self.tree.get_children():
|
||||
if self.tree.item(item, "values")[0] == "[√]":
|
||||
to_remove.append(item)
|
||||
for item in reversed(to_remove):
|
||||
self.tree.delete(item)
|
||||
|
||||
def clear_files(self):
|
||||
for item in self.tree.get_children():
|
||||
self.tree.delete(item)
|
||||
|
||||
def start_conversion(self, convert_all):
|
||||
input_files = [
|
||||
self.tree.item(item, "tags")[0]
|
||||
for item in self.tree.get_children()
|
||||
if convert_all or self.tree.item(item, "values")[0] == "[√]"
|
||||
]
|
||||
|
||||
if not input_files:
|
||||
msg = "没有找到可转换的文件" if convert_all else "没有选中任何文件"
|
||||
messagebox.showwarning("警告", msg)
|
||||
return
|
||||
|
||||
os.makedirs(self.output_dir.get(), exist_ok=True)
|
||||
|
||||
# 解析转换参数
|
||||
width, height = map(int, self.resolution.get().split('x'))
|
||||
compress = CompressMethod.RLE if self.compress_method.get() == "RLE" else CompressMethod.NONE
|
||||
|
||||
# 执行转换
|
||||
self.convert_images(input_files, width, height, compress)
|
||||
|
||||
def convert_images(self, input_files, width, height, compress):
|
||||
success_count = 0
|
||||
total_files = len(input_files)
|
||||
|
||||
for idx, file_path in enumerate(input_files):
|
||||
try:
|
||||
print(f"正在处理: {os.path.basename(file_path)}")
|
||||
|
||||
with Image.open(file_path) as img:
|
||||
# 调整图片大小
|
||||
img = img.resize((width, height), Image.Resampling.LANCZOS)
|
||||
|
||||
# 处理颜色格式
|
||||
color_format_str = self.color_format.get()
|
||||
if color_format_str == "自动识别":
|
||||
# 检测透明通道
|
||||
has_alpha = img.mode in ('RGBA', 'LA') or (img.mode == 'P' and 'transparency' in img.info)
|
||||
if has_alpha:
|
||||
img = img.convert('RGBA')
|
||||
cf = ColorFormat.RGB565A8
|
||||
else:
|
||||
img = img.convert('RGB')
|
||||
cf = ColorFormat.RGB565
|
||||
else:
|
||||
if color_format_str == "RGB565A8":
|
||||
img = img.convert('RGBA')
|
||||
cf = ColorFormat.RGB565A8
|
||||
else:
|
||||
img = img.convert('RGB')
|
||||
cf = ColorFormat.RGB565
|
||||
|
||||
# 保存调整后的图片
|
||||
base_name = os.path.splitext(os.path.basename(file_path))[0]
|
||||
output_image_path = os.path.join(self.output_dir.get(), f"{base_name}_{width}x{height}.png")
|
||||
img.save(output_image_path, 'PNG')
|
||||
|
||||
# 创建临时文件
|
||||
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmpfile:
|
||||
temp_path = tmpfile.name
|
||||
img.save(temp_path, 'PNG')
|
||||
|
||||
# 转换为LVGL C数组
|
||||
lvgl_img = LVGLImage().from_png(temp_path, cf=cf)
|
||||
output_c_path = os.path.join(self.output_dir.get(), f"{base_name}.c")
|
||||
lvgl_img.to_c_array(output_c_path, compress=compress)
|
||||
|
||||
success_count += 1
|
||||
os.unlink(temp_path)
|
||||
print(f"成功转换: {base_name}.c\n")
|
||||
|
||||
except Exception as e:
|
||||
print(f"转换失败: {str(e)}\n")
|
||||
|
||||
print(f"转换完成! 成功 {success_count}/{total_files} 个文件\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
root = tk.Tk()
|
||||
app = ImageConverterApp(root)
|
||||
root.mainloop()
|
||||
@@ -0,0 +1,54 @@
|
||||
import socket
|
||||
import wave
|
||||
import argparse
|
||||
|
||||
|
||||
'''
|
||||
Create a UDP socket and bind it to the server's IP:8000.
|
||||
Listen for incoming messages and print them to the console.
|
||||
Save the audio to a WAV file.
|
||||
'''
|
||||
def main(samplerate, channels):
|
||||
# Create a UDP socket
|
||||
server_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||
server_socket.bind(('0.0.0.0', 8000))
|
||||
|
||||
# Create WAV file with parameters
|
||||
filename = f"{samplerate}_{channels}.wav"
|
||||
wav_file = wave.open(filename, "wb")
|
||||
wav_file.setnchannels(channels) # channels parameter
|
||||
wav_file.setsampwidth(2) # 2 bytes per sample (16-bit)
|
||||
wav_file.setframerate(samplerate) # samplerate parameter
|
||||
|
||||
print(f"Start saving audio from 0.0.0.0:8000 to {filename}...")
|
||||
|
||||
try:
|
||||
while True:
|
||||
# Receive a message from the client
|
||||
message, address = server_socket.recvfrom(8000)
|
||||
|
||||
# Write PCM data to WAV file
|
||||
wav_file.writeframes(message)
|
||||
|
||||
# Print length of the message
|
||||
print(f"Received {len(message)} bytes from {address}")
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nStopping recording...")
|
||||
|
||||
finally:
|
||||
# Close files and socket
|
||||
wav_file.close()
|
||||
server_socket.close()
|
||||
print(f"WAV file '{filename}' saved successfully")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description='UDP音频数据接收器,保存为WAV文件')
|
||||
parser.add_argument('--samplerate', '-s', type=int, default=16000,
|
||||
help='采样率 (默认: 16000)')
|
||||
parser.add_argument('--channels', '-c', type=int, default=2,
|
||||
help='声道数 (默认: 2)')
|
||||
|
||||
args = parser.parse_args()
|
||||
main(args.samplerate, args.channels)
|
||||
@@ -0,0 +1,2 @@
|
||||
#!/bin/sh
|
||||
esptool.py -p /dev/ttyACM0 -b 2000000 write_flash 0 ../releases/v0.9.9_bread-compact-wifi/merged-binary.bin
|
||||
@@ -0,0 +1,91 @@
|
||||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
|
||||
HEADER_TEMPLATE = """// Auto-generated language config
|
||||
#pragma once
|
||||
|
||||
#include <string_view>
|
||||
|
||||
#ifndef {lang_code_for_font}
|
||||
#define {lang_code_for_font} // 預設語言
|
||||
#endif
|
||||
|
||||
namespace Lang {{
|
||||
// 语言元数据
|
||||
constexpr const char* CODE = "{lang_code}";
|
||||
|
||||
// 字符串资源
|
||||
namespace Strings {{
|
||||
{strings}
|
||||
}}
|
||||
|
||||
// 音效资源
|
||||
namespace Sounds {{
|
||||
{sounds}
|
||||
}}
|
||||
}}
|
||||
"""
|
||||
|
||||
def generate_header(input_path, output_path):
|
||||
with open(input_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
# 验证数据结构
|
||||
if 'language' not in data or 'strings' not in data:
|
||||
raise ValueError("Invalid JSON structure")
|
||||
|
||||
lang_code = data['language']['type']
|
||||
|
||||
# 生成字符串常量
|
||||
strings = []
|
||||
sounds = []
|
||||
for key, value in data['strings'].items():
|
||||
value = value.replace('"', '\\"')
|
||||
strings.append(f' constexpr const char* {key.upper()} = "{value}";')
|
||||
|
||||
# 生成音效常量
|
||||
for file in os.listdir(os.path.dirname(input_path)):
|
||||
if file.endswith('.p3'):
|
||||
base_name = os.path.splitext(file)[0]
|
||||
sounds.append(f'''
|
||||
extern const char p3_{base_name}_start[] asm("_binary_{base_name}_p3_start");
|
||||
extern const char p3_{base_name}_end[] asm("_binary_{base_name}_p3_end");
|
||||
static const std::string_view P3_{base_name.upper()} {{
|
||||
static_cast<const char*>(p3_{base_name}_start),
|
||||
static_cast<size_t>(p3_{base_name}_end - p3_{base_name}_start)
|
||||
}};''')
|
||||
|
||||
# 生成公共音效
|
||||
for file in os.listdir(os.path.join(os.path.dirname(output_path), 'common')):
|
||||
if file.endswith('.p3'):
|
||||
base_name = os.path.splitext(file)[0]
|
||||
sounds.append(f'''
|
||||
extern const char p3_{base_name}_start[] asm("_binary_{base_name}_p3_start");
|
||||
extern const char p3_{base_name}_end[] asm("_binary_{base_name}_p3_end");
|
||||
static const std::string_view P3_{base_name.upper()} {{
|
||||
static_cast<const char*>(p3_{base_name}_start),
|
||||
static_cast<size_t>(p3_{base_name}_end - p3_{base_name}_start)
|
||||
}};''')
|
||||
|
||||
# 填充模板
|
||||
content = HEADER_TEMPLATE.format(
|
||||
lang_code=lang_code,
|
||||
lang_code_for_font=lang_code.replace('-', '_').lower(),
|
||||
strings="\n".join(sorted(strings)),
|
||||
sounds="\n".join(sorted(sounds))
|
||||
)
|
||||
|
||||
# 写入文件
|
||||
os.makedirs(os.path.dirname(output_path), exist_ok=True)
|
||||
with open(output_path, 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--input", required=True, help="输入JSON文件路径")
|
||||
parser.add_argument("--output", required=True, help="输出头文件路径")
|
||||
args = parser.parse_args()
|
||||
|
||||
generate_header(args.input, args.output)
|
||||
@@ -0,0 +1,95 @@
|
||||
# P3音频格式转换与播放工具
|
||||
|
||||
这个目录包含两个用于处理P3格式音频文件的Python脚本:
|
||||
|
||||
## 1. 音频转换工具 (convert_audio_to_p3.py)
|
||||
|
||||
将普通音频文件转换为P3格式(4字节header + Opus数据包的流式结构)并进行响度标准化。
|
||||
|
||||
### 使用方法
|
||||
|
||||
```bash
|
||||
python convert_audio_to_p3.py <输入音频文件> <输出P3文件> [-l LUFS] [-d]
|
||||
```
|
||||
|
||||
其中,可选选项 `-l` 用于指定响度标准化的目标响度,默认为 -16 LUFS;可选选项 `-d` 可以禁用响度标准化。
|
||||
|
||||
如果输入的音频文件符合下面的任一条件,建议使用 `-d` 禁用响度标准化:
|
||||
- 音频过短
|
||||
- 音频已经调整过响度
|
||||
- 音频来自默认 TTS (小智当前使用的 TTS 的默认响度已是 -16 LUFS)
|
||||
|
||||
例如:
|
||||
```bash
|
||||
python convert_audio_to_p3.py input.mp3 output.p3
|
||||
```
|
||||
|
||||
## 2. P3音频播放工具 (play_p3.py)
|
||||
|
||||
播放P3格式的音频文件。
|
||||
|
||||
### 特性
|
||||
|
||||
- 解码并播放P3格式的音频文件
|
||||
- 在播放结束或用户中断时应用淡出效果,避免破音
|
||||
- 支持通过命令行参数指定要播放的文件
|
||||
|
||||
### 使用方法
|
||||
|
||||
```bash
|
||||
python play_p3.py <P3文件路径>
|
||||
```
|
||||
|
||||
例如:
|
||||
```bash
|
||||
python play_p3.py output.p3
|
||||
```
|
||||
|
||||
## 3. 音频转回工具 (convert_p3_to_audio.py)
|
||||
|
||||
将P3格式转换回普通音频文件。
|
||||
|
||||
### 使用方法
|
||||
|
||||
```bash
|
||||
python convert_p3_to_audio.py <输入P3文件> <输出音频文件>
|
||||
```
|
||||
|
||||
输出音频文件需要有扩展名。
|
||||
|
||||
例如:
|
||||
```bash
|
||||
python convert_p3_to_audio.py input.p3 output.wav
|
||||
```
|
||||
## 4. 音频/P3批量转换工具
|
||||
|
||||
一个图形化的工具,支持批量转换音频到P3,P3到音频
|
||||
|
||||

|
||||
|
||||
### 使用方法:
|
||||
```bash
|
||||
python batch_convert_gui.py
|
||||
```
|
||||
|
||||
## 依赖安装
|
||||
|
||||
在使用这些脚本前,请确保安装了所需的Python库:
|
||||
|
||||
```bash
|
||||
pip install librosa opuslib numpy tqdm sounddevice pyloudnorm soundfile
|
||||
```
|
||||
|
||||
或者使用提供的requirements.txt文件:
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## P3格式说明
|
||||
|
||||
P3格式是一种简单的流式音频格式,结构如下:
|
||||
- 每个音频帧由一个4字节的头部和一个Opus编码的数据包组成
|
||||
- 头部格式:[1字节类型, 1字节保留, 2字节长度]
|
||||
- 采样率固定为16000Hz,单声道
|
||||
- 每帧时长为60ms
|
||||
@@ -0,0 +1,221 @@
|
||||
import tkinter as tk
|
||||
from tkinter import ttk, filedialog, messagebox
|
||||
import os
|
||||
import threading
|
||||
import sys
|
||||
from convert_audio_to_p3 import encode_audio_to_opus
|
||||
from convert_p3_to_audio import decode_p3_to_audio
|
||||
|
||||
class AudioConverterApp:
|
||||
def __init__(self, master):
|
||||
self.master = master
|
||||
master.title("音频/P3 批量转换工具")
|
||||
master.geometry("680x600") # 调整窗口高度
|
||||
|
||||
# 初始化变量
|
||||
self.mode = tk.StringVar(value="audio_to_p3")
|
||||
self.output_dir = tk.StringVar()
|
||||
self.output_dir.set(os.path.abspath("output"))
|
||||
self.enable_loudnorm = tk.BooleanVar(value=True)
|
||||
self.target_lufs = tk.DoubleVar(value=-16.0)
|
||||
|
||||
# 创建UI组件
|
||||
self.create_widgets()
|
||||
self.redirect_output()
|
||||
|
||||
def create_widgets(self):
|
||||
# 模式选择
|
||||
mode_frame = ttk.LabelFrame(self.master, text="转换模式")
|
||||
mode_frame.grid(row=0, column=0, padx=10, pady=5, sticky="ew")
|
||||
|
||||
ttk.Radiobutton(mode_frame, text="音频转P3", variable=self.mode,
|
||||
value="audio_to_p3", command=self.toggle_settings,
|
||||
width=12).grid(row=0, column=0, padx=5)
|
||||
ttk.Radiobutton(mode_frame, text="P3转音频", variable=self.mode,
|
||||
value="p3_to_audio", command=self.toggle_settings,
|
||||
width=12).grid(row=0, column=1, padx=5)
|
||||
|
||||
# 响度设置
|
||||
self.loudnorm_frame = ttk.Frame(self.master)
|
||||
self.loudnorm_frame.grid(row=1, column=0, padx=10, pady=5, sticky="ew")
|
||||
|
||||
ttk.Checkbutton(self.loudnorm_frame, text="启用响度调整",
|
||||
variable=self.enable_loudnorm, width=15
|
||||
).grid(row=0, column=0, padx=2)
|
||||
ttk.Entry(self.loudnorm_frame, textvariable=self.target_lufs,
|
||||
width=6).grid(row=0, column=1, padx=2)
|
||||
ttk.Label(self.loudnorm_frame, text="LUFS").grid(row=0, column=2, padx=2)
|
||||
|
||||
# 文件选择
|
||||
file_frame = ttk.LabelFrame(self.master, text="输入文件")
|
||||
file_frame.grid(row=2, column=0, padx=10, pady=5, sticky="nsew")
|
||||
|
||||
# 文件操作按钮
|
||||
ttk.Button(file_frame, text="选择文件", command=self.select_files,
|
||||
width=12).grid(row=0, column=0, padx=5, pady=2)
|
||||
ttk.Button(file_frame, text="移除选中", command=self.remove_selected,
|
||||
width=12).grid(row=0, column=1, padx=5, pady=2)
|
||||
ttk.Button(file_frame, text="清空列表", command=self.clear_files,
|
||||
width=12).grid(row=0, column=2, padx=5, pady=2)
|
||||
|
||||
# 文件列表(使用Treeview)
|
||||
self.tree = ttk.Treeview(file_frame, columns=("selected", "filename"),
|
||||
show="headings", height=8)
|
||||
self.tree.heading("selected", text="选中", anchor=tk.W)
|
||||
self.tree.heading("filename", text="文件名", anchor=tk.W)
|
||||
self.tree.column("selected", width=60, anchor=tk.W)
|
||||
self.tree.column("filename", width=600, anchor=tk.W)
|
||||
self.tree.grid(row=1, column=0, columnspan=3, sticky="nsew", padx=5, pady=2)
|
||||
self.tree.bind("<ButtonRelease-1>", self.on_tree_click)
|
||||
|
||||
# 输出目录
|
||||
output_frame = ttk.LabelFrame(self.master, text="输出目录")
|
||||
output_frame.grid(row=3, column=0, padx=10, pady=5, sticky="ew")
|
||||
|
||||
ttk.Entry(output_frame, textvariable=self.output_dir, width=60
|
||||
).grid(row=0, column=0, padx=5, sticky="ew")
|
||||
ttk.Button(output_frame, text="浏览", command=self.select_output_dir,
|
||||
width=8).grid(row=0, column=1, padx=5)
|
||||
|
||||
# 转换按钮区域
|
||||
button_frame = ttk.Frame(self.master)
|
||||
button_frame.grid(row=4, column=0, padx=10, pady=10, sticky="ew")
|
||||
|
||||
ttk.Button(button_frame, text="转换全部文件", command=lambda: self.start_conversion(True),
|
||||
width=15).pack(side=tk.LEFT, padx=5)
|
||||
ttk.Button(button_frame, text="转换选中文件", command=lambda: self.start_conversion(False),
|
||||
width=15).pack(side=tk.LEFT, padx=5)
|
||||
|
||||
# 日志区域
|
||||
log_frame = ttk.LabelFrame(self.master, text="日志")
|
||||
log_frame.grid(row=5, column=0, padx=10, pady=5, sticky="nsew")
|
||||
|
||||
self.log_text = tk.Text(log_frame, height=14, width=80)
|
||||
self.log_text.pack(fill=tk.BOTH, expand=True)
|
||||
|
||||
# 配置布局权重
|
||||
self.master.columnconfigure(0, weight=1)
|
||||
self.master.rowconfigure(2, weight=1)
|
||||
self.master.rowconfigure(5, weight=3)
|
||||
file_frame.columnconfigure(0, weight=1)
|
||||
file_frame.rowconfigure(1, weight=1)
|
||||
|
||||
def toggle_settings(self):
|
||||
if self.mode.get() == "audio_to_p3":
|
||||
self.loudnorm_frame.grid()
|
||||
else:
|
||||
self.loudnorm_frame.grid_remove()
|
||||
|
||||
def select_files(self):
|
||||
file_types = [
|
||||
("音频文件", "*.wav *.mp3 *.ogg *.flac") if self.mode.get() == "audio_to_p3"
|
||||
else ("P3文件", "*.p3")
|
||||
]
|
||||
|
||||
files = filedialog.askopenfilenames(filetypes=file_types)
|
||||
for f in files:
|
||||
self.tree.insert("", tk.END, values=("[ ]", os.path.basename(f)), tags=(f,))
|
||||
|
||||
def on_tree_click(self, event):
|
||||
"""处理复选框点击事件"""
|
||||
region = self.tree.identify("region", event.x, event.y)
|
||||
if region == "cell":
|
||||
col = self.tree.identify_column(event.x)
|
||||
item = self.tree.identify_row(event.y)
|
||||
if col == "#1": # 点击的是选中列
|
||||
current_val = self.tree.item(item, "values")[0]
|
||||
new_val = "[√]" if current_val == "[ ]" else "[ ]"
|
||||
self.tree.item(item, values=(new_val, self.tree.item(item, "values")[1]))
|
||||
|
||||
def remove_selected(self):
|
||||
"""移除选中的文件"""
|
||||
to_remove = []
|
||||
for item in self.tree.get_children():
|
||||
if self.tree.item(item, "values")[0] == "[√]":
|
||||
to_remove.append(item)
|
||||
for item in reversed(to_remove):
|
||||
self.tree.delete(item)
|
||||
|
||||
def clear_files(self):
|
||||
"""清空所有文件"""
|
||||
for item in self.tree.get_children():
|
||||
self.tree.delete(item)
|
||||
|
||||
def select_output_dir(self):
|
||||
path = filedialog.askdirectory()
|
||||
if path:
|
||||
self.output_dir.set(path)
|
||||
|
||||
def redirect_output(self):
|
||||
class StdoutRedirector:
|
||||
def __init__(self, text_widget):
|
||||
self.text_widget = text_widget
|
||||
self.original_stdout = sys.stdout
|
||||
|
||||
def write(self, message):
|
||||
self.text_widget.insert(tk.END, message)
|
||||
self.text_widget.see(tk.END)
|
||||
self.original_stdout.write(message)
|
||||
|
||||
def flush(self):
|
||||
self.original_stdout.flush()
|
||||
|
||||
sys.stdout = StdoutRedirector(self.log_text)
|
||||
|
||||
def start_conversion(self, convert_all):
|
||||
"""开始转换"""
|
||||
input_files = []
|
||||
for item in self.tree.get_children():
|
||||
if convert_all or self.tree.item(item, "values")[0] == "[√]":
|
||||
input_files.append(self.tree.item(item, "tags")[0])
|
||||
|
||||
if not input_files:
|
||||
msg = "没有找到可转换的文件" if convert_all else "没有选中任何文件"
|
||||
messagebox.showwarning("警告", msg)
|
||||
return
|
||||
|
||||
os.makedirs(self.output_dir.get(), exist_ok=True)
|
||||
|
||||
try:
|
||||
if self.mode.get() == "audio_to_p3":
|
||||
target_lufs = self.target_lufs.get() if self.enable_loudnorm.get() else None
|
||||
thread = threading.Thread(target=self.convert_audio_to_p3, args=(target_lufs, input_files))
|
||||
else:
|
||||
thread = threading.Thread(target=self.convert_p3_to_audio, args=(input_files,))
|
||||
|
||||
thread.start()
|
||||
except Exception as e:
|
||||
print(f"转换初始化失败: {str(e)}")
|
||||
|
||||
def convert_audio_to_p3(self, target_lufs, input_files):
|
||||
"""音频转P3转换逻辑"""
|
||||
for input_path in input_files:
|
||||
try:
|
||||
filename = os.path.basename(input_path)
|
||||
base_name = os.path.splitext(filename)[0]
|
||||
output_path = os.path.join(self.output_dir.get(), f"{base_name}.p3")
|
||||
|
||||
print(f"正在转换: {filename}")
|
||||
encode_audio_to_opus(input_path, output_path, target_lufs)
|
||||
print(f"转换成功: {filename}\n")
|
||||
except Exception as e:
|
||||
print(f"转换失败: {str(e)}\n")
|
||||
|
||||
def convert_p3_to_audio(self, input_files):
|
||||
"""P3转音频转换逻辑"""
|
||||
for input_path in input_files:
|
||||
try:
|
||||
filename = os.path.basename(input_path)
|
||||
base_name = os.path.splitext(filename)[0]
|
||||
output_path = os.path.join(self.output_dir.get(), f"{base_name}.wav")
|
||||
|
||||
print(f"正在转换: {filename}")
|
||||
decode_p3_to_audio(input_path, output_path)
|
||||
print(f"转换成功: {filename}\n")
|
||||
except Exception as e:
|
||||
print(f"转换失败: {str(e)}\n")
|
||||
|
||||
if __name__ == "__main__":
|
||||
root = tk.Tk()
|
||||
app = AudioConverterApp(root)
|
||||
root.mainloop()
|
||||
@@ -0,0 +1,62 @@
|
||||
# convert audio files to protocol v3 stream
|
||||
import librosa
|
||||
import opuslib
|
||||
import struct
|
||||
import sys
|
||||
import tqdm
|
||||
import numpy as np
|
||||
import argparse
|
||||
import pyloudnorm as pyln
|
||||
|
||||
def encode_audio_to_opus(input_file, output_file, target_lufs=None):
|
||||
# Load audio file using librosa
|
||||
audio, sample_rate = librosa.load(input_file, sr=None, mono=False, dtype=np.float32)
|
||||
|
||||
# Convert to mono if stereo
|
||||
if audio.ndim == 2:
|
||||
audio = librosa.to_mono(audio)
|
||||
|
||||
if target_lufs is not None:
|
||||
print("Note: Automatic loudness adjustment is enabled, which may cause", file=sys.stderr)
|
||||
print(" audio distortion. If the input audio has already been ", file=sys.stderr)
|
||||
print(" loudness-adjusted or if the input audio is TTS audio, ", file=sys.stderr)
|
||||
print(" please use the `-d` parameter to disable loudness adjustment.", file=sys.stderr)
|
||||
meter = pyln.Meter(sample_rate)
|
||||
current_loudness = meter.integrated_loudness(audio)
|
||||
audio = pyln.normalize.loudness(audio, current_loudness, target_lufs)
|
||||
print(f"Adjusted loudness: {current_loudness:.1f} LUFS -> {target_lufs} LUFS")
|
||||
|
||||
# Convert sample rate to 16000Hz if necessary
|
||||
target_sample_rate = 16000
|
||||
if sample_rate != target_sample_rate:
|
||||
audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=target_sample_rate)
|
||||
sample_rate = target_sample_rate
|
||||
|
||||
# Convert audio data back to int16 after processing
|
||||
audio = (audio * 32767).astype(np.int16)
|
||||
|
||||
# Initialize Opus encoder
|
||||
encoder = opuslib.Encoder(sample_rate, 1, opuslib.APPLICATION_AUDIO)
|
||||
|
||||
# Encode and save
|
||||
with open(output_file, 'wb') as f:
|
||||
duration = 60 # 60ms per frame
|
||||
frame_size = int(sample_rate * duration / 1000)
|
||||
for i in tqdm.tqdm(range(0, len(audio) - frame_size, frame_size)):
|
||||
frame = audio[i:i + frame_size]
|
||||
opus_data = encoder.encode(frame.tobytes(), frame_size=frame_size)
|
||||
packet = struct.pack('>BBH', 0, 0, len(opus_data)) + opus_data
|
||||
f.write(packet)
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description='Convert audio to Opus with loudness normalization')
|
||||
parser.add_argument('input_file', help='Input audio file')
|
||||
parser.add_argument('output_file', help='Output .opus file')
|
||||
parser.add_argument('-l', '--lufs', type=float, default=-16.0,
|
||||
help='Target loudness in LUFS (default: -16)')
|
||||
parser.add_argument('-d', '--disable-loudnorm', action='store_true',
|
||||
help='Disable loudness normalization')
|
||||
args = parser.parse_args()
|
||||
|
||||
target_lufs = None if args.disable_loudnorm else args.lufs
|
||||
encode_audio_to_opus(args.input_file, args.output_file, target_lufs)
|
||||
@@ -0,0 +1,51 @@
|
||||
import struct
|
||||
import sys
|
||||
import opuslib
|
||||
import numpy as np
|
||||
from tqdm import tqdm
|
||||
import soundfile as sf
|
||||
|
||||
|
||||
def decode_p3_to_audio(input_file, output_file):
|
||||
sample_rate = 16000
|
||||
channels = 1
|
||||
decoder = opuslib.Decoder(sample_rate, channels)
|
||||
|
||||
pcm_frames = []
|
||||
frame_size = int(sample_rate * 60 / 1000)
|
||||
|
||||
with open(input_file, "rb") as f:
|
||||
f.seek(0, 2)
|
||||
total_size = f.tell()
|
||||
f.seek(0)
|
||||
|
||||
with tqdm(total=total_size, unit="B", unit_scale=True) as pbar:
|
||||
while True:
|
||||
header = f.read(4)
|
||||
if not header or len(header) < 4:
|
||||
break
|
||||
|
||||
pkt_type, reserved, opus_len = struct.unpack(">BBH", header)
|
||||
opus_data = f.read(opus_len)
|
||||
if len(opus_data) != opus_len:
|
||||
break
|
||||
|
||||
pcm = decoder.decode(opus_data, frame_size)
|
||||
pcm_frames.append(np.frombuffer(pcm, dtype=np.int16))
|
||||
|
||||
pbar.update(4 + opus_len)
|
||||
|
||||
if not pcm_frames:
|
||||
raise ValueError("No valid audio data found")
|
||||
|
||||
pcm_data = np.concatenate(pcm_frames)
|
||||
|
||||
sf.write(output_file, pcm_data, sample_rate, subtype="PCM_16")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 3:
|
||||
print("Usage: python convert_p3_to_audio.py <input.p3> <output.wav>")
|
||||
sys.exit(1)
|
||||
|
||||
decode_p3_to_audio(sys.argv[1], sys.argv[2])
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 142 KiB |
@@ -0,0 +1,241 @@
|
||||
import tkinter as tk
|
||||
from tkinter import filedialog, messagebox
|
||||
import threading
|
||||
import time
|
||||
import opuslib
|
||||
import struct
|
||||
import numpy as np
|
||||
import sounddevice as sd
|
||||
import os
|
||||
|
||||
|
||||
def play_p3_file(input_file, stop_event=None, pause_event=None):
|
||||
"""
|
||||
播放p3格式的音频文件
|
||||
p3格式: [1字节类型, 1字节保留, 2字节长度, Opus数据]
|
||||
"""
|
||||
# 初始化Opus解码器
|
||||
sample_rate = 16000 # 采样率固定为16000Hz
|
||||
channels = 1 # 单声道
|
||||
decoder = opuslib.Decoder(sample_rate, channels)
|
||||
|
||||
# 帧大小 (60ms)
|
||||
frame_size = int(sample_rate * 60 / 1000)
|
||||
|
||||
# 打开音频流
|
||||
stream = sd.OutputStream(
|
||||
samplerate=sample_rate,
|
||||
channels=channels,
|
||||
dtype='int16'
|
||||
)
|
||||
stream.start()
|
||||
|
||||
try:
|
||||
with open(input_file, 'rb') as f:
|
||||
print(f"正在播放: {input_file}")
|
||||
|
||||
while True:
|
||||
if stop_event and stop_event.is_set():
|
||||
break
|
||||
|
||||
if pause_event and pause_event.is_set():
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
# 读取头部 (4字节)
|
||||
header = f.read(4)
|
||||
if not header or len(header) < 4:
|
||||
break
|
||||
|
||||
# 解析头部
|
||||
packet_type, reserved, data_len = struct.unpack('>BBH', header)
|
||||
|
||||
# 读取Opus数据
|
||||
opus_data = f.read(data_len)
|
||||
if not opus_data or len(opus_data) < data_len:
|
||||
break
|
||||
|
||||
# 解码Opus数据
|
||||
pcm_data = decoder.decode(opus_data, frame_size)
|
||||
|
||||
# 将字节转换为numpy数组
|
||||
audio_array = np.frombuffer(pcm_data, dtype=np.int16)
|
||||
|
||||
# 播放音频
|
||||
stream.write(audio_array)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n播放已停止")
|
||||
finally:
|
||||
stream.stop()
|
||||
stream.close()
|
||||
print("播放完成")
|
||||
|
||||
|
||||
class P3PlayerApp:
|
||||
def __init__(self, root):
|
||||
self.root = root
|
||||
self.root.title("P3 文件简易播放器")
|
||||
self.root.geometry("500x400")
|
||||
|
||||
self.playlist = []
|
||||
self.current_index = 0
|
||||
self.is_playing = False
|
||||
self.is_paused = False
|
||||
self.stop_event = threading.Event()
|
||||
self.pause_event = threading.Event()
|
||||
self.loop_playback = tk.BooleanVar(value=False) # 循环播放复选框的状态
|
||||
|
||||
# 创建界面组件
|
||||
self.create_widgets()
|
||||
|
||||
def create_widgets(self):
|
||||
# 播放列表
|
||||
self.playlist_label = tk.Label(self.root, text="播放列表:")
|
||||
self.playlist_label.pack(pady=5)
|
||||
|
||||
self.playlist_frame = tk.Frame(self.root)
|
||||
self.playlist_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
|
||||
|
||||
self.playlist_listbox = tk.Listbox(self.playlist_frame, selectmode=tk.SINGLE)
|
||||
self.playlist_listbox.pack(fill=tk.BOTH, expand=True)
|
||||
|
||||
# 复选框和移除按钮
|
||||
self.checkbox_frame = tk.Frame(self.root)
|
||||
self.checkbox_frame.pack(pady=5)
|
||||
|
||||
self.remove_button = tk.Button(self.checkbox_frame, text="移除文件", command=self.remove_files)
|
||||
self.remove_button.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
# 循环播放复选框
|
||||
self.loop_checkbox = tk.Checkbutton(self.checkbox_frame, text="循环播放", variable=self.loop_playback)
|
||||
self.loop_checkbox.pack(side=tk.LEFT, padx=5)
|
||||
|
||||
# 控制按钮
|
||||
self.control_frame = tk.Frame(self.root)
|
||||
self.control_frame.pack(pady=10)
|
||||
|
||||
self.add_button = tk.Button(self.control_frame, text="添加文件", command=self.add_file)
|
||||
self.add_button.grid(row=0, column=0, padx=5)
|
||||
|
||||
self.play_button = tk.Button(self.control_frame, text="播放", command=self.play)
|
||||
self.play_button.grid(row=0, column=1, padx=5)
|
||||
|
||||
self.pause_button = tk.Button(self.control_frame, text="暂停", command=self.pause)
|
||||
self.pause_button.grid(row=0, column=2, padx=5)
|
||||
|
||||
self.stop_button = tk.Button(self.control_frame, text="停止", command=self.stop)
|
||||
self.stop_button.grid(row=0, column=3, padx=5)
|
||||
|
||||
# 状态标签
|
||||
self.status_label = tk.Label(self.root, text="未在播放", fg="blue")
|
||||
self.status_label.pack(pady=10)
|
||||
|
||||
def add_file(self):
|
||||
files = filedialog.askopenfilenames(filetypes=[("P3 文件", "*.p3")])
|
||||
if files:
|
||||
self.playlist.extend(files)
|
||||
self.update_playlist()
|
||||
|
||||
def update_playlist(self):
|
||||
self.playlist_listbox.delete(0, tk.END)
|
||||
for file in self.playlist:
|
||||
self.playlist_listbox.insert(tk.END, os.path.basename(file)) # 仅显示文件名
|
||||
|
||||
def update_status(self, status_text, color="blue"):
|
||||
"""更新状态标签的内容"""
|
||||
self.status_label.config(text=status_text, fg=color)
|
||||
|
||||
def play(self):
|
||||
if not self.playlist:
|
||||
messagebox.showwarning("警告", "播放列表为空!")
|
||||
return
|
||||
|
||||
if self.is_paused:
|
||||
self.is_paused = False
|
||||
self.pause_event.clear()
|
||||
self.update_status(f"正在播放:{os.path.basename(self.playlist[self.current_index])}", "green")
|
||||
return
|
||||
|
||||
if self.is_playing:
|
||||
return
|
||||
|
||||
self.is_playing = True
|
||||
self.stop_event.clear()
|
||||
self.pause_event.clear()
|
||||
self.current_index = self.playlist_listbox.curselection()[0] if self.playlist_listbox.curselection() else 0
|
||||
self.play_thread = threading.Thread(target=self.play_audio, daemon=True)
|
||||
self.play_thread.start()
|
||||
self.update_status(f"正在播放:{os.path.basename(self.playlist[self.current_index])}", "green")
|
||||
|
||||
def play_audio(self):
|
||||
while True:
|
||||
if self.stop_event.is_set():
|
||||
break
|
||||
|
||||
if self.pause_event.is_set():
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
|
||||
# 检查当前索引是否有效
|
||||
if self.current_index >= len(self.playlist):
|
||||
if self.loop_playback.get(): # 如果勾选了循环播放
|
||||
self.current_index = 0 # 回到第一首
|
||||
else:
|
||||
break # 否则停止播放
|
||||
|
||||
file = self.playlist[self.current_index]
|
||||
self.playlist_listbox.selection_clear(0, tk.END)
|
||||
self.playlist_listbox.selection_set(self.current_index)
|
||||
self.playlist_listbox.activate(self.current_index)
|
||||
self.update_status(f"正在播放:{os.path.basename(self.playlist[self.current_index])}", "green")
|
||||
play_p3_file(file, self.stop_event, self.pause_event)
|
||||
|
||||
if self.stop_event.is_set():
|
||||
break
|
||||
|
||||
if not self.loop_playback.get(): # 如果没有勾选循环播放
|
||||
break # 播放完当前文件后停止
|
||||
|
||||
self.current_index += 1
|
||||
if self.current_index >= len(self.playlist):
|
||||
if self.loop_playback.get(): # 如果勾选了循环播放
|
||||
self.current_index = 0 # 回到第一首
|
||||
|
||||
self.is_playing = False
|
||||
self.is_paused = False
|
||||
self.update_status("播放已停止", "red")
|
||||
|
||||
def pause(self):
|
||||
if self.is_playing:
|
||||
self.is_paused = not self.is_paused
|
||||
if self.is_paused:
|
||||
self.pause_event.set()
|
||||
self.update_status("播放已暂停", "orange")
|
||||
else:
|
||||
self.pause_event.clear()
|
||||
self.update_status(f"正在播放:{os.path.basename(self.playlist[self.current_index])}", "green")
|
||||
|
||||
def stop(self):
|
||||
if self.is_playing or self.is_paused:
|
||||
self.is_playing = False
|
||||
self.is_paused = False
|
||||
self.stop_event.set()
|
||||
self.pause_event.clear()
|
||||
self.update_status("播放已停止", "red")
|
||||
|
||||
def remove_files(self):
|
||||
selected_indices = self.playlist_listbox.curselection()
|
||||
if not selected_indices:
|
||||
messagebox.showwarning("警告", "请先选择要移除的文件!")
|
||||
return
|
||||
|
||||
for index in reversed(selected_indices):
|
||||
self.playlist.pop(index)
|
||||
self.update_playlist()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
root = tk.Tk()
|
||||
app = P3PlayerApp(root)
|
||||
root.mainloop()
|
||||
@@ -0,0 +1,71 @@
|
||||
# 播放p3格式的音频文件
|
||||
import opuslib
|
||||
import struct
|
||||
import numpy as np
|
||||
import sounddevice as sd
|
||||
import argparse
|
||||
|
||||
def play_p3_file(input_file):
|
||||
"""
|
||||
播放p3格式的音频文件
|
||||
p3格式: [1字节类型, 1字节保留, 2字节长度, Opus数据]
|
||||
"""
|
||||
# 初始化Opus解码器
|
||||
sample_rate = 16000 # 采样率固定为16000Hz
|
||||
channels = 1 # 单声道
|
||||
decoder = opuslib.Decoder(sample_rate, channels)
|
||||
|
||||
# 帧大小 (60ms)
|
||||
frame_size = int(sample_rate * 60 / 1000)
|
||||
|
||||
# 打开音频流
|
||||
stream = sd.OutputStream(
|
||||
samplerate=sample_rate,
|
||||
channels=channels,
|
||||
dtype='int16'
|
||||
)
|
||||
stream.start()
|
||||
|
||||
try:
|
||||
with open(input_file, 'rb') as f:
|
||||
print(f"正在播放: {input_file}")
|
||||
|
||||
while True:
|
||||
# 读取头部 (4字节)
|
||||
header = f.read(4)
|
||||
if not header or len(header) < 4:
|
||||
break
|
||||
|
||||
# 解析头部
|
||||
packet_type, reserved, data_len = struct.unpack('>BBH', header)
|
||||
|
||||
# 读取Opus数据
|
||||
opus_data = f.read(data_len)
|
||||
if not opus_data or len(opus_data) < data_len:
|
||||
break
|
||||
|
||||
# 解码Opus数据
|
||||
pcm_data = decoder.decode(opus_data, frame_size)
|
||||
|
||||
# 将字节转换为numpy数组
|
||||
audio_array = np.frombuffer(pcm_data, dtype=np.int16)
|
||||
|
||||
# 播放音频
|
||||
stream.write(audio_array)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n播放已停止")
|
||||
finally:
|
||||
stream.stop()
|
||||
stream.close()
|
||||
print("播放完成")
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='播放p3格式的音频文件')
|
||||
parser.add_argument('input_file', help='输入的p3文件路径')
|
||||
args = parser.parse_args()
|
||||
|
||||
play_p3_file(args.input_file)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,7 @@
|
||||
librosa>=0.9.2
|
||||
opuslib>=3.0.1
|
||||
numpy>=1.20.0
|
||||
tqdm>=4.62.0
|
||||
sounddevice>=0.4.4
|
||||
pyloudnorm>=0.1.1
|
||||
soundfile>=0.13.1
|
||||
@@ -0,0 +1,141 @@
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
import zipfile
|
||||
import argparse
|
||||
|
||||
# 切换到项目根目录
|
||||
os.chdir(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
def get_board_type():
|
||||
with open("build/compile_commands.json") as f:
|
||||
data = json.load(f)
|
||||
for item in data:
|
||||
if not item["file"].endswith("main.cc"):
|
||||
continue
|
||||
command = item["command"]
|
||||
# extract -DBOARD_TYPE=xxx
|
||||
board_type = command.split("-DBOARD_TYPE=\\\"")[1].split("\\\"")[0].strip()
|
||||
return board_type
|
||||
return None
|
||||
|
||||
def get_project_version():
|
||||
with open("CMakeLists.txt") as f:
|
||||
for line in f:
|
||||
if line.startswith("set(PROJECT_VER"):
|
||||
return line.split("\"")[1].split("\"")[0].strip()
|
||||
return None
|
||||
|
||||
def merge_bin():
|
||||
if os.system("idf.py merge-bin") != 0:
|
||||
print("merge bin failed")
|
||||
sys.exit(1)
|
||||
|
||||
def zip_bin(board_type, project_version):
|
||||
if not os.path.exists("releases"):
|
||||
os.makedirs("releases")
|
||||
output_path = f"releases/v{project_version}_{board_type}.zip"
|
||||
if os.path.exists(output_path):
|
||||
os.remove(output_path)
|
||||
with zipfile.ZipFile(output_path, 'w', compression=zipfile.ZIP_DEFLATED) as zipf:
|
||||
zipf.write("build/merged-binary.bin", arcname="merged-binary.bin")
|
||||
print(f"zip bin to {output_path} done")
|
||||
|
||||
|
||||
def release_current():
|
||||
merge_bin()
|
||||
board_type = get_board_type()
|
||||
print("board type:", board_type)
|
||||
project_version = get_project_version()
|
||||
print("project version:", project_version)
|
||||
zip_bin(board_type, project_version)
|
||||
|
||||
def get_all_board_types():
|
||||
board_configs = {}
|
||||
with open("main/CMakeLists.txt", encoding='utf-8') as f:
|
||||
lines = f.readlines()
|
||||
for i, line in enumerate(lines):
|
||||
# 查找 if(CONFIG_BOARD_TYPE_*) 行
|
||||
if "if(CONFIG_BOARD_TYPE_" in line:
|
||||
config_name = line.strip().split("if(")[1].split(")")[0]
|
||||
# 查找下一行的 set(BOARD_TYPE "xxx")
|
||||
next_line = lines[i + 1].strip()
|
||||
if next_line.startswith("set(BOARD_TYPE"):
|
||||
board_type = next_line.split('"')[1]
|
||||
board_configs[config_name] = board_type
|
||||
return board_configs
|
||||
|
||||
def release(board_type, board_config, config_filename="config.json"):
|
||||
config_path = f"main/boards/{board_type}/{config_filename}"
|
||||
if not os.path.exists(config_path):
|
||||
print(f"跳过 {board_type} 因为 {config_filename} 不存在")
|
||||
return
|
||||
|
||||
# Print Project Version
|
||||
project_version = get_project_version()
|
||||
print(f"Project Version: {project_version}", config_path)
|
||||
|
||||
with open(config_path, "r") as f:
|
||||
config = json.load(f)
|
||||
target = config["target"]
|
||||
builds = config["builds"]
|
||||
|
||||
for build in builds:
|
||||
name = build["name"]
|
||||
if not name.startswith(board_type):
|
||||
raise ValueError(f"name {name} 必须以 {board_type} 开头")
|
||||
output_path = f"releases/v{project_version}_{name}.zip"
|
||||
if os.path.exists(output_path):
|
||||
print(f"跳过 {board_type} 因为 {output_path} 已存在")
|
||||
continue
|
||||
|
||||
sdkconfig_append = [f"{board_config}=y"]
|
||||
for append in build.get("sdkconfig_append", []):
|
||||
sdkconfig_append.append(append)
|
||||
print(f"name: {name}")
|
||||
print(f"target: {target}")
|
||||
for append in sdkconfig_append:
|
||||
print(f"sdkconfig_append: {append}")
|
||||
# unset IDF_TARGET
|
||||
os.environ.pop("IDF_TARGET", None)
|
||||
# Call set-target
|
||||
if os.system(f"idf.py set-target {target}") != 0:
|
||||
print("set-target failed")
|
||||
sys.exit(1)
|
||||
# Append sdkconfig
|
||||
with open("sdkconfig", "a") as f:
|
||||
f.write("\n")
|
||||
for append in sdkconfig_append:
|
||||
f.write(f"{append}\n")
|
||||
# Build with macro BOARD_NAME defined to name
|
||||
if os.system(f"idf.py -DBOARD_NAME={name} build") != 0:
|
||||
print("build failed")
|
||||
sys.exit(1)
|
||||
# Call merge-bin
|
||||
if os.system("idf.py merge-bin") != 0:
|
||||
print("merge-bin failed")
|
||||
sys.exit(1)
|
||||
# Zip bin
|
||||
zip_bin(name, project_version)
|
||||
print("-" * 80)
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("board", nargs="?", default=None, help="板子类型或 all")
|
||||
parser.add_argument("-c", "--config", default="config.json", help="指定 config 文件名,默认 config.json")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.board:
|
||||
board_configs = get_all_board_types()
|
||||
found = False
|
||||
for board_config, board_type in board_configs.items():
|
||||
if args.board == 'all' or board_type == args.board:
|
||||
release(board_type, board_config, config_filename=args.config)
|
||||
found = True
|
||||
if not found:
|
||||
print(f"未找到板子类型: {args.board}")
|
||||
print("可用的板子类型:")
|
||||
for board_type in board_configs.values():
|
||||
print(f" {board_type}")
|
||||
else:
|
||||
release_current()
|
||||
@@ -0,0 +1,207 @@
|
||||
#! /usr/bin/env python3
|
||||
from dotenv import load_dotenv
|
||||
load_dotenv()
|
||||
|
||||
import os
|
||||
import struct
|
||||
import zipfile
|
||||
import oss2
|
||||
import json
|
||||
import requests
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
# 切换到项目根目录
|
||||
os.chdir(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
def get_chip_id_string(chip_id):
|
||||
return {
|
||||
0x0000: "esp32",
|
||||
0x0002: "esp32s2",
|
||||
0x0005: "esp32c3",
|
||||
0x0009: "esp32s3",
|
||||
0x000C: "esp32c2",
|
||||
0x000D: "esp32c6",
|
||||
0x0010: "esp32h2",
|
||||
0x0011: "esp32c5",
|
||||
0x0012: "esp32p4",
|
||||
0x0017: "esp32c5",
|
||||
}[chip_id]
|
||||
|
||||
def get_flash_size(flash_size):
|
||||
MB = 1024 * 1024
|
||||
return {
|
||||
0x00: 1 * MB,
|
||||
0x01: 2 * MB,
|
||||
0x02: 4 * MB,
|
||||
0x03: 8 * MB,
|
||||
0x04: 16 * MB,
|
||||
0x05: 32 * MB,
|
||||
0x06: 64 * MB,
|
||||
0x07: 128 * MB,
|
||||
}[flash_size]
|
||||
|
||||
def get_app_desc(data):
|
||||
magic = struct.unpack("<I", data[0x00:0x04])[0]
|
||||
if magic != 0xabcd5432:
|
||||
raise Exception("Invalid app desc magic")
|
||||
version = data[0x10:0x30].decode("utf-8").strip('\0')
|
||||
project_name = data[0x30:0x50].decode("utf-8").strip('\0')
|
||||
time = data[0x50:0x60].decode("utf-8").strip('\0')
|
||||
date = data[0x60:0x70].decode("utf-8").strip('\0')
|
||||
idf_ver = data[0x70:0x90].decode("utf-8").strip('\0')
|
||||
elf_sha256 = data[0x90:0xb0].hex()
|
||||
return {
|
||||
"name": project_name,
|
||||
"version": version,
|
||||
"compile_time": date + "T" + time,
|
||||
"idf_version": idf_ver,
|
||||
"elf_sha256": elf_sha256,
|
||||
}
|
||||
|
||||
def get_board_name(folder):
|
||||
basename = os.path.basename(folder)
|
||||
if basename.startswith("v0.2"):
|
||||
return "bread-simple"
|
||||
if basename.startswith("v0.3") or basename.startswith("v0.4") or basename.startswith("v0.5") or basename.startswith("v0.6"):
|
||||
if "ML307" in basename:
|
||||
return "bread-compact-ml307"
|
||||
elif "WiFi" in basename:
|
||||
return "bread-compact-wifi"
|
||||
elif "KevinBox1" in basename:
|
||||
return "kevin-box-1"
|
||||
if basename.startswith("v0.7") or basename.startswith("v0.8") or basename.startswith("v0.9") or basename.startswith("v1.") or basename.startswith("v2."):
|
||||
return basename.split("_")[1]
|
||||
raise Exception(f"Unknown board name: {basename}")
|
||||
|
||||
def read_binary(dir_path):
|
||||
merged_bin_path = os.path.join(dir_path, "merged-binary.bin")
|
||||
merged_bin_data = open(merged_bin_path, "rb").read()
|
||||
|
||||
# find app partition
|
||||
if merged_bin_data[0x100000] == 0xE9:
|
||||
data = merged_bin_data[0x100000:]
|
||||
elif merged_bin_data[0x200000] == 0xE9:
|
||||
data = merged_bin_data[0x200000:]
|
||||
elif merged_bin_data[0xe0000] == 0xE9:
|
||||
data = merged_bin_data[0xe0000:]
|
||||
else:
|
||||
print(dir_path, "is not a valid image")
|
||||
return
|
||||
# get flash size
|
||||
flash_size = get_flash_size(data[0x3] >> 4)
|
||||
chip_id = get_chip_id_string(data[0xC])
|
||||
# get segments
|
||||
segment_count = data[0x1]
|
||||
segments = []
|
||||
offset = 0x18
|
||||
for i in range(segment_count):
|
||||
segment_size = struct.unpack("<I", data[offset + 4:offset + 8])[0]
|
||||
offset += 8
|
||||
segment_data = data[offset:offset + segment_size]
|
||||
offset += segment_size
|
||||
segments.append(segment_data)
|
||||
assert offset < len(data), "offset is out of bounds"
|
||||
|
||||
# extract bin file
|
||||
bin_path = os.path.join(dir_path, "xiaozhi.bin")
|
||||
if not os.path.exists(bin_path):
|
||||
print("extract bin file to", bin_path)
|
||||
open(bin_path, "wb").write(data)
|
||||
|
||||
# The app desc is in the first segment
|
||||
desc = get_app_desc(segments[0])
|
||||
return {
|
||||
"chip_id": chip_id,
|
||||
"flash_size": flash_size,
|
||||
"board": get_board_name(dir_path),
|
||||
"application": desc,
|
||||
"firmware_size": len(data),
|
||||
}
|
||||
|
||||
def extract_zip(zip_path, extract_path):
|
||||
if not os.path.exists(extract_path):
|
||||
os.makedirs(extract_path)
|
||||
print(f"Extracting {zip_path} to {extract_path}")
|
||||
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||||
zip_ref.extractall(extract_path)
|
||||
|
||||
def upload_dir_to_oss(source_dir, target_dir):
|
||||
auth = oss2.Auth(os.environ['OSS_ACCESS_KEY_ID'], os.environ['OSS_ACCESS_KEY_SECRET'])
|
||||
bucket = oss2.Bucket(auth, os.environ['OSS_ENDPOINT'], os.environ['OSS_BUCKET_NAME'])
|
||||
for filename in os.listdir(source_dir):
|
||||
oss_key = os.path.join(target_dir, filename)
|
||||
print('uploading', oss_key)
|
||||
bucket.put_object(oss_key, open(os.path.join(source_dir, filename), 'rb'))
|
||||
|
||||
def post_info_to_server(info):
|
||||
"""
|
||||
将固件信息发送到服务器
|
||||
|
||||
Args:
|
||||
info: 包含固件信息的字典
|
||||
"""
|
||||
try:
|
||||
# 从环境变量获取服务器URL和token
|
||||
server_url = os.environ.get('VERSIONS_SERVER_URL')
|
||||
server_token = os.environ.get('VERSIONS_TOKEN')
|
||||
|
||||
if not server_url or not server_token:
|
||||
raise Exception("Missing SERVER_URL or TOKEN in environment variables")
|
||||
|
||||
# 准备请求头和数据
|
||||
headers = {
|
||||
'Authorization': f'Bearer {server_token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
# 发送POST请求
|
||||
response = requests.post(
|
||||
server_url,
|
||||
headers=headers,
|
||||
json={'jsonData': json.dumps(info)}
|
||||
)
|
||||
|
||||
# 检查响应状态
|
||||
response.raise_for_status()
|
||||
|
||||
print(f"Successfully uploaded version info for tag: {info['tag']}")
|
||||
|
||||
except RequestException as e:
|
||||
if hasattr(e.response, 'json'):
|
||||
error_msg = e.response.json().get('error', str(e))
|
||||
else:
|
||||
error_msg = str(e)
|
||||
print(f"Failed to upload version info: {error_msg}")
|
||||
raise
|
||||
except Exception as e:
|
||||
print(f"Error uploading version info: {str(e)}")
|
||||
raise
|
||||
|
||||
def main():
|
||||
release_dir = "releases"
|
||||
# look for zip files startswith "v"
|
||||
for name in os.listdir(release_dir):
|
||||
if name.startswith("v") and name.endswith(".zip"):
|
||||
tag = name[:-4]
|
||||
folder = os.path.join(release_dir, tag)
|
||||
info_path = os.path.join(folder, "info.json")
|
||||
if not os.path.exists(info_path):
|
||||
if not os.path.exists(folder):
|
||||
os.makedirs(folder)
|
||||
extract_zip(os.path.join(release_dir, name), folder)
|
||||
info = read_binary(folder)
|
||||
target_dir = os.path.join("firmwares", tag)
|
||||
info["tag"] = tag
|
||||
info["url"] = os.path.join(os.environ['OSS_BUCKET_URL'], target_dir, "xiaozhi.bin")
|
||||
open(info_path, "w").write(json.dumps(info, indent=4))
|
||||
# upload all file to oss
|
||||
upload_dir_to_oss(folder, target_dir)
|
||||
# read info.json
|
||||
info = json.load(open(info_path))
|
||||
# post info.json to server
|
||||
post_info_to_server(info)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user