Skip to content

Commit

Permalink
Included option to format AMBA
Browse files Browse the repository at this point in the history
Signed-off-by: aignacio <anderson@aignacio.com>
  • Loading branch information
aignacio committed Oct 24, 2024
1 parent 93b0045 commit 95042bc
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 16 deletions.
22 changes: 19 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ class AHBBus(Bus):
* hexcl
* hmaster
* hsel
* hready_in
* hready_in *(Note a)*

Notes:
a. This signal is driven high during the txn start but it does not follow the **hreadyout** loopback from the slave when the slave is not available, if the loopback is expected, suggestion is to connect the `hready_in` directly to the `hreadyout`

2. AHB Slave signals

Expand Down Expand Up @@ -201,11 +204,14 @@ For writes, the arguments are listed here:

```python
async def write(
self,
self,
address: Union[int, Sequence[int]],
value: Union[int, Sequence[int]],
size: Optional[Union[int, Sequence[int]]] = None,
pip: Optional[bool] = False,
verbose: Optional[bool] = False,
sync: Optional[bool] = False,
format_amba: Optional[bool] = False,
) -> Sequence[dict]:
```

Expand All @@ -214,6 +220,9 @@ For writes, the arguments are listed here:
* value - Single or a list of integer values to be written
* Optional[size] - Integer number of bytes to be written (for instance, in 32-bit bus, 1, 2 or 4 bytes), default is the max bus size
* Optional[pip] - Define if the address/data phase will overlap in a pipeline manner or not, default non-pipelined
* Optional[verbose] - Print a msg on every txn
* Optional[sync] - Drive signals on next clk edge
* Optional[format_amba] - Enforce AMBA data masking/shifting according to spec (Table 6-1 Active byte lanes for a 32-bit little-endian data bus - ARM IHI 0033B.b)

**Return**
* Sequence[dict] - Once all transactions are dispatched and their responses are received, the function returns a list of dict with [AHBResp](cocotbext/ahb/ahb_types.py) responses along with the data.
Expand All @@ -230,13 +239,17 @@ For reads, the arguments are listed here:
address: Union[int, Sequence[int]],
size: Optional[Union[int, Sequence[int]]] = None,
pip: Optional[bool] = False,
verbose: Optional[bool] = False,
sync: Optional[bool] = False,
) -> Sequence[dict]:
```

**Arguments**
* address - Single or a list of integer addresses to be read from
* Optional[size] - Integer number of bytes to be read (for instance, in 32-bit bus, 1, 2 or 4 bytes), default is the max bus size
* Optional[pip] - Define if the address/data phase will overlap in a pipeline manner or not, default non-pipelined
* Optional[sync] - Drive signals on next clk edge
* Optional[verbose] - Print a msg on every txn

**Return**
* Sequence[dict] - Once all transactions are dispatched and their responses are received, the function returns a list of dict with [AHBResp](cocotbext/ahb/ahb_types.py) responses along with the data.
Expand All @@ -254,7 +267,8 @@ A third method provides flexibility in case the user wants to perform read or wr
value: Union[int, Sequence[int]],
mode: Union[int, Sequence[int]],
size: Optional[Union[int, Sequence[int]]] = None,
pip: Optional[bool]
pip: Optional[bool],
verbose: Optional[bool] = False,
) -> Sequence[dict]:
```

Expand All @@ -264,6 +278,8 @@ A third method provides flexibility in case the user wants to perform read or wr
* mode - Single or a list of operation types - 0 (Read) or 1 (Write)
* Optional[size] - Integer number of bytes to be written/read (for instance, in 32-bit bus, 1, 2 or 4 bytes), default is the max bus size
* Optional[pip] - Define if the address/data phase will overlap in a pipeline manner or not, default non-pipelined
* Optional[sync] - Drive signals on next clk edge
* Optional[verbose] - Print a msg on every txn

**Return**
* Sequence[dict] - Once all transactions are dispatched and their responses are received, the function returns a list of dict with [AHBResp](cocotbext/ahb/ahb_types.py) responses along with the data.
Expand Down
27 changes: 25 additions & 2 deletions cocotbext/ahb/ahb_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# License : MIT license <Check LICENSE>
# Author : Anderson I. da Silva (aignacio) <anderson@aignacio.com>
# Date : 08.10.2023
# Last Modified Date: 02.10.2024
# Last Modified Date: 24.10.2024
import logging
import cocotb
import copy
Expand Down Expand Up @@ -93,6 +93,24 @@ def _check_size(size: int, data_bus_width: int) -> None:
elif size <= 0 or (size & (size - 1)) != 0:
raise ValueError(f"Error -> {size} - Size must" f"be a positive power of 2")

def _fmt_amba(self, address: Sequence[int], size: Sequence[int], value: Sequence[int]) -> Sequence[int]:
"""Format the write data to follow AMBA by shifting / masking data."""
new_val = []
offset = (self.bus.data_width // 8) - 1
for addr, sz, val in zip(address, size, value):
if sz != (self.bus.data_width // 8):
data = 0
if sz == 4:
data = (val & 0xFFFFFFFF) << ((addr & offset) * 8)
elif sz == 2:
data = (val & 0xFFFF) << ((addr & offset) * 8)
elif sz == 1:
data = (val & 0xFF) << ((addr & offset) * 8)
new_val.append(data)
else:
new_val.append(val)
return new_val

def _addr_phase(self, addr: int, size: int, mode: AHBWrite, trans: AHBTrans):
"""Drive the AHB signals of the address phase."""
self.bus.haddr.value = addr
Expand Down Expand Up @@ -198,7 +216,7 @@ async def _send_txn(
f"\tID = {txn_id}\n"
f"\tADDR = 0x{txn_addr:x}\n"
f"\tDATA = 0x{value[index + 1]:x}\n"
f"\tSIZE = {txn_size} bytes"
f"\tSIZE = {txn_size} byte[s]"
)
self.bus.hwdata.value = txn_data
if self.bus.hready_in_exist:
Expand Down Expand Up @@ -275,6 +293,7 @@ async def write(
pip: Optional[bool] = False,
verbose: Optional[bool] = False,
sync: Optional[bool] = False,
format_amba: Optional[bool] = False,
) -> Sequence[dict]:
"""Write data in the AHB bus."""

Expand All @@ -293,6 +312,10 @@ async def write(

if not isinstance(value, list):
value = [value]

if format_amba is True:
value = self._fmt_amba(address, size, value)

# if not isinstance(size, list):
# size = [size]

Expand Down
2 changes: 1 addition & 1 deletion cocotbext/ahb/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.4.3"
__version__ = "0.4.4"
6 changes: 3 additions & 3 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# License : MIT license <Check LICENSE>
# Author : Anderson I. da Silva (aignacio) <anderson@aignacio.com>
# Date : 08.10.2023
# Last Modified Date: 02.09.2024
# Last Modified Date: 24.10.2024

import nox

Expand Down Expand Up @@ -33,8 +33,8 @@ def run(session):
"--cov=cocotbext",
"--cov-branch",
"--cov-report=xml",
"-rf",
# "-rP",
# "-rf",
"-rP",
"-n",
"auto",
*session.posargs
Expand Down
56 changes: 49 additions & 7 deletions tests/test_ahb_lite_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
# License : MIT license <Check LICENSE>
# Author : Anderson I. da Silva (aignacio) <anderson@aignacio.com>
# Date : 08.10.2023
# Last Modified Date: 09.09.2024
# Last Modified Date: 24.10.2024

import cocotb
import os
import random
import pytest

from const import cfg
from cocotb_test.simulator import run
Expand Down Expand Up @@ -75,27 +76,67 @@ async def run_test(dut, bp_fn=None, pip_mode=False):
resp = await ahb_lite_master.write(address, value, pip=pip_mode)
resp = await ahb_lite_master.read(address, size, pip=pip_mode)
resp = await ahb_lite_master.custom(address, value, mode, size, pip_mode)

type(resp)

# ----------------------
# AMBA format test
# ----------------------
if ahb_lite_master.bus.data_width == 32:
# Byte access
address = [0x00, 0x01, 0x02, 0x3]
value = [rnd_val(32) for _ in range(4)]
size = [1 for _ in range(4)]
resp = await ahb_lite_master.write(address, value, size, format_amba=True, verbose=True)

# Half-word access
address = [0x00, 0x02]
value = [rnd_val(32) for _ in range(2)]
size = [2 for _ in range(2)]
resp = await ahb_lite_master.write(address, value, size, format_amba=True, verbose=True)

# Word access
address = [0x00]
value = [rnd_val(32)]
size = [4]
resp = await ahb_lite_master.write(address, value, size, format_amba=True, verbose=True)
else:
# Byte access
address = [0x00, 0x01, 0x02, 0x3, 0x4, 0x5, 0x6, 0x7]
value = [rnd_val(32) for _ in range(8)]
size = [1 for _ in range(8)]
resp = await ahb_lite_master.write(address, value, size, format_amba=True, verbose=True)

# Half-word access
address = [0x00, 0x02, 0x4, 0x6]
value = [rnd_val(32) for _ in range(4)]
size = [2 for _ in range(4)]
resp = await ahb_lite_master.write(address, value, size, format_amba=True, verbose=True)

# Word access
address = [0x00, 0x04]
rd = rnd_val(32)
value = [rd, rd]
size = [4 for _ in range(2)]
resp = await ahb_lite_master.write(address, value, size, format_amba=True, verbose=True)


if cocotb.SIM_NAME:
factory = TestFactory(run_test)
factory.add_option(
"bp_fn", [slave_back_pressure_generator(), slave_no_back_pressure_generator()]
)
factory.add_option("pip_mode", [False, True])
factory.generate_tests()


def test_ahb_lite_log():
@pytest.mark.parametrize("data_width", [{"DATA_WIDTH": "32"}, {"DATA_WIDTH": "64"}])
def test_ahb_lite_log(data_width):
"""
Test AHB lite log messages
Test ID: 1
"""
module = os.path.splitext(os.path.basename(__file__))[0]
SIM_BUILD = os.path.join(
cfg.TESTS_DIR, f"../run_dir/sim_build_{cfg.SIMULATOR}_{module}"
cfg.TESTS_DIR,
f"../run_dir/sim_build_{cfg.SIMULATOR}_{module}_data_width_{data_width['DATA_WIDTH']}_bits",
)
extra_args_sim = cfg.EXTRA_ARGS

Expand All @@ -105,6 +146,7 @@ def test_ahb_lite_log():
toplevel=cfg.TOPLEVEL,
module=module,
sim_build=SIM_BUILD,
parameters=data_width,
extra_args=extra_args_sim,
extra_env=cfg.EXTRA_ENV,
timescale=cfg.TIMESCALE,
Expand Down

0 comments on commit 95042bc

Please sign in to comment.