Hello everyone, here's my blog post on how I built an API to return traffic conditions from Access Kenya cameras. The API would work by specifiying a road name at the URL endpoint and a json reponse would have number of cars that moved and the speed of movement.
To begin the process I decided to test if is possible to capture Image from each camera. After exploration of the website I realized each camera had a url with a JPEG file extension at the end. I figured the cameras wrote a new image to the JPEG file on the URL. So, can I captured every new image? Yes - I used the urllib library to capture the image and store it on the disk.
I organized the camera urls into a dict so as to have a means of calling and referencing each road.
cameras = dict ( museum='http://traffic.accesskenya.com/images/traffic/feeds/purshotam.jpg', ojijo='http://traffic.accesskenya.com/images/traffic/feeds/mhcojijo.jpg', forest_limuru='http://traffic.accesskenya.com/images/traffic/feeds/forestlimuru.jpg?', kenyatta_uhuru_valley='http://traffic.accesskenya.com/images/traffic/feeds/barclaysplaza.jpg',)
After that, I wrote a function that takes a url and extracts three images every 6 seconds.
def capture_images(self): for i in 'abc': if self.name in way: urllib.urlretrieve(way[self.name],'img_'+i+'.jpg') time.sleep(6)
The images captured from the camera are stored on a folder with the name of the road. Next task involves processing the images for analysis. I utlize two libraries for this task; PIL for loading the images and numpy to convert pixel values to numerical array values. All these are wrapped in a function (shown below) that takes the image folder directory as input then proceeds to load all 3 images, converts them to numpy arrays, deletes the images and returns a python dict holding arrays on all the images.
# load images def load(self): files = os.listdir(self.path) a = dict() b = dict() k = 0 while k <= len(files): for names in files: if names != '.DS_Store': a[names] = Image.open(names).convert('L') a[names].load() b[names] = np.asarray(a[names]) k +=1 # delete image folder shutil.rmtree(os.getcwd()) return b
The first important step in analysis of the images is checking if there has been movements within the 6 second period. To achieve this, I utilized the concept of differential imaging - a means of measuring motion detection by subtracting the pixel values of subsquent images. In my function, I calculate the number of pixels that have moved, this helps in quantifying the movement (standstill, moderate traffic).
# differential imaging def diffImg(self,img1,img2,img3): # calculate absolute difference d1 = cv2.absdiff(img1,img2) d2 = cv2.absdiff(img2,img3) bit = cv2.bitwise_and(d1,d2) ret,thresh = cv2.threshold(bit,35,255,cv2.THRESH_BINARY) #get number of different pixels moving = list() for cell in thresh.flat: if cell == 255: move = 'True' moving.append(move) pixie = len(moving) return pixie
Once movement is detected, it is important to then quantify trafficc in km/h. To aid in this calculation is the optical flow algorithm. A concept in computer vision that allow tracking features in an image. I utilized this functionality to find features to track (cars) in the first images, and get their corresponding positions in the second and third image. I then proceeded to calculate the avaerage distance (euclidean distance) that the feature has moved. Dividing the pixel distance by 12 seconds gives me speed at which the objects(cars) are moving. My function returns this value.
# calculate optical flow of points on images def opticalFlow(self,img1,img2,img3): #set variables lk_params = dict(winSize = (10,10), maxLevel = 5, criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT,10,0.03)) features_param = dict( maxCorners = 3000, qualityLevel = 0.5, minDistance = 3, blockSize = 3) # feature extraction of points to track pt = cv2.goodFeaturesToTrack(img1,**features_param) p0 =np.float32(pt).reshape(-1,1,2) # calaculate average movement dist = list() for loop in p0: p1,st,err =cv2.calcOpticalFlowPyrLK(img1, img2,loop, None,**lk_params) p0r,st,err =cv2.calcOpticalFlowPyrLK(img2,img1,p1, None,**lk_params) if abs(loop-p0r).reshape(-1, 2).max(-1) < 1: dst = distance.euclidean(loop,p0r) dist.append(dst) return round(max(dist)*10,2)
The API (underconstruction) is based on flask. By specifying a road name at the HTTP endpoint, the API returns speed of traffic and level of movement (stanstill, moderate rate, no traffic).
# load required libraries import image_processing import numpy as np from flask import Flask import links import json import scipy as sp # create flask web server app = Flask(__name__) # create HTTP endpoint @app.route('/ImPro/<road>') # main function def get_route(road): # initialize route class road = 'sarit' traffic = image_processing.route(road) # setup working directory traffic.set_dir() # get image from traffic camera traffic.capture_images() # load image stack x = traffic.load() # differential imaging y = traffic.diffImg(x['img_a.jpg'],x['img_b.jpg'],x['img_c.jpg']) # calculate optical flow z = traffic.opticalFlow(x['img_a.jpg'],x['img_b.jpg'],x['img_c.jpg']) if __name__ == "__main__": app.run()
The code is available at https://github.com/Python-Nairobi/ImPro. Feel free to modify and improve as you see fit. I hope you enjoyed the article. Feel free to drop me an e-mail at firstname.lastname@example.org for any queries.Go Top