Skip to content

Commit

Permalink
Included b2b
Browse files Browse the repository at this point in the history
Signed-off-by: Anderson Ignacio <anderson@aignacio.com>
  • Loading branch information
aignacio committed Oct 16, 2023
1 parent 2df28bc commit ee15971
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cocotbext/ahb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
# ex. with this:
# from cocotbext.ahb import AHBLiteMaster, AHBBus
from .ahb_master import AHBLiteMaster, AHBMaster
from .ahb_slave import AHBLiteSlave
from .ahb_slave import AHBLiteSlave, AHBSlave
from .ahb_bus import AHBBus
88 changes: 70 additions & 18 deletions cocotbext/ahb/ahb_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ def _create_vector(self, vec: Sequence[int],
# Data
# | default val | DATA 0 | ... | DATA N |

if phase == 'address_phase':
if phase == 'address_ph':
vec_out = vec
vec_out.append(self._get_def(width))
elif phase == 'data_phase':
elif phase == 'data_ph':
vec_out = vec
vec_out.insert(0, self._get_def(width))
else:
Expand All @@ -110,11 +110,11 @@ def _create_vector(self, vec: Sequence[int],
# Data
# | default val | DATA 0 | default value | DATA 1 |

if phase == 'address_phase':
if phase == 'address_ph':
for i in vec:
vec_out.append(i)
vec_out.append(self._get_def(width))
elif phase == 'data_phase':
elif phase == 'data_ph':
for i in vec:
vec_out.append(self._get_def(width))
vec_out.append(i)
Expand All @@ -124,21 +124,21 @@ def _create_vector(self, vec: Sequence[int],
async def _send_txn(self, address: Sequence[int],
value: Sequence[int],
size: Sequence[int],
mode: str = 'write') -> Sequence[AHBResp]:
mode: Sequence[str]) -> Sequence[AHBResp]:
"""Drives the AHB transaction into the bus."""
response = []
first_txn = True

for index, (txn_addr, txn_data, txn_size) in enumerate(zip(address,
value,
size)):
for index, (txn_addr, txn_data, txn_size, txn_mode) in enumerate(
zip(address, value, size, mode)
):
if index == len(address) - 1:
self._init_bus()
else:
self._addr_phase(txn_addr, txn_size, mode)
self._addr_phase(txn_addr, txn_size, txn_mode)
if txn_addr != self.def_val:
if not isinstance(txn_addr, LogicArray):
self.log.info(f"AHB {mode} txn:\n"
self.log.info(f"AHB {txn_mode} txn:\n"
f"\tADDR = 0x{txn_addr:x}\n"
f"\tDATA = 0x{value[index+1]:x}\n"
f"\tSIZE = {txn_size}")
Expand Down Expand Up @@ -205,13 +205,14 @@ async def write(self, address: Union[int, Sequence[int]],
t_size = copy.deepcopy(size)

width = len(self.bus.haddr)
t_address = self._create_vector(t_address, width, 'address_phase', pip)
t_address = self._create_vector(t_address, width, 'address_ph', pip)
width = len(self.bus.hwdata)
t_value = self._create_vector(t_value, width, 'data_phase', pip)
t_value = self._create_vector(t_value, width, 'data_ph', pip)
width = len(self.bus.hsize)
t_size = self._create_vector(t_size, width, 'address_phase', pip)
t_size = self._create_vector(t_size, width, 'address_ph', pip)
t_mode = ['write' for _ in range(len(t_address))]

return await self._send_txn(t_address, t_value, t_size, 'write')
return await self._send_txn(t_address, t_value, t_size, t_mode)

@cocotb.coroutine
async def read(self, address: Union[int, Sequence[int]],
Expand Down Expand Up @@ -242,13 +243,64 @@ async def read(self, address: Union[int, Sequence[int]],
t_size = copy.deepcopy(size)

width = len(self.bus.haddr)
t_address = self._create_vector(t_address, width, 'address_phase', pip)
t_address = self._create_vector(t_address, width, 'address_ph', pip)
width = len(self.bus.hwdata)
t_value = self._create_vector(t_value, width, 'data_phase', pip)
t_value = self._create_vector(t_value, width, 'data_ph', pip)
width = len(self.bus.hsize)
t_size = self._create_vector(t_size, width, 'address_phase', pip)
t_size = self._create_vector(t_size, width, 'address_ph', pip)
t_mode = ['read' for _ in range(len(t_address))]

return await self._send_txn(t_address, t_value, t_size, 'read')
return await self._send_txn(t_address, t_value, t_size, t_mode)

@cocotb.coroutine
async def b2b(self, address: Union[int, Sequence[int]],
value: Optional[Union[int, Sequence[int]]],
size: Optional[Union[int, Sequence[int]]] = None,
mode: Optional[str] = 'rd_after_wr') -> Sequence[AHBResp]:
"""Back-to-Back operation - RAW or WAR"""

if len(address) != 2:
raise Exception(f"Length address {len(address)} is diff. than 2!")

if len(value) != 2:
raise Exception(f"Length value {len(value)} is bigger diff. 2!")

if size is None:
size = [self.bus._data_width // 8 for _ in range(len(address))]
else:
if len(size) != 2:
raise Exception(f"Length size {len(size)} is diff. than 2!")
for sz in size:
AHBLiteMaster._check_size(sz, len(self.bus.hwdata) // 8)

# Convert all inputs into lists, if not already
if not isinstance(address, list):
address = [address]
if not isinstance(size, list):
size = [size]
if not isinstance(value, list):
value = [value]

# Need to copy data as we'll have to shift address/size
t_address = copy.deepcopy(address)
t_value = copy.deepcopy(value)
t_size = copy.deepcopy(size)

width = len(self.bus.haddr)
t_address = self._create_vector(t_address, width, 'address_ph', True)
width = len(self.bus.hwdata)
t_value = self._create_vector(t_value, width, 'data_ph', True)
width = len(self.bus.hsize)
t_size = self._create_vector(t_size, width, 'address_ph', True)

if mode == 'rd_after_wr':
t_mode = ['write', 'read', 'read']
elif mode == 'wr_after_rd':
t_mode = ['read', 'write', 'write']
else:
raise Exception('Illegal mode')

return await self._send_txn(t_address, t_value, t_size, t_mode)


class AHBMaster(AHBLiteMaster):
Expand Down
7 changes: 7 additions & 0 deletions cocotbext/ahb/ahb_slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,10 @@ def _init_bus(self) -> None:
def _get_def(self, width: int = 1) -> BinaryValue:
"""Return a handle obj with the default value"""
return LogicArray([self.def_val for _ in range(width)])


class AHBSlave(AHBLiteSlave):
def __init__(self, bus: AHBBus, clock: str, reset: str,
timeout: int = 100, def_val: Union[int, str] = 'Z', **kwargs):
super().__init__(bus, clock, reset, timeout,
def_val, 'ahb_full', **kwargs)
8 changes: 6 additions & 2 deletions tests/test_ahb_lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,13 @@ async def run_test(dut):
# resp = await ahb_lite_master.write(address, value, size, pip=False)
# resp = await ahb_lite_master.write(address, value, size, pip=False)
# resp = await ahb_lite_master.write(address, value, size, pip=True)
resp = await ahb_lite_master.write(address, value, size, pip=True)

address = [rnd_val(32) for _ in range(2)]
value = [rnd_val(32) for _ in range(2)]
size = [pick_random_value([1, 2, 4]) for _ in range(2)]
resp = await ahb_lite_master.b2b(address, value, size, mode='rd_after_wr')
print(resp)
resp = await ahb_lite_master.read(address, pip=True)
resp = await ahb_lite_master.b2b(address, value, size, mode='wr_after_rd')
print(resp)


Expand Down

0 comments on commit ee15971

Please sign in to comment.