Files
QuickBrush/main.py
2025-12-12 11:19:14 +08:00

415 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import requests
import re
import os
import sys
import json
import time
import subprocess
import xml.etree.ElementTree as ET
# 当前程序只能一键对所有课程进行刷课,如需选择课程请自行修改代码
# cookie 请替换为你的登录 Cookie在视频页从控制台执行Job.updateStatus()请求头中获取其中的负载参数有很多字段包括课程id等信息您可参考字段删除内容以防止已完成课程的刷取即使代码中包含了完成判断逻辑
# 您可根据视频心跳请求中的负载参数修改json缓存从而达到跳过指定课程的目的十分银杏
# [警告] 下方的 cookie 包含敏感登录信息,请勿泄露给他人!
cookie = ""
url = "https://zjbc.cjnep.net/lms/web/course/index"
cache_file = "course_ids.json"
items_cache_file = "course_items.json"
progress_cache_file = "progress.json"
# [配置] 心跳间隔(秒)
# 警告:建议设置在 60 秒以上,过快的心跳可能面临风险!
HEARTBEAT_INTERVAL = 0
# [配置] 每次心跳增加的进度时间(秒)
ADD_TIME = 120
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Cookie": cookie
}
# -------------------------------------------------------------------------
# 辅助函数:进度保存与读取
# -------------------------------------------------------------------------
def load_progress():
if os.path.exists(progress_cache_file):
try:
with open(progress_cache_file, "r", encoding="utf-8") as f:
return json.load(f)
except:
return {}
return {}
def save_progress(course_id, item_id, current_time, total_time, finished):
data = load_progress()
key = f"{course_id}_{item_id}"
data[key] = {
"current_time": current_time,
"total_time": total_time,
"finished": finished,
"timestamp": time.time()
}
try:
with open(progress_cache_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f" [Warning] 保存进度失败: {e}")
# -------------------------------------------------------------------------
# 辅助函数:模拟刷课逻辑
# -------------------------------------------------------------------------
def get_video_duration(video_url):
"""
使用 ffprobe 获取视频时长
"""
try:
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_url
]
# Run ffprobe
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30)
if result.returncode == 0:
try:
duration = float(result.stdout.strip())
return duration
except ValueError:
print(f" [Error] ffprobe output invalid: {result.stdout}")
return None
else:
print(f" [Error] ffprobe failed: {result.stderr}")
return None
except Exception as e:
print(f" [Error] get_video_duration exception: {e}")
return None
def process_video(course_id, item_id):
"""
处理单个视频:
1. 获取视频页,提取 config URL
2. 请求 config XML获取 userId, updStatusUrl, totalTime, historyId 等
3. 循环发送心跳包,直到视频看完
"""
print(f"\n>>> 开始处理视频: CourseId={course_id}, ItemId={item_id}")
# 1. 请求视频播放页
video_page_url = f"https://zjbc.cjnep.net/lms/web/course/view?id={course_id}&itemid={item_id}"
try:
resp = requests.get(video_page_url, headers=headers)
resp.raise_for_status()
except Exception as e:
print(f" [Error] 无法访问视频页: {e}")
return
# 2. 提取 config URL
# 示例: config: '/lms/web/course/startupxml?courseid=677&itemid=142573&historyid=5740083&is_subcourse=0'
config_match = re.search(r"config:\s*'([^']+)'", resp.text)
if not config_match:
print(" [Error] 未找到 config URL跳过此视频。")
return
config_path = config_match.group(1)
config_url = "https://zjbc.cjnep.net" + config_path
print(f" Config URL: {config_url}")
# 3. 请求 XML 配置
try:
xml_resp = requests.get(config_url, headers=headers)
xml_resp.raise_for_status()
except Exception as e:
print(f" [Error] 无法获取 XML 配置: {e}")
return
# 4. 解析 XML
try:
root = ET.fromstring(xml_resp.text)
# 提取关键参数
user_id = root.findtext("userId")
upd_status_url = root.findtext("updStatusUrl")
total_time_str = root.findtext("totalTime")
history_id = root.findtext("historyId")
finish_status = root.findtext("finish")
# 尝试获取视频 URL
video_url = None
file_url_node = root.find("fileUrl")
if file_url_node is not None:
video_url = file_url_node.findtext("normal")
# 如果 XML 里是相对路径,需要补全
if upd_status_url and not upd_status_url.startswith("http"):
upd_status_url = "https://zjbc.cjnep.net" + upd_status_url
# 检查必要字段 (totalTime 除外)
if not (user_id and upd_status_url and history_id):
print(" [Error] XML 解析缺少关键字段 (userId/updStatusUrl/historyId)")
return
# 读取本地缓存
local_progress = load_progress().get(f"{course_id}_{item_id}", {})
local_finished = local_progress.get("finished", False)
# 优先检查是否已完成,如果已完成则跳过获取时长(避免 ffprobe
if local_finished or finish_status == "true":
print(
f" [Skip] 视频已完成 (XML Finish: {finish_status}, Local Finish: {local_finished})")
return
# 获取时长
total_time = 0.0
if total_time_str:
total_time = float(total_time_str)
elif video_url:
print(" [Info] XML 中未找到 totalTime尝试通过 ffprobe 获取视频时长...")
duration = get_video_duration(video_url)
if duration:
total_time = duration
print(f" [Info] 获取到视频时长: {total_time}s")
else:
print(" [Error] 无法获取视频时长,跳过此视频")
return
else:
print(" [Error] XML 中既无 totalTime 也无 videoUrl无法继续")
return
print(
f" [Info] UserId={user_id}, HistoryId={history_id}, TotalTime={total_time}")
except ET.ParseError:
print(" [Error] XML 解析失败")
return
# 5. 循环发送心跳
# 初始进度:优先从本地缓存读取,其次从 XML 读取
local_time = local_progress.get("current_time", 0.0)
# 读取 XML 进度
last_viewed_time_str = root.findtext("lastViewedTime")
xml_time = float(last_viewed_time_str) if last_viewed_time_str else 0.0
# 选取最大的进度
current_time = max(local_time, xml_time)
# 如果进度已达标,则跳过
if current_time >= total_time:
print(
f" [Skip] 视频已完成 (Progress: {current_time}/{total_time})")
return
print(f" [Start] 当前进度: {current_time}s / {total_time}s")
is_first = "true"
while True:
# 如果已经看完
if current_time >= total_time:
print(" [Done] 视频已看完。")
# 发送最后一次 finished=1 的包
send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
add_time=0, current_time=0, total_time=total_time,
finished=1, first_update=is_first)
save_progress(course_id, item_id, total_time, total_time, True)
break
# 模拟观看
step = ADD_TIME
# 如果剩余时间不足
if current_time + step >= total_time:
step = int(total_time - current_time) + 1 # 稍微多一点确保覆盖
current_time = total_time
finished_flag = 1
post_current_time = 0 # 完成时传 0
else:
current_time += step
finished_flag = 0
post_current_time = current_time
# 发送心跳
success = send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
add_time=step, current_time=post_current_time, total_time=total_time,
finished=finished_flag, first_update=is_first)
if not success:
print(" [Error] 心跳发送失败,停止当前视频。")
break
# 保存进度
save_progress(course_id, item_id, current_time,
total_time, finished_flag == 1)
if finished_flag == 1:
print(" [Done] 视频播放结束。")
break
is_first = "false"
# 模拟等待
print(
f" [Progress] 进度更新: {current_time:.1f}/{total_time} (add {step}s)...")
time.sleep(HEARTBEAT_INTERVAL)
def send_heartbeat(url, user_id, course_id, sco_id, history_id, add_time, current_time, total_time, finished, first_update):
"""
发送单个心跳包,带重试机制
"""
has_check_three = "true" if first_update == "true" else "false"
has_check_two = "false"
try:
if float(total_time) > 0 and float(current_time) > (float(total_time) / 2):
has_check_two = "true"
except:
pass
has_check_one = "true" if str(finished) == "1" else "false"
payload = {
"userId": user_id,
"courseId": course_id,
"scoId": sco_id,
"historyId": history_id,
"addTime": str(add_time),
"totalTime": str(total_time),
"finished": finished,
"currentTime": str(current_time),
"hasCheckOne": has_check_one,
"hasCheckTwo": has_check_two,
"hasCheckThree": has_check_three,
"firstUpdate": first_update
}
max_retries = 5
for attempt in range(max_retries):
try:
# 注意:这里使用 data=payload 发送 application/x-www-form-urlencoded
resp = requests.post(
url, data=payload, headers=headers, timeout=10)
resp.raise_for_status()
# 检查响应内容,有些服务器返回 XML 状态
# <root><status>1</status></root>
if "<status>1</status>" in resp.text:
return True
elif "<status>-1</status>" in resp.text:
print(" [Warning] 服务器返回 status -1 (可能多端登录或异常)")
return False
return True
except Exception as e:
print(f" [Warning] 请求异常 (尝试 {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(2) # 重试前等待
else:
print(" [Error] 重试次数耗尽,放弃本次心跳。")
return False
return False
# -------------------------------------------------------------------------
# 主程序逻辑
# -------------------------------------------------------------------------
print("\n" + "="*60)
print("欢迎使用自动刷课脚本 v1.0")
print("此脚本通过视频心跳请求模拟观看课程视频,以达到刷课目的。功耗极低")
print("注意:请确保 Cookie 有效,且已安装 ffprobe")
print(f"当前心跳间隔: {HEARTBEAT_INTERVAL}秒 (警告:请勿设置过低,以免封号)")
print(f"每次进度增加: {ADD_TIME}")
print("您需要安装ffmpeg工具集确保ffprobe命令可用可从https://ffmpeg.org/download.html 下载")
print("脚本仅供学习交流使用,请勿用于商业用途!")
print("作者NCJOAQ & Github Compilot")
print("="*60 + "\n")
course_data = []
# 1. Try to load existing items data
if os.path.exists(items_cache_file) and os.path.getsize(items_cache_file) > 0:
try:
with open(items_cache_file, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict) and "courseId" in data[0]:
print("检测到课程项目缓存文件且有效。")
course_data = data
except json.JSONDecodeError:
print("课程项目缓存文件损坏,已忽略。")
# 2. 如果没有缓存数据,则执行之前的抓取逻辑 (此处省略抓取代码,假设已有缓存或手动抓取)
# 为了保持代码简洁,这里假设你已经运行过一次脚本生成了 course_items.json
# 如果 course_data 为空,说明需要先生成缓存。
if not course_data:
print("未找到有效的课程数据缓存 (course_items.json)。")
print("请先运行脚本生成缓存,或检查网络连接。")
# --- 原有的抓取逻辑 (简化版) ---
# (此处保留你之前的抓取代码,如果需要的话,可以把之前的抓取逻辑放回来)
# ...
# -----------------------------
# 临时:如果没有数据就退出,提示用户
# sys.exit(1)
# 或者,为了完整性,保留之前的抓取逻辑:
print("开始执行抓取流程...")
# ... (这里可以粘贴之前的抓取代码,为了不让文件太长,我先假设你已经有了 json) ...
# 如果你需要我把抓取代码也完整合并进来,请告诉我。
# 这里为了演示,我把之前的抓取逻辑简单复原一下:
course_ids = []
if os.path.exists(cache_file) and os.path.getsize(cache_file) > 0:
with open(cache_file, "r", encoding="utf-8") as f:
course_ids = json.load(f)
if not course_ids:
resp = requests.get(url, headers=headers)
course_ids = re.findall(
r"window\.location\s*=\s*['\"]/lms/web/course/detail\?id=(\d+)['\"]", resp.text)
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(course_ids, f, indent=2)
for cid in course_ids:
detail_url = f"https://zjbc.cjnep.net/lms/web/course/detail?id={cid}"
resp = requests.get(detail_url, headers=headers)
item_ids = re.findall(r'[?&]itemid=(\d+)', resp.text)
seen = set()
unique_item_ids = [x for x in item_ids if not (
x in seen or seen.add(x))]
course_data.append({"courseId": cid, "itemIds": unique_item_ids})
time.sleep(1)
with open(items_cache_file, "w", encoding="utf-8") as f:
json.dump(course_data, f, indent=2)
# 3. 开始刷课
print(f"\n>>> 开始刷课任务,共 {len(course_data)} 门课程")
for course in course_data:
cid = course['courseId']
items = course['itemIds']
print(f"\n=== 正在处理课程 {cid},共 {len(items)} 个视频 ===")
for item_id in items:
process_video(cid, item_id)
# 视频之间休息一下
time.sleep(2)
print("\n所有任务完成。")