Source code for kim.pipelines.collection

# kim/pipelines/string.py
# Copyright (C) 2014-2015 the Kim authors and contributors
# <see AUTHORS file>
#
# This module is part of Kim and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php

from kim.exception import FieldInvalid
from kim.utils import attr_or_key

from .base import pipe
from .marshaling import MarshalPipeline
from .serialization import SerializePipeline


@pipe(run_if_none=True)
[docs]def marshall_collection(session): """iterate over each item in ``data`` and marshal the item through the wrapped field defined for this collection :param session: Kim pipeline session instance TODO(mike) this should be called marshal_collection """ wrapped_field = session.field.opts.field existing_value = attr_or_key(session.output, session.field.opts.source) output = [] if session.data is not None: if not hasattr(session.data, '__iter__'): raise session.field.invalid('type_error') for i, datum in enumerate(session.data): _output = {} # If the object already exists, try to match up the existing elements # with those in the input json if existing_value is not None: try: _output[wrapped_field.opts.source] = existing_value[i] except IndexError: pass mapper_session = session.mapper.get_mapper_session(datum, _output) wrapped_field.marshal(mapper_session, parent_session=session) result = _output[wrapped_field.opts.source] output.append(result) session.data = output return session.data
@pipe()
[docs]def serialize_collection(session): """iterate over each item in ``data`` and serialize the item through the wrapped field defined for this collection :param session: Kim pipeline session instance """ wrapped_field = session.field.opts.field field_name = wrapped_field.name output = [] mapper_session = session.mapper.get_mapper_session(None, {}) # If the wrapped field uses a mapper, fetch it once to avoid looking up the mapper # from the registry for each item in the collection. session.nested_mapper = getattr( wrapped_field, 'get_mapper', lambda **kwargs: None)(as_class=True) for datum in session.data: mapper_session.data = datum mapper_session.output = {} wrapped_field.serialize(mapper_session, parent_session=session) output.append(mapper_session.output[field_name]) session.data = output return session.data
@pipe()
[docs]def check_duplicates(session): """iterate over collection and check for duplicates if th unique_on FieldOpt has been set of this Collection field TODO(mike) This should only run if the wrapped field is a nested collection """ data = session.data key = session.field.opts.unique_on if key: keys = [attr_or_key(a, key) for a in data] if len(keys) != len(set(keys)): raise session.field.invalid(error_type='duplicates') return data
[docs]class CollectionMarshalPipeline(MarshalPipeline): """CollectionMarshalPipeline .. seealso:: :func:`kim.pipelines.collection.check_duplicates` :func:`kim.pipelines.collection.marshal_collection` :class:`kim.pipelines.marshaling.MarshalPipeline` """ input_pipes = MarshalPipeline.input_pipes + [check_duplicates, marshall_collection]
[docs]class CollectionSerializePipeline(SerializePipeline): """CollectionSerializePipeline .. seealso:: :func:`kim.pipelines.collection.serialize_collection` :class:`kim.pipelines.serialization.SerializePipeline` """ process_pipes = [serialize_collection, ] + SerializePipeline.process_pipes