이 글은 잠을 참고 정리하는 글이므로, 정리가 대충되어있습니다.
ROS package 설계
ㅇ 요약
현재 진행상황 요약 : 가제보상에서 토픽으로 이동요청시 이동함.
2개의 패키지를 제작했습니다.
- minibot_indoor 패키지 (실내 경로생성 알고리즘이 들어있는 패키지 입니다.)
- minibot_msg 패키지 (메시지의 모음입니다)
1. 각 패키지에 대한 설명
minibot_indoor 는 실내 경로생성 알고리즘이 들어 있는 패키지 입니다. 해당 패키지에 대한 설명을 하면서, 메시지 타입은 msg에 작성되어 있습니다.
해당 패키지에는 현재 두 알고리즘이 들어가있습니다.
라이다에서 전방 물체를 판단하는 방법 (tistory.com)
실내 경로생성 알고리즘의 아이디어 및 코드 설명 (tistory.com)
이 글을 따라가도 장애물 탐지는 작동하지 않습니다.
패키지에 대한 정보는 여기서 받을 수 있으며,
AIrobot-repo-2/baby_bot at main · addinedu-amr/AIrobot-repo-2
패키지를 받은 경우 dependency 해결을 위해 다음을 입력 해주어야 합니다.
pip install opencv-python
export PYTHONPATH=/$your_path$/mini_bot/src/pinklab_minibot_robot/minibot_indoor/minibot_indoor:$PYTHONPATH
여튼 minibot_indoor에는 다음과 같은 파일들이 존재합니다.
ROS와 연관이 없이 python코드만 들어있는 파일은
image_processing.py와 Astar.py입니다.
우선 Astar.py에 대한 설명은 굳이 필요가 없을 것이라 생각되며, 자세한 코드는 블로그 글을 참고합니다. 여기에 적으면 엄청 길어집니다
.Fleet management을 직접 구현해보자. (A* 알고리즘 탐구) (tistory.com)
image_processing.py 파일의 경우 다음과 같으며 주석으로 코드를 설명합니다.
앞서 설명한 map을 행렬로 변환하는 코드립니다.
import cv2
import sys
import yaml
import numpy as np
import matplotlib.pyplot as plt
# 클래스 : pgm에서 matrix를 추출하는 클래스
# pgm_path :pgm 파일의 경로
# yaml_path : yaml 파일의 경로
# origin_x , origin_y = 원점의 x값, 원점의 y값
# matrix : 추출된 matrix
# cell_height : 그리드(영역)의 세로
# cell_width : 그리드(영역)의 가로
# resolution : yaml에서 얻은 resolution값
# garo_m : 그리드(영역)의 가로 길이
# sero_m : 그리드(영역)의 세로 길이
class pgm_to_matrix:
def __init__(self, pgm_path, yaml_path, hegiht, width, origin_x, origin_y):
self.pgm_path = pgm_path
self.yaml_path = yaml_path
self.hegiht = hegiht
self.width = width
self.origin_x = origin_x
self.origin_y = origin_y
self.matrix = None
self.cell_height = None
self.cell_width = None
self.resolution = None
self.garo_m = None
self.sero_m = None
# pgm에서 png로 변환하는 함수 pgm 자체로 opencv가 불가능함
def change_pgmtopng(self):
pgm_img = cv2.imread(self.pgm_path, cv2.IMREAD_GRAYSCALE)
# PNG로 저장
self.pgm_path = self.pgm_path[:-3]+"png"
print(self.pgm_path)
cv2.imwrite(self.pgm_path, pgm_img)
# map 이미지에서 그리드 영역을 계산하고 matrix를 반환하는 함수
def image_processing(self):
map_img = cv2.imread(self.pgm_path,cv2.COLOR_BGR2GRAY )
img_height, img_width = map_img.shape
grid_size = (self.hegiht, self.width) # 세로, 가로
# 그리드 영역 크기 계산
self.cell_height = img_height // grid_size[0]
self.cell_width = img_width // grid_size[1]
map_cost = []
map_col = []
#각각의 그리드 영역에서 흰색 픽셀 비율 계산
for i in range(grid_size[0]):
map_col = []
for j in range(grid_size[1]):
# 그리드 영역의 범위를 지정하는거고
cell_top = i * self.cell_height
cell_left = j * self.cell_width
cell_bottom = cell_top + self.cell_height
cell_right = cell_left + self.cell_width
if i == self.hegiht-1:
cell_bottom = img_height
# 그 맵의 모든 픽셀을 가져와서
cell = map_img[cell_top:cell_bottom, cell_left:cell_right]
wall = 0
load = 0
for row in range(len(cell)):
for col in range(len(cell[0])):
if cell[row][col] <= 210:
wall += 1;
else:
load += 1
if load > wall:
map_col.append(0)
else:
map_col.append(1)
map_cost.append(map_col)
self.matrix = map_cost
# yaml에서 resolution 값을 추출
def get_resolution(self):
with open(self.yaml_path, "r") as stream:
yaml_data = yaml.safe_load(stream)
# resolution 값 가져오기
self.resolution = yaml_data["resolution"]
self.garo_m = self.resolution * self.cell_width
self.sero_m = self.resolution * self.cell_height
# 실행함수
def run(self):
self.change_pgmtopng()
self.image_processing()
self.get_resolution()
if __name__ == '__main__':
a = pgm_to_matrix("/home/du/mini_bot/baby_map.pgm", "/home/du/mini_bot/baby_map.yaml" ,3 , 12, -1.1, 2.2)
a.run()
print(a.matrix)
print(a.resolution)
print(a.garo_m)
print(a.sero_m)
발행하는 코드
obstacle_detection.py
import rclpy as rp
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
from minibot_msgs.msg import IsObstacle
# 라이다의 전방의 거리와 장애물충돌위험신호를 보냄
class ObstacleDetect(Node):
# /scan topic을 구독하고 장애물이 가까이 있는지 파악해서 알려줌
def __init__(self):
super().__init__("obstacle_detector")
self.sub_scan = self.create_subscription(LaserScan, "/scan", self.callback_scan, 10)
self.timer_period = 0.5
self.publisher = self.create_publisher(IsObstacle, "/obstacle_detect", 10)
self.timer = self.create_timer(self.timer_period, self.timer_callback)
self.is_osbstacle = IsObstacle()
def callback_scan(self, msg):
min_data = min(msg.ranges[170:210]) # ladar의 앞 부분 인식
self.is_osbstacle.min_distance = min_data
if min_data < 0.2: # 0.2m 이하이면 주행 멈추라고 신호 줌
self.is_osbstacle.check_obstacle = True
else:
self.is_osbstacle.check_obstacle = False
def timer_callback(self):
self.publisher.publish(self.is_osbstacle)
def main(args=None):
rp.init(args=args)
obstacle_detector = ObstacleDetect()
rp.spin(obstacle_detector)
obstacle_detector.destroy_node()
rp.shutdown()
if __name__ == "__main__":
main()
해당 파일이 사용하는 메시지 형식은 다음과 같습니다.
minibot_msgs / IsObstacle.msg
bool check_obstacle
float32 min_distance
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from minibot_msgs.msg import StartEnd
# 좌표를 입력해서 그 지점으로 가게 해달라는 신호를 보냅니다.
class Publisher(Node):
def __init__(self):
super().__init__('test_pub')
self.publisher_ = self.create_publisher(StartEnd, 'test_topic', 10)
timer_period = 1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
def timer_callback(self):
msg = StartEnd()
msg.end_x = 11
msg.end_y = 2
self.publisher_.publish(msg)
#self.get_logger().info('Publishing: "%s"' % msg.data)
def main(args=None):
rclpy.init(args=args)
test_pub = Publisher()
rclpy.spin(test_pub)
test_pub.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
해당 파일이 사용하는 메시지 형식은 다음과 같습니다.
minibot_msgs / StartEnd.msg
int64 end_x
int64 end_y
메시지파일의 이름은 다음과 같은 이름형식을 지켜야합니다. 대문자로 시작을 해야하며 _과 같은 문자는 들어갈 수 없습니다.
메시지 파일 내부에는 대문자 변수를 넣을 수 없습니다.
minibot_msgs / CMakeLists.txt
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/IsObstacle.msg"
"msg/StartEnd.msg"
)
구독하는 코드
path_planning.py
from nav2_simple_commander.robot_navigator import BasicNavigator
from geometry_msgs.msg import PoseWithCovarianceStamped
from geometry_msgs.msg import PoseStamped
from rclpy.duration import Duration
from minibot_indoor import image_processing
from minibot_indoor import Astar
import rclpy as rp
from rclpy.node import Node
from std_msgs.msg import String
from minibot_msgs.msg import IsObstacle
from minibot_msgs.msg import StartEnd
rp.init()
nav = BasicNavigator()
# 장애물을 탐지하는 부분..
class Subscriber(Node):
def __init__(self):
super().__init__('test_sub')
self.subscription1 = self.create_subscription(IsObstacle, '/obstacle_detect', self.obstacle_callback, 10)
self.subscription2 = self.create_subscription(StartEnd, 'test_topic', self.test_callback, 10)
def obstacle_callback(self, msg):
print("move : ", msg.check_obstacle, ", distance : ", msg.min_distance)
def test_callback(self, msg):
go_my_robot(get_my_map_coordinate(), (1,0), (msg.end_y, msg.end_x))
def go_my_robot(my_map_coordinate, start, end):
image = image_processing.pgm_to_matrix("/home/du/mini_bot/baby_map.pgm", "/home/du/mini_bot/baby_map.yaml" ,3 , 12, -1.1, 2.2)
image.run()
my_map = image.matrix
make_route = Astar.Astar()
result, path = make_route.run(my_map , start, end)
print(result)
print("경로 생성 맵입니다. 숫자 2는 경로입니다.")
for i in result:
print(i)
if path == None:
print("이동할 수 없는 지점으로 이동을 명령하고 있습니다.")
return
else:
path_0 = path[0]
path = path[1:]
for route in path:
print("명령을 이수합니다." + str(route) + "좌표로 이동합니다." )
#북쪽
if route[1] - path_0[1] == 1:
my_map_coordinate[route[0]][route[1]].pose.orientation.x = 0.0
my_map_coordinate[route[0]][route[1]].pose.orientation.y = 0.0
my_map_coordinate[route[0]][route[1]].pose.orientation.z = 0.0
my_map_coordinate[route[0]][route[1]].pose.orientation.w = 1.0
#남쪽
elif route[1] - path_0[1] == -1:
my_map_coordinate[route[0]][route[1]].pose.orientation.x = 0.0
my_map_coordinate[route[0]][route[1]].pose.orientation.y = 0.0
my_map_coordinate[route[0]][route[1]].pose.orientation.z = 0.0
my_map_coordinate[route[0]][route[1]].pose.orientation.w = -1.0
# 서쪽
elif route[0] - path_0[0] == -1:
my_map_coordinate[route[0]][route[1]].pose.orientation.x = 0.0
my_map_coordinate[route[0]][route[1]].pose.orientation.y = 0.0
my_map_coordinate[route[0]][route[1]].pose.orientation.z = 0.7
my_map_coordinate[route[0]][route[1]].pose.orientation.w = 0.7
# 동쪽
elif route[0] - path_0[0] == 1:
my_map_coordinate[route[0]][route[1]].pose.orientation.x = 0.0
my_map_coordinate[route[0]][route[1]].pose.orientation.y = 0.0
my_map_coordinate[route[0]][route[1]].pose.orientation.z = -0.7
my_map_coordinate[route[0]][route[1]].pose.orientation.w = 0.7
nav.goToPose(my_map_coordinate[route[0]][route[1]])
i=0
while not nav.isTaskComplete():
i = i+1
feedback = nav.getFeedback()
if feedback and i % 5 == 0:
print("Distance remaining : " + str(feedback.distance_remaining) + " m")
if feedback.distance_remaining > 0.1:
pass
else:
print("Distance remaining : " + str(feedback.distance_remaining) + " m")
nav.cancelTask()
print("명령을 이수했습니다.")
print(str(route) + "좌표로 이동했습니다." )
print()
path_0 = route
def get_my_map_coordinate():
image = image_processing.pgm_to_matrix("/home/du/mini_bot/baby_map.pgm", "/home/du/mini_bot/baby_map.yaml" ,3 , 12, -1.1, 2.2)
image.run()
my_map_coordinate = []
my_map_layer = []
cor_x = image.origin_x #1.1씩 더하세요
cor_y = image.origin_y #1.1씩 빼세요.
for i in range(len(image.matrix)):
cor_x = image.origin_x
cor_y = cor_y - image.sero_m
for j in range(len(image.matrix[0])):
cor_x = cor_x + image.garo_m
if image.matrix[i][j] == 1:
my_map_layer.append(None)
elif image.matrix[i][j] == 0:
goal = PoseStamped()
goal.header.frame_id = 'map'
goal.header.stamp = nav.get_clock().now().to_msg()
goal.pose.position.x = cor_x
goal.pose.position.y = cor_y
goal.pose.position.z = 0.006378173828125
goal.pose.orientation.x = 0.0
goal.pose.orientation.y = 0.0
goal.pose.orientation.z = 0.0
goal.pose.orientation.w = 1.0
my_map_layer.append(goal)
my_map_coordinate.append(my_map_layer)
my_map_layer = []
return my_map_coordinate
def main(args=None):
# args = None 아무런 의미가 없는말이다.
# init 에 뭔가를 넣을 수 있으니 만약에 하고 싶다면 이걸 수정해라 라고 해서 args = None이라고 한거다.
test_sub = Subscriber()
rp.spin(test_sub)
test_sub.destroy_node()
rp.shutdown()
# 종료해달라
if __name__ == '__main__':
main()
문제는 함수가 너무깁니다. 다른 코드가 동작을 못합니다
minibot_indoor / setup.py
entry_points={
'console_scripts': [
'obstacle_detection = minibot_indoor.obstacle_detection:main',
'path_planning = minibot_indoor.path_planning:main',
'order = minibot_indoor.order:main',
'image_processing = minibot_indoor.image_processing:main',
'Astar = minibot_indoor.Astar:main',
],
},
entry_points 에 다음처럼 추가합니다.
현재 예상되는 문제점
작성된 코드는 현재 ROS상에서 돌아가는지 검증을 하기 위한 코드..
class order_Subscriber(Node):
def __init__(self):
super().__init__('test_sub')
self.subscription = self.create_subscription(StartEnd, 'test_topic', self.listener_callback, 10)
self.subscription # prevent unused variable warning
def listener_callback(self, msg):
go_my_robot(get_my_map_coordinate(), (1,0), (msg.end_y, msg.end_x))
문제는 go_my_robot함수는 하나의 명령을 완료할때까지 실행되기 때문에 lifecycle이 너무 길다. 그 때까지 다른 입력을 받을 수 없을 수도 있다.
맞다.. 라이다에 대한 정보를 보내도 구독은 하고 있지만 콜백함수가 동작하지 못한다
그러면 당연히 장애물이 멈추라는 신호를 보내도 멈출 수 없을 것이다.
해결 방법은 다음과 같다.
- path를 한가지씩만 수행하게 해서 함수가 적은 시간동안 실행되도록 한다. (중요)
- 노드를 분리한다. (X)
- 스레드 개념을 도입한다. (X)
콜백함수에서는 명령이수 단계로 변경 (false에서 true로 변경) ← 가능할듯 이걸로 낙찰
그러면 장애물탐지는 스레드로 실행하고.. 콜백함수에서 지속적으로 받음
그래서 처음에 진행한 것이 함수를 작게 쪼개는 것이였다. 현재는 Nav2가 모든 지점을 다 이동할때까지 callback함수를 물고 늘어지고 있어서 가장 가능한 것은 하나의 좌표를 이행할때까지 분리하는 것이였고
추가적으로 필요했던 조치는 라이다를 구독하는 콜백함수를 스레드 형식으로 만드는 것이다.
다음처럼 앞부분의 코드가 길어졌지만 주목할 점은 groups가 등장하면서 라이다의 값을 스레드 형식으로 받아볼 수 있게 되었다.
(해당 코드에는 장애물 처리에 관한 코드도 들어있다.)
from nav2_simple_commander.robot_navigator import BasicNavigator
from geometry_msgs.msg import PoseWithCovarianceStamped
from geometry_msgs.msg import PoseStamped
from rclpy.duration import Duration
from minibot_indoor import image_processing
from minibot_indoor import Astar
import rclpy as rp
import threading
from rclpy.node import Node
from std_msgs.msg import String
from minibot_msgs.msg import IsObstacle
from minibot_msgs.msg import StartEnd
from minibot_msgs.msg import NextMission
import time
from geometry_msgs.msg import Twist
rp.init()
nav = BasicNavigator()
# 장애물을 탐지하는 부분..
class Subscriber(Node):
def __init__(self):
super().__init__('test_sub')
self.callback_group = rp.callback_groups.ReentrantCallbackGroup()
self.subscription1 = self.create_subscription(IsObstacle, '/obstacle_detect', self.obstacle_callback, 10, callback_group=self.callback_group)
self.subscription2 = self.create_subscription(StartEnd, 'Robot_order', self.test_callback, 10)
self.publisher = self.create_publisher(Twist, 'base_controller/cmd_vel_unstamped', 10)
self.publisher_mission = self.create_publisher(NextMission, 'Mission_complete', 10)
timer_period = 1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
self.order_start = False
self.path_count = 1
self.path = None
self.now_location = (0, 1)
self.pre_order_x = None
self.pre_order_y = None
self.result = None
self.check_obstacle = False
self.obstacle_area = None
self.mission_ing = False
self.restart = False
self.my_map_coordinate = get_my_map_coordinate()
def timer_callback(self):
msg = NextMission()
msg.mission = self.mission_ing
self.publisher_mission.publish(msg)
def obstacle_callback(self, msg):
threading.Thread(target=self.process_message_b, args=(msg,)).start()
def process_message_b(self, msg):
check_obstacle_front(msg.check_obstacle)
print("move : ", msg.check_obstacle, ", distance : ", msg.min_distance)
def test_callback(self, msg):
print("함수를 새로시작합니다.")
if self.order_start == False:
if self.pre_order_y == msg.end_y and self.pre_order_x == msg.end_x:
pass
else:
self.result, self.path = my_robot_path("/home/du/mini_bot/baby_real_map_cut.pgm", "/home/du/mini_bot/baby_real_map_cut.yaml", self.now_location, (msg.end_y, msg.end_x), self.obstacle_area )
self.pre_order_y = msg.end_y
self.pre_order_x = msg.end_x
self.mission_ing = True
self.order_start = True
print(self.result)
if self.path != None:
if len(self.path) > self.path_count:
self.now_location, success = go_my_robot(self.my_map_coordinate, self.now_location, self.path[self.path_count])
if success == True:
self.path_count = self.path_count + 1
else:
print("로봇이 후진합니다.")
self.back_robot()
print("경로를 재탐색합니다.")
self.restart = True
self.obstacle_area = self.path[self.path_count]
self.order_start = False
self.path = None
self.pre_order_y = None
self.pre_order_x = None
self.path_count = 1
else:
self.order_start = False
self.mission_ing = False
self.path = None
self.path_count = 1
else:
if self.order_start == True:
print("이동할 수 없는 지점으로 명령하고 있습니다.")
self.order_start = False
def spin(self):
rp.spin(self)
# 후진하는 부분
def back_robot(self):
print("후진을 실시합니다.")
for i in range(11):
twist = Twist()
twist.linear.x = 0.0 # 속도 값을 -0.2로 수정
twist.angular.z = 0.628
self.publisher.publish(twist)
time.sleep(1)
for i in range(5):
twist = Twist()
twist.linear.x = 0.08 # 속도 값을 -0.2로 수정
twist.angular.z = 0.0
self.publisher.publish(twist)
time.sleep(1)
그리고 라이다에서 주는 정보를 전역변수로 계속 공유하면 된다.
# 실시간 구독한 장애물 정보를 전역변수로 선언하면 될듯..
check_obstacle = False
def check_obstacle_front(check):
global check_obstacle
check_obstacle = check
다음처럼 작성하면 함수만 호출되면 전역변수가 수정되고, 조회가 가능해진다.
# 경로를 생성하는 함수
def my_robot_path(pgm_path, yaml_path, start, end, obstacle_area):
image = image_processing.pgm_to_matrix(pgm_path, yaml_path ,12 , 3, -1.3, 0.6)
image.run()
my_map = image.matrix
if obstacle_area != None:
my_map[obstacle_area[0]][obstacle_area[1]] = 1
make_route = Astar.Astar()
result, path = make_route.run(my_map , start, end)
return result, path
# 0인 지점의 좌표값을 구하는 함수
def get_my_map_coordinate():
image = image_processing.pgm_to_matrix("/home/du/mini_bot/baby_real_map_cut.pgm", "/home/du/mini_bot/baby_real_map_cut.yaml" ,12 , 3, -1.3, 0.6)
image.run()
my_map_coordinate = []
my_map_layer = []
cor_x = image.origin_x #1.1씩 더하세요
cor_y = image.origin_y #1.1씩 빼세요.
for i in range(len(image.matrix)):
cor_x = image.origin_x
cor_y = cor_y - image.sero_m
for j in range(len(image.matrix[0])):
cor_x = cor_x + image.garo_m
if image.matrix[i][j] == 1:
my_map_layer.append(None)
elif image.matrix[i][j] == 0:
goal = PoseStamped()
goal.header.frame_id = 'map'
goal.header.stamp = nav.get_clock().now().to_msg()
goal.pose.position.x = cor_x
goal.pose.position.y = cor_y
goal.pose.position.z = 0.006378173828125
goal.pose.orientation.x = 0.0
goal.pose.orientation.y = 0.0
goal.pose.orientation.z = 0.0
goal.pose.orientation.w = 1.0
my_map_layer.append(goal)
my_map_coordinate.append(my_map_layer)
my_map_layer = []
return my_map_coordinate
# 이동을 수행하는 코드로 start와 end는 (0,1) (0,2) 같이 1만 움직이게 설정했다.
# 결과적으로 함수를 하나의 경로만큼으로 줄였지만, Nav2로 찍은 동안은 어쩔 수 없이 물려있어야한다.
# 스레드의 개념을 들고와서 그 부분을 해결했다.
def go_my_robot(my_map_coordinate, start, end):
global check_obstacle
watiting_second = False
print("명령을 이수합니다." + str(start) + "좌표로 이동합니다." )
#북쪽
if end[1] - start[1] == 1:
my_map_coordinate[end[0]][end[1]].pose.orientation.x = 0.0
my_map_coordinate[end[0]][end[1]].pose.orientation.y = 0.0
my_map_coordinate[end[0]][end[1]].pose.orientation.z = 0.0
my_map_coordinate[end[0]][end[1]].pose.orientation.w = 1.0
#남쪽
elif end[1] - start[1] == -1:
my_map_coordinate[end[0]][end[1]].pose.orientation.x = 0.0
my_map_coordinate[end[0]][end[1]].pose.orientation.y = 0.0
my_map_coordinate[end[0]][end[1]].pose.orientation.z = -1.0
my_map_coordinate[end[0]][end[1]].pose.orientation.w = 0.0
# 서쪽
elif end[0] - start[0] == -1:
my_map_coordinate[end[0]][end[1]].pose.orientation.x = 0.0
my_map_coordinate[end[0]][end[1]].pose.orientation.y = 0.0
my_map_coordinate[end[0]][end[1]].pose.orientation.z = 0.7
my_map_coordinate[end[0]][end[1]].pose.orientation.w = 0.7
# 동쪽
elif end[0] - start[0] == 1:
my_map_coordinate[end[0]][end[1]].pose.orientation.x = 0.0
my_map_coordinate[end[0]][end[1]].pose.orientation.y = 0.0
my_map_coordinate[end[0]][end[1]].pose.orientation.z = -0.7
my_map_coordinate[end[0]][end[1]].pose.orientation.w = 0.7
nav.goToPose(my_map_coordinate[end[0]][end[1]])
i=0
while not nav.isTaskComplete():
if check_obstacle == True:
# 그래서 우리가 원했던 장애물 탐지가 가능하게 되었다.
print("전방에 장애물이 있습니다. 5초간 정지합니다.")
else:
watiting_second = False
i = i+1
feedback = nav.getFeedback()
if feedback and i % 5 == 0:
print("Distance remaining : " + str(feedback.distance_remaining) + " m")
if feedback.distance_remaining > 0.15:
pass
else:
print("Distance remaining : " + str(feedback.distance_remaining) + " m")
nav.cancelTask()
print("명령을 이수했습니다.")
print(str(end) + "좌표로 이동했습니다." )
print()
if check_obstacle == True:
return start, False
else:
return end, True
이렇게 설명을 대충 마무리하고, 장애물 탐지는 다음처럼 진행한다. 추가적으로 로봇이 자신이 현재 움직이는 상태인지 멈춘상태인지 발행도 한다.
장애물을 판단하는 과정
1. 장애물 토픽을 구독하고, 여기서 장애물이 있다고 판단되는 경우
2. 5초간 정지하며, 장애물이 정적장애물인지, 동적장애물인지 확인합니다.
3. 5초뒤 전방에 장애물이 없다고 판단되면, 계속 이동
4. 5초뒤에도 전방의 장애물이 없어지지 않는다면 뒤로 후진하고, 전방의 영역을
장애물이 있는 상태로 변경하고 경로를 재탐색하여 임무 수행
5. 장애물에 막힌경우 Rviz상에서는 벽으로 인식된다. 이를 우회를 통해서 들어가려한다면, remain Distance가 길어지게 된다. 이 경우 취소시킨다.
6. 재탐색에도 경로가 없으면 이동할 수 없음을 알림
5번을 제외하고 구현이 된 상태의 코드이며, 위의 코드에서 장애물을 탐지하였습니다에 다음과 같은 코드가 들어간다.
if check_obstacle == True:
if watiting_second == False:
print("전방에 장애물이 있습니다. 5초간 정지합니다.")
time.sleep(5)
watiting_second = True
else:
print("5초간 정지해보니 고정된 물체로 판단되어 경로를 재탐색합니다.")
nav.cancelTask()
그냥 설명보다도 코드는 깃헙을 확인하고, 실제 작동을 가제보상에서 확인해보자.
addinedu-amr/AIrobot-repo-2 (github.com)
우선 장애물이 없는 경우의 경로를 찾아가는 경우를 보자.
이번에는 장애물을 스폰하고 찾아가는 것을 확인해보자.
다음은 order.py와 path_planning.py 파일이 서로 토픽으로 자신의 상태를 주고받게 하여 연속된 일처리가 가능하게 하자.
order.py 에서 path_planning.py에서 로봇이 현재 작업을 수행중인지에 대한 토픽을 보낸다.
order.py 에서 작업이 종료되었다는 소식이 들려오면 리스트의 다음 값을 보내는 방식으로 진행하고, 리스트에 있는 모든 일을 처리한 경우 집으로 귀환 시킨다.
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from minibot_msgs.msg import StartEnd
from minibot_msgs.msg import NextMission
class Publisher(Node):
def __init__(self):
super().__init__('test_pub')
self.publisher_ = self.create_publisher(StartEnd, 'Robot_order', 10)
self.subscriber = self.create_subscription(NextMission, 'Mission_complete', self.mission_callback, 10 )
timer_period = 1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
self.i = 0
self.end_point_x = [0, 0]
self.end_point_y = [2, 6]
self.end_point_count = 0
self.now_status = False
def timer_callback(self):
if len(self.end_point_x) > self.end_point_count:
print(str(self.end_point_x[self.end_point_count]) + "," + str(self.end_point_y[self.end_point_count]) + "로 이동을 시작합니다." )
msg = StartEnd()
msg.end_x = self.end_point_x[self.end_point_count]
msg.end_y = self.end_point_y[self.end_point_count]
self.publisher_.publish(msg)
elif len(self.end_point_x) == self.end_point_count:
print("집(0,1)으로 이동을 시작합니다." )
msg = StartEnd()
msg.end_x = 1
msg.end_y = 0
self.publisher_.publish(msg)
def mission_callback(self, msg):
if msg.mission == True:
self.now_status = True
elif msg.mission == False and self.now_status == True:
self.now_status = False
if len(self.end_point_x) > self.end_point_count:
print("명령을 완료했습니다. 다음 명령을 진행합니다.")
self.end_point_count = self.end_point_count + 1
else:
print("모든 명령을 완료했습니다. 집으로 복귀 시킵니다.")
def main(args=None):
rclpy.init(args=args)
test_pub = Publisher()
rclpy.spin(test_pub)
test_pub.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
실제 로봇의 동작
1. 장거리 배송과 장애물을 피해가는 모습
화질 왜이래..
2. 목표지점을 2곳 방문한 후 홈으로 복귀하는 모습
이것으로 로봇 프로젝트에서 실내로봇 자율주행 경로생성 파트를 맡아서 부족하지만 성공적으로 프로젝트 수행을 완료했습니다.
다음 이야기는 실외팀의 코드를 리뷰해보면서 그들이 어떻게 작업을 수행했는지 정리를 하도록 하겠습니다.
'ㅇ 프로젝트 > TEAM_실내외배송로봇' 카테고리의 다른 글
라이다에서 전방 물체를 판단하는 방법 (1) | 2023.03.21 |
---|---|
실내 경로생성 알고리즘의 아이디어 및 코드 설명 (0) | 2023.03.21 |
실내 경로 생성 파이썬으로 해보자 (Nav2를 이용한) (0) | 2023.03.14 |
도로의 차선을 검출해보자. (image segmentation 활용) (0) | 2023.03.09 |
Visual SLAM 구역, SLAM구 ROS 패키지 설계 및 FSM 설계 (0) | 2023.03.07 |