Skip to content

Commit

Permalink
Add PTO & h-bridge to Amiga PDO1 (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hackerman342 authored Mar 21, 2023
1 parent 2402b7b commit 8bad34c
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 16 deletions.
1 change: 1 addition & 0 deletions py/examples/vehicle_twist/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
async def request_generator(twist: canbus_pb2.Twist2d) -> iter[canbus_pb2.SendVehicleTwistCommandReply]:
while True:
yield canbus_pb2.SendVehicleTwistCommandRequest(command=twist)
await asyncio.sleep(0.1) # Limit to 10 hz


async def main(config: ClientConfig, twist: canbus_pb2.Twist2d) -> None:
Expand Down
113 changes: 97 additions & 16 deletions py/farm_ng/canbus/packet.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ class AmigaControlState(IntEnum):
STATE_ESTOPPED = 6


class ActuatorCommands(IntEnum):
passive = 0x0
forward = 0x1
stopped = 0x2
reverse = 0x3


def actuator_bits_cmd(
a0=ActuatorCommands.passive, a1=ActuatorCommands.passive, a2=ActuatorCommands.passive, a3=ActuatorCommands.passive
):
return a0.value + (a1.value << 2) + (a2.value << 4) + (a3.value << 6)


def actuator_bits_read(bits):
a0 = ActuatorCommands(bits & 0x3)
a1 = ActuatorCommands((bits >> 2) & 0x3)
a2 = ActuatorCommands((bits >> 4) & 0x3)
a3 = ActuatorCommands((bits >> 6) & 0x3)
return (a0, a1, a2, a3)


class Packet:
"""Base class inherited by all CAN message data structures."""

Expand All @@ -67,7 +88,7 @@ def age(self):


def make_amiga_rpdo1_proto(
state_req: AmigaControlState, cmd_speed: float, cmd_ang_rate: float
state_req: AmigaControlState, cmd_speed: float, cmd_ang_rate: float, pto_bits: int = 0x0, hbridge_bits: int = 0x0
) -> canbus_pb2.RawCanbusMessage:
"""Creates a canbus_pb2.RawCanbusMessage.
Expand All @@ -85,12 +106,21 @@ def make_amiga_rpdo1_proto(
# TODO: add some checkers, or make python CHECK_API
return canbus_pb2.RawCanbusMessage(
id=AmigaRpdo1.cob_id + DASHBOARD_NODE_ID,
data=AmigaRpdo1(state_req=state_req, cmd_speed=cmd_speed, cmd_ang_rate=cmd_ang_rate).encode(),
data=AmigaRpdo1(
state_req=state_req,
cmd_speed=cmd_speed,
cmd_ang_rate=cmd_ang_rate,
pto_bits=pto_bits,
hbridge_bits=hbridge_bits,
).encode(),
)


class AmigaRpdo1(Packet):
"""State, speed, and angular rate command (request) sent to the Amiga vehicle control unit (VCU)"""
"""State, speed, and angular rate command (request) sent to the Amiga vehicle control unit (VCU).
New in fw v0.1.9 / farm-ng-amiga v0.0.7: Add pto & hbridge control. Message data is now 8 bytes (was 5).
"""

cob_id = 0x200

Expand All @@ -99,32 +129,59 @@ def __init__(
state_req: AmigaControlState = AmigaControlState.STATE_ESTOPPED,
cmd_speed: float = 0.0,
cmd_ang_rate: float = 0.0,
pto_bits: int = 0x0,
hbridge_bits: int = 0x0,
):
self.format = "<Bhh"
self.format = "<BhhBBx"
self.legacy_format = "<Bhh"

self.state_req = state_req
self.cmd_speed = cmd_speed
self.cmd_ang_rate = cmd_ang_rate
self.pto_bits = pto_bits
self.hbridge_bits = hbridge_bits

self.stamp_packet(time.monotonic())

def encode(self):
"""Returns the data contained by the class encoded as CAN message data."""
return pack(self.format, self.state_req, int(self.cmd_speed * 1000.0), int(self.cmd_ang_rate * 1000.0))
return pack(
self.format,
self.state_req,
int(self.cmd_speed * 1000.0),
int(self.cmd_ang_rate * 1000.0),
self.pto_bits,
self.hbridge_bits,
)

def decode(self, data):
"""Decodes CAN message data and populates the values of the class."""
(self.state_req, cmd_speed, cmd_ang_rate) = unpack(self.format, data)
self.cmd_speed = cmd_speed / 1000.0
self.cmd_ang_rate = cmd_ang_rate / 1000.0
if len(data) == 5:
# TODO: Instate warning when dashboard fw v0.1.9 is released
# warnings.warn(
# "Please update dashboard firmware to >= v0.1.9."
# " New AmigaTpdo1 packets include more data. Support will be removed in farm_ng_amiga v0.0.9",
# stacklevel=2,
# )
(self.state_req, cmd_speed, cmd_ang_rate) = unpack(self.legacy_format, data)
self.cmd_speed = cmd_speed / 1000.0
self.cmd_ang_rate = cmd_ang_rate / 1000.0
else:
(self.state_req, cmd_speed, cmd_ang_rate, self.pto_bits, self.hbridge_bits) = unpack(self.format, data)
self.cmd_speed = cmd_speed / 1000.0
self.cmd_ang_rate = cmd_ang_rate / 1000.0

def __str__(self):
return "AMIGA RPDO1 Request state {} Command speed {:0.3f} Command angular rate {:0.3f}".format(
self.state_req, self.cmd_speed, self.cmd_ang_rate
)
) + " Command PTO bits 0x{:x} Command h-bridge bits 0x{:x}".format(self.pto_bits, self.hbridge_bits)


class AmigaTpdo1(Packet):
"""State, speed, and angular rate of the Amiga vehicle control unit (VCU)"""
"""State, speed, and angular rate of the Amiga vehicle control unit (VCU).
New in fw v0.1.9 / farm-ng-amiga v0.0.7: Add pto & hbridge control. Message data is now 8 bytes (was 5).
"""

cob_id = 0x180

Expand All @@ -133,28 +190,52 @@ def __init__(
state: AmigaControlState = AmigaControlState.STATE_ESTOPPED,
meas_speed: float = 0.0,
meas_ang_rate: float = 0.0,
pto_bits: int = 0x0,
hbridge_bits: int = 0x0,
):
self.format = "<Bhh"
self.format = "<BhhBBx"
self.legacy_format = "<Bhh"

self.state = state
self.meas_speed = meas_speed
self.meas_ang_rate = meas_ang_rate
self.pto_bits = pto_bits
self.hbridge_bits = hbridge_bits

self.stamp_packet(time.monotonic())

def encode(self):
"""Returns the data contained by the class encoded as CAN message data."""
return pack(self.format, self.state, int(self.meas_speed * 1000.0), int(self.meas_ang_rate * 1000.0))
return pack(
self.format,
self.state,
int(self.meas_speed * 1000.0),
int(self.meas_ang_rate * 1000.0),
self.pto_bits,
self.hbridge_bits,
)

def decode(self, data):
"""Decodes CAN message data and populates the values of the class."""
(self.state, meas_speed, meas_ang_rate) = unpack(self.format, data)
self.meas_speed = meas_speed / 1000.0
self.meas_ang_rate = meas_ang_rate / 1000.0
if len(data) == 5:
# TODO: Instate warning when dashboard fw v0.1.9 is released
# warnings.warn(
# "Please update dashboard firmware to >= v0.1.9."
# " New AmigaTpdo1 packets include more data. Support will be removed in farm_ng_amiga v0.0.9",
# stacklevel=2,
# )
(self.state, meas_speed, meas_ang_rate) = unpack(self.legacy_format, data)
self.meas_speed = meas_speed / 1000.0
self.meas_ang_rate = meas_ang_rate / 1000.0
else:
(self.state, meas_speed, meas_ang_rate, self.pto_bits, self.hbridge_bits) = unpack(self.format, data)
self.meas_speed = meas_speed / 1000.0
self.meas_ang_rate = meas_ang_rate / 1000.0

def __str__(self):
return "AMIGA TPDO1 Amiga state {} Measured speed {:0.3f} Measured angular rate {:0.3f} @ time {}".format(
self.state, self.meas_speed, self.meas_ang_rate, self.stamp.stamp
)
) + " PTO bits 0x{:x} h-bridge bits 0x{:x}".format(self.pto_bits, self.hbridge_bits)


def parse_amiga_tpdo1_proto(message: canbus_pb2.RawCanbusMessage) -> AmigaTpdo1 | None:
Expand Down

0 comments on commit 8bad34c

Please sign in to comment.