칼만필터를 사용하여 추적시 파란원의 자잘한 움직임은 무시하고 큰 동선만 추적하도록 해봤습니다.

2024. 3. 3. 최초작성  

코드는 기존에 구현했던 칼만 필터로 웹캠 영상 속 파란원을 추적하는 OpenCV Python 에제를 수정해서 사용했습니다.

칼만 필터로 웹캠 영상 속 파란원을 추적하는 OpenCV Python 예제

https://webnautes.com/kalman-pilteoro-webkaem-yeongsang-sog-paranweoneul-cujeoghaneun-opencv-python-yeje/ 

전체 코드입니다. 기존 코드에서 상태 전이행렬, 프로세스 노이즈 행렬, 측정 노이즈 행렬만 수정했습니다.

        # 상태 전이 행렬 - 관성을 더 크게 (속도 변화를 더 천천히)

        self.kf.transitionMatrix = np.array([[1, 0, 0.1, 0],

                                            [0, 1, 0, 0.1],

                                            [0, 0, 0.95, 0],   # 속도의 95%만 유지 (감쇠 효과)

                                            [0, 0, 0, 0.95]], np.float32)  # 속도의 95%만 유지

       

 # 프로세스 노이즈 - 속도 변화에 대한 노이즈를 낮게 설정하여 급격한 변화 억제

        self.kf.processNoiseCov = np.array([[0.0001, 0, 0, 0],

                                            [0, 0.0001, 0, 0],

                                            [0, 0, 0.001, 0],   # 속도 노이즈를 매우 낮게

                                            [0, 0, 0, 0.001]], np.float32)

        

# 측정 노이즈 - 측정값을 덜 신뢰하도록 설정하여 작은 움직임 무시

        self.kf.measurementNoiseCov = np.array([[0.1, 0],   # 측정 노이즈를 높게 설정

                                                [0, 0.1]], np.float32)

import cv2
import numpy as np
from collections import deque
import time

class KalmanFilter:
    def __init__(self):
        self.kf = cv2.KalmanFilter(4, 2# 상태: (x, y, dx, dy), 측정: (x, y)
       
        # 측정 행렬 (x, y만 측정)
        self.kf.measurementMatrix = np.array([[1, 0, 0, 0],
                                            [0, 1, 0, 0]], np.float32)
       
        # 상태 전이 행렬 - 관성을 더 크게 (속도 변화를 더 천천히)
        self.kf.transitionMatrix = np.array([[1, 0, 0.1, 0],
                                            [0, 1, 0, 0.1],
                                            [0, 0, 0.95, 0],   # 속도의 95%만 유지 (감쇠 효과)
                                            [0, 0, 0, 0.95]], np.float32)  # 속도의 95%만 유지

        # 프로세스 노이즈 - 속도 변화에 대한 노이즈를 낮게 설정하여 급격한 변화 억제
        self.kf.processNoiseCov = np.array([[0.0001, 0, 0, 0],
                                            [0, 0.0001, 0, 0],
                                            [0, 0, 0.001, 0],   # 속도 노이즈를 매우 낮게
                                            [0, 0, 0, 0.001]], np.float32)

        # 측정 노이즈 - 측정값을 덜 신뢰하도록 설정하여 작은 움직임 무시
        self.kf.measurementNoiseCov = np.array([[0.1, 0],   # 측정 노이즈를 높게 설정
                                                [0, 0.1]], np.float32)
               
        self.initialized = False
        self.last_prediction = None

    def init(self, point):
        self.kf.statePre = np.array([[point[0]], [point[1]], [0], [0]], np.float32)
        self.kf.statePost = np.array([[point[0]], [point[1]], [0], [0]], np.float32)
        self.initialized = True

    def predict(self):
        if not self.initialized:
            return None
        predicted = self.kf.predict()
        self.last_prediction = np.array([predicted[0][0], predicted[1][0]], dtype=np.float32)
        return self.last_prediction

    def correct(self, point):
        if not self.initialized:
            self.init(point)
            return point
       
        measurement = np.array([[point[0]], [point[1]]], np.float32)
        corrected = self.kf.correct(measurement)
        return np.array([corrected[0][0], corrected[1][0]], dtype=np.float32)


class TrajectoryDrawer:
    def __init__(self, max_points=200):
        self.points = deque(maxlen=max_points)
        self.kalman_points = deque(maxlen=max_points)
        self.predicted_points = deque(maxlen=max_points)  # 예측 궤적을 위한 별도의 deque
        self.point_times = deque(maxlen=max_points)
        self.max_duration = 5.0
        self.last_update_time = 0
        self.update_interval = 0.02
        self.object_visible = False
        self.waiting_for_object = False
        self.kalman = KalmanFilter()

    def add_point(self, point, current_time):
        if current_time - self.last_update_time >= self.update_interval:
            if not self.waiting_for_object:
                self.object_visible = True
               
                # 먼저 예측
                predicted = self.kalman.predict()
                if predicted is not None:
                    self.predicted_points.append(predicted)
               
                # 측정값으로 보정
                self.points.append(point)
                kalman_point = self.kalman.correct(point)
                self.kalman_points.append(kalman_point)
               
                self.point_times.append(current_time)
               
                # 오래된 점들 제거
                while self.point_times and current_time - self.point_times[0] > self.max_duration:
                    self.points.popleft()
                    if len(self.kalman_points) > 0:
                        self.kalman_points.popleft()
                    if len(self.predicted_points) > 0:
                        self.predicted_points.popleft()
                    self.point_times.popleft()
                   
            self.last_update_time = current_time

    def mark_object_invisible(self):
        if self.object_visible:
            self.object_visible = False
            self.start_new_trajectory()

    def start_new_trajectory(self):
        self.points.clear()
        self.kalman_points.clear()
        self.predicted_points.clear()
        self.point_times.clear()
        self.waiting_for_object = False
        self.kalman = KalmanFilter()

    def draw(self, frame):
        # 실제 궤적 그리기 (파란색)
        points = list(self.points)
        for i in range(len(points) - 1):
            pt1 = tuple(map(int, points[i]))
            pt2 = tuple(map(int, points[i + 1]))
            cv2.line(frame, pt1, pt2, (255, 0, 0), 2)

        # 칼만 필터 예측 궤적 그리기 (빨간색)
        pred_points = list(self.predicted_points)
        for i in range(len(pred_points) - 1):
            pt1 = tuple(map(int, pred_points[i]))
            pt2 = tuple(map(int, pred_points[i + 1]))
            cv2.line(frame, pt1, pt2, (0, 0, 255), 2)


def get_color_hsv_range(cap, guide_rect):
    """선택된 영역의 HSV 분포를 3초간 분석하여 추적할 색상 범위 설정"""
    sampling_time = 3  # 3초 동안 샘플링
    start_time = time.time()
   
    h_values_all = []
    s_values_all = []
    v_values_all = []
   
    x, y, w, h = guide_rect
   
    print("3초간 색상 정보를 수집합니다. 물체를 가만히 두세요...")
   
    while time.time() - start_time < sampling_time:
        ret, current_frame = cap.read()
        if not ret:
            break
           
        current_frame = cv2.flip(current_frame, 1)
       
        # 가이드 사각형 그리기
        cv2.rectangle(current_frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
       
        # ROI 추출 및 HSV 변환
        roi = current_frame[y:y+h, x:x+w]
        hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)
       
        # 현재 ROI의 HSV 값 수집
        h_values_all.extend(hsv[:, :, 0].flatten())
        s_values_all.extend(hsv[:, :, 1].flatten())
        v_values_all.extend(hsv[:, :, 2].flatten())
       
        # 현재 수집된 HSV 범위로 마스크 생성하여 표시 (디버깅용)
        if len(h_values_all) > 0:
            h_min_temp = max(np.percentile(h_values_all, 5), 0)
            h_max_temp = min(np.percentile(h_values_all, 95), 179)
            s_min_temp = max(np.percentile(s_values_all, 5), 0)
            s_max_temp = min(np.percentile(s_values_all, 95), 255)
            v_min_temp = max(np.percentile(v_values_all, 5), 0)
            v_max_temp = min(np.percentile(v_values_all, 95), 255)
           
            temp_mask = cv2.inRange(hsv, np.array([h_min_temp, s_min_temp, v_min_temp]),
                                      np.array([h_max_temp, s_max_temp, v_max_temp]))
            cv2.imshow('Sampling Mask', temp_mask)
       
        # 남은 시간 표시
        remaining_time = int(sampling_time - (time.time() - start_time))
        cv2.putText(current_frame, f"Sampling: {remaining_time}s",
                  (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
       
        cv2.imshow('Tracking', current_frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
   
    # 5~95 퍼센타일 범위 사용 (이상치 제외)
    h_min = max(np.percentile(h_values_all, 5), 0)
    h_max = min(np.percentile(h_values_all, 95), 179)
    s_min = max(np.percentile(s_values_all, 5), 0)
    s_max = min(np.percentile(s_values_all, 95), 255)
    v_min = max(np.percentile(v_values_all, 5), 0)
    v_max = min(np.percentile(v_values_all, 95), 255)

    # 범위를 약간 더 넓게 설정
    h_range = h_max - h_min
    s_range = s_max - s_min
    v_range = v_max - v_min
   
    h_min = max(h_min - h_range * 0.2, 0)
    h_max = min(h_max + h_range * 0.2, 179)
    s_min = max(s_min - s_range * 0.2, 0)
    s_max = min(s_max + s_range * 0.2, 255)
    v_min = max(v_min - v_range * 0.2, 0)
    v_max = min(v_max + v_range * 0.2, 255)

    print(f"\n설정된 HSV 범위:")
    print(f"H: {h_min:.1f} ~ {h_max:.1f}")
    print(f"S: {s_min:.1f} ~ {s_max:.1f}")
    print(f"V: {v_min:.1f} ~ {v_max:.1f}")

    return np.array([h_min, s_min, v_min]), np.array([h_max, s_max, v_max])

def track_blue():
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("카메라를 열 수 없습니다.")
        return

    ret, frame = cap.read()
    if not ret:
        print("프레임을 가져올 수 없습니다.")
        return
   
    height, width = frame.shape[:2]
   
    # 정사각형 가이드 설정 (화면 높이의 10%를 크기로 사용)
    rect_size = int(height * 0.1# 더 작은 정사각형
    x = (width - rect_size) // 2
    y = (height - rect_size) // 2
    guide_rect = (x, y, rect_size, rect_size)

    print("추적할 물체를 가이드 사각형 안에 위치시키고 's' 키를 눌러주세요.")
   
    color_range = None  # (lower_hsv, upper_hsv)
    trajectory_drawer = TrajectoryDrawer(200)
    last_object_found = False  # 이전 프레임에서 물체가 있었는지 여부

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame = cv2.flip(frame, 1)
        current_time = time.time()

        if color_range is None:
            cv2.rectangle(frame, (x, y), (x + rect_size, y + rect_size),
                        (0, 255, 0), 2)
            cv2.putText(frame, "Place object inside the green box and press 's'",
                      (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
        else:
            hsv = cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)
            mask = cv2.inRange(hsv, color_range[0], color_range[1])
           
            # 노이즈 제거
            mask = cv2.erode(mask, None, iterations=1)
            mask = cv2.dilate(mask, None, iterations=1)
           
            contours, _ = cv2.findContours(mask.copy(), cv2.RETR_EXTERNAL,
                                        cv2.CHAIN_APPROX_SIMPLE)
           
            if contours:
                c = max(contours, key=cv2.contourArea)
                area = cv2.contourArea(c)
               
                if area > 100# 최소 면적 조건
                    M = cv2.moments(c)
                    if M["m00"] > 0:
                        # 중심점 계산
                        center_x = int(M["m10"] / M["m00"])
                        center_y = int(M["m01"] / M["m00"])
                       
                        # 화면 벗어남 검사
                        if (center_x < 0 or center_x > width or
                            center_y < 0 or center_y > height):
                            last_object_found = False
                            trajectory_drawer.mark_object_invisible()
                        else:
                            center_point = np.array([center_x, center_y])
                           
                            # 현재 위치 표시
                            cv2.circle(frame, (center_x, center_y), 5, (255, 0, 0), -1)
                           
                            # 궤적 추가
                            trajectory_drawer.add_point(center_point, current_time)
                           
                            # 감지된 윤곽선만 표시
                            cv2.drawContours(frame, [c], -1, (255, 255, 0), 2)

                            last_object_found = True
                else:
                    last_object_found = False
                    trajectory_drawer.mark_object_invisible()
            else:
                last_object_found = False
                trajectory_drawer.mark_object_invisible()
           
            # 궤적 그리기
            trajectory_drawer.draw(frame)

        cv2.imshow('Tracking', frame)
       
        key = cv2.waitKey(1) & 0xFF
        if key == ord('s'):
            if color_range is None:
                color_range = get_color_hsv_range(cap, guide_rect)
        elif key == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    track_blue()