# Copyright (C) 2018 Alban Gruin # # celcatsanitizer is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # celcatsanitizer is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with celcatsanitizer. If not, see . from datetime import datetime, timedelta import asyncio import calendar import json from django.utils import timezone import lxml.html import requests from ...models import Course, Group, Room from ...utils import get_current_week, get_week from .abstractparser import AbstractParser VARNAME = "v.events.list = " def find_events_list(soup): res = [] for script in soup.xpath("//script/text()"): if VARNAME in script: for var in script.split('\n'): if var.startswith(VARNAME): res = json.loads(var[len(VARNAME):-2]) return res def get_next_month(dt): n = dt.replace(day=1) + timedelta(days=32) return n.replace(day=1) class Parser(AbstractParser): def __init__(self, source): super(Parser, self).__init__(source) # En-tête tiré de mon Firefox… base_req = self._make_request( source.url, headers={"Accept-Language": "en-US,en;q=0.5"} ) parser = lxml.html.HTMLParser(encoding="utf-8") self.soup = lxml.html.document_fromstring( base_req.content, parser=parser ) self.months = [] for option in self.soup.xpath("//option"): if option.get("selected") is not None or len(self.months) > 0: self.months.append(option.text) def __get_event(self, event, today, beginning_of_month, end_of_month, year, week): begin = timezone.make_aware( datetime.strptime(event["start"], "%Y-%m-%dT%H:%M:%S") ) end = timezone.make_aware( datetime.strptime(event["end"], "%Y-%m-%dT%H:%M:%S") ) if begin < beginning_of_month or begin >= end_of_month or \ (today is not None and begin < today): return if year is not None and week is not None: event_year, event_week, _ = begin.isocalendar() if event_year != year or event_week != week: return course = Course.objects.create( source=self.source, begin=begin, end=end ) data = event["text"].split("
") rooms = None if data[0] == "Global Event": return i = 0 while i < len(data) and not data[i].startswith( ("L1 ", "L2 ", "L3 ", "L3P ", "M1 ", "M2 ", "DEUST ", "MAG1 ", "1ERE ANNEE ", "2EME ANNEE ", "3EME ANNEE ", "MAT-Agreg Interne ") ): i += 1 groups = data[i] if i - 1 >= 0: course.name = ", ".join(set(data[i - 1].split(';'))) else: course.name = "Sans nom" if i - 2 >= 0: course.type = data[i - 2] if len(data) >= i + 2: rooms = data[i + 1] if len(data) >= i + 3: course.notes = data[i + 2] groups = [ Group.objects.get_or_create( source=self.source, celcat_name=name )[0] for name in groups.split(';') ] course.groups.add(*groups) if rooms is not None: rooms_objs = Room.objects.filter(name__in=rooms.split(';')) if rooms_objs.count() > 0: course.rooms.add(*rooms_objs) elif course.notes: course.notes = "{0}\n{1}".format(rooms, course.notes) else: course.notes = rooms if course.notes is not None: course.notes = course.notes.strip() return course def get_events(self, today, year=None, week=None): for i, month in enumerate(self.events): beginning_of_month = timezone.make_aware( datetime.strptime(self.months[i], "%B, %Y") ) end_of_month = get_next_month(beginning_of_month) for event in month: course = self.__get_event(event, today, beginning_of_month, end_of_month, year, week) if course is not None: yield course def get_update_date(self): return None # Pas de date de mise à jour dans ce format def get_weeks(self): # FIXME: détection automatique à partir des événements présents beginning, _ = get_week(*get_current_week()) self.weeks = {"1": beginning} return self.weeks def ajax_req(self, month): month = datetime.strptime(month, "%B, %Y") first_monday = min( week[calendar.MONDAY] for week in calendar.monthcalendar(month.year, month.month) if week[calendar.MONDAY] > 0 ) month_str = month.replace(day=first_monday).strftime("%Y%m%d") req = self._make_request( self.source.url, headers={ "Accept-Language": "en-US,en;q=0.5", }, params={"Date": month_str}, ) req.raise_for_status() parser = lxml.html.HTMLParser(encoding="utf8") soup = lxml.html.document_fromstring(req.content, parser=parser) return find_events_list(soup) @asyncio.coroutine def get_months_async(self): loop = asyncio.get_event_loop() futures = [] for month in self.months[1:]: futures.append(loop.run_in_executor(None, self.ajax_req, month)) responses = yield from asyncio.gather(*futures) return responses def get_source_from_months(self, async=True): events = [] if async: loop = asyncio.get_event_loop() events = loop.run_until_complete(self.get_months_async()) else: for month in self.months[1:]: events.append(self.ajax_req(month)) return events def get_source(self): self.events = [ find_events_list(self.soup) ] + self.get_source_from_months() return self.events