Click here to Skip to main content
15,881,139 members
Articles / Internet of Things / Raspberry-Pi
Article

Arm-Powered Driver Distraction Detection

Rate me:
Please Sign up or sign in to vote.
5.00/5 (3 votes)
7 Feb 2022CPOL8 min read 5.2K   95   5   1
How to create a driver distraction detector and how to run it on an Arm-powered device like a Raspberry Pi or Jetson Nano
In this article, we demonstrate how straightforward it is to develop an AI computer vision application for a portable Arm-powered device. We choose this solution for practicality since our driver distraction detection system must function autonomously in a driving car. We show that the application can run on the Arm-powered device in real-time mode, reaching about two FPS processing speed.

This article is a sponsored article. Articles such as these are intended to provide you with information on products and services that we consider useful and of value to developers

Introduction

When discussing computer vision (CV) based on artificial intelligence (AI) or deep learning (DL), we often imagine a powerful desktop machine or server processing images or videos. But sometimes, we need to run a complicated CV algorithm on a portable device.

For example, to create a computer system to prevent a driver from becoming distracted, the most practical solution is a stand-alone device with specialized software. A driver, fleet manager, or manufacturer could then place such a device into a vehicle to alert the driver when they are likely distracted.

So, can we run a complicated algorithm on portable Arm-powered devices? In this article, we will demonstrate how to create a distracted driver detector and show how to run it on a Raspberry Pi device. We will use Python to develop our program, OpenCV for the computer vision algorithms, and convolutional neural networks (CNN) to detect possible driver distraction.

Inventing the Algorithm

We will use a simple detection type that checks if eyes are closed for short intervals. We could characterize many other distraction symptoms, but this one is likely the most reliable.

Modern AI algorithms can complete this task with minimal effort. One approach uses a special CNN for detecting so-called facial landmarks. The image below features an extremely common 68-point facial landmarks diagram.

Image 1

Using the eye point landmark coordinates, we can calculate the eye’s height-to-width ratio. When an eye is closed, this ratio is significantly lower. Through tracking this data, we can detect the moment of potential distraction.

A common approach to obtaining facial landmarks is detecting a face bounding box (a box around a face) and locating the landmark coordinates within it. Therefore, this algorithm requires two ingredients — a face detector and a landmark evaluator. We will use deep neural networks (DNN) for both subtasks. You can find the face detection TensorFlow model on GitLab. For a facial landmark evaluator, we will use this Caffe model.

Detecting Facial Landmarks

Let us start writing code for our facial landmarks detection algorithm. We begin with the face detector based on the DNN model.

Python
class TF_FD:    
    def __init__(self, model, graph, min_size, min_confidence):
        self.min_size = min_size
        self.min_confidence = min_confidence
        self.detector = cv2.dnn.readNetFromTensorflow(model, graph)
        l_names = self.detector.getLayerNames()
        if len(l_names)>0:
            print('Face detector loaded:')
        else:
            print('Face detector loading FAILED')
        
    def detect(self, frame):
        width = frame.shape[1]
        height = frame.shape[0]
        
        inputBlob = cv2.dnn.blobFromImage(frame, 1.0, (300, 300), \
                                          (104.0, 177.0, 123.0), True, False)

        self.detector.setInput(inputBlob, 'data');
        detection = self.detector.forward('detection_out');

        n = detection.shape[2]
        
        detected = []
        for i in range(n):
            conf = detection[0, 0, i, 2]
            if conf >= self.min_confidence:
                x1 = detection[0, 0, i, 3]
                y1 = detection[0, 0, i, 4]
                x2 = detection[0, 0, i, 5]
                y2 = detection[0, 0, i, 6]
                # skip faces out of the frame
                if Utils.point_is_out(x1,y1) or Utils.point_is_out(x2, y2):
                    continue
                fw = (x2-x1)*width
                fh = (y2-y1)*height
                if (fw>=self.min_size) and (fh>=self.min_size):
                    r = (x1, y1, x2, y2)
                    d = (conf, r)
                    detected.append(d)
        
        return detected

This simple class provides the constructor for loading a TensorFlow neural network from the specified model and graph files. The OpenCV framework’s cv2.dnn module delivers methods for loading DNN models of many popular formats. The constructor has two additional arguments: the minimum face size and minimum detection confidence.

The class’s detect method receives one parameter, frame, which is an image or a video frame. The function creates a blob object (a special 4D array that we use as input data for the detector).

Note that we used some specific values for the parameters in our model’s blobFromImage function. If you use another face detection model, remember to change the values as needed.

Next, we run the detector calling the forward method and extract data (detection confidence and bounding boxes) for all faces satisfying our criteria (the minimum size and confidence).

We next develop the second class, the facial landmarks detector:

Python
class CAFFE_FLD:    
    def __init__(self, model, proto):
        self.detector = cv2.dnn.readNetFromCaffe(proto, model)
        l_names = self.detector.getLayerNames()
        if len(l_names)>0:
            print('Face landmarks detector loaded:')
        else:
            print('Face landmarks detector loading FAILED')
    
    def get_face_rect(self, frame, face):
        width = frame.shape[1]
        height = frame.shape[0]
        
        (conf, rect) =  face
        (x1, y1, x2, y2) = rect
        fw = (x2-x1)*width
        fh = (y2-y1)*height
        
        if fw>fh:
            dx = (fw-fh)/(2*width)
            x1 = x1+dx
            x2 = x2-dx
        else:
            dy = (fh-fw)/(2*height)
            y1 = y1+dy
            y2 = y2-dy
        
        x1 = Utils.fit(x1)
        y1 = Utils.fit(y1)
        x2 = Utils.fit(x2)
        y2 = Utils.fit(y2)
        
        rect = (x1, y1, x2, y2)
        
        return rect
    
    def get_frame_points(self, face_rect, face_points):
        (x1, y1, x2, y2) = face_rect
        fw = (x2-x1)
        fh = (y2-y1)
        
        n = len(face_points)
        frame_points = []
        
        for i in range(n):
            v = face_points[i]
            if (i % 2) == 0:
                dv = x1
                df = fw
            else:
                dv = y1
                df = fh
            v = dv+v*df
            frame_points.append(v)
            
        return frame_points
    
    def get_face_image(self, frame, face):
        width = frame.shape[1]
        height = frame.shape[0]
        
        (conf, rect) =  face
        (x1, y1, x2, y2) = rect
        
        rect = self.get_face_rect(frame, face)
        (xi1, yi1, xi2, yi2) = Utils.rect_to_abs(rect, width, height)
        
        roi = frame[yi1:yi2, xi1:xi2]
        gray = cv2.cvtColor(roi, cv2.COLOR_RGB2GRAY)
        resized = cv2.resize(gray, (60, 60), 0.0, 0.0, interpolation=cv2.INTER_CUBIC)
        
        return (rect, gray, resized)
    
    def detect(self, f_img):
        width = f_img.shape[1]
        height = f_img.shape[0]
        
        inputBlob = cv2.dnn.blobFromImage(f_img, 1/127.5, (60, 60), (127.5))
        self.detector.setInput(inputBlob, 'data');
        
        detection = self.detector.forward();
        points = detection[0]
        
        return points

This class also loads a DNN model on initialization, but it uses another function because this model is in Caffe’s specific format. The primary method, detect, again creates a blob and runs the neural network to get facial landmarks. In this case, the detect method receives not an entire frame but a specially processed part of the frame containing one face.

We can generate this “face image” using the get_face_image method, specially designed for this purpose. It finds the square box containing the face, crops it from the frame, converts the blue, green, red (BGR) data to a gray-scale image (because we have trained our DNN model on gray images), and resizes the image to 60x60 pixels using a high-quality interpolation method.

Running Landmark Detection on Raspberry Pi

Now that we have designed our face landmark detector, we should test it on an Arm-powered device to verify that it can run the algorithm with enough frames per second (FPS). We will perform tests on a Raspberry Pi 4 Model B device. We have installed the Python OpenCV framework on the device using pre-compiled binary packages. If you use another device, you should follow the suitable guides to install its packages.

In this article, we won’t use special AI frameworks, and the neural networks are processed without acceleration on GPU or TPU. So, all the ML workloads run only on the device’s CPU.

We’ll do all tests using video files to ensure the experiments’ repeatability. The video is set in an office but imitates the scene when driving a car.

The following class runs facial landmark detection on a video file:

Python
class VideoFLD:    
    def __init__(self, fd, fld):
        self.fd = fd
        self.fld = fld
    
    def process(self, video):
        frame_count = 0
        detection_num = 0;
        dt = 0
        dt_l = 0
        
        capture = cv2.VideoCapture(video)
        img = None

        dname = 'Arm-Powered Driver Distraction Detection'
        cv2.namedWindow(dname, cv2.WINDOW_NORMAL)
        cv2.resizeWindow(dname, 720, 720)
        
        # Capture all frames
        while(True):    
            (ret, frame) = capture.read()
            if frame is None:
                break
            frame_count = frame_count+1
            
            # work with square images
            width = frame.shape[1]
            height = frame.shape[0]
            if not (width == height):
                dx = int((width-height)/2)
                frame = frame[0:height, dx:dx+height]
            
            t1 = time.time()
            faces = self.fd.detect(frame)
            t2 = time.time()
            dt = dt + (t2-t1)
            
            f_count = len(faces)
            detection_num += f_count
            
            draw_points = []
            if (f_count>0):
                for (i, face) in enumerate(faces):
                    t1 = time.time()
                    (fi_rect, fi_gray, fi_resized) = self.fld.get_face_image(frame, face)
                    points = self.fld.detect(fi_resized)
                    frame_points = self.fld.get_frame_points(fi_rect, points)
                    t2 = time.time()
                    dt_l = dt_l + (t2-t1)
                    draw_points.append(frame_points)
    
            if len(faces)>0:
                Utils.draw_faces(faces, (255, 0, 0), 1, frame, True)
            if len(draw_points)>0:
                for (i, points) in enumerate(draw_points):
                    Utils.draw_points(points, (0, 0, 255), 1, frame)
            
            # Display the resulting frame
            cv2.imshow(dname,frame)
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            
        capture.release()
        cv2.destroyAllWindows()    
        
        fps = 0.0
        if dt>0:
            fps = frame_count/dt
            
        fps_l = 0.0
        if dt_l>0:
            fps_l = detection_num/dt_l
        
        return (detection_num, fps, fps_l)

Here, we use our face and landmark detectors to provide the primary function. We use the VideoCapture class from the OpenCV library to read frames from a video file and feed them to the detectors.

Now we can run the algorithm with the following code:

Python
w_path = '/home/pi/Desktop/PI_DD'
n_path = os.path.join(w_path, 'net')
fd_model = os.path.join(n_path, 'opencv_face_detector_uint8.pb')
fd_graph = os.path.join(n_path, 'opencv_face_detector.pbtxt')
fd = TF_FD(fd_model, fd_graph, 30, 0.5)

fld_model = os.path.join(n_path, 'face_landmarks.caffemodel')
fld_proto = os.path.join(n_path, 'face_landmarks.prototxt')
fld = CAFFE_FLD(fld_model, fld_proto)

v_path = os.path.join(w_path, 'video')
v_name = 'v_1.mp4'
v_file = os.path.join(v_path, v_name)
vfld = VideoFLD(fd, fld)

(detection_num, fps, fps_l) = vfld.process(v_file)

print("Face detections: "+str(detection_num))
print("Detection FPS: "+str(fps))
print("Landmarks FPS: "+str(fps_l))

You can see the screened results in the following video:

Our facial landmark detection algorithm works well and locates the reference points with reasonable accuracy. It gave us a face detection speed of about 2 FPS and a landmark evaluation speed of about 60 FPS. That’s definitely usable, and not bad considering we’re only using the Pi’s CPU.

This speed should be sufficient for detecting closed eyes within one to three seconds, applicable in real situations of driver distraction. So, it should be good enough for our distraction detection task.

Implementing Driver Distraction Detection

We are only one step away from the completed algorithm of distracted driver detection: writing the algorithm for evaluating the eye’s height-to-width ratio and tracking it to evaluate the moment of possible distraction.

First, we add two simple methods to the CAFFE_FLD class:

Python
def get_eye_points(self, face_points, eye_id):
     i0 = 72
     i1 = i0+12*(eye_id-1)
     i2 = i1+12

     eye_points = face_points[i1:i2]
     return eye_points

 def get_eye_ratio(self, eye):
     n = int(len(eye)/2)
     pts = np.array(eye, dtype=np.float32)
     pts = pts.reshape([n, 2])

     rect = cv2.minAreaRect(pts)
     (w, h) = rect[1]

     if (w>h):
         ratio = h/w
     else:
         ratio = w/h

     return ratio

The get_eye_points method extracts points of an eye from the array of 68 face landmarks. The get_eye_ratio method evaluates the eye’s height-to-width ratio.

Now we can write the code to track the ratio value and detect moments of possible distraction.

Python
class DERD:    
    def __init__(self, ratio_thresh, delta_time, eyes=2):
        self.ratio_thresh = ratio_thresh
        self.delta_time = delta_time
        self.eyes = eyes
        self.eye_closed_time = 0.0
        self.last_time = 0.0
    
    def start(self, time):
        self.eye_closed_time = 0.0
        self.last_time = time
    
    def detect(self, eye1_ratio, eye2_ratio, time):
        dt = time - self.last_time
        distraction = False
        
        d1 = (eye1_ratio<self.ratio_thresh)
        d2 = (eye2_ratio<self.ratio_thresh)
        
        if self.eyes == 2:
            d = d1 and d2
        else:
            d = d1 or d2
        
        if d:
            self.eye_closed_time += dt
        else:
            self.eye_closed_time -= dt
            
        if self.eye_closed_time<0.0:
            self.eye_closed_time = 0.0
            
        print('Eye 1: '+str(eye1_ratio))
        print('Eye 2: '+str(eye2_ratio))
        print('Eye closed time = '+str(self.eye_closed_time))
            
        if self.eye_closed_time>=self.delta_time:
            distraction = True
            self.start(time)
        
        self.last_time = time
        return distraction

The ratio_thresh argument is the minimum value of the height-to-width ratio to assume the eye is closed. The delta_time parameter denotes how long the eye must be closed to decide whether a distraction occurred. The eyes parameter determines whether one or both eyes must be closed to consider this a distraction.

Finally, we slightly modified our video detector to include this distraction detection algorithm in the code and generate an alarm when a detection happens.

Python
class VideoDDD:    
    def __init__(self, fd, fld, eye_ratio_thresh=0.2, eyes=2, delta_time=2.0):
        self.fd = fd
        self.fld = fld
        self.derd = DERD(eye_ratio_thresh, delta_time, eyes)
    
    def process(self, video):
        frame_count = 0
        detection_num = 0;
        dt = 0
        dt_l = 0
        
        capture = cv2.VideoCapture(video)
        img = None

        dname = 'Arm-Powered Driver Distraction Detection'
        cv2.namedWindow(dname, cv2.WINDOW_NORMAL)
        cv2.resizeWindow(dname, 720, 720)
        
        # just suppose FPS=25
        delta = 0.040
        
        dd_time = -1000
        
        draw_points = []
        faces = []
        
        # Capture all frames
        while(True):    
            frame_t1 = time.time()
            
            (ret, frame) = capture.read()
            if frame is None:
                break
            frame_count = frame_count+1
            frame_time = (frame_count-1)*delta
            
            if frame_count==1:
                self.derd.start(frame_time)
            
            # work with square images
            width = frame.shape[1]
            height = frame.shape[0]
            if not (width == height):
                dx = int((width-height)/2)
                frame = frame[0:height, dx:dx+height]
            
            f_count = 0
            if (frame_count % 10) == 0:
                faces = []
                draw_points = []
                t1 = time.time()
                faces = self.fd.detect(frame)
                t2 = time.time()
                dt = dt + (t2-t1)
                f_count = len(faces)
                detection_num += 1
            
            distraction = False
            
            if (f_count>0):
                # supposed one face at the camera
                face = faces[0]
                t1 = time.time()
                (fi_rect, fi_gray, fi_resized) = self.fld.get_face_image(frame, face)
                points = self.fld.detect(fi_resized)
                frame_points = self.fld.get_frame_points(fi_rect, points)
                t2 = time.time()
                dt_l = dt_l + (t2-t1)
                    
                draw_points.append(frame_points)
                    
                eye1 = self.fld.get_eye_points(frame_points, 1)
                eye2 = self.fld.get_eye_points(frame_points, 2)
                #draw_points.append(eye1)
                #draw_points.append(eye2)
                    
                r1 = self.fld.get_eye_ratio(eye1)
                r2 = self.fld.get_eye_ratio(eye2)
                    
                distraction = self.derd.detect(r1, r2, frame_time)
    
            if len(faces)>0:
                Utils.draw_faces(faces, (255, 0, 0), 1, frame, True)
            if len(draw_points)>0:
                for (i, points) in enumerate(draw_points):
                    Utils.draw_points(points, (0, 0, 255), 1, frame)
            
            # Show distraction alarm for 1 second
            if distraction:
                dd_time = frame_time
            
            if dd_time>0:
                text = "ALARM! DRIVER DISTRACTION"
                xd1 = 10
                yd1 = 50
                cv2.putText(frame, text, (xd1, yd1), \
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 1, cv2.LINE_AA)
                if (frame_time-dd_time)>1.0:
                    dd_time = -1000
                
            
            # Display the resulting frame
            cv2.imshow(dname,frame)
            
            frame_t2 = time.time()
            frame_dt = frame_t2 - frame_t1
            if frame_dt<delta:
                frame_dt = delta-frame_dt
                #print('Sleep='+str(frame_dt))
                time.sleep(frame_dt)
            
            if cv2.waitKey(1) & 0xFF == ord('q'):
                break
            
        capture.release()
        cv2.destroyAllWindows()    
        
        fps = 0.0
        if dt>0:
            fps = detection_num/dt
            
        fps_l = 0.0
        if dt_l>0:
            fps_l = detection_num/dt_l
        
        return (detection_num, fps, fps_l)

In addition to using the DERD class, we slightly changed the frame processing algorithm. We added a timestamp comparison of frames to estimate time intervals of possible distraction. Also, we now process only every tenth frame to imitate the near real-time processing.

Now we can run the completed driver distraction detection algorithm with the following code:

Python
w_path = '/home/pi/Desktop/PI_DD'
n_path = os.path.join(w_path, 'net')
fd_model = os.path.join(n_path, 'opencv_face_detector_uint8.pb')
fd_graph = os.path.join(n_path, 'opencv_face_detector.pbtxt')
fd = TF_FD(fd_model, fd_graph, 30, 0.5)

fld_model = os.path.join(n_path, 'face_landmarks.caffemodel')
fld_proto = os.path.join(n_path, 'face_landmarks.prototxt')
fld = CAFFE_FLD(fld_model, fld_proto)

v_path = os.path.join(w_path, 'video')
v_name = 'v_1.mp4'
v_file = os.path.join(v_path, v_name)
vddd = VideoDDD(fd, fld, 0.3, 1, 2.0)

(detection_num, fps, fps_l) = vddd.process(v_file)

print("Face detections: "+str(detection_num))
print("Detection FPS: "+str(fps))
print("Landmarks FPS: "+str(fps_l))

You can see that the algorithm correctly handles the situation when the eyes appear to be closed during a long enough time interval and generates the alarm.

This helps us catch one cause of driver distraction — when a driver is looking down at a mobile device in their lap, our drowsiness detector identifies them as distracted because it only see their eyelids. As jurisdictions around the world ban mobile device use while driving, drivers have tried to adapt by holding their devices out of sight. But our distraction detector will catch them by detecting when their eyes don’t appear to be fully open.

Conveniently, this algorithm can also work to detect driver drowsiness. Our device should raise the alarm whether a driver's eyes merely appear closed because they are looking down at a mobile device, or they actually are closed because the driver is drowsy or asleep.

The algorithm also correctly handles the situation when the eyes are closed for a short time interval (for example, a driver blinks) or the head is slightly turned for a short time.

Next Steps

We’ve implemented one driver distraction algorithm using facial landmarks - but we could add others! For example, we might want to detect when a driver's head is turned by measuring the angle of the lines between nose landmarks. We might also check if a driver's mouth appears to be opening and closing by comparing the distance between upper and lower mouth landmarks over time. If they are, it likely means the driver is talking or eating while driving.

To take things even further, we might consider upgrading to an ML model that can do iris detection, and try to determine when a driver's eyes aren't looking at the road.

In this article, we demonstrated how straightforward it is to develop an AI computer vision application for a portable Arm-powered device. We chose this solution for practicality since our driver distraction detection system must function autonomously in a driving car. We showed that the application could run on the Arm-powered device in real-time mode, reaching about two FPS processing speed.

Nevertheless, we can still investigate many aspects to improve this driver distraction detection system. For instance, can we increase the FPS? To answer this, we should pay attention to the application’s slowest part — face detection with the TensorFlow neural network. Can we improve this model’s performance? Yes. We can use Arm’s Arm NN library, which they specially developed to accelerate processing DNN models on Arm-powered devices.

With the Arm NN library, we also could run the NN model on connected GPU or NPU units to achieve near real-time speed. This would give us more flexibility to invent advanced algorithms of driver distraction detection or to use other face detection DNN models, like the BlazeFace neural model.

Other improvements to our solution might involve generating new distraction criteria. For example, we can infer that drivers are likely distracted if their eyes or head are directed elsewhere for more than a determined time interval.

We hope these ideas have piqued your interest. We encourage you to expand on this solution or create your own portable AI solutions on Arm-powered devices.

History

  • 7th February, 2022: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Team Leader VIPAKS
Russian Federation Russian Federation
EDUCATION:

Master’s degree in Mechanics.

PhD degree in Mathematics and Physics.



PROFESSIONAL EXPERIENCE:

15 years’ experience in developing scientific programs
(C#, C++, Delphi, Java, Fortran).



SCIENTIFIC INTERESTS:

Mathematical modeling, symbolic computer algebra, numerical methods, 3D geometry modeling, artificial intelligence, differential equations, boundary value problems.

Comments and Discussions

 
GeneralMy vote of 5 Pin
tugrulGtx14-Feb-22 4:21
tugrulGtx14-Feb-22 4:21 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.