Source code for kim.pipelines.base
# kim/pipelines/base.py
# Copyright (C) 2014-2015 the Kim authors and contributors
# <see AUTHORS file>
#
# This module is part of Kim and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
from itertools import chain
from functools import wraps
from kim.exception import StopPipelineExecution, FieldError
from kim.utils import attr_or_key, set_attr_or_key, attr_or_key_update
class Session(object):
"""Session objects acts as store for the state passed between
one pipe method to another.
Everytime a :class:`kim.field.Field` is marshaled or serialized a session object
is created for each field that persists across every pipe inside of the fields
MarsahlingPipeline or SerializationPipeline.
Each pipe in a pipeline is able to work with the data set by the previous pipe using
the session object.
Example::
from kim.pipelines.base import pipe
@pipe()
def odds_evens(session):
if session.data is not None:
if session.data % 2:
session.data = 'evens'
else:
session.data = 'odds'
In the example above our pipe changes the data of our pipeline based on the current
value of session.data.
As well as providing the interface to the data in a fields pipeline, the Session
object also provides pipes with access to the overal state of that marshaling or
serialization pipeline.
"""
__slots__ = ('field', 'data', 'output', 'parent', 'mapper_session', 'nested_mapper')
def __init__(self, field=None, data=None, output=None,
parent=None, mapper_session=None, nested_mapper=None):
"""Construct a new session.
:param field: an instance of :class:`kim.field.Field` the scope of this session
is bound too.
:param data: data that has been passed along the pipeline field marshaling or
serialization session.
:param output: An object that contains the output of this fields marshaling
or serialization session.
:param parent: If this is a wrapped field, then the parent kwarg will be a
referrence to the instance of the field wrapping field.
:param mapper_session: The overal mapper marshaling or serialization session
this field session belongs to.
"""
self.field = field
self.data = data
self.output = output
self.parent = parent
self.mapper_session = mapper_session
self.nested_mapper = nested_mapper
@property
def mapper(self):
"""Return the :class:`kim.mapper.Mapper` bound to the scope of this Session.
:returns: The :class:`kim.mapper.Mapper` bound to this Session.
:rtype: :class:`kim.mapper.Mapper`
"""
return self.mapper_session.mapper
[docs]def pipe(**pipe_kwargs):
"""Pipe decorator is provided as a convenience to avoid duplicating logic like
not running pipes when session.data is null.
:param run_if_none: Specify wether the pipe function should be called if session.data
is None.
Usage::
from kim.pipelines.base import pipe
@pipe(run_if_none=True)
def my_pipe(session):
do_stuff(session)
"""
def pipe_decorator(pipe_func):
@wraps(pipe_func)
def inner(session, *args, **kwargs):
if session.data is not None:
return pipe_func(session)
elif session.data is None and pipe_kwargs.get('run_if_none'):
return pipe_func(session)
else:
return session.data
return inner
return pipe_decorator
[docs]class Pipeline(object):
"""Pipelines provide a simple, extensible way of processing data for
a :class:`kim.field.Field`. Each pipeline provides 4 input groups,
``input_pipes``, ``validation_pipes``, ``process_pipes`` and ``output_pipes``.
Each containing `pipe` functions that are called in order
passing data from one pipe to another.
Kim pipes are similar to unix pipes, where each pipe in the chain has a
single role in handling data before passing it on to the next pipe in the
chain.
Pipelines are typically ignorant to whether they
are marhsaling data or serializing data, they simply take data in one end,
this may be a deserialized dict of JSON or an object
that's been populated from the database, and produce an output
at the other.
Usage::
from kim.pipelines.base import Pipeline
class StringIntPipeline(Pipeline):
input_pipes = [get_data_from_json]
validation_pipes = [is_numeric_string]
process_pipes [cast_to_int]
output_pipes = [update_output]
"""
input_pipes = []
validation_pipes = []
process_pipes = []
output_pipes = []
__slots__ = ()
@classmethod
def get_pipeline(cls, **extra_pipes):
chain = []
chain.extend(cls.input_pipes + extra_pipes.get('input', []))
chain.extend(cls.validation_pipes + extra_pipes.get('validation', []))
chain.extend(cls.process_pipes + extra_pipes.get('process', []))
chain.extend(cls.output_pipes + extra_pipes.get('output', []))
return chain
def run_pipeline(pipeline, session, field, **opts):
""" Iterate over all of the defined ``pipes`` for this pipeline.
:param parent_session: The field being processed by this Pipeline is wrapped,
parent_session will be passed and set on the fields session.
:returns: Returns the output of the pipelines session.
:rtype: mixed
"""
# chain all the pipelines pipes together and process them until the all the
# pipe groups have been exhausted or until
# :class:`kim.exception.StopPipelineExecution` is raised.
try:
for pipe_func in pipeline:
pipe_func(session)
return session.output
except StopPipelineExecution:
return session.output
@pipe(run_if_none=True)
[docs]def get_data_from_name(session):
"""Extracts a specific key from data using ``field.name``. This pipe is
typically used as the entry point to a chain of input pipes.
:param session: Kim pipeline session instance
:rtype: mixed
:returns: the key found in data using field.name
"""
# If the field is wrapped by another field then the relevant data
# will have already been pulled from the name.
if session.field.opts._is_wrapped:
return session.data
value = attr_or_key(session.data, session.field.name)
if value is None:
if session.field.opts.required and session.field.opts.default is None:
raise session.field.invalid(error_type='required')
elif session.field.opts.default is not None:
session.data = session.field.opts.default
return session.data
elif not session.field.opts.allow_none:
raise session.field.invalid(error_type='none_not_allowed')
session.data = value
return session.data
@pipe()
[docs]def get_data_from_source(session):
"""Extracts a specific key from data using ``field.source``. This pipe is
typically used as the entry point to a chain of output pipes.
:param session: Kim pipeline session instance
:rtype: mixed
:returns: the key found in data using field.source
"""
source = session.field.opts.source
# If the field is wrapped by another field then the relevant data
# will have already been pulled from the source.
if session.field.opts._is_wrapped or source == '__self__':
return session.data
value = attr_or_key(session.data, source)
session.data = value
return session.data
@pipe(run_if_none=True)
[docs]def get_field_if_required(session):
if session.data is None:
session.data = session.field.opts.default
return session.data
@pipe()
[docs]def read_only(session):
"""End processing of a pipeline if a Field is marked as read_only.
:param session: Kim pipeline session instance
:raises StopPipelineExecution:
"""
if session.field.opts.read_only:
raise StopPipelineExecution('read_only field')
return session.data
@pipe()
[docs]def is_valid_choice(session):
"""End processing of a pipeline if a Field is marked as read_only.
:param session: Kim pipeline session instance
:raises StopPipelineExecution:
"""
choices = session.field.opts.choices
if choices is not None and session.data not in choices:
raise session.field.invalid('invalid_choice')
return session.data
@pipe(run_if_none=True)
[docs]def update_output_to_name(session):
"""Store ``data`` at ``field[name]`` for a ``field`` inside
of ``output``
:param session: Kim pipeline session instance
:returns: None
"""
session.output[session.field.name] = session.data
@pipe(run_if_none=True)
[docs]def update_output_to_source(session):
"""Store ``data`` at field.opts.source for a ``field`` inside
of ``output``
:param session: Kim pipeline session instance
:raises: FieldError
:returns: None
"""
source = session.field.opts.source
try:
if source == '__self__':
attr_or_key_update(session.output, session.data)
else:
set_attr_or_key(session.output, session.field.opts.source, session.data)
except (TypeError, AttributeError):
raise FieldError('output does not support attribute or '
'key based set operations')
@pipe(run_if_none=True)
def set_default(session):
"""If ``data`` is None, set default if it is set.
:param session: Kim pipeline session instance
:returns: None
"""
if session.data is None:
session.data = session.field.opts.default
return session.data