304 lines
8.1 KiB
Python
304 lines
8.1 KiB
Python
import io
|
|
import os
|
|
import ast
|
|
import json
|
|
import random
|
|
|
|
from PIL import Image
|
|
from pypdf import PdfReader, PdfWriter
|
|
|
|
import catsoop.cslog as cslog
|
|
|
|
|
|
# meta stuff
|
|
|
|
|
|
def get_base_location(ctx):
|
|
return os.path.join(
|
|
ctx["cs_data_root"], "courses", ctx["cs_course"], "_gradesoop_scans"
|
|
)
|
|
|
|
|
|
def get_exams(ctx):
|
|
try:
|
|
return set(os.listdir(get_base_location(ctx)))
|
|
except FileNotFoundError:
|
|
return set()
|
|
|
|
|
|
def get_exam_location(ctx, exam):
|
|
return os.path.join(get_base_location(ctx), exam)
|
|
|
|
|
|
def get_submissions(ctx, exam):
|
|
try:
|
|
return {
|
|
i[:-4]
|
|
for i in os.listdir(
|
|
os.path.join(get_exam_location(ctx, exam), "submissions")
|
|
)
|
|
if i.endswith(".pdf")
|
|
}
|
|
except FileNotFoundError:
|
|
return set()
|
|
|
|
|
|
def get_problem_info(ctx, exam):
|
|
try:
|
|
with open(os.path.join(get_exam_location(ctx, exam), "problem_info.json")) as f:
|
|
return json.load(f)
|
|
except:
|
|
return {}
|
|
|
|
|
|
# images and pdfs
|
|
|
|
|
|
def get_images(ctx, exam, serial_number, base64=False):
|
|
from pypdf import PdfReader
|
|
|
|
reader = PdfReader(get_pdf_location(ctx, exam, serial_number))
|
|
out = [i.images[0].data for i in reader.pages if i.images]
|
|
if base64:
|
|
from base64 import b64encode
|
|
|
|
out = [f'data:image/png;base64,{b64encode(i).decode("utf-8")}' for i in out]
|
|
return out
|
|
|
|
|
|
def get_problem_image(ctx, exam, serial_number, problem, full_width, base64=False):
|
|
probleminfo = get_problem_info(ctx, exam)
|
|
regions = probleminfo[problem]["bbox"]
|
|
if full_width:
|
|
for r in regions:
|
|
r["left"] = 0
|
|
r["width"] = 1
|
|
reader = PdfReader(get_pdf_location(ctx, exam, serial_number))
|
|
pages = {i["page"] for i in regions}
|
|
images = {page: io.BytesIO() for page in pages}
|
|
for p in images:
|
|
images[p].write(reader.pages[p - 1].images[0].data)
|
|
images[p].seek(0)
|
|
images[p] = Image.open(images[p])
|
|
|
|
total_height = 0
|
|
max_width = 0
|
|
out_images = []
|
|
for r in regions:
|
|
im = images[r["page"]]
|
|
w, h = im.size
|
|
x1 = int(r["left"] * w)
|
|
pxw = int(r["width"] * w)
|
|
x2 = x1 + pxw
|
|
y1 = int(r["top"] * h)
|
|
pxh = int(r["height"] * h)
|
|
y2 = y1 + pxh
|
|
out_images.append(im.crop((x1, y1, x2, y2)))
|
|
max_width = max(max_width, pxw)
|
|
total_height += pxh
|
|
|
|
out_im = Image.new("RGB", (max_width, total_height))
|
|
height_so_far = 0
|
|
for im in out_images:
|
|
out_im.paste(im, ((max_width - im.width) // 2, height_so_far))
|
|
height_so_far += im.height
|
|
|
|
out_data = io.BytesIO()
|
|
out_im.save(out_data, "PNG")
|
|
out = out_data.getvalue()
|
|
if base64:
|
|
from base64 import b64encode
|
|
|
|
out = f'data:image/png;base64,{b64encode(out).decode("utf-8")}'
|
|
return out
|
|
|
|
|
|
def get_pdf_location(ctx, exam, serial_number):
|
|
base = get_exam_location(ctx, exam)
|
|
if serial_number == "blank":
|
|
return os.path.join(base, "blank.pdf")
|
|
else:
|
|
return os.path.join(base, "submissions", f"{serial_number}.pdf")
|
|
|
|
|
|
def get_coverless_pdf(ctx, filename):
|
|
reader = PdfReader(filename)
|
|
writer = PdfWriter()
|
|
for i in range(1, len(reader.pages)):
|
|
writer.add_page(reader.pages[i])
|
|
buf = io.BytesIO()
|
|
writer.write(buf)
|
|
return buf.getvalue()
|
|
|
|
|
|
def get_image_cache_location(ctx, exam):
|
|
output = os.path.join(
|
|
ctx["cs_data_root"], "_logs", "_gradesoop", "image_cache", exam
|
|
)
|
|
os.makedirs(output, exist_ok=True)
|
|
return output
|
|
|
|
|
|
# name map
|
|
|
|
|
|
def get_name_map_log(ctx, exam):
|
|
return ("_gradesoop", [ctx["cs_course"], exam], "name_map")
|
|
|
|
|
|
def get_name_map(ctx, exam):
|
|
return cslog.most_recent(*get_name_map_log(ctx, exam), {})
|
|
|
|
|
|
def get_serial_number(ctx, exam, username):
|
|
return {v: k for k, v in get_name_map(ctx, exam).items()}.get(username, None)
|
|
|
|
|
|
def get_username(ctx, exam, serial):
|
|
return get_name_map(ctx, exam).get(serial, None)
|
|
|
|
|
|
# grades
|
|
|
|
|
|
def estimate_fraction_graded(ctx, exam, problem):
|
|
return len(cslog.most_recent(*get_grade_log(ctx, exam, problem), {})) / len(
|
|
get_submissions(ctx, exam)
|
|
)
|
|
|
|
|
|
def get_grade_log(ctx, exam, problem):
|
|
return ("_gradesoop", [ctx["cs_course"], exam, "_grades"], problem)
|
|
|
|
|
|
def get_all_grades(ctx, exam):
|
|
pinfo = get_problem_info(ctx, exam)
|
|
pinfo.pop("name_box")
|
|
|
|
weighted = any(("weight" in i) for i in pinfo.values())
|
|
|
|
problem_scores = {p: get_grades(ctx, exam, p) for p in pinfo}
|
|
user_scores = {}
|
|
for serial in get_submissions(ctx, exam):
|
|
user_scores[serial] = {}
|
|
for p in problem_scores:
|
|
user_scores[serial][p] = eval(
|
|
problem_scores[p].get(serial, {"score": "None"})["score"]
|
|
)
|
|
user_scores[serial]["total"] = sum(
|
|
i for i in user_scores[serial].values() if i is not None
|
|
)
|
|
|
|
if weighted:
|
|
weighted_user_scores = {}
|
|
for serial in user_scores:
|
|
weighted_user_scores[serial] = {
|
|
k: v
|
|
if v is None
|
|
else (v / pinfo[k]["points"])
|
|
* pinfo[k].get("weight", pinfo[k]["points"])
|
|
for k, v in user_scores[serial].items()
|
|
if k != "total"
|
|
}
|
|
|
|
out = {
|
|
"problem_info": pinfo,
|
|
"max_points": sum(i["points"] for i in pinfo.values()),
|
|
"raw": user_scores,
|
|
}
|
|
if weighted:
|
|
out["weighted"] = weighted_user_scores
|
|
for serial, scores in weighted_user_scores.items():
|
|
out["weighted"][serial]["total"] = sum(
|
|
i for i in scores.values() if i is not None
|
|
)
|
|
out["max_weighted"] = sum(i.get("weight", i["points"]) for i in pinfo.values())
|
|
|
|
return out
|
|
|
|
|
|
def get_grades(ctx, exam, problem):
|
|
return cslog.most_recent(*get_grade_log(ctx, exam, problem), {})
|
|
|
|
|
|
def get_grade(ctx, exam, problem, serial):
|
|
return get_grades(ctx, exam, problem).get(serial, None)
|
|
|
|
|
|
def get_graded_exams(ctx, exam, problem):
|
|
return set(get_grades(ctx, exam, problem))
|
|
|
|
|
|
def get_ungraded_exams(ctx, exam, problem):
|
|
return get_submissions(ctx, exam) - get_graded_exams(ctx, exam, problem)
|
|
|
|
|
|
def set_grade(ctx, exam, problem, serial, score, comments, image, imageactions):
|
|
n = get_grade_log(ctx, exam, problem)
|
|
cm = cslog.log_lock([n[0], *n[1], n[2]])
|
|
with cm:
|
|
current = cslog.most_recent(*n, {}, lock=False)
|
|
current[serial] = {
|
|
"staff": ctx["cs_username"],
|
|
"score": score,
|
|
"comments": comments,
|
|
"image": image,
|
|
"imageactions": imageactions,
|
|
}
|
|
cslog.update_log(*n, current, lock=False)
|
|
return True
|
|
|
|
|
|
# locking
|
|
|
|
|
|
def get_lock_log(ctx, exam, problem):
|
|
return ("_gradesoop", [ctx["cs_course"], exam, "_locks"], problem)
|
|
|
|
|
|
def try_acquire_lock(ctx, exam, problem, serial_number=None, force=False):
|
|
n = get_lock_log(ctx, exam, problem)
|
|
cm = cslog.log_lock([n[0], *n[1], n[2]])
|
|
success = False
|
|
with cm:
|
|
current = cslog.most_recent(*n, {}, lock=False)
|
|
|
|
available = sorted(
|
|
get_ungraded_exams(ctx, exam, problem)
|
|
- {k for k, v in current.items() if v != ctx["cs_username"]}
|
|
)
|
|
if serial_number is None:
|
|
if available:
|
|
serial_number = random.choice(available)
|
|
|
|
current_holder = current.get(serial_number, None)
|
|
if (serial_number is not None) and (
|
|
(current_holder is None) or force or (current_holder == ctx["cs_username"])
|
|
):
|
|
current[serial_number] = ctx["cs_username"]
|
|
success = True
|
|
cslog.update_log(*n, current, lock=False)
|
|
|
|
return {
|
|
"ok": success,
|
|
"last": current_holder,
|
|
"attempted": serial_number,
|
|
"pool": available,
|
|
}
|
|
|
|
|
|
def relinquish_lock(ctx, exam, problem, serial_number):
|
|
n = get_lock_log(ctx, exam, problem)
|
|
cm = cslog.log_lock([n[0], *n[1], n[2]])
|
|
|
|
success = False
|
|
with cm:
|
|
current = cslog.most_recent(*n, {}, lock=False)
|
|
|
|
if current.get(serial_number, None) == ctx["cs_username"]:
|
|
del current[serial_number]
|
|
cslog.update_log(*n, current, lock=False)
|
|
success = True
|
|
|
|
return {"ok": success}
|