Source code for queryable_properties.properties.subquery

# -*- coding: utf-8 -*-
import six
from django.db.models import Q

from ..managers import QueryablePropertiesQuerySetMixin
from ..utils import get_queryable_property
from ..utils.internal import QueryPath, get_output_field, get_queryable_property_descriptor
from .base import QueryableProperty, QueryablePropertyReference
from .mixins import IgnoreCacheMixin, SubqueryMixin


[docs] class SubqueryFieldProperty(SubqueryMixin, QueryableProperty): """ A property that returns a field value contained in a subquery, extracting it from the first row of the subquery's result set. """
[docs] def __init__(self, queryset, field_name, output_field=None, **kwargs): """ Initialize a new property that returns a field value from a subqery. :param queryset: The internal queryset to use as the subquery or a callable that takes no arguments or the outer model class as its sole argument and generates the internal queryset. :type queryset: django.db.models.QuerySet | function :param str field_name: The name of the subquery field whose value should be returned. May refer to an annotated field or queryable property inside the subquery. :param output_field: The output field to use for the subquery expression. Only required in cases where Django cannot determine the field type on its own. :type output_field: django.db.models.Field | None """ self.field_name = field_name self.output_field = output_field super(SubqueryFieldProperty, self).__init__(queryset, **kwargs)
def get_annotation(self, cls): from django.db.models import Subquery return Subquery(self._get_inner_queryset(cls).values(self.field_name)[:1], output_field=self.output_field)
[docs] class SubqueryExistenceCheckProperty(SubqueryMixin, QueryableProperty): """ A property that checks whether certain objects exist in the database using a custom subquery. """
[docs] def __init__(self, queryset, negated=False, **kwargs): """ Initialize a new property that checks for the existence of database records using a custom subquery. :param queryset: The internal queryset to use as the subquery or a callable that takes no arguments or the outer model class as its sole argument and generates the internal queryset. :type queryset: django.db.models.QuerySet | function :param bool negated: Whether to negate the ``EXISTS`` subquery (i.e. the property will return ``True`` if no objects exist when using ``negated=True``). """ self.negated = negated super(SubqueryExistenceCheckProperty, self).__init__(queryset, **kwargs)
def get_annotation(self, cls): from django.db.models import Exists subquery = Exists(self._get_inner_queryset(cls)) if self.negated: subquery = ~subquery return subquery
[docs] class SubqueryObjectProperty(IgnoreCacheMixin, SubqueryFieldProperty): """ A property that allows to fetch an entire model object from the first row of a given subquery. Each field value of the subquery object is queried using a :class:`SubqueryFieldProperty`. A model instance is reconstructed from the individual field values. """
[docs] def __init__(self, model, queryset, field_names=None, property_names=(), **kwargs): """ Initialize a new property that allows to fetch an entire model object from the first row of a given subquery. :param model: The model class whose instances are being queried via the subquery. Can be either a concrete model class or a lazy reference to a model class (see foreign keys). :type model: type | str :param queryset: The internal queryset to use as the subquery or a callable that takes no arguments or the outer model class as its sole argument and generates the internal queryset. :type queryset: django.db.models.QuerySet | function :param field_names: The names of the fields that should be queried for the subquery object. Fields not present in this sequence will be deferred. If not provided, all concrete fields of the model will be queried. :type field_names: collections.Sequence[str] | None :param property_names: Optional names of queryable properties on the subquery model whose values should be retrieved along with the other fields. If not provided, no queryable property values will be selected. :type property_names: collections.Sequence[str] """ kwargs.pop('output_field', None) super(SubqueryObjectProperty, self).__init__(queryset, None, **kwargs) self._subquery_model = model self._field_names = field_names self._property_names = property_names self._managed_refs = {} self._field_aliases = {} self._pk_field_names = None
def _finalize_setup(self, model, subquery_model): """ Finalize the setup of this property by constructing the sub-properties, attaching them to the model class and populating attributes. """ pk_fields = getattr(subquery_model._meta, 'pk_fields', [subquery_model._meta.pk]) sub_field_names = set(self._field_names) if self._field_names is not None else None self._subquery_model = subquery_model self._pk_field_names = [pk_field.attname for pk_field in pk_fields] self.field_name = self._pk_field_names[0] self._managed_refs[self.field_name] = QueryablePropertyReference(self, self.model, QueryPath()) if pk_fields[0].name != self.field_name: self._field_aliases[pk_fields[0].name] = self.field_name if len(pk_fields) == 1: self._field_aliases['pk'] = self.field_name elif sub_field_names is not None: sub_field_names.update(pk_field.name for pk_field in pk_fields[1:]) def add_sub_property(name, queryset, output_field=None): prop = SubqueryFieldProperty(queryset, name, output_field=output_field, cached=self.cached) prop.contribute_to_class(model, '-'.join((self.name, name))) self._managed_refs[name] = prop._resolve()[0] for field in subquery_model._meta.concrete_fields: if field is pk_fields[0] or (sub_field_names is not None and field.name not in sub_field_names): continue add_sub_property(field.attname, self._inner_queryset) if field.name != field.attname: self._field_aliases[field.name] = field.attname for property_name in self._property_names: remote_ref = get_queryable_property(subquery_model, property_name)._resolve(subquery_model)[0] add_sub_property( property_name, lambda: QueryablePropertiesQuerySetMixin.apply_to( self._get_inner_queryset(model)).select_properties(property_name), get_output_field(remote_ref.get_annotation()), ) def _resolve(self, model=None, relation_path=QueryPath(), remaining_path=QueryPath()): if remaining_path: first = self._field_aliases.get(remaining_path[0], remaining_path[0]) if first in self._managed_refs: # Reference to one of the fields represented by the managed # properties. ref = self._managed_refs[first]._replace(model=model or self.model, relation_path=relation_path) return ref, remaining_path[1:] return SubqueryObjectPropertyReference(self, model or self.model, relation_path), remaining_path def contribute_to_class(self, cls, name): from django.db.models.fields.related import lazy_related_operation super(SubqueryObjectProperty, self).contribute_to_class(cls, name) # Finalize the setup of this property after the subquery model was # constructed. lazy_related_operation(self._finalize_setup, self.model, self._subquery_model) def get_value(self, obj): values = {} if self._descriptor.has_cached_value(obj): cached_value = self._descriptor.get_cached_value(obj) if cached_value is None or isinstance(cached_value, self._subquery_model): # The cached value is already the final model object or None, # so it can be returned as-is. return cached_value # The cached value is a raw primary key. Use this value and the # present cache values of all managed properties to construct the # final model instance. for attname, ref in six.iteritems(self._managed_refs): if ref.descriptor.has_cached_value(obj): values[ref.property.name] = ref.descriptor.get_cached_value(obj) elif attname in self._pk_field_names: # For composite PKs, all fields contributing to the PK must have # a value, otherwise the cached values can't be used. values.clear() break if not values: # No/insufficient cached values: perform a single query to fetch # the values for all fields and populate the cache for all managed # properties if configured as cached. names = [ref.property.name for ref in six.itervalues(self._managed_refs)] values = self.get_queryset_for_object(obj).select_properties(*names).values(*names).get() if self.cached: for ref in six.itervalues(self._managed_refs): ref.descriptor.set_cached_value(obj, values[ref.property.name]) if values[self.name] is None: # The subquery didn't return a row, so no instance can be # constructed. return None field_names, field_values = [], [] for field in self._subquery_model._meta.concrete_fields: if field.attname in self._managed_refs and self._managed_refs[field.attname].property.name in values: field_names.append(field.attname) field_values.append(values[self._managed_refs[field.attname].property.name]) subquery_obj = self._subquery_model.from_db( self._get_inner_queryset(obj.__class__).db, field_names, field_values) # Populate any queryable properties whose values were queried for the # subquery object. for property_name in self._property_names: sub_name = self._managed_refs[property_name].property.name if sub_name in values: get_queryable_property_descriptor(self._subquery_model, property_name).set_cached_value( subquery_obj, values[sub_name]) if self.cached or self._descriptor.has_cached_value(obj): self._descriptor.set_cached_value(obj, subquery_obj) return subquery_obj def get_filter(self, cls, lookup, value): if isinstance(value, self._subquery_model): value = value.pk if len(self._pk_field_names) > 1 and isinstance(value, tuple): # Build individual filter clauses for each field of a composite PK. base_path = QueryPath(self.name) conditions = {(base_path + lookup).as_str(): value[0]} for attname, pk_part in zip(self._pk_field_names[1:], value[1:]): conditions[(base_path + attname + lookup).as_str()] = pk_part return Q(**conditions) return super(SubqueryObjectProperty, self).get_filter(cls, lookup, value)
class SubqueryObjectPropertyReference(QueryablePropertyReference): """ A specialized property reference that allows the parts of a :class:`SubqueryObjectProperty` to be annotated properly. """ __slots__ = () def annotate_query(self, query, full_group_by, select=False, remaining_path=QueryPath()): if select: # A selection of the main property via .select_properties() # should lead to the selection of all sub-properties to be able to # populate the subquery object with all values. for ref in six.itervalues(self.property._managed_refs): if ref.property is not self.property: ref = ref._replace(model=self.model, relation_path=self.relation_path) ref.annotate_query(query, full_group_by, select) return super(SubqueryObjectPropertyReference, self).annotate_query(query, full_group_by, select, remaining_path)