# Copyright (C) 2018 Alban Gruin
#
# celcatsanitizer is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# celcatsanitizer is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with celcatsanitizer. If not, see .
from datetime import datetime, timedelta
import asyncio
import calendar
import json
from django.utils import timezone
import lxml.html
import requests
from ...models import Course, Group, Room
from ...utils import get_current_week, get_week
from .abstractparser import AbstractParser
VARNAME = "v.events.list = "
def find_events_list(soup):
res = []
for script in soup.xpath("//script/text()"):
if VARNAME in script:
for var in script.split('\n'):
if var.startswith(VARNAME):
res = json.loads(var[len(VARNAME):-2])
return res
def get_next_month(dt):
n = dt.replace(day=1) + timedelta(days=32)
return n.replace(day=1)
class Parser(AbstractParser):
def __init__(self, source):
super(Parser, self).__init__(source)
# En-tête tiré de mon Firefox…
base_req = self._make_request(
source.url, headers={"Accept-Language": "en-US,en;q=0.5"}
)
parser = lxml.html.HTMLParser(encoding="utf-8")
self.soup = lxml.html.document_fromstring(
base_req.content, parser=parser
)
self.months = []
for option in self.soup.xpath("//option"):
if option.get("selected") is not None or len(self.months) > 0:
self.months.append(option.text)
def __get_event(self, event, today,
beginning_of_month, end_of_month,
year, week):
begin = timezone.make_aware(
datetime.strptime(event["start"], "%Y-%m-%dT%H:%M:%S")
)
end = timezone.make_aware(
datetime.strptime(event["end"], "%Y-%m-%dT%H:%M:%S")
)
if begin < beginning_of_month or begin >= end_of_month or \
(today is not None and begin < today):
return
if year is not None and week is not None:
event_year, event_week, _ = begin.isocalendar()
if event_year != year or event_week != week:
return
course = Course.objects.create(
source=self.source, begin=begin, end=end
)
data = event["text"].split("
")
rooms = None
if data[0] == "Global Event":
return
i = 1
while i < len(data) and not data[i].startswith(
("L1 ", "L2 ", "L3 ", "L3P ", "M1 ", "M2 ", "DEUST ", "MAG1 ",
"1ERE ANNEE ", "2EME ANNEE ", "3EME ANNEE ",
"MAT-Agreg Interne ")
):
i += 1
groups = data[i]
if i - 1 > 0:
course.name = ", ".join(set(data[i - 1].split(';')))
else:
course.name = "Sans nom"
if i - 2 > 0:
course.type = data[i - 2]
if len(data) >= i + 2:
rooms = data[i + 1]
if len(data) >= i + 3:
course.notes = data[i + 2]
groups = [
Group.objects.get_or_create(
source=self.source, celcat_name=name
)[0]
for name in groups.split(';')
]
course.groups.add(*groups)
if rooms is not None:
rooms_objs = Room.objects.filter(name__in=rooms.split(';'))
if rooms_objs.count() > 0:
course.rooms.add(*rooms_objs)
elif course.notes:
course.notes = "{0}\n{1}".format(rooms, course.notes)
else:
course.notes = rooms
if course.notes is not None:
course.notes = course.notes.strip()
return course
def get_events(self, today, year=None, week=None):
for i, month in enumerate(self.events):
beginning_of_month = timezone.make_aware(
datetime.strptime(self.months[i], "%B, %Y")
)
end_of_month = get_next_month(beginning_of_month)
for event in month:
course = self.__get_event(event, today,
beginning_of_month, end_of_month,
year, week)
if course is not None:
yield course
def get_update_date(self):
return None # Pas de date de mise à jour dans ce format
def get_weeks(self):
# FIXME: détection automatique à partir des événements présents
beginning, _ = get_week(*get_current_week())
self.weeks = {"1": beginning}
return self.weeks
def ajax_req(self, month):
month = datetime.strptime(month, "%B, %Y")
first_monday = min(
week[calendar.MONDAY]
for week in calendar.monthcalendar(month.year, month.month)
if week[calendar.MONDAY] > 0
)
month_str = month.replace(day=first_monday).strftime("%Y%m%d")
req = self._make_request(
self.source.url,
headers={
"Accept-Language": "en-US,en;q=0.5",
},
params={"Date": month_str},
)
req.raise_for_status()
parser = lxml.html.HTMLParser(encoding="utf8")
soup = lxml.html.document_fromstring(req.content, parser=parser)
return find_events_list(soup)
@asyncio.coroutine
def get_months_async(self):
loop = asyncio.get_event_loop()
futures = []
for month in self.months[1:]:
futures.append(loop.run_in_executor(None, self.ajax_req, month))
responses = yield from asyncio.gather(*futures)
return responses
def get_source_from_months(self, async=True):
events = []
if async:
loop = asyncio.get_event_loop()
events = loop.run_until_complete(self.get_months_async())
else:
for month in self.months[1:]:
events.append(self.ajax_req(month))
return events
def get_source(self):
self.events = [
find_events_list(self.soup)
] + self.get_source_from_months()
return self.events