import requests
import re
import os
import sys
import json
import time
import subprocess
import xml.etree.ElementTree as ET
# 当前程序只能一键对所有课程进行刷课,如需选择课程请自行修改代码
# cookie 请替换为你的登录 Cookie,可在页面请求头中获取
# 您可根据视频心跳请求中的负载参数修改json缓存从而达到跳过指定课程的目的,十分银杏
# [警告] 下方的 cookie 包含敏感登录信息,请勿泄露给他人!
cookie = ""
url = "https://zjbc.cjnep.net/lms/web/course/index"
cache_file = "course_ids.json"
items_cache_file = "course_items.json"
progress_cache_file = "progress.json"
# [配置] 心跳间隔(秒)
# 警告:建议设置在 60 秒以上,过快的心跳可能面临风险!
HEARTBEAT_INTERVAL = 5
# [配置] 每次心跳增加的进度时间(秒)
ADD_TIME = 120
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Cookie": cookie
}
# -------------------------------------------------------------------------
# 辅助函数:进度保存与读取
# -------------------------------------------------------------------------
def load_progress():
if os.path.exists(progress_cache_file):
try:
with open(progress_cache_file, "r", encoding="utf-8") as f:
return json.load(f)
except:
return {}
return {}
def save_progress(course_id, item_id, current_time, total_time, finished):
data = load_progress()
key = f"{course_id}_{item_id}"
data[key] = {
"current_time": current_time,
"total_time": total_time,
"finished": finished,
"timestamp": time.time()
}
try:
with open(progress_cache_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f" [Warning] 保存进度失败: {e}")
# -------------------------------------------------------------------------
# 辅助函数:模拟刷课逻辑
# -------------------------------------------------------------------------
def get_video_duration(video_url):
"""
使用 ffprobe 获取视频时长
"""
try:
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_url
]
# Run ffprobe
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30)
if result.returncode == 0:
try:
duration = float(result.stdout.strip())
return duration
except ValueError:
print(f" [Error] ffprobe output invalid: {result.stdout}")
return None
else:
print(f" [Error] ffprobe failed: {result.stderr}")
return None
except Exception as e:
print(f" [Error] get_video_duration exception: {e}")
return None
def process_video(course_id, item_id, current_index=0, total_items=0):
"""
处理单个视频:
1. 获取视频页,提取 config URL
2. 请求 config XML,获取 userId, updStatusUrl, totalTime, historyId 等
3. 循环发送心跳包,直到视频看完
"""
# 优化日志头部
separator = "-" * 60
print(f"\n{separator}")
print(f"[+] 正在处理视频 | CourseID: {course_id} | ItemID: {item_id}")
if total_items > 0:
print(f"[*] 总体进度: 第 {current_index}/{total_items} 集")
# 1. 请求视频播放页
video_page_url = f"https://zjbc.cjnep.net/lms/web/course/view?id={course_id}&itemid={item_id}"
try:
resp = requests.get(video_page_url, headers=headers)
resp.raise_for_status()
except Exception as e:
print(f"[!] 无法访问视频页: {e}")
return
# 2. 提取 config URL
# 示例: config: '/lms/web/course/startupxml?courseid=677&itemid=142573&historyid=5740083&is_subcourse=0'
config_match = re.search(r"config:\s*'([^']+)'", resp.text)
if not config_match:
print("[!] 未找到 config URL,跳过此视频。")
return
config_path = config_match.group(1)
config_url = "https://zjbc.cjnep.net" + config_path
# print(f" Config URL: {config_url}") # 减少冗余日志
# 3. 请求 XML 配置
try:
xml_resp = requests.get(config_url, headers=headers)
xml_resp.raise_for_status()
except Exception as e:
print(f"[!] 无法获取 XML 配置: {e}")
return
# 4. 解析 XML
try:
root = ET.fromstring(xml_resp.text)
# 提取关键参数
user_id = root.findtext("userId")
upd_status_url = root.findtext("updStatusUrl")
total_time_str = root.findtext("totalTime")
history_id = root.findtext("historyId")
finish_status = root.findtext("finish")
sco_title = root.findtext("scoTitle")
if sco_title:
print(f"[*] 课程名称: {sco_title}")
# 尝试获取视频 URL
video_url = None
file_url_node = root.find("fileUrl")
if file_url_node is not None:
video_url = file_url_node.findtext("normal")
# 如果 XML 里是相对路径,需要补全
if upd_status_url and not upd_status_url.startswith("http"):
upd_status_url = "https://zjbc.cjnep.net" + upd_status_url
# 检查必要字段 (totalTime 除外)
if not (user_id and upd_status_url and history_id):
print("[!] XML 解析缺少关键字段 (userId/updStatusUrl/historyId)")
return
# 读取本地缓存
local_progress = load_progress().get(f"{course_id}_{item_id}", {})
local_finished = local_progress.get("finished", False)
# 优先检查是否已完成,如果已完成则跳过获取时长(避免 ffprobe)
if local_finished or finish_status == "true":
print(
f"[-] 视频已完成,跳过 (XML: {finish_status}, Local: {local_finished})")
return
# 获取时长
total_time = 0.0
if total_time_str:
total_time = float(total_time_str)
elif video_url:
print("[*] XML 中未找到 totalTime,尝试通过 ffprobe 获取视频时长...")
duration = get_video_duration(video_url)
if duration:
total_time = duration
print(f"[*] 获取到视频时长: {total_time}s")
else:
print("[!] 无法获取视频时长,跳过此视频")
return
else:
print("[!] XML 中既无 totalTime 也无 videoUrl,无法继续")
return
print(
f"[*] 视频信息: User={user_id} | History={history_id} | Duration={total_time}s")
except ET.ParseError:
print("[!] XML 解析失败")
return
# 5. 循环发送心跳
# 初始进度:优先从本地缓存读取,其次从 XML 读取
local_time = local_progress.get("current_time", 0.0)
# 读取 XML 进度
last_viewed_time_str = root.findtext("lastViewedTime")
xml_time = float(last_viewed_time_str) if last_viewed_time_str else 0.0
# 选取最大的进度
current_time = max(local_time, xml_time)
# 如果进度已达标,则跳过
if current_time >= total_time:
print(f"[-] 视频已完成 (Progress: {current_time}/{total_time})")
return
print(f"[>] 初始进度: {current_time}s / {total_time}s")
is_first = "true"
while True:
# 如果已经看完
if current_time >= total_time:
print("[+] 视频已看完,发送完成包...")
# 发送最后一次 finished=1 的包
send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
add_time=0, current_time=0, total_time=total_time,
finished=1, first_update=is_first)
save_progress(course_id, item_id, total_time, total_time, True)
break
# 模拟观看
step = ADD_TIME
# 如果剩余时间不足
if current_time + step >= total_time:
step = int(total_time - current_time) + 1 # 稍微多一点确保覆盖
current_time = total_time
finished_flag = 1
post_current_time = 0 # 完成时传 0
else:
current_time += step
finished_flag = 0
post_current_time = current_time
# 发送心跳
success = send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
add_time=step, current_time=post_current_time, total_time=total_time,
finished=finished_flag, first_update=is_first)
if not success:
print("[!] 心跳发送失败,停止当前视频。")
break
# 保存进度
save_progress(course_id, item_id, current_time,
total_time, finished_flag == 1)
if finished_flag == 1:
print("[+] 视频播放结束。")
break
is_first = "false"
# 模拟等待
percent = min(100, int((current_time / total_time) * 100))
print(
f" [Running] 进度: {current_time:.1f}/{total_time}s (+{step}s) | {percent}%")
time.sleep(HEARTBEAT_INTERVAL)
def send_heartbeat(url, user_id, course_id, sco_id, history_id, add_time, current_time, total_time, finished, first_update):
"""
发送单个心跳包,带重试机制
"""
has_check_three = "true" if first_update == "true" else "false"
has_check_two = "false"
try:
if float(total_time) > 0 and float(current_time) > (float(total_time) / 2):
has_check_two = "true"
except:
pass
has_check_one = "true" if str(finished) == "1" else "false"
payload = {
"userId": user_id,
"courseId": course_id,
"scoId": sco_id,
"historyId": history_id,
"addTime": str(add_time),
"totalTime": str(total_time),
"finished": finished,
"currentTime": str(current_time),
"hasCheckOne": has_check_one,
"hasCheckTwo": has_check_two,
"hasCheckThree": has_check_three,
"firstUpdate": first_update
}
max_retries = 5
for attempt in range(max_retries):
try:
# 注意:这里使用 data=payload 发送 application/x-www-form-urlencoded
resp = requests.post(
url, data=payload, headers=headers, timeout=10)
resp.raise_for_status()
# 检查响应内容,有些服务器返回 XML 状态
#