-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmover.py
142 lines (110 loc) · 4.73 KB
/
mover.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# Copy this file into your homework repository and fill in the ${?} blanks in the document strings
"""
Publishes twist that will move a robot back and forth in the x direction
while randomly providing an angular velocity about the z-axis.
PUBLISHERS:
+ /turtle1/cmd_vel (geometry_msgs/msg/Twist) - The velocity of an erratic turtle path
SERVICES:
+ /switch (crazy_turtle_interfaces/srv/Switch) - Position of the new turtle
PARAMETERS:
+ velocity (double) - Velocity driving the robot
"""
from enum import Enum, auto
from crazy_turtle_interfaces.srv import Switch
from geometry_msgs.msg import Twist, Vector3
from math import pi
from random import uniform
import rclpy
from rclpy.node import Node
from turtlesim.srv import Spawn, Kill
from rcl_interfaces.msg import ParameterDescriptor
class State(Enum):
""" Current state of the system.
Determines what the main timer function should be doing on each iteration
"""
KILLING = auto(),
SPAWNING = auto(),
HURTLING = auto()
def turtle_twist(xdot, omega):
""" Create a twist suitable for a turtle
Args:
xdot (float) : the forward velocity
omega (float) : the angular velocity
Returns:
Twist - a 2D twist object corresponding to linear/angular velocity
"""
return Twist(linear = Vector3(x = xdot, y = 0.0, z = 0.0),
angular = Vector3(x = 0.0, y = 0.0, z = omega))
class Mover(Node):
""" Publishes movement geometry_msgs/Twist commands at a fixed rate
"""
def __init__(self):
super().__init__("mover")
self.state = State.HURTLING
self.declare_parameter("velocity", 1.0,
ParameterDescriptor(description="The velocity of the turtle"))
self.kill_future = 0
self.nsteps = 0
self.direction = 1
self.newx = 0
self.newy = 0
self.velocity = self.get_parameter("velocity").get_parameter_value().double_value
self.pub = self.create_publisher(Twist, "cmd_vel", 10)
self.switch = self.create_service(Switch, "switch", self.switch_callback)
self.kill = self.create_client(Kill, "kill")
self.spawn = self.create_client(Spawn,"spawn")
self.timer = self.create_timer(0.008333, self.timer_callback)
if not self.kill.wait_for_service(timeout_sec=1.0):
raise RuntimeError('Timeout waiting for "kill" service to become available')
if not self.spawn.wait_for_service(timeout_sec=1.0):
raise RuntimeError('Timeout waiting for "spawn" service to become available')
def switch_callback(self, request, response):
""" Callback function for the /switch service
Kills turtle1 and respawns it an a new location
Args:
request (SwitchRequest): the mixer field contains
x, y, linear and angular velocity components
that are used to determine the new turtle location
response (SwitchResponse): the response object
Returns:
A SwitchResponse, containing the new x and y position
"""
self.get_logger().info("Killing turtle1")
self.kill_future = self.kill.call_async(Kill.Request(name="turtle1"))
self.state = State.KILLING
# The new position of the turtle is intentionally scrambled from a weird message
self.newx = request.mixer.y + request.mixer.angular_velocity
self.newy = request.mixer.x * request.mixer.linear_velocity
response.x = self.newx
response.y = self.newy
return response
def timer_callback(self):
""" Hurtle the turtle
"""
if self.state == State.HURTLING:
twist = turtle_twist(self.direction * self.velocity, uniform(-20.0, 20.0))
self.nsteps += 1
if self.nsteps > 200:
self.nsteps = 0
self.direction *= -1
self.pub.publish(twist)
elif self.state == State.KILLING:
if self.kill_future.done():
self.get_logger().info("turtle1 is dead :(")
self.get_logger().info("respawning turtle1")
self.spawn_future = self.spawn.call_async(Spawn.Request(x = self.newx, y = self.newy, theta = uniform(-pi, pi), name = "turtle1"))
self.state = State.SPAWNING
elif self.state == State.SPAWNING:
if self.spawn_future.done():
self.state = State.HURTLING
self.get_logger().info("turtle1 lives!")
else:
raise RuntimeException("Invalid State")
def main(args=None):
""" The main() function. """
rclpy.init(args=args)
mymove = Mover()
rclpy.spin(mymove)
rclpy.shutdown()
if __name__ == '__main__':
main()