Source code for dltk.io.abstract_reader

from __future__ import unicode_literals
from __future__ import print_function
from __future__ import division
from __future__ import absolute_import

import tensorflow as tf
import traceback


[docs]class IteratorInitializerHook(tf.train.SessionRunHook): """Hook to initialise data iterator after Session is created.""" def __init__(self): super(IteratorInitializerHook, self).__init__() self.iterator_initializer_func = None
[docs] def after_create_session(self, session, coord): """Initialise the iterator after the session has been created.""" self.iterator_initializer_func(session)
[docs]class Reader(object): """Wrapper for dataset generation given a read function""" def __init__(self, read_fn, dtypes): """Constructs a Reader instance Args: read_fn: Input function returning features which is a dictionary of string feature name to `Tensor` or `SparseTensor`. If it returns a tuple, first item is extracted as features. Prediction continues until `input_fn` raises an end-of-input exception (`OutOfRangeError` or `StopIteration`). dtypes: A nested structure of tf.DType objects corresponding to each component of an element yielded by generator. """ self.dtypes = dtypes self.read_fn = read_fn
[docs] def get_dataset(self, file_references, mode, example_shapes=None, shuffle_cache_size=100, batch_size=4, params=None): """Function to get a tf.data.Dataset from a python generator. Args: file_references: An array like structure that holds the reference to the file to read. It can also be None if not needed. mode: A tf.estimator.ModeKeys. It is passed on to `read_fn` to trigger specific functions there. example_shapes (optional): A nested structure of lists or tuples corresponding to the shape of each component of an element yielded by generator. shuffle_cache_size (int, optional): An `int` determining the number of examples that are held in the shuffle queue. batch_size (int, optional): An `int` specifying the number of examples returned in a batch. params (dict, optional): A `dict` passed on to the `read_fn`. Returns: tf.data.Dataset: The dataset holding the data to look at. """ def f(): def clean_ex(ex, compare): # Clean example dictionary by recursively deleting # non-relevant entries. However, this does not look into # dictionaries nested into lists for k in list(ex.keys()): if k not in list(compare.keys()): del ex[k] elif isinstance(ex[k], dict) and isinstance(compare[k], dict): clean_ex(ex[k], compare[k]) elif (isinstance(ex[k], dict) and not isinstance(compare[k], dict)) or \ (not isinstance(ex[k], dict) and isinstance( compare[k], dict)): raise ValueError('Entries between example and ' 'dtypes incompatible for key {}' ''.format(k)) elif ((isinstance(ex[k], list) and not isinstance( compare[k], list)) or (not isinstance(ex[k], list) and isinstance( compare[k], list)) or (isinstance(ex[k], list) and isinstance(compare[k], list) and not len(ex[k]) == len(compare[k]))): raise ValueError('Entries between example and ' 'dtypes incompatible for key {}' ''.format(k)) for k in list(compare): if k not in list(ex.keys()): raise ValueError('Key {} not found in ex but is ' 'present in dtypes. Found keys: ' '{}'.format(k, ex.keys())) return ex fn = self.read_fn(file_references, mode, params) # iterate over all entries - this loop is terminated by the # tf.errors.OutOfRangeError or StopIteration thrown by the # read_fn while True: try: ex = next(fn) if ex.get('labels') is None: ex['labels'] = None if not isinstance(ex, dict): raise ValueError('The read_fn has to return ' 'dictionaries') ex = clean_ex(ex, self.dtypes) yield ex except (tf.errors.OutOfRangeError, StopIteration): raise except Exception as e: print('got error `{} from `_read_sample`:'.format(e)) print(traceback.format_exc()) raise dataset = tf.data.Dataset.from_generator( f, self.dtypes, example_shapes) dataset = dataset.repeat(None) dataset = dataset.shuffle(shuffle_cache_size) dataset = dataset.batch(batch_size) dataset = dataset.prefetch(tf.contrib.data.AUTOTUNE) return dataset
[docs] def get_inputs(self, file_references, mode, example_shapes=None, shuffle_cache_size=100, batch_size=4, params=None): """ Function to provide the input_fn for a tf.Estimator. Args: file_references: An array like structure that holds the reference to the file to read. It can also be None if not needed. mode: A tf.estimator.ModeKeys. It is passed on to `read_fn` to trigger specific functions there. example_shapes (optional): A nested structure of lists or tuples corresponding to the shape of each component of an element yielded by generator. shuffle_cache_size (int, optional): An `int` determining the number of examples that are held in the shuffle queue. batch_size (int, optional): An `int` specifying the number of examples returned in a batch. params (dict, optional): A `dict` passed on to the `read_fn`. Returns: function: a handle to the `input_fn` to be passed the relevant tf estimator functions. tf.train.SessionRunHook: A hook to initialize the queue within the dataset. """ iterator_initializer_hook = IteratorInitializerHook() def inputs(): dataset = self.get_dataset(file_references, mode, example_shapes, shuffle_cache_size, batch_size, params) iterator = dataset.make_initializable_iterator() next_dict = iterator.get_next() # Set runhook to initialize iterator iterator_initializer_hook.iterator_initializer_func = \ lambda sess: sess.run(iterator.initializer) # Return batched (features, labels) return next_dict['features'], next_dict.get('labels') # Return function and hook return inputs, iterator_initializer_hook
[docs] def serving_input_receiver_fn(self, placeholder_shapes): """Build the serving inputs. Args: placeholder_shapes: A nested structure of lists or tuples corresponding to the shape of each component of the feature elements yieled by the read_fn. Returns: function: A function to be passed to the tf.estimator.Estimator instance when exporting a saved model with estimator.export_savedmodel. """ def f(): inputs = {k: tf.placeholder( shape=[None] + list(placeholder_shapes['features'][k]), dtype=self.dtypes['features'][k]) for k in list(self.dtypes['features'].keys())} return tf.estimator.export.ServingInputReceiver(inputs, inputs) return f