aboutsummaryrefslogtreecommitdiff
path: root/db.py
diff options
context:
space:
mode:
Diffstat (limited to 'db.py')
-rw-r--r--db.py80
1 files changed, 0 insertions, 80 deletions
diff --git a/db.py b/db.py
deleted file mode 100644
index 94df376..0000000
--- a/db.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright (C) 2017 Alban Gruin
-#
-# celcatsanitizer is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 2 of the License, or
-# (at your option) any later version.
-#
-# celcatsanitizer is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with celcatsanitizer; if not, write to the Free Software Foundation, Inc.,
-# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-
-from django.db import connections
-from django.db.models import Manager
-from django.db.models.functions import Extract
-from django.db.models.sql.compiler import SQLCompiler
-from django.db.models.sql.query import Query
-from django.db.models.sql.where import WhereNode
-from django.db.models.query import QuerySet
-
-class ExtractWeek(Extract):
- lookup_name = "week"
-
-
-class GroupedCompiler(SQLCompiler):
- def get_group_by(self, select, order_by):
- result = super(GroupedCompiler, self).get_group_by(select, order_by)
- expressions = []
- for expr in self.query.real_group_by:
- ref = expr if hasattr(expr, "as_sql") else self.query.resolve_ref(expr)
- sql, params = self.compile(ref)
- result.append((sql, params))
-
- return result
-
-
-class GroupedQuery(Query):
- def __init__(self, model, where=WhereNode):
- super(GroupedQuery, self).__init__(model, where)
- self.real_group_by = []
-
- def clone(self, klass=None, memo=None, **kwargs):
- obj = super(GroupedQuery, self).clone(klass, memo, **kwargs)
- obj.real_group_by = self.real_group_by[:]
- return obj
-
- def add_grouping(self, *grouping):
- self.real_group_by.extend(grouping)
-
- def clear_grouping(self):
- self.real_group_by = []
-
- def get_compiler(self, using=None, connection=None):
- if using is None and connection is None:
- raise ValueError("Need either using or connection")
- if using:
- connection = connections[using]
- return GroupedCompiler(self, connection, using)
-
-
-class GroupedQuerySet(QuerySet):
- def __init__(self, model=None, query=None, using=None, hints=None):
- super(GroupedQuerySet, self).__init__(model, query, using, hints)
- self.query = query or GroupedQuery(self.model)
-
- def group_by(self, *field_names):
- obj = self._clone()
- obj.query.clear_grouping()
- obj.query.add_grouping(*field_names)
- return obj
-
-
-class GroupedManager(Manager):
- def __init__(self):
- super(GroupedManager, self).__init__()
- self._queryset_class = GroupedQuerySet