464 lines
17 KiB
Python
464 lines
17 KiB
Python
import requests
|
||
import re
|
||
import os
|
||
import sys
|
||
import json
|
||
import time
|
||
import subprocess
|
||
import xml.etree.ElementTree as ET
|
||
|
||
# 当前程序只能一键对所有课程进行刷课,如需选择课程请自行修改代码
|
||
|
||
# cookie 请替换为你的登录 Cookie,在视频页从控制台执行Job.updateStatus()请求头中获取,其中的负载参数有很多字段,包括课程id等信息,您可参考字段删除内容以防止已完成课程的刷取,即使代码中包含了完成判断逻辑
|
||
|
||
# 您可根据视频心跳请求中的负载参数修改json缓存从而达到跳过指定课程的目的,十分银杏
|
||
|
||
# [警告] 下方的 cookie 包含敏感登录信息,请勿泄露给他人!
|
||
cookie = ""
|
||
|
||
url = "https://zjbc.cjnep.net/lms/web/course/index"
|
||
cache_file = "course_ids.json"
|
||
items_cache_file = "course_items.json"
|
||
progress_cache_file = "progress.json"
|
||
|
||
# [配置] 心跳间隔(秒)
|
||
# 警告:建议设置在 60 秒以上,过快的心跳可能面临风险!
|
||
HEARTBEAT_INTERVAL = 0
|
||
|
||
# [配置] 每次心跳增加的进度时间(秒)
|
||
ADD_TIME = 120
|
||
|
||
headers = {
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||
"Cookie": cookie
|
||
}
|
||
|
||
# -------------------------------------------------------------------------
|
||
# 辅助函数:进度保存与读取
|
||
# -------------------------------------------------------------------------
|
||
|
||
|
||
def load_progress():
|
||
if os.path.exists(progress_cache_file):
|
||
try:
|
||
with open(progress_cache_file, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except:
|
||
return {}
|
||
return {}
|
||
|
||
|
||
def save_progress(course_id, item_id, current_time, total_time, finished):
|
||
data = load_progress()
|
||
key = f"{course_id}_{item_id}"
|
||
data[key] = {
|
||
"current_time": current_time,
|
||
"total_time": total_time,
|
||
"finished": finished,
|
||
"timestamp": time.time()
|
||
}
|
||
try:
|
||
with open(progress_cache_file, "w", encoding="utf-8") as f:
|
||
json.dump(data, f, indent=2)
|
||
except Exception as e:
|
||
print(f" [Warning] 保存进度失败: {e}")
|
||
|
||
# -------------------------------------------------------------------------
|
||
# 辅助函数:模拟刷课逻辑
|
||
# -------------------------------------------------------------------------
|
||
|
||
|
||
def get_video_duration(video_url):
|
||
"""
|
||
使用 ffprobe 获取视频时长
|
||
"""
|
||
try:
|
||
cmd = [
|
||
"ffprobe",
|
||
"-v", "error",
|
||
"-show_entries", "format=duration",
|
||
"-of", "default=noprint_wrappers=1:nokey=1",
|
||
video_url
|
||
]
|
||
# Run ffprobe
|
||
result = subprocess.run(
|
||
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30)
|
||
if result.returncode == 0:
|
||
try:
|
||
duration = float(result.stdout.strip())
|
||
return duration
|
||
except ValueError:
|
||
print(f" [Error] ffprobe output invalid: {result.stdout}")
|
||
return None
|
||
else:
|
||
print(f" [Error] ffprobe failed: {result.stderr}")
|
||
return None
|
||
except Exception as e:
|
||
print(f" [Error] get_video_duration exception: {e}")
|
||
return None
|
||
|
||
|
||
def process_video(course_id, item_id, current_index=0, total_items=0):
|
||
"""
|
||
处理单个视频:
|
||
1. 获取视频页,提取 config URL
|
||
2. 请求 config XML,获取 userId, updStatusUrl, totalTime, historyId 等
|
||
3. 循环发送心跳包,直到视频看完
|
||
"""
|
||
print(f"\n>>> 开始处理视频: CourseId={course_id}, ItemId={item_id}")
|
||
if total_items > 0:
|
||
print(f" 当前进度: 第 {current_index} 集 / 共 {total_items} 集")
|
||
|
||
# 1. 请求视频播放页
|
||
video_page_url = f"https://zjbc.cjnep.net/lms/web/course/view?id={course_id}&itemid={item_id}"
|
||
try:
|
||
resp = requests.get(video_page_url, headers=headers)
|
||
resp.raise_for_status()
|
||
except Exception as e:
|
||
print(f" [Error] 无法访问视频页: {e}")
|
||
return
|
||
|
||
# 2. 提取 config URL
|
||
# 示例: config: '/lms/web/course/startupxml?courseid=677&itemid=142573&historyid=5740083&is_subcourse=0'
|
||
config_match = re.search(r"config:\s*'([^']+)'", resp.text)
|
||
if not config_match:
|
||
print(" [Error] 未找到 config URL,跳过此视频。")
|
||
return
|
||
|
||
config_path = config_match.group(1)
|
||
config_url = "https://zjbc.cjnep.net" + config_path
|
||
print(f" Config URL: {config_url}")
|
||
|
||
# 3. 请求 XML 配置
|
||
try:
|
||
xml_resp = requests.get(config_url, headers=headers)
|
||
xml_resp.raise_for_status()
|
||
except Exception as e:
|
||
print(f" [Error] 无法获取 XML 配置: {e}")
|
||
return
|
||
|
||
# 4. 解析 XML
|
||
try:
|
||
root = ET.fromstring(xml_resp.text)
|
||
|
||
# 提取关键参数
|
||
user_id = root.findtext("userId")
|
||
upd_status_url = root.findtext("updStatusUrl")
|
||
total_time_str = root.findtext("totalTime")
|
||
history_id = root.findtext("historyId")
|
||
finish_status = root.findtext("finish")
|
||
sco_title = root.findtext("scoTitle")
|
||
|
||
if sco_title:
|
||
print(f" [课程名称] {sco_title}")
|
||
|
||
# 尝试获取视频 URL
|
||
video_url = None
|
||
file_url_node = root.find("fileUrl")
|
||
if file_url_node is not None:
|
||
video_url = file_url_node.findtext("normal")
|
||
|
||
# 如果 XML 里是相对路径,需要补全
|
||
if upd_status_url and not upd_status_url.startswith("http"):
|
||
upd_status_url = "https://zjbc.cjnep.net" + upd_status_url
|
||
|
||
# 检查必要字段 (totalTime 除外)
|
||
if not (user_id and upd_status_url and history_id):
|
||
print(" [Error] XML 解析缺少关键字段 (userId/updStatusUrl/historyId)")
|
||
return
|
||
|
||
# 读取本地缓存
|
||
local_progress = load_progress().get(f"{course_id}_{item_id}", {})
|
||
local_finished = local_progress.get("finished", False)
|
||
|
||
# 优先检查是否已完成,如果已完成则跳过获取时长(避免 ffprobe)
|
||
if local_finished or finish_status == "true":
|
||
print(
|
||
f" [Skip] 视频已完成 (XML Finish: {finish_status}, Local Finish: {local_finished})")
|
||
return
|
||
|
||
# 获取时长
|
||
total_time = 0.0
|
||
if total_time_str:
|
||
total_time = float(total_time_str)
|
||
elif video_url:
|
||
print(" [Info] XML 中未找到 totalTime,尝试通过 ffprobe 获取视频时长...")
|
||
duration = get_video_duration(video_url)
|
||
if duration:
|
||
total_time = duration
|
||
print(f" [Info] 获取到视频时长: {total_time}s")
|
||
else:
|
||
print(" [Error] 无法获取视频时长,跳过此视频")
|
||
return
|
||
else:
|
||
print(" [Error] XML 中既无 totalTime 也无 videoUrl,无法继续")
|
||
return
|
||
|
||
print(
|
||
f" [Info] UserId={user_id}, HistoryId={history_id}, TotalTime={total_time}")
|
||
|
||
except ET.ParseError:
|
||
print(" [Error] XML 解析失败")
|
||
return
|
||
|
||
# 5. 循环发送心跳
|
||
# 初始进度:优先从本地缓存读取,其次从 XML 读取
|
||
|
||
local_time = local_progress.get("current_time", 0.0)
|
||
|
||
# 读取 XML 进度
|
||
last_viewed_time_str = root.findtext("lastViewedTime")
|
||
xml_time = float(last_viewed_time_str) if last_viewed_time_str else 0.0
|
||
|
||
# 选取最大的进度
|
||
current_time = max(local_time, xml_time)
|
||
|
||
# 如果进度已达标,则跳过
|
||
if current_time >= total_time:
|
||
print(
|
||
f" [Skip] 视频已完成 (Progress: {current_time}/{total_time})")
|
||
return
|
||
|
||
print(f" [Start] 当前进度: {current_time}s / {total_time}s")
|
||
|
||
is_first = "true"
|
||
|
||
while True:
|
||
# 如果已经看完
|
||
if current_time >= total_time:
|
||
print(" [Done] 视频已看完。")
|
||
# 发送最后一次 finished=1 的包
|
||
send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
|
||
add_time=0, current_time=0, total_time=total_time,
|
||
finished=1, first_update=is_first)
|
||
save_progress(course_id, item_id, total_time, total_time, True)
|
||
break
|
||
|
||
# 模拟观看
|
||
step = ADD_TIME
|
||
# 如果剩余时间不足
|
||
if current_time + step >= total_time:
|
||
step = int(total_time - current_time) + 1 # 稍微多一点确保覆盖
|
||
current_time = total_time
|
||
finished_flag = 1
|
||
post_current_time = 0 # 完成时传 0
|
||
else:
|
||
current_time += step
|
||
finished_flag = 0
|
||
post_current_time = current_time
|
||
|
||
# 发送心跳
|
||
success = send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
|
||
add_time=step, current_time=post_current_time, total_time=total_time,
|
||
finished=finished_flag, first_update=is_first)
|
||
|
||
if not success:
|
||
print(" [Error] 心跳发送失败,停止当前视频。")
|
||
break
|
||
|
||
# 保存进度
|
||
save_progress(course_id, item_id, current_time,
|
||
total_time, finished_flag == 1)
|
||
|
||
if finished_flag == 1:
|
||
print(" [Done] 视频播放结束。")
|
||
break
|
||
|
||
is_first = "false"
|
||
|
||
# 模拟等待
|
||
print(
|
||
f" [Progress] 进度更新: {current_time:.1f}/{total_time} (add {step}s)...")
|
||
time.sleep(HEARTBEAT_INTERVAL)
|
||
|
||
|
||
def send_heartbeat(url, user_id, course_id, sco_id, history_id, add_time, current_time, total_time, finished, first_update):
|
||
"""
|
||
发送单个心跳包,带重试机制
|
||
"""
|
||
has_check_three = "true" if first_update == "true" else "false"
|
||
|
||
has_check_two = "false"
|
||
try:
|
||
if float(total_time) > 0 and float(current_time) > (float(total_time) / 2):
|
||
has_check_two = "true"
|
||
except:
|
||
pass
|
||
|
||
has_check_one = "true" if str(finished) == "1" else "false"
|
||
|
||
payload = {
|
||
"userId": user_id,
|
||
"courseId": course_id,
|
||
"scoId": sco_id,
|
||
"historyId": history_id,
|
||
"addTime": str(add_time),
|
||
"totalTime": str(total_time),
|
||
"finished": finished,
|
||
"currentTime": str(current_time),
|
||
"hasCheckOne": has_check_one,
|
||
"hasCheckTwo": has_check_two,
|
||
"hasCheckThree": has_check_three,
|
||
"firstUpdate": first_update
|
||
}
|
||
|
||
max_retries = 5
|
||
for attempt in range(max_retries):
|
||
try:
|
||
# 注意:这里使用 data=payload 发送 application/x-www-form-urlencoded
|
||
resp = requests.post(
|
||
url, data=payload, headers=headers, timeout=10)
|
||
resp.raise_for_status()
|
||
# 检查响应内容,有些服务器返回 XML 状态
|
||
# <root><status>1</status></root>
|
||
if "<status>1</status>" in resp.text:
|
||
return True
|
||
elif "<status>-1</status>" in resp.text:
|
||
print(" [Warning] 服务器返回 status -1 (可能多端登录或异常)")
|
||
return False
|
||
return True
|
||
except Exception as e:
|
||
print(f" [Warning] 请求异常 (尝试 {attempt+1}/{max_retries}): {e}")
|
||
if attempt < max_retries - 1:
|
||
time.sleep(2) # 重试前等待
|
||
else:
|
||
print(" [Error] 重试次数耗尽,放弃本次心跳。")
|
||
return False
|
||
return False
|
||
|
||
|
||
# -------------------------------------------------------------------------
|
||
# 主程序逻辑
|
||
# -------------------------------------------------------------------------
|
||
|
||
print("\n" + "="*60)
|
||
print("欢迎使用自动刷课脚本 v1.0")
|
||
print("此脚本通过视频心跳请求模拟观看课程视频,以达到刷课目的。功耗极低")
|
||
print("注意:请确保 Cookie 有效,且已安装 ffprobe")
|
||
print(f"当前心跳间隔: {HEARTBEAT_INTERVAL}秒 (警告:请勿设置过低,以免封号)")
|
||
print(f"每次进度增加: {ADD_TIME}秒")
|
||
print("您需要安装ffmpeg工具集,确保ffprobe命令可用,可从https://ffmpeg.org/download.html 下载")
|
||
print("脚本仅供学习交流使用,请勿用于商业用途!")
|
||
print("作者:NCJOAQ & Github Compilot")
|
||
print("="*60 + "\n")
|
||
|
||
course_data = []
|
||
|
||
# 1. Try to load existing items data
|
||
if os.path.exists(items_cache_file) and os.path.getsize(items_cache_file) > 0:
|
||
try:
|
||
with open(items_cache_file, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
|
||
if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict) and "courseId" in data[0]:
|
||
print("检测到课程项目缓存文件且有效。")
|
||
course_data = data
|
||
except json.JSONDecodeError:
|
||
print("课程项目缓存文件损坏,已忽略。")
|
||
|
||
# 2. 如果没有缓存数据,则执行之前的抓取逻辑 (此处省略抓取代码,假设已有缓存或手动抓取)
|
||
# 为了保持代码简洁,这里假设你已经运行过一次脚本生成了 course_items.json
|
||
# 如果 course_data 为空,说明需要先生成缓存。
|
||
|
||
if not course_data:
|
||
print("未找到有效的课程数据缓存 (course_items.json)。")
|
||
print("请先运行脚本生成缓存,或检查网络连接。")
|
||
|
||
# --- 原有的抓取逻辑 (简化版) ---
|
||
# (此处保留你之前的抓取代码,如果需要的话,可以把之前的抓取逻辑放回来)
|
||
# ...
|
||
# -----------------------------
|
||
|
||
# 临时:如果没有数据就退出,提示用户
|
||
# sys.exit(1)
|
||
|
||
# 或者,为了完整性,保留之前的抓取逻辑:
|
||
print("开始执行抓取流程...")
|
||
# ... (这里可以粘贴之前的抓取代码,为了不让文件太长,我先假设你已经有了 json) ...
|
||
# 如果你需要我把抓取代码也完整合并进来,请告诉我。
|
||
|
||
# 这里为了演示,我把之前的抓取逻辑简单复原一下:
|
||
course_ids = []
|
||
# 尝试读取课程ID缓存
|
||
if os.path.exists(cache_file) and os.path.getsize(cache_file) > 0:
|
||
print(f"正在读取课程ID缓存: {cache_file}")
|
||
try:
|
||
with open(cache_file, "r", encoding="utf-8") as f:
|
||
course_ids = json.load(f)
|
||
except Exception as e:
|
||
print(f"[Warning] 读取课程ID缓存失败: {e}")
|
||
|
||
# 如果没有缓存,从网络获取
|
||
if not course_ids:
|
||
print("正在从服务器获取课程列表...")
|
||
try:
|
||
resp = requests.get(url, headers=headers)
|
||
resp.raise_for_status()
|
||
# 正则匹配课程ID
|
||
course_ids = re.findall(
|
||
r"window\.location\s*=\s*['\"]/lms/web/course/detail\?id=(\d+)['\"]", resp.text)
|
||
|
||
if course_ids:
|
||
print(f"成功获取到 {len(course_ids)} 门课程。")
|
||
with open(cache_file, "w", encoding="utf-8") as f:
|
||
json.dump(course_ids, f, indent=2)
|
||
else:
|
||
print("[Error] 未能获取到任何课程ID,请检查Cookie是否失效或页面结构变更。")
|
||
except Exception as e:
|
||
print(f"[Error] 获取课程列表请求失败: {e}")
|
||
|
||
# 遍历课程获取视频Item ID
|
||
if course_ids:
|
||
print(f"开始获取每门课程的视频列表 (共 {len(course_ids)} 门)...")
|
||
for index, cid in enumerate(course_ids):
|
||
print(f"[{index+1}/{len(course_ids)}] 正在解析课程 ID: {cid} ...",
|
||
end="", flush=True)
|
||
|
||
try:
|
||
detail_url = f"https://zjbc.cjnep.net/lms/web/course/detail?id={cid}"
|
||
resp = requests.get(detail_url, headers=headers)
|
||
resp.raise_for_status()
|
||
|
||
# 提取 itemid
|
||
item_ids = re.findall(r'[?&]itemid=(\d+)', resp.text)
|
||
|
||
# 去重
|
||
seen = set()
|
||
unique_item_ids = [x for x in item_ids if not (
|
||
x in seen or seen.add(x))]
|
||
|
||
course_data.append(
|
||
{"courseId": cid, "itemIds": unique_item_ids})
|
||
print(f" 发现 {len(unique_item_ids)} 个视频")
|
||
|
||
except Exception as e:
|
||
print(f"\n [Error] 获取课程 {cid} 详情失败: {e}")
|
||
|
||
# 避免请求过快
|
||
time.sleep(1)
|
||
|
||
# 保存最终结果
|
||
print(f"正在保存课程数据到 {items_cache_file} ...")
|
||
with open(items_cache_file, "w", encoding="utf-8") as f:
|
||
json.dump(course_data, f, indent=2)
|
||
print("课程数据抓取完成。")
|
||
else:
|
||
print("没有课程ID,跳过视频抓取步骤。")
|
||
|
||
|
||
# 3. 开始刷课
|
||
print(f"\n>>> 开始刷课任务,共 {len(course_data)} 门课程")
|
||
|
||
for course in course_data:
|
||
cid = course['courseId']
|
||
items = course['itemIds']
|
||
print(f"\n=== 正在处理课程 {cid},共 {len(items)} 个视频 ===")
|
||
|
||
for idx, item_id in enumerate(items):
|
||
process_video(cid, item_id, current_index=idx +
|
||
1, total_items=len(items))
|
||
# 视频之间休息一下
|
||
time.sleep(2)
|
||
|
||
print("\n所有任务完成。")
|