gradesoop/gradesoop/catsoop.py

304 lines
8.1 KiB
Python

import io
import os
import ast
import json
import random
from PIL import Image
from pypdf import PdfReader, PdfWriter
import catsoop.cslog as cslog
# meta stuff
def get_base_location(ctx):
return os.path.join(
ctx["cs_data_root"], "courses", ctx["cs_course"], "_gradesoop_scans"
)
def get_exams(ctx):
try:
return set(os.listdir(get_base_location(ctx)))
except FileNotFoundError:
return set()
def get_exam_location(ctx, exam):
return os.path.join(get_base_location(ctx), exam)
def get_submissions(ctx, exam):
try:
return {
i[:-4]
for i in os.listdir(
os.path.join(get_exam_location(ctx, exam), "submissions")
)
if i.endswith(".pdf")
}
except FileNotFoundError:
return set()
def get_problem_info(ctx, exam):
try:
with open(os.path.join(get_exam_location(ctx, exam), "problem_info.json")) as f:
return json.load(f)
except:
return {}
# images and pdfs
def get_images(ctx, exam, serial_number, base64=False):
from pypdf import PdfReader
reader = PdfReader(get_pdf_location(ctx, exam, serial_number))
out = [i.images[0].data for i in reader.pages if i.images]
if base64:
from base64 import b64encode
out = [f'data:image/png;base64,{b64encode(i).decode("utf-8")}' for i in out]
return out
def get_problem_image(ctx, exam, serial_number, problem, full_width, base64=False):
probleminfo = get_problem_info(ctx, exam)
regions = probleminfo[problem]["bbox"]
if full_width:
for r in regions:
r["left"] = 0
r["width"] = 1
reader = PdfReader(get_pdf_location(ctx, exam, serial_number))
pages = {i["page"] for i in regions}
images = {page: io.BytesIO() for page in pages}
for p in images:
images[p].write(reader.pages[p - 1].images[0].data)
images[p].seek(0)
images[p] = Image.open(images[p])
total_height = 0
max_width = 0
out_images = []
for r in regions:
im = images[r["page"]]
w, h = im.size
x1 = int(r["left"] * w)
pxw = int(r["width"] * w)
x2 = x1 + pxw
y1 = int(r["top"] * h)
pxh = int(r["height"] * h)
y2 = y1 + pxh
out_images.append(im.crop((x1, y1, x2, y2)))
max_width = max(max_width, pxw)
total_height += pxh
out_im = Image.new("RGB", (max_width, total_height))
height_so_far = 0
for im in out_images:
out_im.paste(im, ((max_width - im.width) // 2, height_so_far))
height_so_far += im.height
out_data = io.BytesIO()
out_im.save(out_data, "PNG")
out = out_data.getvalue()
if base64:
from base64 import b64encode
out = f'data:image/png;base64,{b64encode(out).decode("utf-8")}'
return out
def get_pdf_location(ctx, exam, serial_number):
base = get_exam_location(ctx, exam)
if serial_number == "blank":
return os.path.join(base, "blank.pdf")
else:
return os.path.join(base, "submissions", f"{serial_number}.pdf")
def get_coverless_pdf(ctx, filename):
reader = PdfReader(filename)
writer = PdfWriter()
for i in range(1, len(reader.pages)):
writer.add_page(reader.pages[i])
buf = io.BytesIO()
writer.write(buf)
return buf.getvalue()
def get_image_cache_location(ctx, exam):
output = os.path.join(
ctx["cs_data_root"], "_logs", "_gradesoop", "image_cache", exam
)
os.makedirs(output, exist_ok=True)
return output
# name map
def get_name_map_log(ctx, exam):
return ("_gradesoop", [ctx["cs_course"], exam], "name_map")
def get_name_map(ctx, exam):
return cslog.most_recent(*get_name_map_log(ctx, exam), {})
def get_serial_number(ctx, exam, username):
return {v: k for k, v in get_name_map(ctx, exam).items()}.get(username, None)
def get_username(ctx, exam, serial):
return get_name_map(ctx, exam).get(serial, None)
# grades
def estimate_fraction_graded(ctx, exam, problem):
return len(cslog.most_recent(*get_grade_log(ctx, exam, problem), {})) / len(
get_submissions(ctx, exam)
)
def get_grade_log(ctx, exam, problem):
return ("_gradesoop", [ctx["cs_course"], exam, "_grades"], problem)
def get_all_grades(ctx, exam):
pinfo = get_problem_info(ctx, exam)
pinfo.pop("name_box")
weighted = any(("weight" in i) for i in pinfo.values())
problem_scores = {p: get_grades(ctx, exam, p) for p in pinfo}
user_scores = {}
for serial in get_submissions(ctx, exam):
user_scores[serial] = {}
for p in problem_scores:
user_scores[serial][p] = eval(
problem_scores[p].get(serial, {"score": "None"})["score"]
)
user_scores[serial]["total"] = sum(
i for i in user_scores[serial].values() if i is not None
)
if weighted:
weighted_user_scores = {}
for serial in user_scores:
weighted_user_scores[serial] = {
k: v
if v is None
else (v / pinfo[k]["points"])
* pinfo[k].get("weight", pinfo[k]["points"])
for k, v in user_scores[serial].items()
if k != "total"
}
out = {
"problem_info": pinfo,
"max_points": sum(i["points"] for i in pinfo.values()),
"raw": user_scores,
}
if weighted:
out["weighted"] = weighted_user_scores
for serial, scores in weighted_user_scores.items():
out["weighted"][serial]["total"] = sum(
i for i in scores.values() if i is not None
)
out["max_weighted"] = sum(i.get("weight", i["points"]) for i in pinfo.values())
return out
def get_grades(ctx, exam, problem):
return cslog.most_recent(*get_grade_log(ctx, exam, problem), {})
def get_grade(ctx, exam, problem, serial):
return get_grades(ctx, exam, problem).get(serial, None)
def get_graded_exams(ctx, exam, problem):
return set(get_grades(ctx, exam, problem))
def get_ungraded_exams(ctx, exam, problem):
return get_submissions(ctx, exam) - get_graded_exams(ctx, exam, problem)
def set_grade(ctx, exam, problem, serial, score, comments, image, imageactions):
n = get_grade_log(ctx, exam, problem)
cm = cslog.log_lock([n[0], *n[1], n[2]])
with cm:
current = cslog.most_recent(*n, {}, lock=False)
current[serial] = {
"staff": ctx["cs_username"],
"score": score,
"comments": comments,
"image": image,
"imageactions": imageactions,
}
cslog.update_log(*n, current, lock=False)
return True
# locking
def get_lock_log(ctx, exam, problem):
return ("_gradesoop", [ctx["cs_course"], exam, "_locks"], problem)
def try_acquire_lock(ctx, exam, problem, serial_number=None, force=False):
n = get_lock_log(ctx, exam, problem)
cm = cslog.log_lock([n[0], *n[1], n[2]])
success = False
with cm:
current = cslog.most_recent(*n, {}, lock=False)
available = sorted(
get_ungraded_exams(ctx, exam, problem)
- {k for k, v in current.items() if v != ctx["cs_username"]}
)
if serial_number is None:
if available:
serial_number = random.choice(available)
current_holder = current.get(serial_number, None)
if (serial_number is not None) and (
(current_holder is None) or force or (current_holder == ctx["cs_username"])
):
current[serial_number] = ctx["cs_username"]
success = True
cslog.update_log(*n, current, lock=False)
return {
"ok": success,
"last": current_holder,
"attempted": serial_number,
"pool": available,
}
def relinquish_lock(ctx, exam, problem, serial_number):
n = get_lock_log(ctx, exam, problem)
cm = cslog.log_lock([n[0], *n[1], n[2]])
success = False
with cm:
current = cslog.most_recent(*n, {}, lock=False)
if current.get(serial_number, None) == ctx["cs_username"]:
del current[serial_number]
cslog.update_log(*n, current, lock=False)
success = True
return {"ok": success}