# Copyright (C) 2017 Alban Gruin # # celcatsanitizer is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # celcatsanitizer is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with celcatsanitizer. If not, see . import datetime from django.core.management.base import BaseCommand from django.db import transaction from django.db.models import Min from django.utils import timezone from edt.models import Course, Timetable from edt.utils import get_week from ._private import delete_courses_in_week, get_events, get_update_date, get_weeks, get_xml @transaction.atomic def process_timetable_week(timetable, soup, weeks_in_soup, force, year=None, week=None): criteria = {} today = timezone.make_aware(datetime.datetime.now()) if year is not None and week is not None: begin, end = get_week(year, week) criteria["begin__gte"] = begin criteria["begin__lt"] = end last_update_date = Course.objects.filter(timetable=timetable, **criteria) \ .aggregate(Min("last_update")) \ ["last_update__min"] new_update_date = get_update_date(soup) if not force and last_update_date is not None and new_update_date is not None and \ last_update_date >= new_update_date: return if year is not None and week is not None: delete_courses_in_week(timetable, year, week, today) else: delete_from = max(min(weeks_in_soup.values()), today) Course.objects.filter(timetable=timetable, begin__gte=delete_from).delete() for course in get_events(timetable, soup, weeks_in_soup, today, year, week): course.save() timetable.last_update_date = new_update_date timetable.save() def process_timetable(timetable, force, year=None, weeks=None): soup = get_xml(timetable.url) weeks_in_soup = get_weeks(soup) if year is not None and weeks is not None: for week in weeks: process_timetable_week(timetable, soup, weeks_in_soup, force, year, week) else: process_timetable_week(timetable, soup, weeks_in_soup, force) class Command(BaseCommand): help = "Fetches registered celcat timetables" def add_arguments(self, parser): parser.add_argument("--all", const=True, default=False, action="store_const") parser.add_argument("--force", const=True, default=False, action="store_const") parser.add_argument("--week", type=int, choices=range(1, 54), nargs="+") parser.add_argument("--year", type=int, nargs=1) def handle(self, *args, **options): year = None errcount = 0 if options["all"]: weeks = None elif options["week"] is None: _, week, day = timezone.now().isocalendar() if day >= 6: year, week, _ = (timezone.now() + datetime.timedelta(weeks=1)).isocalendar() weeks = [week] else: weeks = options["week"] if not options["all"]: if options["year"] is None and year is None: year = timezone.now().year elif year is None: year = options["year"][0] for timetable in Timetable.objects.all(): self.stdout.write("Processing {0}".format(timetable)) try: process_timetable(timetable, options["force"], year, weeks) except Exception as exc: self.stderr.write( self.style.ERROR("Failed to process {0}: {1}".format(timetable, exc))) errcount += 1 if errcount == 0: self.stdout.write(self.style.SUCCESS("Done.")) else: self.stdout.write(self.style.ERROR("Done with {0} errors.".format(errcount)))