#!/usr/bin/env python
__author__ = 'solivr'
__license__ = "GPL"
import tensorflow as tf
import numpy as np
import csv
from .config import Params, CONST
from typing import Tuple, Union, List
[docs]def random_rotation(img: tf.Tensor, max_rotation: float=0.1, crop: bool=True) -> tf.Tensor: # adapted from SeguinBe
"""
Rotates an image with a random angle.
See https://stackoverflow.com/questions/16702966/rotate-image-and-crop-out-black-borders for formulae
:param img: Tensor
:param max_rotation: maximum angle to rotate (radians)
:param crop: boolean to crop or not the image after rotation
:return:
"""
with tf.name_scope('RandomRotation'):
rotation = tf.random_uniform([], -max_rotation, max_rotation, name='pick_random_angle')
rotated_image = tf.contrib.image.rotate(img, rotation, interpolation='BILINEAR')
if crop:
rotation = tf.abs(rotation)
original_shape = tf.shape(rotated_image)[:2]
h, w = original_shape[0], original_shape[1]
old_l, old_s = tf.cond(h > w, lambda: [h, w], lambda: [w, h])
old_l, old_s = tf.cast(old_l, tf.float32), tf.cast(old_s, tf.float32)
new_l = (old_l * tf.cos(rotation) - old_s * tf.sin(rotation)) / tf.cos(2*rotation)
new_s = (old_s - tf.sin(rotation) * new_l) / tf.cos(rotation)
new_h, new_w = tf.cond(h > w, lambda: [new_l, new_s], lambda: [new_s, new_l])
new_h, new_w = tf.cast(new_h, tf.int32), tf.cast(new_w, tf.int32)
bb_begin = tf.cast(tf.ceil((h-new_h)/2), tf.int32), tf.cast(tf.ceil((w-new_w)/2), tf.int32)
# Test sliced
rotated_image_crop = tf.cond(
tf.logical_and(bb_begin[0] < h - bb_begin[0], bb_begin[1] < w - bb_begin[1]),
true_fn=lambda: rotated_image[bb_begin[0]:h - bb_begin[0], bb_begin[1]:w - bb_begin[1], :],
false_fn=lambda: img,
name='check_slices_indices'
)
# rotated_image_crop = rotated_image[bb_begin[0]:h - bb_begin[0], bb_begin[1]:w - bb_begin[1], :]
# If crop removes the entire image, keep the original image
rotated_image = tf.cond(tf.equal(tf.size(rotated_image_crop), 0),
true_fn=lambda: img,
false_fn=lambda: rotated_image_crop,
name='check_size_crop')
return rotated_image
[docs]def random_padding(image: tf.Tensor, max_pad_w: int=5, max_pad_h: int=10) -> tf.Tensor:
"""
Given an image will pad its border adding a random number of rows and columns
:param image: image to pad
:param max_pad_w: maximum padding in width
:param max_pad_h: maximum padding in height
:return: a padded image
"""
# TODO specify image shape in doc
w_pad = list(np.random.randint(0, max_pad_w, size=[2]))
h_pad = list(np.random.randint(0, max_pad_h, size=[2]))
paddings = [h_pad, w_pad, [0, 0]]
return tf.pad(image, paddings, mode='REFLECT', name='random_padding')
[docs]def augment_data(image: tf.Tensor, max_rotation: float=0.1) -> tf.Tensor:
"""
Data augmentation on an image (padding, brightness, contrast, rotation)
:param image: Tensor
:param max_rotation: float, maximum permitted rotation (in radians)
:return: Tensor
"""
with tf.name_scope('DataAugmentation'):
# Random padding
image = random_padding(image)
# TODO : add random scaling
image = tf.image.random_brightness(image, max_delta=0.1)
image = tf.image.random_contrast(image, 0.5, 1.5)
image = random_rotation(image, max_rotation, crop=True)
if image.shape[-1] >= 3:
image = tf.image.random_hue(image, 0.2)
image = tf.image.random_saturation(image, 0.5, 1.5)
return image
# def preprocess_image_for_prediction(fixed_height: int=32, min_width: int=8):
# """
# Input function to use when exporting the model for making predictions (see estimator.export_savedmodel)
# :param fixed_height: height of the input image after resizing
# :param min_width: minimum width of image after resizing
# :return:
# """
#
# def serving_input_fn():
# # define placeholder for input image
# image = tf.placeholder(dtype=tf.float32, shape=[None, None, 1])
#
# shape = tf.shape(image)
# # Assert shape is h x w x c with c = 1
#
# ratio = tf.divide(shape[1], shape[0])
# increment = CONST.DIMENSION_REDUCTION_W_POOLING
# new_width = tf.cast(tf.round((ratio * fixed_height) / increment) * increment, tf.int32)
#
# resized_image = tf.cond(new_width < tf.constant(min_width, dtype=tf.int32),
# true_fn=lambda: tf.image.resize_images(image, size=(fixed_height, min_width)),
# false_fn=lambda: tf.image.resize_images(image, size=(fixed_height, new_width))
# )
#
# # Features to serve
# features = {'images': resized_image[None], # cast to 1 x h x w x c
# 'images_widths': new_width[None] # cast to tensor
# }
#
# # Inputs received
# receiver_inputs = {'images': image}
#
# return tf.estimator.export.ServingInputReceiver(features, receiver_inputs)
#
# return serving_input_fn
[docs]def data_loader(csv_filename: Union[List[str], str], params: Params, labels=True, batch_size: int=64,
data_augmentation: bool=False, num_epochs: int=None, image_summaries: bool=False):
"""
Loads, preprocesses (data augmentation, padding) and feeds the data
:param csv_filename: filename or list of filenames
:param params: Params object containing all the parameters
:param labels: transcription labels
:param batch_size: batch_size
:param data_augmentation: flag to select or not data augmentation
:param num_epochs: feeds the data 'num_epochs' times
:param image_summaries: floag to show image summaries or not
:return: data_loader function
"""
padding = True
def input_fn():
if labels:
csv_types = [['None'], ['None']]
else:
csv_types = [['None']]
dataset = tf.contrib.data.CsvDataset(csv_filename, record_defaults=csv_types, header=False,
field_delim=params.csv_delimiter, use_quote_delim=True)
dataset = dataset.apply(tf.contrib.data.shuffle_and_repeat(buffer_size=1024, count=num_epochs))
# -- Read image
def _image_reading_preprocessing(path, label) -> dict():
# Load
image_content = tf.read_file(path, name='filename_reader')
# decode image is not used because it seems the shape is not set...
image = tf.cond(
tf.image.is_jpeg(image_content),
lambda: tf.image.decode_jpeg(image_content, channels=params.input_channels, name='image_decoding_op',
try_recover_truncated=True),
lambda: tf.image.decode_png(image_content, channels=params.input_channels, name='image_decoding_op'))
# Data augmentation
if data_augmentation:
image = augment_data(image, params.data_augmentation_max_rotation)
# Padding
if padding:
with tf.name_scope('padding'):
image, img_width = padding_inputs_width(image, target_shape=params.input_shape,
increment=CONST.DIMENSION_REDUCTION_W_POOLING)
# Resize
else:
image = tf.image.resize_images(image, size=params.input_shape)
img_width = tf.shape(image)[1]
# Update features
features = {'filenames': path, 'labels': label}
features.update({'images': image, 'images_widths': img_width})
return features
dataset = dataset.map(_image_reading_preprocessing, num_parallel_calls=params.input_data_n_parallel_calls)
dataset = dataset.batch(batch_size).prefetch(32)
prepared_batch = dataset.make_one_shot_iterator().get_next()
if image_summaries:
tf.summary.image('input/image', prepared_batch['images'], max_outputs=1)
if labels:
tf.summary.text('input/labels', prepared_batch.get('labels')[:10])
return prepared_batch, prepared_batch.get('labels')
return input_fn
# TODO serving function for batches
def serving_batch_filenames_fn(input_shape=(32, 100), n_channels: int=1, padding=True):
"""
Serving input function for batch inference using filenames as inputs
:param input_shape: shape of the input after resizing/padding
:param n_channels: number of channels of images
:param padding: if True, keeps the image ratio and pads it to get to 'input_shape' shape,
if False will resize the image using bilinear interpolation
:param batch_size: batch_size for inference
:return: serving input function
"""
def serving_input_fn():
# Define placeholder for batch size and filename
batch_size = tf.placeholder(dtype=tf.int64, name='batch_size')
image_filenames = tf.placeholder(dtype=tf.string, shape=[None], name='list_image_filenames')
# Create dataset
dataset = tf.data.Dataset.from_tensor_slices(image_filenames)
# -- Read image
def _image_reading_preprocessing(image_filename) -> dict():
# Load
image_content = tf.read_file(image_filename, name='filename_reader')
# Decode image is not used because it seems the shape is not set...
# image = tf.image.decode_jpeg(image_content, channels=params.input_channels,
# try_recover_truncated=True,name='image_decoding_op')
# tensorflow v1.8 change to :
image = tf.cond(
tf.image.is_jpeg(image_content),
lambda: tf.image.decode_jpeg(image_content, channels=n_channels, name='image_decoding_op',
try_recover_truncated=True),
lambda: tf.image.decode_png(image_content, channels=n_channels, name='image_decoding_op'))
# Padding
if padding:
with tf.name_scope('padding'):
image, img_width = padding_inputs_width(image, target_shape=input_shape,
increment=CONST.DIMENSION_REDUCTION_W_POOLING)
# Resize
else:
image = tf.image.resize_images(image, size=input_shape)
img_width = tf.shape(image)[1]
return image, img_width
dataset = dataset.map(_image_reading_preprocessing)
dataset = dataset.batch(batch_size)
# Build the Iterator this way in order to be able to initialize it when the saved_model will be loaded
# From http://vict0rsch.github.io/2018/05/17/restore-tf-model-dataset/
iterator = tf.data.Iterator.from_structure(dataset.output_types, dataset.output_shapes)
dataset_init_op = iterator.make_initializer(dataset, name='dataset_init')
features_images, features_widths = iterator.get_next()
# Features to serve 'images', images_width'
features = {'images': features_images, 'images_widths': features_widths}
return tf.estimator.export.ServingInputReceiver(features, receiver_tensors={'list_filenames': image_filenames,
'batch_size': batch_size})
return serving_input_fn
# TODO serving function from url...