Traffic signs recognition. More simple RNN. Part4.

02.07.2017
By

There is more simple architecture of RNN with tensorflow but result on russian traffic dataset is bad (about 70% of recognized images with 100000 epoch – training time – about 15 hours). Anyway I will show python program for information.

 """
modification of traffic_ru dataset with image preprocession
(make YUV mormalized images) from http://jokla.me/robotics/traffic-signs/
"""
from __future__ import division

from tensorflow.python.framework import graph_util
from tensorflow.python.platform import gfile

import os
import random
import cv2
import skimage.data
import skimage.transform
import skimage.exposure
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

def load_data(data_dir):
    """
    Loads a data set and returns two lists:

    images: a list of Numpy arrays, each representing an image.
    labels: a list of numbers that represent the images labels.
    """
    # Get all subdirectories of data_dir. Each represents a label.
    directories = [d for d in os.listdir(data_dir)
                   if os.path.isdir(os.path.join(data_dir, d))]
    # Loop through the label directories and collect the data in
    # two lists, labels and images.
    labels = []
    images = []
    for d in directories:
        label_dir = os.path.join(data_dir, d)
        file_names = [os.path.join(label_dir, f)
                      for f in os.listdir(label_dir) if (f.endswith(".jpg") or f.endswith(".jpeg"))]
        # For each label, load it's images and add them to the images list.
        # And add the label number (i.e. directory name) to the labels list.
        for f in file_names:
            images.append(skimage.data.imread(f))
            labels.append(int(d))
        print d
    return images, labels

def pre_processing_single_img (img):

    img_y = cv2.cvtColor(img,(cv2.COLOR_BGR2YUV))[:,:,0]
    img_y = (img_y / 255.).astype(np.float32)
    img_y = (skimage.exposure.equalize_adapthist(img_y) - 0.5) # image will be from -0.5 to +0.5
    img_y = img_y.reshape(img_y.shape + (1,)) # if need directly reshape to tf format (

    return img_y

def display_images_and_labels(images, labels):
    """Display the first image of each label."""
    unique_labels = set(labels)
    plt.figure(figsize=(15, 15))
    i = 1
    for label in unique_labels:
    if i<=64: #restriction from matplotlib
            # Pick the first image for each label.
        image = images[labels.index(label)]
        plt.subplot(8, 8, i)  # A grid of 8 rows x 8 columns
        plt.axis('off')
        plt.title("Label {0} ({1})".format(label, labels.count(label)))
        _ = plt.imshow(image)
        i += 1
    plt.show()

def display_label_images(images, label):
    """Display images of a specific label."""
    limit = 24  # show a max of 24 images
    plt.figure(figsize=(15, 5))
    i = 1

    start = labels.index(label)
    end = start + labels.count(label)
    for image in images[start:end][:limit]:
        plt.subplot(3, 8, i)  # 3 rows, 8 per row
        plt.axis('off')
        i += 1
        plt.imshow(image)
    plt.show()

# Load training and testing datasets.
ROOT_PATH = "/home/tensorflow/python_prog/traffic_ru"
ROOT_PATH_SAVES = "/home/tensorflow/python_prog/traffic_ru/graph_aug_saves"
train_data_dir = os.path.join(ROOT_PATH, "mod_train_cleaned_augmented")
test_data_dir = os.path.join(ROOT_PATH, "Testing") # cleaned testing direcrory without fake images
output_graph = os.path.join(ROOT_PATH_SAVES, "traffic_ru.chkp")
output_labels = os.path.join(ROOT_PATH_SAVES, "traffic_ru_lbl.txt")

images, labels = load_data(train_data_dir)

unique_labels_set = set(labels)
unique_labels_set_string = []
for label in unique_labels_set:
    unique_labels_set_string.append(str(label))    

print("Unique Labels: {0}\nTotal Images: {1}".format(len(set(labels)), len(images)))

images_yuv = []
i = 0
for img in images:
    images_yuv.append(pre_processing_single_img(img))
    i+=1
    if i % 1000 == 0:
        print("Training images processed: ", i)

#for img in images_yuv:
    #cv2.imshow("img",img)
    #cv2.waitKey(0)
    #cv2.destroyAllWindows()
#display_images_and_labels(images_yuv, labels)
#display_label_images(images_yuv, 32)

for image in images_yuv[:5]:
    print("shape: {0}, min: {1}, max: {2}".format(image.shape, image.min(), image.max()))

# Resize images
images32 = [skimage.transform.resize(image, (32, 32))
                for image in images_yuv]
#display_images_and_labels(images32, labels)

for image in images32[:5]:
    print("shape: {0}, min: {1}, max: {2}".format(image.shape, image.min(), image.max()))

labels_a = np.array(labels)
images_a = np.array(images32)
print("labels: ", labels_a.shape, "\nimages: ", images_a.shape)

# Create a graph to hold the model.
graph = tf.Graph()

# Create model in the graph.
with graph.as_default():
    # Placeholders for inputs and labels.
    #images_ph = tf.placeholder(tf.float32, [None, 32, 32, 1], name = "image_jpeg")
    #images_ph = tf.placeholder(tf.float32, [None, 32, 32, 3]) # for colored RGB images with 3 channels
    images_ph = tf.placeholder(tf.float32, [None, 32, 32, 1]) # for preprocessed images with 1 channel
    labels_ph = tf.placeholder(tf.int32, [None])

    # Flatten input from: [None, height, width, channels]
    # To: [None, height * width * channels] == [None, 3072]
    images_flat = tf.contrib.layers.flatten(images_ph)

    # Fully connected layer.
    # Generates logits of size [None, 67] 67 - numbers of classes in training dataset
    logits = tf.contrib.layers.fully_connected(images_flat, 67, tf.nn.relu)

    # Convert logits to label indexes (int).
    # Shape [None], which is a 1D vector of length == batch_size.
    predicted_labels = tf.argmax(logits, 1, name = "predicted")
    #predicted_labels = tf.argmax(logits, 1)

    # Define the loss function.
    # Cross-entropy is a good choice for classification.
    # !!!!!!!!!!!!!!!!!!!!!!!!!!!!! changed !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
    #loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels = labels_ph), name = "loss_func")
    loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits, labels = labels_ph))

    # Create training op.
    #train = tf.train.AdamOptimizer(learning_rate=0.001, epsilon=1.0).minimize(loss)
    #train = tf.train.AdamOptimizer(learning_rate=0.001, epsilon=0.00000001, name = "train_func").minimize(loss)
    #train = tf.train.AdamOptimizer(learning_rate=0.001, epsilon=0.00000001).minimize(loss)
    train = tf.train.AdamOptimizer(learning_rate=0.02, epsilon=0.01).minimize(loss) # for 0.02 big dataset to reduce epoches

    # Let's create a Saver object
    # By default, the Saver handles every Variables related to the default graph
    all_saver = tf.train.Saver()

    # And, finally, an initialization op to execute before training.
    init = tf.global_variables_initializer()

print("images_flat: ", images_flat)
print("logits: ", logits)
print("loss: ", loss)
print("predicted_labels: ", predicted_labels)

# Create a session to run the graph we created.
session = tf.Session(graph=graph)

# First step is always to initialize all variables.
# We don't care about the return value, though. It's None.
_ = session.run([init])

# We can list operations
#for op in session.graph.get_operations():
    #print(op.name)

#100000 epoch give 0.45 loss and 70% prediction
#10000 epoch give 0.77 loss and 74% of prediction
#2001 epoch give 0,80 loss and 74% of prediction
#1001 epoch give 1,06 loss and 74% of prediction
epoch = 1001
for i in range(epoch):
    _, loss_value = session.run([train, loss],
                                feed_dict={images_ph: images_a, labels_ph: labels_a})
    if i % 10 == 0:
        print("Loss: ", loss_value)
        print("Epoch: ", i)

#Save result to graph
all_saver.save(session, output_graph, global_step = epoch)

# Pick 20 random images
sample_indexes = random.sample(range(len(images32)), 20)
sample_images = [images32[i] for i in sample_indexes]
sample_labels = [labels[i] for i in sample_indexes]

# Run the "predicted_labels" op.
predicted = session.run([predicted_labels],
                        feed_dict={images_ph: sample_images})[0]
print(sample_labels)
print(predicted)

"""
# Display the predictions and the ground truth visually.
fig = plt.figure(figsize=(10, 10))
for i in range(len(sample_images)):
    truth = sample_labels[i]
    prediction = predicted[i]
    plt.subplot(5, 2,1+i)
    plt.axis('off')
    color='green' if truth == prediction else 'red'
    plt.text(40, 10, "Truth:        {0}\nPrediction: {1}".format(truth, prediction),
             fontsize=12, color=color)
    plt.imshow(sample_images[i])
plt.show()
"""

# Load the test dataset.
test_images, test_labels = load_data(test_data_dir)

test_images_yuv = []
i = 0
for img in test_images:
    test_images_yuv.append(pre_processing_single_img(img))
    i+=1
    if i % 1000 == 0:
        print("Test images processed: ", i)

# Transform the images, just like we did with the training set.
test_images32 = [skimage.transform.resize(image, (32, 32))
                 for image in test_images_yuv]
#display_images_and_labels(test_images32, test_labels)

# Run predictions against the full test set.
predicted = session.run([predicted_labels],
                        feed_dict={images_ph: test_images32})[0]

#print(test_labels)
#print(predicted)

# Calculate how many matches we got.
match_count = sum([int(y == y_) for y, y_ in zip(test_labels, predicted)])
print(match_count)
print(len(test_labels))
accuracy = match_count/len(test_labels)
print(accuracy)

# Write out the trained graph and labels with the weights stored as
# constants.
#output_graph_def = graph_util.convert_variables_to_constants(session, graph.as_graph_def(), ["predicted"])
#with gfile.FastGFile(output_graph, 'wb') as f:
    #f.write(output_graph_def.SerializeToString())
with gfile.FastGFile(output_labels, 'w') as f:
    f.write('\n'.join(unique_labels_set_string) + '\n')

# Close the session. This will destroy the trained model.
session.close()

 

But with 2000 epoches and 10000 epoches result of training practically same – about 68%.

We can check result of trainings with this program

 """
program to predict single jpg file of traffic sign
model training we can do with traffic_ru.py program and
protobuf files (*.pb) prepared by traffic_ru_make_pb.py program
Andrey Surkov 2017

"""

import numpy as np
import tensorflow as tf
import skimage.data
import skimage.transform
from numpy import array

imagePath = '/home/tensorflow/python_prog/traffic_ru/check_jpg/test2.jpg'
modelFullPath = '/home/tensorflow/python_prog/traffic_ru/graph_saves/traffic_ru.pb'
labelsFullPath = '/home/tensorflow/python_prog/traffic_ru/graph_saves/traffic_ru_lbl.txt'

def create_graph():
    """Creates a graph from saved GraphDef file and returns a saver."""
    # Creates graph from saved *.pb.
    with tf.gfile.FastGFile(modelFullPath, 'rb') as f:
        graph_def = tf.GraphDef()
        graph_def.ParseFromString(f.read())
        _ = tf.import_graph_def(graph_def, name='')

def run_predict_image():

    answer = None
    # checking if jpg file existed
    if not tf.gfile.Exists(imagePath):
        tf.logging.fatal('File does not exist %s', imagePath)
        return answer

    #reading image data
    image_data = skimage.data.imread(imagePath)

    # Resize images
    image32 = skimage.transform.resize(image_data, (32, 32))
    #print("shape: {0}, min: {1}, max: {2}".format(image32.shape, image32.min(), image32.max()))

    #image_a = np.array(image32)
    #transferring image to array from one element because placeholder recieves array of jpeg
    #link from graph = > images_ph = tf.placeholder(tf.float32, [None, 32, 32, 3])
    image_a = array(image32).reshape(1,32,32,3)

    # Creates graph from saved GraphDef.
    create_graph()

    with tf.Session() as sess:

        # We can list operations
        #for op in sess.graph.get_operations():
            #print(op.name)

        #argmax tensor we use because out graph so this prediction with this tensor
        # from graph => predicted_labels = tf.argmax(logits, 1)
        # default name is ArgMax we can check it if we list all operations  
        argmax_tensor = sess.graph.get_tensor_by_name('predicted:0')
        # this is placeholder from graph where we can put our jpeg for prediction
        # from graph => images_ph = tf.placeholder(tf.float32, [None, 32, 32, 3])
        # default name is Placeholder
        image_ph = sess.graph.get_tensor_by_name('Placeholder:0')

        # make predictions
        predictions = sess.run(argmax_tensor, feed_dict={image_ph: image_a})[0]

        # if we predict only one jpg file this is single number
        predictions = np.squeeze(predictions)

        #print(predictions)

        f = open(labelsFullPath, 'rb')
        lines = f.readlines()
        labels = [str(w).replace("\n", "") for w in lines]

        answer = labels[predictions]
        return answer

if __name__ == '__main__':
    print(run_predict_image())

 

before test one image with traffic sign we need to transform it to numpy array for correct work of program.

Tags: ,

Добавить комментарий