1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | import cv2 import datetime import time import numpy from multiprocessing import shared_memory def detect(frame): #Convert source image to hsv image hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV) #Define the range of color blue lower_boarder = numpy.array([110, 50, 50]) upper_boarder = numpy.array([130, 255, 255]) #Show the blue values in white, all others black mask = cv2.inRange(hsv, lower_boarder, upper_boarder) #Get the treshold and contours of the blue elements ret, threshed_img = cv2.threshold(mask, 0, 255, cv2.THRESH_BINARY) contours, hier = cv2.findContours(threshed_img, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) #Bounding rect around every blue element and get coordinates s='' for c in contours: (x,y,w,h) = cv2.boundingRect(c) pt = (x, y+h) cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 0, 255), 1) s += str(pt) + ' ' #Prevent to many contours in share memory sting if len(s) > 50: break return s #Get the image of camera def getVideoFrame(): cam = cv2.VideoCapture(0) ret_val, frame = cam.read() cv2.imshow("frame", frame) return frame def shareMemory(memName, memString, memSize, memTime): #Create new shared memory object shm = shared_memory.SharedMemory(name=memName, create=True, size=memSize) #Encode string in utf-8 encoded=memString.encode('utf-8') #Convert string to bytearray bArr = bytearray(encoded) #Write every single element in shared memory buffer if len(bArr) < memSize: for i in range(len(bArr)): shm.buf[i] = bArr[i] #Delay to read the memory in other application, before close time.sleep(memTime) shm.close() if __name__ == '__main__': #Get the image of an file frame = cv2.imread("Bubbles_640x480.jpg") #To get video call getVideoFrame() in a loop #Add a time stamp to the string s = detect(frame) + str(datetime.datetime.now()) shareMemory(memName='MySharedMemory', memString=s, memSize=100, memTime=60) |