Source code for concurrency.admin

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals

import operator
import re
from functools import reduce

from django.contrib import admin, messages
from django.contrib.admin import helpers
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.db.models import Q
from django.forms.formsets import (
    INITIAL_FORM_COUNT, MAX_NUM_FORM_COUNT, TOTAL_FORM_COUNT, ManagementForm
)
from django.forms.models import BaseModelFormSet
from django.http import HttpResponse, HttpResponseRedirect
from django.utils.encoding import force_text
from django.utils.safestring import mark_safe
from django.utils.translation import ugettext as _, ungettext

from concurrency import core, forms
from concurrency.api import get_revision_of_object
from concurrency.config import CONCURRENCY_LIST_EDITABLE_POLICY_ABORT_ALL, conf
from concurrency.exceptions import RecordModifiedError
from concurrency.forms import ConcurrentForm, VersionWidget

ALL = object()


[docs]class ConcurrencyActionMixin(object): check_concurrent_action = True def action_checkbox(self, obj): """ A list_display column containing a checkbox widget. """ if self.check_concurrent_action: return helpers.checkbox.render(helpers.ACTION_CHECKBOX_NAME, force_text("%s,%s" % (obj.pk, get_revision_of_object(obj)))) else: return super(ConcurrencyActionMixin, self).action_checkbox(obj) action_checkbox.short_description = mark_safe('<input type="checkbox" id="action-toggle" />') action_checkbox.allow_tags = True def get_confirmation_template(self): return "concurrency/delete_selected_confirmation.html" def response_action(self, request, queryset): # noqa """ Handle an admin action. This is called if a request is POSTed to the changelist; it returns an HttpResponse if the action was handled, and None otherwise. """ # There can be multiple action forms on the page (at the top # and bottom of the change list, for example). Get the action # whose button was pushed. try: action_index = int(request.POST.get('index', 0)) except ValueError: # pragma: no cover action_index = 0 # Construct the action form. data = request.POST.copy() data.pop(helpers.ACTION_CHECKBOX_NAME, None) data.pop("index", None) # Use the action whose button was pushed try: data.update({'action': data.getlist('action')[action_index]}) except IndexError: # If we didn't get an action from the chosen form that's invalid # POST data, so by deleting action it'll fail the validation check # below. So no need to do anything here pass action_form = self.action_form(data, auto_id=None) action_form.fields['action'].choices = self.get_action_choices(request) # If the form's valid we can handle the action. if action_form.is_valid(): action = action_form.cleaned_data['action'] func, name, description = self.get_actions(request)[action] # Get the list of selected PKs. If nothing's selected, we can't # perform an action on it, so bail. if int(request.POST.get('select_across', 0)): selected = ALL else: selected = request.POST.getlist(helpers.ACTION_CHECKBOX_NAME) revision_field = self.model._concurrencymeta.field if not selected: return None if self.check_concurrent_action: self.delete_selected_confirmation_template = self.get_confirmation_template() # If select_across we have to avoid the use of concurrency if selected is not ALL: filters = [] for x in selected: try: pk, version = x.split(",") except ValueError: raise ImproperlyConfigured('`ConcurrencyActionMixin` error.' 'A tuple with `primary_key, version_number` ' 'expected: `%s` found' % x) filters.append(Q(**{'pk': pk, revision_field.attname: version})) queryset = queryset.filter(reduce(operator.or_, filters)) if len(selected) != queryset.count(): messages.error(request, 'One or more record were updated. ' '(Probably by other user) ' 'The execution was aborted.') return HttpResponseRedirect(".") else: messages.warning(request, 'Selecting all records, you will avoid the concurrency check') response = func(self, request, queryset) # Actions may return an HttpResponse, which will be used as the # response from the POST. If not, we'll be a good little HTTP # citizen and redirect back to the changelist page. if isinstance(response, HttpResponse): return response else: return HttpResponseRedirect(".")
class ConcurrentManagementForm(ManagementForm): def __init__(self, *args, **kwargs): self._versions = kwargs.pop('versions', []) super(ConcurrentManagementForm, self).__init__(*args, **kwargs) def _html_output(self, normal_row, error_row, row_ender, help_text_html, errors_on_separate_row): ret = super(ConcurrentManagementForm, self)._html_output(normal_row, error_row, row_ender, help_text_html, errors_on_separate_row) v = [] for pk, version in self._versions: v.append('<input type="hidden" name="_concurrency_version_{0}" value="{1}">'.format(pk, version)) return mark_safe("{0}{1}".format(ret, "".join(v))) class ConcurrentBaseModelFormSet(BaseModelFormSet): def _management_form(self): """Returns the ManagementForm instance for this FormSet.""" if self.is_bound: form = ConcurrentManagementForm(self.data, auto_id=self.auto_id, prefix=self.prefix) if not form.is_valid(): raise ValidationError('ManagementForm data is missing or has been tampered with') else: form = ConcurrentManagementForm(auto_id=self.auto_id, prefix=self.prefix, initial={TOTAL_FORM_COUNT: self.total_form_count(), INITIAL_FORM_COUNT: self.initial_form_count(), MAX_NUM_FORM_COUNT: self.max_num}, versions=[(form.instance.pk, get_revision_of_object(form.instance)) for form in self.initial_forms]) return form management_form = property(_management_form)
[docs]class ConcurrencyListEditableMixin(object): list_editable_policy = conf.POLICY def get_changelist_formset(self, request, **kwargs): kwargs['formset'] = ConcurrentBaseModelFormSet return super(ConcurrencyListEditableMixin, self).get_changelist_formset(request, **kwargs) def _add_conflict(self, request, obj): if hasattr(request, '_concurrency_list_editable_errors'): request._concurrency_list_editable_errors.append(obj.pk) else: request._concurrency_list_editable_errors = [obj.pk] def _get_conflicts(self, request): if hasattr(request, '_concurrency_list_editable_errors'): return request._concurrency_list_editable_errors else: return [] def save_model(self, request, obj, form, change): try: if change: version = request.POST.get('_concurrency_version_{0.pk}'.format(obj), None) if version: core._set_version(obj, version) super(ConcurrencyListEditableMixin, self).save_model(request, obj, form, change) except RecordModifiedError: self._add_conflict(request, obj) # If policy is set to 'silent' the user will be informed using message_user # raise Exception if not silent. # NOTE: # list_editable_policy MUST have the LIST_EDITABLE_POLICY_ABORT_ALL # set to work properly if self.list_editable_policy == CONCURRENCY_LIST_EDITABLE_POLICY_ABORT_ALL: raise def log_change(self, request, object, message): if object.pk in self._get_conflicts(request): return super(ConcurrencyListEditableMixin, self).log_change(request, object, message) def log_deletion(self, request, object, object_repr): if object.pk in self._get_conflicts(request): return super(ConcurrencyListEditableMixin, self).log_deletion(request, object, object_repr) def message_user(self, request, message, *args, **kwargs): # This is ugly but we do not want to touch the changelist_view() code. opts = self.model._meta conflicts = self._get_conflicts(request) if conflicts: names = force_text(opts.verbose_name), force_text(opts.verbose_name_plural) pattern = r"(?P<num>\d+) ({0}|{1})".format(*names) rex = re.compile(pattern) m = rex.match(message) concurrency_errros = len(conflicts) if m: updated_record = int(m.group('num')) - concurrency_errros if updated_record == 0: message = _("No %(name)s were changed due conflict errors") % {'name': names[0]} else: ids = ",".join(map(str, conflicts)) messages.error(request, ungettext("Record with pk `{0}` has been modified and was not updated", "Records `{0}` have been modified and were not updated", concurrency_errros).format(ids)) if updated_record == 1: name = force_text(opts.verbose_name) else: name = force_text(opts.verbose_name_plural) message = ungettext("%(count)s %(name)s was changed successfully.", "%(count)s %(name)s were changed successfully.", updated_record) % {'count': updated_record, 'name': name} return super(ConcurrencyListEditableMixin, self).message_user(request, message, *args, **kwargs)
[docs]class ConcurrentModelAdmin(ConcurrencyActionMixin, ConcurrencyListEditableMixin, admin.ModelAdmin): form = ConcurrentForm formfield_overrides = {forms.VersionField: {'widget': VersionWidget}}