diff --git a/.gitignore b/.gitignore index 0ee9421..056b8ea 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,5 @@ __pycache__/ # build outputs build +/overrides.txt +.python-version diff --git a/chipflow_digital_ip/io/__init__.py b/chipflow_digital_ip/io/__init__.py index 8ce595c..4304b3a 100644 --- a/chipflow_digital_ip/io/__init__.py +++ b/chipflow_digital_ip/io/__init__.py @@ -2,5 +2,13 @@ from ._uart import UARTPeripheral from ._i2c import I2CPeripheral from ._spi import SPIPeripheral +from ._verilog_wrapper import VerilogWrapper, load_wrapper_from_toml -__all__ = ['GPIOPeripheral', 'UARTPeripheral', 'I2CPeripheral', 'SPIPeripheral'] +__all__ = [ + 'GPIOPeripheral', + 'UARTPeripheral', + 'I2CPeripheral', + 'SPIPeripheral', + 'VerilogWrapper', + 'load_wrapper_from_toml', +] diff --git a/chipflow_digital_ip/io/_glasgow_i2c.py b/chipflow_digital_ip/io/_glasgow_i2c.py index c1419ef..8fd746f 100644 --- a/chipflow_digital_ip/io/_glasgow_i2c.py +++ b/chipflow_digital_ip/io/_glasgow_i2c.py @@ -1,4 +1,4 @@ -from amaranth import * +from amaranth import * from amaranth.lib.cdc import FFSynchronizer diff --git a/chipflow_digital_ip/io/_glasgow_iostream.py b/chipflow_digital_ip/io/_glasgow_iostream.py index cd3c4a3..cca7561 100644 --- a/chipflow_digital_ip/io/_glasgow_iostream.py +++ b/chipflow_digital_ip/io/_glasgow_iostream.py @@ -1,9 +1,13 @@ -from amaranth import * +from amaranth import * from amaranth.lib import data, wiring, stream, io from amaranth.lib.wiring import In, Out +from amaranth_types.types import ShapeLike + + __all__ = ["IOStreamer", "PortGroup"] + class PortGroup: """Group of Amaranth library I/O ports. @@ -118,7 +122,7 @@ class IOStreamer(wiring.Component): """ @staticmethod - def o_stream_signature(ioshape, /, *, ratio=1, meta_layout=0): + def o_stream_signature(ioshape, /, *, ratio=1, meta_layout: ShapeLike = 0): return stream.Signature(data.StructLayout({ "port": _map_ioshape("o", ioshape, lambda width: data.StructLayout({ "o": width if ratio == 1 else data.ArrayLayout(width, ratio), @@ -129,7 +133,7 @@ def o_stream_signature(ioshape, /, *, ratio=1, meta_layout=0): })) @staticmethod - def i_stream_signature(ioshape, /, *, ratio=1, meta_layout=0): + def i_stream_signature(ioshape, /, *, ratio=1, meta_layout: ShapeLike = 0): return stream.Signature(data.StructLayout({ "port": _map_ioshape("i", ioshape, lambda width: data.StructLayout({ "i": width if ratio == 1 else data.ArrayLayout(width, ratio), @@ -137,8 +141,8 @@ def i_stream_signature(ioshape, /, *, ratio=1, meta_layout=0): "meta": meta_layout, })) - def __init__(self, ioshape, ports, /, *, ratio=1, init=None, meta_layout=0): - assert isinstance(ioshape, (int, dict)) + def __init__(self, ioshape: dict, ports, /, *, ratio=1, init=None, meta_layout: ShapeLike = 0): + assert isinstance(ioshape, dict) assert ratio in (1, 2) self._ioshape = ioshape @@ -161,7 +165,7 @@ def elaborate(self, platform): buffer_cls, latency = SimulatableDDRBuffer, 3 if isinstance(self._ports, io.PortLike): - m.submodules.buffer = buffer = buffer_cls("io", self._ports) + m.submodules.buffer = buffer = buffer_cls(io.Direction.Bidir, self._ports) if isinstance(self._ports, PortGroup): buffer = {} for name, sub_port in self._ports.__dict__.items(): @@ -231,7 +235,7 @@ def delay(value, name): class IOClocker(wiring.Component): @staticmethod - def i_stream_signature(ioshape, /, *, _ratio=1, meta_layout=0): + def i_stream_signature(ioshape, /, *, _ratio=1, meta_layout: ShapeLike = 0): # Currently the only supported ratio is 1, but this will change in the future for # interfaces like HyperBus. return stream.Signature(data.StructLayout({ @@ -245,10 +249,10 @@ def i_stream_signature(ioshape, /, *, _ratio=1, meta_layout=0): })) @staticmethod - def o_stream_signature(ioshape, /, *, ratio=1, meta_layout=0): + def o_stream_signature(ioshape, /, *, ratio=1, meta_layout: ShapeLike = 0): return IOStreamer.o_stream_signature(ioshape, ratio=ratio, meta_layout=meta_layout) - def __init__(self, ioshape, *, clock, o_ratio=1, meta_layout=0, divisor_width=16): + def __init__(self, ioshape, *, clock, o_ratio=1, meta_layout: ShapeLike = 0, divisor_width=16): assert isinstance(ioshape, dict) assert isinstance(clock, str) assert o_ratio in (1, 2) diff --git a/chipflow_digital_ip/io/_gpio.py b/chipflow_digital_ip/io/_gpio.py index 4fc5994..57417b7 100644 --- a/chipflow_digital_ip/io/_gpio.py +++ b/chipflow_digital_ip/io/_gpio.py @@ -1,4 +1,3 @@ - from amaranth import Module, unsigned from amaranth.lib import wiring from amaranth.lib.wiring import In, Out, flipped, connect diff --git a/chipflow_digital_ip/io/_rfc_uart.py b/chipflow_digital_ip/io/_rfc_uart.py index e08f13e..337d8a8 100644 --- a/chipflow_digital_ip/io/_rfc_uart.py +++ b/chipflow_digital_ip/io/_rfc_uart.py @@ -2,17 +2,26 @@ The Amaranth SoC RFC UART from https://github.com/ChipFlow/chipflow-digital-ip """ +from typing import Generic, TypeVar + from amaranth import * from amaranth.lib import stream, wiring from amaranth.lib.wiring import In, Out, flipped, connect +from amaranth.hdl import ValueCastable + +from amaranth_types.types import HasElaborate, ShapeLike, ValueLike from amaranth_soc import csr __all__ = ["RxPhySignature", "TxPhySignature", "RxPeripheral", "TxPeripheral", "Peripheral"] +_T_ValueOrValueCastable = TypeVar("_T_ValueOrValueCastable", bound=Value | ValueCastable, covariant=True) +_T_ShapeLike = TypeVar("_T_ShapeLike", bound=ShapeLike, covariant=True) +_T_Symbol_ShapeLike = TypeVar("_T_Symbol_ShapeLike", bound=ShapeLike, covariant=True) + -class RxPhySignature(wiring.Signature): +class RxPhySignature(wiring.Signature, Generic[_T_ShapeLike, _T_Symbol_ShapeLike]): """Receiver PHY signature. Parameters @@ -38,7 +47,7 @@ class RxPhySignature(wiring.Signature): Receiver error flag. Pulsed for one clock cycle in case of an implementation-specific error (e.g. wrong parity bit). """ - def __init__(self, phy_config_shape, symbol_shape): + def __init__(self, phy_config_shape: _T_ShapeLike, symbol_shape: _T_Symbol_ShapeLike): super().__init__({ "rst": Out(1), "config": Out(phy_config_shape), @@ -48,7 +57,7 @@ def __init__(self, phy_config_shape, symbol_shape): }) -class TxPhySignature(wiring.Signature): +class TxPhySignature(wiring.Signature, Generic[_T_ShapeLike, _T_Symbol_ShapeLike]): """Transmitter PHY signature. Parameters @@ -68,7 +77,7 @@ class TxPhySignature(wiring.Signature): symbols : :py:`Out(stream.Signature(symbol_shape))` Symbol stream. The shape of its payload is given by the `symbol_shape` parameter. """ - def __init__(self, phy_config_shape, symbol_shape): + def __init__(self, phy_config_shape: _T_ShapeLike, symbol_shape: _T_Symbol_ShapeLike): super().__init__({ "rst": Out(1), "config": Out(phy_config_shape), @@ -98,7 +107,7 @@ def elaborate(self, platform): return m -class RxPeripheral(wiring.Component): +class RxPeripheral(wiring.Component, Generic[_T_ShapeLike, _T_ValueOrValueCastable, _T_Symbol_ShapeLike]): class Config(csr.Register, access="rw"): """Peripheral configuration register. @@ -141,7 +150,7 @@ class PhyConfig(csr.Register, access="rw"): phy_config_init : :class:`int` Initial value of the PHY configuration word. """ - def __init__(self, phy_config_shape, phy_config_init): + def __init__(self, phy_config_shape: _T_ShapeLike, phy_config_init: _T_ValueOrValueCastable): super().__init__(csr.Field(_PhyConfigFieldAction, phy_config_shape, init=phy_config_init)) @@ -199,7 +208,7 @@ class Data(csr.Register, access="r"): symbol_shape : :ref:`shape-like ` Shape of a symbol. """ - def __init__(self, symbol_shape): + def __init__(self, symbol_shape: _T_Symbol_ShapeLike): super().__init__(csr.Field(csr.action.R, symbol_shape)) """UART receiver peripheral. @@ -224,8 +233,8 @@ def __init__(self, symbol_shape): phy : :py:`Out(RxPhySignature(phy_config_shape, symbol_shape))` Interface between the peripheral and its PHY. """ - def __init__(self, *, addr_width, data_width, phy_config_shape=unsigned(16), - phy_config_init=0, symbol_shape=unsigned(8)): + def __init__(self, *, addr_width, data_width, phy_config_shape:_T_ShapeLike = unsigned(16), + phy_config_init: _T_ValueOrValueCastable = Value.cast(0), symbol_shape: _T_Symbol_ShapeLike = unsigned(8)): regs = csr.Builder(addr_width=addr_width, data_width=data_width) self._config = regs.add("Config", self.Config()) @@ -298,7 +307,7 @@ def elaborate(self, platform): return m -class TxPeripheral(wiring.Component): +class TxPeripheral(wiring.Component, Generic[_T_ShapeLike, _T_Symbol_ShapeLike, _T_ValueOrValueCastable]): class Config(csr.Register, access="rw"): """Peripheral configuration register. @@ -341,7 +350,7 @@ class PhyConfig(csr.Register, access="rw"): phy_config_init : :class:`int` Initial value of the PHY configuration word. """ - def __init__(self, phy_config_shape, phy_config_init): + def __init__(self, phy_config_shape: _T_ShapeLike, phy_config_init: _T_ValueOrValueCastable): super().__init__(csr.Field(_PhyConfigFieldAction, phy_config_shape, init=phy_config_init)) @@ -391,7 +400,7 @@ class Data(csr.Register, access="w"): symbol_shape : :ref:`shape-like ` Shape of a symbol. """ - def __init__(self, symbol_shape): + def __init__(self, symbol_shape: _T_Symbol_ShapeLike): super().__init__(csr.Field(csr.action.W, symbol_shape)) """UART transmitter peripheral. @@ -416,8 +425,8 @@ def __init__(self, symbol_shape): phy : :py:`Out(TxPhySignature(phy_config_shape, symbol_shape))` Interface between the peripheral and its PHY. """ - def __init__(self, *, addr_width, data_width=8, phy_config_shape=unsigned(16), - phy_config_init=0, symbol_shape=unsigned(8)): + def __init__(self, *, addr_width, data_width=8, phy_config_shape: _T_ShapeLike = unsigned(16), + phy_config_init: _T_ValueOrValueCastable = Value.cast(0), symbol_shape: _T_Symbol_ShapeLike = unsigned(8)): regs = csr.Builder(addr_width=addr_width, data_width=data_width) self._config = regs.add("Config", self.Config()) @@ -487,7 +496,7 @@ def elaborate(self, platform): return m -class Peripheral(wiring.Component): +class Peripheral(wiring.Component, Generic[_T_ShapeLike, _T_Symbol_ShapeLike, _T_ValueOrValueCastable]): """UART transceiver peripheral. This peripheral is composed of two subordinate peripherals. A :class:`RxPeripheral` occupies @@ -522,8 +531,8 @@ class Peripheral(wiring.Component): :exc:`TypeError` If ``addr_width`` is not a positive integer. """ - def __init__(self, *, addr_width, data_width=8, phy_config_shape=unsigned(16), - phy_config_init=0, symbol_shape=unsigned(8)): + def __init__(self, *, addr_width, data_width=8, phy_config_shape: _T_ShapeLike = unsigned(16), + phy_config_init: _T_ValueOrValueCastable = Value.cast(0), symbol_shape: _T_Symbol_ShapeLike = unsigned(8)): if not isinstance(addr_width, int) or addr_width <= 0: raise TypeError(f"Address width must be a positive integer, not {addr_width!r}") diff --git a/chipflow_digital_ip/io/_spi.py b/chipflow_digital_ip/io/_spi.py index 18f51bd..fec8b26 100644 --- a/chipflow_digital_ip/io/_spi.py +++ b/chipflow_digital_ip/io/_spi.py @@ -1,4 +1,4 @@ -from amaranth import Module, Signal, Cat, C, unsigned +from amaranth import * from amaranth.lib import wiring from amaranth.lib.wiring import In, Out, connect, flipped diff --git a/chipflow_digital_ip/io/_verilog_wrapper.py b/chipflow_digital_ip/io/_verilog_wrapper.py new file mode 100644 index 0000000..1c94e3c --- /dev/null +++ b/chipflow_digital_ip/io/_verilog_wrapper.py @@ -0,0 +1,1100 @@ +"""Verilog wrapper for external Verilog/SystemVerilog/SpinalHDL modules. + +This module provides a TOML-based configuration system for wrapping external Verilog +modules as Amaranth wiring.Component classes. It supports: + +- Automatic Signature generation from TOML port definitions +- SpinalHDL code generation +- SystemVerilog to Verilog conversion via sv2v or yosys-slang +- Clock and reset signal mapping +- Port and pin interface mapping to Verilog signals +""" + +import os +import re +import shutil +import subprocess +from enum import StrEnum, auto +from importlib import import_module +from pathlib import Path +from typing import Any, Dict, List, Literal, Optional, Self + +import tomli +from pydantic import BaseModel, JsonValue, ValidationError, model_validator + +from amaranth import ClockSignal, Instance, Module, ResetSignal +from amaranth.lib import wiring +from amaranth.lib.wiring import In, Out + +from chipflow import ChipFlowError + + +__all__ = [ + "VerilogWrapper", + "load_wrapper_from_toml", + "_generate_auto_map", + "_infer_auto_map", + "_parse_verilog_ports", + "_INTERFACE_PATTERNS", + "_INTERFACE_REGISTRY", # Backwards compat alias +] + + +class Files(BaseModel): + """Specifies the source location for Verilog files.""" + + module: Optional[str] = None + path: Optional[Path] = None + + @model_validator(mode="after") + def verify_module_or_path(self) -> Self: + if (self.module and self.path) or (not self.module and not self.path): + raise ValueError("You must set exactly one of `module` or `path`.") + return self + + def get_source_path(self) -> Path: + """Get the resolved source path.""" + if self.path: + return self.path + if self.module: + try: + mod = import_module(self.module) + if hasattr(mod, "data_location"): + return Path(mod.data_location) + elif hasattr(mod, "__path__"): + return Path(mod.__path__[0]) + else: + return Path(mod.__file__).parent + except ImportError as e: + raise ChipFlowError(f"Could not import module '{self.module}': {e}") + raise ChipFlowError("No source path available") + + +class GenerateSpinalHDL(BaseModel): + """Configuration for SpinalHDL code generation.""" + + scala_class: str + options: List[str] = [] + + def generate( + self, source_path: Path, dest_path: Path, name: str, parameters: Dict[str, JsonValue] + ) -> List[str]: + """Generate Verilog from SpinalHDL. + + Args: + source_path: Path to SpinalHDL project + dest_path: Output directory for generated Verilog + name: Output file name (without extension) + parameters: Template parameters for options + + Returns: + List of generated Verilog file names + """ + gen_args = [o.format(**parameters) for o in self.options] + path = source_path / "ext" / "SpinalHDL" + args = " ".join( + gen_args + [f"--netlist-directory={dest_path.absolute()}", f"--netlist-name={name}"] + ) + cmd = ( + f'cd {path} && sbt -J--enable-native-access=ALL-UNNAMED -v ' + f'"lib/runMain {self.scala_class} {args}"' + ) + os.environ["GRADLE_OPTS"] = "--enable-native-access=ALL-UNNAMED" + + if os.system(cmd) != 0: + raise ChipFlowError(f"Failed to run SpinalHDL generation: {cmd}") + + return [f"{name}.v"] + + +class GenerateSV2V(BaseModel): + """Configuration for SystemVerilog to Verilog conversion using sv2v.""" + + include_dirs: List[str] = [] + defines: Dict[str, str] = {} + top_module: Optional[str] = None + + def generate( + self, source_path: Path, dest_path: Path, name: str, parameters: Dict[str, JsonValue] + ) -> List[Path]: + """Convert SystemVerilog files to Verilog using sv2v. + + Args: + source_path: Path containing SystemVerilog files + dest_path: Output directory for converted Verilog + name: Output file name (without extension) + parameters: Template parameters (unused for sv2v) + + Returns: + List of generated Verilog file paths + """ + # Check if sv2v is available + if shutil.which("sv2v") is None: + raise ChipFlowError( + "sv2v is not installed or not in PATH. " + "Install from: https://github.com/zachjs/sv2v" + ) + + # Collect all SystemVerilog files + sv_files = list(source_path.glob("**/*.sv")) + if not sv_files: + raise ChipFlowError(f"No SystemVerilog files found in {source_path}") + + # Build sv2v command + cmd = ["sv2v"] + + # Add include directories + for inc_dir in self.include_dirs: + inc_path = source_path / inc_dir + if inc_path.exists(): + cmd.extend(["-I", str(inc_path)]) + + # Add defines + for define_name, define_value in self.defines.items(): + if define_value: + cmd.append(f"-D{define_name}={define_value}") + else: + cmd.append(f"-D{define_name}") + + # Add top module if specified + if self.top_module: + cmd.extend(["--top", self.top_module]) + + # Add all SV files + cmd.extend(str(f) for f in sv_files) + + # Output file + output_file = dest_path / f"{name}.v" + cmd.extend(["-w", str(output_file)]) + + # Run sv2v + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=True, + ) + except subprocess.CalledProcessError as e: + raise ChipFlowError( + f"sv2v conversion failed:\nCommand: {' '.join(cmd)}\n" + f"Stderr: {e.stderr}\nStdout: {e.stdout}" + ) + + if not output_file.exists(): + raise ChipFlowError(f"sv2v did not produce output file: {output_file}") + + return [output_file] + + +class GenerateYosysSlang(BaseModel): + """Configuration for SystemVerilog to Verilog conversion using yosys-slang. + + This uses the yosys-slang plugin (https://github.com/povik/yosys-slang) to read + SystemVerilog directly into Yosys, then outputs Verilog. + + For yowasp-yosys, slang is built-in (statically linked), so no plugin loading + is needed. For native yosys, the slang plugin must be loaded with -m slang. + """ + + include_dirs: List[str] = [] + defines: Dict[str, str] = {} + top_module: Optional[str] = None + yosys_command: str = "yosys" # Can be overridden + + def generate( + self, source_path: Path, dest_path: Path, name: str, parameters: Dict[str, JsonValue] + ) -> List[Path]: + """Convert SystemVerilog files to Verilog using yosys-slang. + + Args: + source_path: Path containing SystemVerilog files + dest_path: Output directory for converted Verilog + name: Output file name (without extension) + parameters: Template parameters (unused) + + Returns: + List of generated Verilog file paths + """ + # Find yosys and determine if slang is built-in + yosys_cmd, slang_builtin = self._find_yosys() + + # Collect all SystemVerilog files + sv_files = list(source_path.glob("**/*.sv")) + if not sv_files: + raise ChipFlowError(f"No SystemVerilog files found in {source_path}") + + # Build yosys script + output_file = dest_path / f"{name}.v" + + # Build read_slang arguments + read_slang_args = [] + if self.top_module: + read_slang_args.append(f"--top {self.top_module}") + for inc_dir in self.include_dirs: + inc_path = source_path / inc_dir + if inc_path.exists(): + read_slang_args.append(f"-I{inc_path}") + for define_name, define_value in self.defines.items(): + if define_value: + read_slang_args.append(f"-D{define_name}={define_value}") + else: + read_slang_args.append(f"-D{define_name}") + + # Add source files + read_slang_args.extend(str(f) for f in sv_files) + + yosys_script = f""" +read_slang {' '.join(read_slang_args)} +hierarchy -check {f'-top {self.top_module}' if self.top_module else ''} +proc +write_verilog -noattr {output_file} +""" + + # Build command - yowasp-yosys has slang built-in, native yosys needs plugin + if slang_builtin: + cmd = [yosys_cmd, "-p", yosys_script] + else: + cmd = [yosys_cmd, "-m", "slang", "-p", yosys_script] + + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + check=True, + ) + except subprocess.CalledProcessError as e: + raise ChipFlowError( + f"yosys-slang conversion failed:\nCommand: {' '.join(cmd)}\n" + f"Stderr: {e.stderr}\nStdout: {e.stdout}" + ) + except FileNotFoundError: + raise ChipFlowError( + f"yosys not found. Install yowasp-yosys (pip install yowasp-yosys) " + f"or native yosys with slang plugin. Tried: {yosys_cmd}" + ) + + if not output_file.exists(): + raise ChipFlowError(f"yosys-slang did not produce output file: {output_file}") + + return [output_file] + + def _find_yosys(self) -> tuple[str, bool]: + """Find yosys executable and determine if slang is built-in. + + Returns: + Tuple of (command, slang_builtin) where slang_builtin is True for + yowasp-yosys (slang statically linked) and False for native yosys + (slang loaded as plugin). + """ + # Check if custom command is set + if self.yosys_command != "yosys": + # Assume custom command needs plugin unless it's yowasp-yosys + is_yowasp = "yowasp" in self.yosys_command.lower() + return (self.yosys_command, is_yowasp) + + # Try yowasp-yosys first (Python package) - slang is built-in + try: + import yowasp_yosys # noqa: F401 + return ("yowasp-yosys", True) + except ImportError: + pass + + # Try native yosys - slang must be loaded as plugin + if shutil.which("yosys"): + return ("yosys", False) + + raise ChipFlowError( + "Neither yowasp-yosys nor native yosys found. " + "Install yowasp-yosys: pip install yowasp-yosys, " + "or install native yosys with slang plugin." + ) + + +class Generators(StrEnum): + """Supported code generators.""" + + SPINALHDL = auto() + VERILOG = auto() + SYSTEMVERILOG = auto() + YOSYS_SLANG = auto() + + +class Generate(BaseModel): + """Code generation configuration.""" + + parameters: Optional[Dict[str, JsonValue]] = None + generator: Generators + spinalhdl: Optional[GenerateSpinalHDL] = None + sv2v: Optional[GenerateSV2V] = None + yosys_slang: Optional[GenerateYosysSlang] = None + + +class Port(BaseModel): + """Port interface mapping configuration.""" + + interface: str # Interface type (e.g., 'amaranth_soc.wishbone.Signature') + params: Optional[Dict[str, JsonValue]] = None + vars: Optional[Dict[str, Literal["int"]]] = None + map: Optional[str | Dict[str, Dict[str, str] | str]] = None # Auto-generated if not provided + prefix: Optional[str] = None # Prefix for auto-generated signal names + direction: Optional[Literal["in", "out"]] = None # Explicit direction override + + +class DriverConfig(BaseModel): + """Software driver configuration for SoftwareDriverSignature.""" + + regs_struct: Optional[str] = None + c_files: List[str] = [] + h_files: List[str] = [] + + +class ExternalWrapConfig(BaseModel): + """Complete configuration for wrapping an external Verilog module.""" + + name: str + files: Files + generate: Optional[Generate] = None + clocks: Dict[str, str] = {} + resets: Dict[str, str] = {} + ports: Dict[str, Port] = {} + pins: Dict[str, Port] = {} + driver: Optional[DriverConfig] = None + + +def _resolve_interface_type(interface_str: str) -> type | tuple: + """Resolve an interface type string to an actual class. + + Args: + interface_str: Dotted path to interface class (e.g., 'amaranth_soc.wishbone.Interface') + + Returns: + The resolved interface class, or a tuple of (direction, width) for simple signals + """ + # Handle simple Out/In expressions like "amaranth.lib.wiring.Out(1)" + out_match = re.match(r"amaranth\.lib\.wiring\.(Out|In)\((\d+)\)", interface_str) + if out_match: + direction, width = out_match.groups() + return (direction, int(width)) + + # Import the module and get the class + parts = interface_str.rsplit(".", 1) + if len(parts) == 2: + module_path, class_name = parts + try: + mod = import_module(module_path) + return getattr(mod, class_name) + except (ImportError, AttributeError) as e: + raise ChipFlowError(f"Could not resolve interface '{interface_str}': {e}") + + raise ChipFlowError(f"Invalid interface specification: '{interface_str}'") + + +def _parse_signal_direction(signal_name: str) -> str: + """Determine signal direction from Verilog naming convention. + + Args: + signal_name: Verilog signal name (e.g., 'i_clk', 'o_data') + + Returns: + 'i' for input, 'o' for output + """ + if signal_name.startswith("i_"): + return "i" + elif signal_name.startswith("o_"): + return "o" + else: + # Default to input for unknown + return "i" + + +def _flatten_port_map( + port_map: str | Dict[str, Dict[str, str] | str], +) -> Dict[str, str]: + """Flatten a nested port map into a flat dictionary. + + Args: + port_map: Port mapping (simple string or nested dict) + + Returns: + Flat dictionary mapping Amaranth signal paths to Verilog signal names + """ + if isinstance(port_map, str): + return {"": port_map} + + result = {} + for key, value in port_map.items(): + if isinstance(value, str): + result[key] = value + elif isinstance(value, dict): + for subkey, subvalue in value.items(): + result[f"{key}.{subkey}"] = subvalue + + return result + + +def _get_nested_attr(obj: Any, path: str) -> Any: + """Get a nested attribute using dot notation.""" + if not path: + return obj + for part in path.split("."): + obj = getattr(obj, part) + return obj + + +# ============================================================================= +# Interface Auto-Mapping from Verilog Signal Names +# ============================================================================= +# Auto-mapping works by parsing the Verilog module to find its actual port names, +# then matching patterns to identify which signals correspond to interface members. +# This adapts to whatever naming convention the Verilog code uses. + +# Pattern definitions for each interface type. +# Each pattern is a tuple of (regex_pattern, interface_member_path, expected_direction) +# The regex should match common naming conventions for that signal. + +_WISHBONE_PATTERNS: List[tuple[str, str, str]] = [ + # Core Wishbone signals - match various naming styles + (r"(?:^|_)(cyc)(?:_|$)", "cyc", "i"), # wb_cyc, cyc_i, i_wb_cyc + (r"(?:^|_)(stb)(?:_|$)", "stb", "i"), # wb_stb, stb_i, i_wb_stb + (r"(?:^|_)(we)(?:_|$)", "we", "i"), # wb_we, we_i, i_wb_we + (r"(?:^|_)(sel)(?:_|$)", "sel", "i"), # wb_sel, sel_i, i_wb_sel + (r"(?:^|_)(adr|addr)(?:_|$)", "adr", "i"), # wb_adr, addr_i, i_wb_adr + (r"(?:^|_)(ack)(?:_|$)", "ack", "o"), # wb_ack, ack_o, o_wb_ack + # Data signals - need to distinguish read vs write + (r"(?:^|_)dat(?:a)?_?w(?:r(?:ite)?)?(?:_|$)", "dat_w", "i"), # dat_w, data_wr, wdata + (r"(?:^|_)w(?:r(?:ite)?)?_?dat(?:a)?(?:_|$)", "dat_w", "i"), # wdat, write_data + (r"(?:^|_)dat(?:a)?_?r(?:d|ead)?(?:_|$)", "dat_r", "o"), # dat_r, data_rd, rdata + (r"(?:^|_)r(?:d|ead)?_?dat(?:a)?(?:_|$)", "dat_r", "o"), # rdat, read_data + # Fallback for generic dat - use direction to disambiguate + (r"(?:^|_)(dat|data)(?:_|$)", "dat_w", "i"), # Input data = write + (r"(?:^|_)(dat|data)(?:_|$)", "dat_r", "o"), # Output data = read + # Optional Wishbone signals + (r"(?:^|_)(err)(?:_|$)", "err", "o"), + (r"(?:^|_)(rty)(?:_|$)", "rty", "o"), + (r"(?:^|_)(stall)(?:_|$)", "stall", "o"), + (r"(?:^|_)(lock)(?:_|$)", "lock", "i"), + (r"(?:^|_)(cti)(?:_|$)", "cti", "i"), + (r"(?:^|_)(bte)(?:_|$)", "bte", "i"), +] + +_CSR_PATTERNS: List[tuple[str, str, str]] = [ + (r"(?:^|_)(addr|adr)(?:_|$)", "addr", "i"), + (r"(?:^|_)r(?:ead)?_?data(?:_|$)", "r_data", "o"), + (r"(?:^|_)r(?:ead)?_?stb(?:_|$)", "r_stb", "i"), + (r"(?:^|_)w(?:rite)?_?data(?:_|$)", "w_data", "i"), + (r"(?:^|_)w(?:rite)?_?stb(?:_|$)", "w_stb", "i"), +] + +_UART_PATTERNS: List[tuple[str, str, str]] = [ + (r"(?:^|_)(tx|txd)(?:_|$)", "tx.o", "o"), + (r"(?:^|_)(rx|rxd)(?:_|$)", "rx.i", "i"), +] + +_I2C_PATTERNS: List[tuple[str, str, str]] = [ + (r"(?:^|_)sda(?:_i|_in)?(?:_|$)", "sda.i", "i"), + (r"(?:^|_)sda(?:_o|_out|_oe)(?:_|$)", "sda.oe", "o"), + (r"(?:^|_)scl(?:_i|_in)?(?:_|$)", "scl.i", "i"), + (r"(?:^|_)scl(?:_o|_out|_oe)(?:_|$)", "scl.oe", "o"), +] + +_SPI_PATTERNS: List[tuple[str, str, str]] = [ + (r"(?:^|_)(sck|sclk|clk)(?:_|$)", "sck.o", "o"), + (r"(?:^|_)(mosi|copi|sdo)(?:_|$)", "copi.o", "o"), + (r"(?:^|_)(miso|cipo|sdi)(?:_|$)", "cipo.i", "i"), + (r"(?:^|_)(cs|csn|ss|ssn)(?:_|$)", "csn.o", "o"), +] + +_GPIO_PATTERNS: List[tuple[str, str, str]] = [ + (r"(?:^|_)gpio(?:_i|_in)(?:_|$)", "gpio.i", "i"), + (r"(?:^|_)gpio(?:_o|_out)(?:_|$)", "gpio.o", "o"), + (r"(?:^|_)gpio(?:_oe|_en)(?:_|$)", "gpio.oe", "o"), +] + +# Registry mapping interface types to their pattern lists +_INTERFACE_PATTERNS: Dict[str, List[tuple[str, str, str]]] = { + "amaranth_soc.wishbone.Signature": _WISHBONE_PATTERNS, + "amaranth_soc.csr.Signature": _CSR_PATTERNS, + "chipflow.platform.GPIOSignature": _GPIO_PATTERNS, + "chipflow.platform.UARTSignature": _UART_PATTERNS, + "chipflow.platform.I2CSignature": _I2C_PATTERNS, + "chipflow.platform.SPISignature": _SPI_PATTERNS, +} + +# For backwards compatibility +_INTERFACE_REGISTRY = _INTERFACE_PATTERNS + + +def _parse_verilog_ports(verilog_content: str, module_name: str) -> Dict[str, str]: + """Parse Verilog/SystemVerilog to extract module port names and directions. + + Args: + verilog_content: The Verilog source code + module_name: Name of the module to parse + + Returns: + Dictionary mapping port names to directions ('input', 'output', 'inout') + """ + ports: Dict[str, str] = {} + + # Find the module definition + # Match both Verilog and SystemVerilog module syntax + module_pattern = rf"module\s+{re.escape(module_name)}\s*(?:#\s*\([^)]*\))?\s*\(([^;]*)\)\s*;" + module_match = re.search(module_pattern, verilog_content, re.DOTALL | re.IGNORECASE) + + if not module_match: + # Try ANSI-style port declarations + ansi_pattern = rf"module\s+{re.escape(module_name)}\s*(?:#\s*\([^)]*\))?\s*\(" + ansi_match = re.search(ansi_pattern, verilog_content, re.IGNORECASE) + if ansi_match: + # Find matching parenthesis + start = ansi_match.end() + depth = 1 + end = start + while depth > 0 and end < len(verilog_content): + if verilog_content[end] == "(": + depth += 1 + elif verilog_content[end] == ")": + depth -= 1 + end += 1 + port_section = verilog_content[start : end - 1] + else: + return ports + else: + port_section = module_match.group(1) + + # Parse ANSI-style port declarations (input/output in port list) + # Matches: input logic [31:0] signal_name + ansi_port_pattern = r"(input|output|inout)\s+(?:logic|wire|reg)?\s*(?:\[[^\]]*\])?\s*(\w+)" + for match in re.finditer(ansi_port_pattern, port_section, re.IGNORECASE): + direction, name = match.groups() + ports[name] = direction.lower() + + # Also look for non-ANSI declarations after the module header + # Find the module body + module_body_start = verilog_content.find(";", module_match.end() if module_match else 0) + if module_body_start != -1: + # Look for standalone input/output declarations + body_pattern = r"^\s*(input|output|inout)\s+(?:logic|wire|reg)?\s*(?:\[[^\]]*\])?\s*(\w+)" + for match in re.finditer( + body_pattern, verilog_content[module_body_start:], re.MULTILINE | re.IGNORECASE + ): + direction, name = match.groups() + if name not in ports: + ports[name] = direction.lower() + + return ports + + +def _infer_signal_direction(signal_name: str) -> str: + """Infer signal direction from common naming conventions. + + Args: + signal_name: Verilog signal name + + Returns: + 'i' for input, 'o' for output, 'io' for unknown/bidirectional + """ + name_lower = signal_name.lower() + + # Check prefixes + if name_lower.startswith("i_") or name_lower.startswith("in_"): + return "i" + if name_lower.startswith("o_") or name_lower.startswith("out_"): + return "o" + + # Check suffixes + if name_lower.endswith("_i") or name_lower.endswith("_in"): + return "i" + if name_lower.endswith("_o") or name_lower.endswith("_out"): + return "o" + if name_lower.endswith("_oe") or name_lower.endswith("_en"): + return "o" + + return "io" # Unknown + + +def _infer_auto_map( + interface_str: str, + verilog_ports: Dict[str, str], + port_direction: str = "in", +) -> Dict[str, str]: + """Infer port mapping by matching Verilog signals to interface patterns. + + Args: + interface_str: Interface type string (e.g., 'amaranth_soc.wishbone.Signature') + verilog_ports: Dictionary of Verilog port names to their directions + port_direction: Direction of the port ('in' or 'out') + + Returns: + Dictionary mapping interface signal paths to matched Verilog signal names + + Raises: + ChipFlowError: If interface is not in the registry or required signals not found + """ + # Handle simple Out/In expressions + out_match = re.match(r"amaranth\.lib\.wiring\.(Out|In)\((\d+)\)", interface_str) + if out_match: + # For simple signals, we can't auto-infer - need explicit mapping + raise ChipFlowError( + f"Cannot auto-infer mapping for simple signal '{interface_str}'. " + "Please provide an explicit 'map' in the TOML configuration." + ) + + if interface_str not in _INTERFACE_PATTERNS: + raise ChipFlowError( + f"No auto-mapping patterns available for interface '{interface_str}'. " + f"Please provide an explicit 'map' in the TOML configuration. " + f"Known interfaces: {', '.join(_INTERFACE_PATTERNS.keys())}" + ) + + patterns = _INTERFACE_PATTERNS[interface_str] + result: Dict[str, str] = {} + used_ports: set[str] = set() + + for pattern, member_path, expected_dir in patterns: + if member_path in result: + continue # Already matched + + for port_name, port_dir in verilog_ports.items(): + if port_name in used_ports: + continue + + # Check if the port name matches the pattern + if not re.search(pattern, port_name, re.IGNORECASE): + continue + + # Infer direction from port name if not explicitly declared + inferred_dir = _infer_signal_direction(port_name) + actual_dir = "i" if port_dir == "input" else ("o" if port_dir == "output" else inferred_dir) + + # For bus interfaces (Wishbone, CSR), direction determines master/slave + # and we flip signal directions accordingly. For pin interfaces (UART, I2C, etc.), + # direction="out" is the normal case and signals shouldn't be flipped. + is_bus_interface = interface_str in ( + "amaranth_soc.wishbone.Signature", + "amaranth_soc.csr.Signature", + ) + if is_bus_interface and port_direction == "out": + check_dir = "o" if expected_dir == "i" else "i" + else: + check_dir = expected_dir + + # Match if directions align (or if we couldn't determine) + if actual_dir == "io" or actual_dir == check_dir: + result[member_path] = port_name + used_ports.add(port_name) + break + + return result + + +def _generate_auto_map( + interface_str: str, prefix: str, port_direction: str = "in" +) -> Dict[str, str]: + """Generate automatic port mapping for a well-known interface using prefix convention. + + This is a fallback when Verilog ports aren't available for inference. + Generates signal names like i_wb_cyc, o_wb_ack based on the prefix. + + Args: + interface_str: Interface type string + prefix: Prefix for signal names (e.g., 'wb') + port_direction: Direction of the port ('in' or 'out') + + Returns: + Dictionary mapping interface signal paths to Verilog signal names + """ + # Handle simple Out/In expressions + out_match = re.match(r"amaranth\.lib\.wiring\.(Out|In)\((\d+)\)", interface_str) + if out_match: + direction, _width = out_match.groups() + if direction == "Out": + return {"": f"o_{prefix}"} + else: + return {"": f"i_{prefix}"} + + if interface_str not in _INTERFACE_PATTERNS: + raise ChipFlowError( + f"No auto-mapping available for interface '{interface_str}'. " + f"Please provide an explicit 'map' in the TOML configuration." + ) + + # Build map from patterns - use the matched group as suffix + patterns = _INTERFACE_PATTERNS[interface_str] + result: Dict[str, str] = {} + seen_members: set[str] = set() + + for pattern, member_path, expected_dir in patterns: + if member_path in seen_members: + continue + seen_members.add(member_path) + + # Determine actual direction + if port_direction == "out": + actual_dir = "o" if expected_dir == "i" else "i" + else: + actual_dir = expected_dir + + # Extract a reasonable suffix from the member path + suffix = member_path.replace(".", "_") + + result[member_path] = f"{actual_dir}_{prefix}_{suffix}" + + return result + + +class VerilogWrapper(wiring.Component): + """Dynamic Amaranth Component that wraps an external Verilog module. + + This component is generated from TOML configuration and creates the appropriate + Signature and elaborate() implementation to instantiate the Verilog module. + + When a driver configuration is provided, the component uses SoftwareDriverSignature + to enable automatic driver generation and register struct creation. + + Auto-mapping works by parsing the Verilog files to find actual port names, + then matching patterns to identify which signals correspond to interface members. + """ + + def __init__(self, config: ExternalWrapConfig, verilog_files: List[Path] | None = None): + """Initialize the Verilog wrapper. + + Args: + config: Parsed TOML configuration + verilog_files: List of Verilog file paths to include + """ + self._config = config + self._verilog_files = verilog_files or [] + self._port_mappings: Dict[str, Dict[str, str]] = {} + + # Parse Verilog to get port information for auto-mapping + verilog_ports = self._parse_verilog_ports() + + # Build signature from ports and pins + signature_members = {} + + # Process ports (bus interfaces like Wishbone) - typically direction="in" + for port_name, port_config in config.ports.items(): + default_dir = "in" + sig_member = self._create_signature_member(port_config, config, default_direction=default_dir) + signature_members[port_name] = sig_member + self._port_mappings[port_name] = self._get_port_mapping( + port_name, port_config, port_config.direction or default_dir, verilog_ports + ) + + # Process pins (I/O interfaces to pads) - typically direction="out" + for pin_name, pin_config in config.pins.items(): + default_dir = "out" + sig_member = self._create_signature_member(pin_config, config, default_direction=default_dir) + signature_members[pin_name] = sig_member + self._port_mappings[pin_name] = self._get_port_mapping( + pin_name, pin_config, pin_config.direction or default_dir, verilog_ports + ) + + # Use SoftwareDriverSignature if driver config is provided + if config.driver: + try: + from chipflow.platform import SoftwareDriverSignature + + super().__init__( + SoftwareDriverSignature( + members=signature_members, + component=self, + regs_struct=config.driver.regs_struct, + c_files=config.driver.c_files, + h_files=config.driver.h_files, + ) + ) + except ImportError: + # Fallback if chipflow.platform not available + super().__init__(signature_members) + else: + super().__init__(signature_members) + + def _parse_verilog_ports(self) -> Dict[str, str]: + """Parse all Verilog files to extract port information. + + Returns: + Dictionary mapping port names to their directions + """ + all_ports: Dict[str, str] = {} + + for verilog_file in self._verilog_files: + if verilog_file.exists(): + try: + content = verilog_file.read_text() + ports = _parse_verilog_ports(content, self._config.name) + all_ports.update(ports) + except Exception: + # If parsing fails, continue without those ports + pass + + return all_ports + + def _get_port_mapping( + self, port_name: str, port_config: Port, direction: str, verilog_ports: Dict[str, str] + ) -> Dict[str, str]: + """Get port mapping, auto-inferring from Verilog if not explicitly provided. + + Args: + port_name: Name of the port + port_config: Port configuration from TOML + direction: Direction of the port ('in' or 'out') + verilog_ports: Dictionary of Verilog port names to directions + + Returns: + Flattened port mapping dictionary + """ + if port_config.map is not None: + # Explicit mapping provided + return _flatten_port_map(port_config.map) + + # Try to infer mapping from Verilog ports + if verilog_ports: + try: + return _infer_auto_map(port_config.interface, verilog_ports, direction) + except ChipFlowError: + pass # Fall through to prefix-based generation + + # Fallback: generate mapping using prefix convention + prefix = port_config.prefix + if prefix is None: + # Infer prefix from port name + name = port_name.lower() + for suffix in ("_pins", "_bus", "_port", "_interface"): + if name.endswith(suffix): + name = name[: -len(suffix)] + break + if name in ("bus", "port"): + if "wishbone" in port_config.interface.lower(): + prefix = "wb" + elif "csr" in port_config.interface.lower(): + prefix = "csr" + else: + prefix = name + else: + prefix = name + + return _generate_auto_map(port_config.interface, prefix, direction) + + def _create_signature_member( + self, port_config: Port, config: ExternalWrapConfig, default_direction: str = "in" + ): + """Create a signature member from port configuration. + + Args: + port_config: Port configuration from TOML + config: Full wrapper configuration + default_direction: Default direction if not specified ('in' or 'out') + + Returns: + In or Out wrapped signature member + """ + interface_info = _resolve_interface_type(port_config.interface) + + if isinstance(interface_info, tuple): + # Simple Out/In(width) - direction already specified in interface string + direction, width = interface_info + if direction == "Out": + return Out(width) + else: + return In(width) + + # Complex interface class - instantiate with params + params = port_config.params or {} + # Resolve parameter references from generate.parameters + resolved_params = {} + for k, v in params.items(): + if isinstance(v, str) and v.startswith("{") and v.endswith("}"): + param_name = v[1:-1] + if config.generate and config.generate.parameters: + resolved_params[k] = config.generate.parameters.get(param_name, v) + else: + resolved_params[k] = v + else: + resolved_params[k] = v + + try: + # Try to instantiate the interface/signature + if hasattr(interface_info, "Signature"): + sig = interface_info.Signature(**resolved_params) + else: + sig = interface_info(**resolved_params) + + # Determine direction: + # 1. Explicit direction in TOML takes precedence + # 2. Otherwise use default_direction (ports="in", pins="out") + if port_config.direction: + direction = port_config.direction + else: + direction = default_direction + + if direction == "in": + return In(sig) + else: + return Out(sig) + except Exception as e: + raise ChipFlowError( + f"Could not create interface '{port_config.interface}' " + f"with params {resolved_params}: {e}" + ) + + def elaborate(self, platform): + """Generate the Amaranth module with Verilog instance. + + Creates an Instance() of the wrapped Verilog module with all + port mappings configured from the TOML specification. + """ + m = Module() + + # Build Instance port arguments + instance_ports = {} + + # Add clock signals + for clock_name, verilog_signal in self._config.clocks.items(): + if clock_name == "sys": + instance_ports[f"i_{verilog_signal}"] = ClockSignal() + else: + instance_ports[f"i_{verilog_signal}"] = ClockSignal(clock_name) + + # Add reset signals (active-low is common convention) + for reset_name, verilog_signal in self._config.resets.items(): + if reset_name == "sys": + instance_ports[f"i_{verilog_signal}"] = ~ResetSignal() + else: + instance_ports[f"i_{verilog_signal}"] = ~ResetSignal(reset_name) + + # Add port mappings + for port_name, port_map in self._port_mappings.items(): + amaranth_port = getattr(self, port_name) + + for signal_path, verilog_signal in port_map.items(): + # Handle variable substitution in signal names (e.g., {n} for arrays) + if "{" in verilog_signal: + # For now, expand with index 0. Future: support multiple instances + verilog_signal = verilog_signal.format(n=0) + + # Navigate to the signal in the Amaranth interface + amaranth_signal = _get_nested_attr(amaranth_port, signal_path) + + # The Verilog signal name already includes i_/o_ prefix + # Use it directly for the Instance parameter + instance_ports[verilog_signal] = amaranth_signal + + # Create the Verilog instance + m.submodules.wrapped = Instance(self._config.name, **instance_ports) + + # Add Verilog files to the platform + if platform is not None: + for verilog_file in self._verilog_files: + if verilog_file.exists(): + with open(verilog_file, "r") as f: + platform.add_file(verilog_file.name, f.read()) + + return m + + +def load_wrapper_from_toml( + toml_path: Path | str, generate_dest: Path | None = None +) -> VerilogWrapper: + """Load a VerilogWrapper from a TOML configuration file. + + Args: + toml_path: Path to the TOML configuration file + generate_dest: Destination directory for generated Verilog (if using SpinalHDL) + + Returns: + Configured VerilogWrapper component + + Raises: + ChipFlowError: If configuration is invalid or generation fails + """ + toml_path = Path(toml_path) + + with open(toml_path, "rb") as f: + raw_config = tomli.load(f) + + try: + config = ExternalWrapConfig.model_validate(raw_config) + except ValidationError as e: + error_messages = [] + for error in e.errors(): + location = ".".join(str(loc) for loc in error["loc"]) + message = error["msg"] + error_messages.append(f"Error at '{location}': {message}") + error_str = "\n".join(error_messages) + raise ChipFlowError(f"Validation error in {toml_path}:\n{error_str}") + + verilog_files = [] + + # Get source path, resolving relative paths against the TOML file's directory + source_path = config.files.get_source_path() + if not source_path.is_absolute(): + source_path = (toml_path.parent / source_path).resolve() + + # Handle code generation if configured + if config.generate: + if generate_dest is None: + generate_dest = Path("./build/verilog") + generate_dest.mkdir(parents=True, exist_ok=True) + + parameters = config.generate.parameters or {} + + if config.generate.generator == Generators.SPINALHDL: + if config.generate.spinalhdl is None: + raise ChipFlowError( + "SpinalHDL generator selected but no spinalhdl config provided" + ) + + generated = config.generate.spinalhdl.generate( + source_path, generate_dest, config.name, parameters + ) + verilog_files.extend(generate_dest / f for f in generated) + + elif config.generate.generator == Generators.SYSTEMVERILOG: + # Convert SystemVerilog to Verilog using sv2v + sv2v_config = config.generate.sv2v or GenerateSV2V() + generated = sv2v_config.generate( + source_path, generate_dest, config.name, parameters + ) + verilog_files.extend(generated) + + elif config.generate.generator == Generators.YOSYS_SLANG: + # Convert SystemVerilog to Verilog using yosys-slang + yosys_slang_config = config.generate.yosys_slang or GenerateYosysSlang() + generated = yosys_slang_config.generate( + source_path, generate_dest, config.name, parameters + ) + verilog_files.extend(generated) + + elif config.generate.generator == Generators.VERILOG: + # Just use existing Verilog files from source + for v_file in source_path.glob("**/*.v"): + verilog_files.append(v_file) + else: + # No generation - look for Verilog and SystemVerilog files in source + for v_file in source_path.glob("**/*.v"): + verilog_files.append(v_file) + for sv_file in source_path.glob("**/*.sv"): + verilog_files.append(sv_file) + + return VerilogWrapper(config, verilog_files) + + +# CLI entry point for testing +if __name__ == "__main__": + import sys + + if len(sys.argv) < 2: + print("Usage: python -m chipflow_digital_ip.io._verilog_wrapper ") + sys.exit(1) + + try: + wrapper = load_wrapper_from_toml(sys.argv[1]) + print(f"Successfully loaded wrapper: {wrapper._config.name}") + print(f"Signature: {wrapper.signature}") + except ChipFlowError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) diff --git a/chipflow_digital_ip/io/sv_timer/drivers/wb_timer.h b/chipflow_digital_ip/io/sv_timer/drivers/wb_timer.h new file mode 100644 index 0000000..a15a1cd --- /dev/null +++ b/chipflow_digital_ip/io/sv_timer/drivers/wb_timer.h @@ -0,0 +1,64 @@ +// SPDX-License-Identifier: BSD-2-Clause +// Wishbone Timer Driver Header + +#ifndef WB_TIMER_H +#define WB_TIMER_H + +#include + +// Register offsets +#define WB_TIMER_CTRL 0x00 +#define WB_TIMER_COMPARE 0x04 +#define WB_TIMER_COUNTER 0x08 +#define WB_TIMER_STATUS 0x0C + +// Control register bits +#define WB_TIMER_CTRL_ENABLE (1 << 0) +#define WB_TIMER_CTRL_IRQ_EN (1 << 1) +#define WB_TIMER_CTRL_PRESCALER_SHIFT 16 + +// Status register bits +#define WB_TIMER_STATUS_IRQ_PENDING (1 << 0) +#define WB_TIMER_STATUS_MATCH (1 << 1) + +// Register structure for SoftwareDriverSignature +typedef struct { + volatile uint32_t ctrl; // Control: [31:16] prescaler, [1] irq_en, [0] enable + volatile uint32_t compare; // Compare value for match interrupt + volatile uint32_t counter; // Current counter (read) / Reload value (write) + volatile uint32_t status; // Status: [1] match, [0] irq_pending (write 1 to clear) +} wb_timer_regs_t; + +static inline void wb_timer_init(wb_timer_regs_t *regs, uint16_t prescaler, uint32_t compare) { + regs->compare = compare; + regs->counter = 0; + regs->ctrl = ((uint32_t)prescaler << WB_TIMER_CTRL_PRESCALER_SHIFT) + | WB_TIMER_CTRL_ENABLE + | WB_TIMER_CTRL_IRQ_EN; +} + +static inline void wb_timer_enable(wb_timer_regs_t *regs) { + regs->ctrl |= WB_TIMER_CTRL_ENABLE; +} + +static inline void wb_timer_disable(wb_timer_regs_t *regs) { + regs->ctrl &= ~WB_TIMER_CTRL_ENABLE; +} + +static inline void wb_timer_set_compare(wb_timer_regs_t *regs, uint32_t value) { + regs->compare = value; +} + +static inline uint32_t wb_timer_get_counter(wb_timer_regs_t *regs) { + return regs->counter; +} + +static inline void wb_timer_clear_irq(wb_timer_regs_t *regs) { + regs->status = WB_TIMER_STATUS_IRQ_PENDING | WB_TIMER_STATUS_MATCH; +} + +static inline int wb_timer_irq_pending(wb_timer_regs_t *regs) { + return (regs->status & WB_TIMER_STATUS_IRQ_PENDING) != 0; +} + +#endif // WB_TIMER_H diff --git a/chipflow_digital_ip/io/sv_timer/wb_timer.sv b/chipflow_digital_ip/io/sv_timer/wb_timer.sv new file mode 100644 index 0000000..1b5cfb8 --- /dev/null +++ b/chipflow_digital_ip/io/sv_timer/wb_timer.sv @@ -0,0 +1,181 @@ +// SPDX-License-Identifier: BSD-2-Clause +// Simple Wishbone Timer/Counter in SystemVerilog +// +// A basic 32-bit programmable timer with Wishbone B4 interface. +// Useful for MCU applications requiring periodic interrupts or timing. +// +// Registers: +// 0x00: CTRL - Control register (enable, mode, interrupt enable) +// 0x04: COMPARE - Compare value for timer match +// 0x08: COUNTER - Current counter value (read) / Reload value (write) +// 0x0C: STATUS - Status register (interrupt pending, match flag) + +module wb_timer #( + parameter int WIDTH = 32, + parameter int PRESCALER_WIDTH = 16 +) ( + // Clock and reset + input logic i_clk, + input logic i_rst_n, + + // Wishbone B4 slave interface + input logic i_wb_cyc, + input logic i_wb_stb, + input logic i_wb_we, + input logic [3:0] i_wb_sel, + input logic [3:0] i_wb_adr, // Word address (4 registers) + input logic [31:0] i_wb_dat, + output logic [31:0] o_wb_dat, + output logic o_wb_ack, + + // Interrupt output + output logic o_irq +); + + // Register addresses (word-aligned) + localparam logic [3:0] ADDR_CTRL = 4'h0; + localparam logic [3:0] ADDR_COMPARE = 4'h1; + localparam logic [3:0] ADDR_COUNTER = 4'h2; + localparam logic [3:0] ADDR_STATUS = 4'h3; + + // Control register bits + typedef struct packed { + logic [15:0] prescaler; // [31:16] Prescaler divider value + logic [13:0] reserved; // [15:2] Reserved + logic irq_en; // [1] Interrupt enable + logic enable; // [0] Timer enable + } ctrl_t; + + // Status register bits + typedef struct packed { + logic [29:0] reserved; // [31:2] Reserved + logic match; // [1] Compare match occurred + logic irq_pending; // [0] Interrupt pending + } status_t; + + // Registers + ctrl_t ctrl_reg; + logic [WIDTH-1:0] compare_reg; + logic [WIDTH-1:0] counter_reg; + logic [WIDTH-1:0] reload_reg; + status_t status_reg; + + // Prescaler counter + logic [PRESCALER_WIDTH-1:0] prescaler_cnt; + logic prescaler_tick; + + // Wishbone acknowledge - single cycle response + logic wb_access; + assign wb_access = i_wb_cyc & i_wb_stb; + + always_ff @(posedge i_clk or negedge i_rst_n) begin + if (!i_rst_n) begin + o_wb_ack <= 1'b0; + end else begin + o_wb_ack <= wb_access & ~o_wb_ack; + end + end + + // Prescaler logic + always_ff @(posedge i_clk or negedge i_rst_n) begin + if (!i_rst_n) begin + prescaler_cnt <= '0; + prescaler_tick <= 1'b0; + end else if (ctrl_reg.enable) begin + if (prescaler_cnt >= ctrl_reg.prescaler) begin + prescaler_cnt <= '0; + prescaler_tick <= 1'b1; + end else begin + prescaler_cnt <= prescaler_cnt + 1'b1; + prescaler_tick <= 1'b0; + end + end else begin + prescaler_cnt <= '0; + prescaler_tick <= 1'b0; + end + end + + // Counter logic + logic counter_match; + assign counter_match = (counter_reg == compare_reg); + + always_ff @(posedge i_clk or negedge i_rst_n) begin + if (!i_rst_n) begin + counter_reg <= '0; + end else if (ctrl_reg.enable && prescaler_tick) begin + if (counter_match) begin + counter_reg <= reload_reg; + end else begin + counter_reg <= counter_reg + 1'b1; + end + end + end + + // Status and interrupt logic + always_ff @(posedge i_clk or negedge i_rst_n) begin + if (!i_rst_n) begin + status_reg <= '0; + end else begin + // Set match flag on compare match + if (ctrl_reg.enable && prescaler_tick && counter_match) begin + status_reg.match <= 1'b1; + if (ctrl_reg.irq_en) begin + status_reg.irq_pending <= 1'b1; + end + end + + // Clear flags on status register write + if (wb_access && i_wb_we && (i_wb_adr == ADDR_STATUS)) begin + if (i_wb_sel[0] && i_wb_dat[0]) status_reg.irq_pending <= 1'b0; + if (i_wb_sel[0] && i_wb_dat[1]) status_reg.match <= 1'b0; + end + end + end + + assign o_irq = status_reg.irq_pending; + + // Register write logic + always_ff @(posedge i_clk or negedge i_rst_n) begin + if (!i_rst_n) begin + ctrl_reg <= '0; + compare_reg <= '1; // Default to max value + reload_reg <= '0; + end else if (wb_access && i_wb_we) begin + case (i_wb_adr) + ADDR_CTRL: begin + if (i_wb_sel[0]) ctrl_reg[7:0] <= i_wb_dat[7:0]; + if (i_wb_sel[1]) ctrl_reg[15:8] <= i_wb_dat[15:8]; + if (i_wb_sel[2]) ctrl_reg[23:16] <= i_wb_dat[23:16]; + if (i_wb_sel[3]) ctrl_reg[31:24] <= i_wb_dat[31:24]; + end + ADDR_COMPARE: begin + if (i_wb_sel[0]) compare_reg[7:0] <= i_wb_dat[7:0]; + if (i_wb_sel[1]) compare_reg[15:8] <= i_wb_dat[15:8]; + if (i_wb_sel[2]) compare_reg[23:16] <= i_wb_dat[23:16]; + if (i_wb_sel[3]) compare_reg[31:24] <= i_wb_dat[31:24]; + end + ADDR_COUNTER: begin + // Writing to counter sets the reload value + if (i_wb_sel[0]) reload_reg[7:0] <= i_wb_dat[7:0]; + if (i_wb_sel[1]) reload_reg[15:8] <= i_wb_dat[15:8]; + if (i_wb_sel[2]) reload_reg[23:16] <= i_wb_dat[23:16]; + if (i_wb_sel[3]) reload_reg[31:24] <= i_wb_dat[31:24]; + end + default: ; + endcase + end + end + + // Register read logic + always_comb begin + o_wb_dat = '0; + case (i_wb_adr) + ADDR_CTRL: o_wb_dat = ctrl_reg; + ADDR_COMPARE: o_wb_dat = compare_reg; + ADDR_COUNTER: o_wb_dat = counter_reg; + ADDR_STATUS: o_wb_dat = status_reg; + default: o_wb_dat = '0; + endcase + end + +endmodule diff --git a/chipflow_digital_ip/io/sv_timer/wb_timer.toml b/chipflow_digital_ip/io/sv_timer/wb_timer.toml new file mode 100644 index 0000000..a94df06 --- /dev/null +++ b/chipflow_digital_ip/io/sv_timer/wb_timer.toml @@ -0,0 +1,46 @@ +# Wishbone Timer/Counter Configuration +# A simple 32-bit programmable timer with Wishbone B4 interface + +name = 'wb_timer' + +[files] +path = '.' + +# Generator options: +# 'yosys_slang' - Uses yosys with slang plugin (preferred, works with yowasp-yosys) +# 'systemverilog' - Uses sv2v for conversion (requires sv2v installed) +[generate] +generator = 'yosys_slang' + +[generate.yosys_slang] +top_module = 'wb_timer' + +[clocks] +sys = 'clk' + +[resets] +sys = 'rst_n' + +# Bus interface using auto-inference +# When no explicit 'map' is provided, the wrapper parses the Verilog file +# and matches signal patterns (cyc, stb, ack, etc.) to interface members. +# This works with any naming convention: i_wb_cyc, wb_cyc_i, cyc, etc. +[ports.bus] +interface = 'amaranth_soc.wishbone.Signature' +direction = 'in' + +[ports.bus.params] +addr_width = 4 +data_width = 32 +granularity = 8 + +# Pin interfaces - simple signals need explicit mapping +# (pattern matching can't reliably infer single-bit signals) +[pins.irq] +interface = 'amaranth.lib.wiring.Out(1)' +map = 'o_irq' + +# Software driver configuration for SoftwareDriverSignature +[driver] +regs_struct = 'wb_timer_regs_t' +h_files = ['drivers/wb_timer.h'] diff --git a/chipflow_digital_ip/io/usb_ohci.toml b/chipflow_digital_ip/io/usb_ohci.toml new file mode 100644 index 0000000..65b9ff3 --- /dev/null +++ b/chipflow_digital_ip/io/usb_ohci.toml @@ -0,0 +1,99 @@ +name = 'UsbOhciPeripheral' + +[files] +module = 'pythondata_misc_usb_ohci' + +[generate] +parameters.nusb = 1 +parameters.dma_data_width = 32 +parameters.usb_clk_freq = 48e6 + +generator = 'spinalhdl' + +[generate.spinalhdl] +options = [ '--port-count {nusb}', + '--phy-frequency {usb_clk_freq:.0f}', + '--dma-width {dma_data_width}', + ] +scala_class = 'spinal.lib.com.usb.ohci.UsbOhciWishbone' + +[clocks] +usb = 'i_phy_clk' +sys = 'i_ctrl_clk' + +[resets] +usb = 'i_phy_reset' +sys = 'i_ctrl_reset' + +[ports.wb_ctl] +interface = 'amaranth_soc.wishbone.Interface' + +[ports.wb_ctl.params] +data_width=32 +address_width=32 +addressing='word' + +[ports.wb_ctl.map] +cyc = 'i_io_ctrl_CYC' +stb = 'i_io_ctrl_STB' +ack = 'o_io_ctrl_ACK' +we = 'i_io_ctrl_WE' +adr = 'i_io_ctrl_ADR' +dat.r = 'o_io_ctrl_DAT_MISO' +dat.w = 'i_io_ctrl_DAT_MOSI' +sel = 'i_io_ctrl_SEL' + +[ports.wb_dma] +interface = 'amaranth_soc.wishbone.Interface' + +[ports.wb_dma.params] +data_width=32 +address_width='{dma_data_width}' +addressing='word' + +[ports.wb_dma.map] +cyc = 'o_io_dma_CYC' +stb = 'o_io_dma_STB' +ack = 'i_io_dma_ACK' +we = 'o_io_dma_WE' +adr = 'o_io_dma_ADR' +dat.r = 'i_io_dma_DAT_MISO' +dat.w = 'o_io_dma_DAT_MOSI' +sel = 'o_io_dma_SEL' +err = 'i_io_dma_ERR' +cti = 'o_io_dma_CTI' +bte = 'o_io_dma_BTE' + +[ports.interrupt] +interface = 'amaranth.lib.wiring.Out(1)' +map = 'o_io_interrupt' + +[pins.usb] +interface = 'chipflow_lib.platforms.I2CSignature' + +[pins.usb.vars] +n = 'int' + +[pins.usb.map] +dp.i = 'i_io_usb_{n}_dp_read' +dp.o = 'o_io_usb_{n}_dp_write' +dp.oe = 'o_io_usb_{n}_dp_writeEnable' +dm.i = 'i_io_usb_{n}_dm_read' +dm.o = 'o_io_usb_{n}_dm_write' +dm.oe = 'o_io_usb_{n}_dm_writeEnable' + +[drivers.uboot] +CONFIG_USB_OHCI_NEW = true +CONFIG_SYS_USB_OHCI_REGS_BASE = "{regs_base}" + +[drivers.zephyr] +CONFIG_UHC_NXP_OHCI = true + +[drivers.dtsi] +# reg, interrupts, enabled automatically added +compatible = "nxp,uhc-ohci" +maximum-speed = "full-speed" + +[drivers.raw] +c_files = ['drivers/ohci_generic.c', 'drivers/ohci_hcd.c'] +h_files = ['ohci.h'] diff --git a/pyproject.toml b/pyproject.toml index dbb1b17..689e439 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,16 +10,19 @@ authors = [ readme = {file = "README.md", content-type = "text/markdown"} license-files = [ "LICENSE*", - "vendor/*/*/LICEN?E*", ] -requires-python = ">=3.11,<3.14" +requires-python = ">=3.12,<3.14" dependencies = [ "amaranth>=0.5,<0.6", - "chipflow-lib @ git+https://github.com/ChipFlow/chipflow-lib.git", + "chipflow @ git+https://github.com/ChipFlow/chipflow-lib.git", "amaranth-soc @ git+https://github.com/amaranth-lang/amaranth-soc", "amaranth-stdio @ git+https://github.com/amaranth-lang/amaranth-stdio", "minerva @ git+https://github.com/minerva-cpu/minerva", + "pythondata-misc-usb_ohci @ git+https://github.com/robtaylor/pythondata-misc-usb_ohci@update-spinalhdl", + "pydantic>=2.0", + "tomli>=2.0", + "yowasp-yosys>=0.50", # For SystemVerilog support via built-in yosys-slang ] # Build system configuration @@ -32,6 +35,8 @@ build-backend = "pdm.backend" includes = [ "**/*.py", "**/*.v", + "**/*.sv", + "**/*.toml", "**/*.yaml" ] source-includes = [ @@ -47,11 +52,12 @@ purelib = [ # Development workflow configuration [tool.pyright] -diagnosticMode=false -typeCheckingMode = "off" reportInvalidTypeForm = false reportMissingImports = false reportUnboundVariable = false +reportAttributeAccessIssue = false +reportWildcardImportFromLibrary = false +ignore = [ "tests", "vendor" ] [tool.ruff.lint] select = ['E4', 'E7', 'E9', 'F', 'W291', 'W293'] @@ -75,4 +81,6 @@ dev = [ "pytest>=7.2.0", "pytest-cov>=0.6", "sphinx>=7.0", + "pyright>=1.1.407", + "amaranth-stubs>=0.1.1", ] diff --git a/tests/test_verilog_wrapper.py b/tests/test_verilog_wrapper.py new file mode 100644 index 0000000..bcc5bd7 --- /dev/null +++ b/tests/test_verilog_wrapper.py @@ -0,0 +1,603 @@ +# amaranth: UnusedElaboratable=no + +# SPDX-License-Identifier: BSD-2-Clause + +import tempfile +import unittest +import warnings +from pathlib import Path + +from amaranth import * +from amaranth.hdl import UnusedElaboratable + +from chipflow import ChipFlowError +from chipflow_digital_ip.io._verilog_wrapper import ( + DriverConfig, + ExternalWrapConfig, + Files, + Generate, + GenerateSV2V, + Generators, + Port, + VerilogWrapper, + _flatten_port_map, + _generate_auto_map, + _infer_auto_map, + _infer_signal_direction, + _INTERFACE_PATTERNS, + _INTERFACE_REGISTRY, + _parse_signal_direction, + _parse_verilog_ports, + _resolve_interface_type, + load_wrapper_from_toml, +) + + +class HelperFunctionsTestCase(unittest.TestCase): + def test_parse_signal_direction_input(self): + self.assertEqual(_parse_signal_direction("i_clk"), "i") + self.assertEqual(_parse_signal_direction("i_data_in"), "i") + + def test_parse_signal_direction_output(self): + self.assertEqual(_parse_signal_direction("o_data_out"), "o") + self.assertEqual(_parse_signal_direction("o_valid"), "o") + + def test_parse_signal_direction_default(self): + self.assertEqual(_parse_signal_direction("clk"), "i") + self.assertEqual(_parse_signal_direction("data"), "i") + + def test_flatten_port_map_string(self): + result = _flatten_port_map("i_signal") + self.assertEqual(result, {"": "i_signal"}) + + def test_flatten_port_map_simple_dict(self): + result = _flatten_port_map({"cyc": "i_cyc", "stb": "i_stb"}) + self.assertEqual(result, {"cyc": "i_cyc", "stb": "i_stb"}) + + def test_flatten_port_map_nested_dict(self): + result = _flatten_port_map({ + "dat": {"r": "o_dat_r", "w": "i_dat_w"}, + "cyc": "i_cyc" + }) + self.assertEqual(result, { + "dat.r": "o_dat_r", + "dat.w": "i_dat_w", + "cyc": "i_cyc" + }) + + def test_resolve_interface_type_simple(self): + result = _resolve_interface_type("amaranth.lib.wiring.Out(1)") + self.assertEqual(result, ("Out", 1)) + + result = _resolve_interface_type("amaranth.lib.wiring.In(8)") + self.assertEqual(result, ("In", 8)) + + def test_resolve_interface_type_invalid(self): + with self.assertRaises(ChipFlowError): + _resolve_interface_type("invalid") + + +class FilesConfigTestCase(unittest.TestCase): + def test_files_module_only(self): + files = Files(module="os.path") + self.assertEqual(files.module, "os.path") + self.assertIsNone(files.path) + + def test_files_path_only(self): + files = Files(path=Path("/tmp")) + self.assertIsNone(files.module) + self.assertEqual(files.path, Path("/tmp")) + + def test_files_neither_raises(self): + with self.assertRaises(ValueError): + Files() + + def test_files_both_raises(self): + with self.assertRaises(ValueError): + Files(module="os.path", path=Path("/tmp")) + + +class ExternalWrapConfigTestCase(unittest.TestCase): + def test_minimal_config(self): + config = ExternalWrapConfig( + name="TestModule", + files=Files(path=Path("/tmp")), + ) + self.assertEqual(config.name, "TestModule") + self.assertEqual(config.clocks, {}) + self.assertEqual(config.resets, {}) + self.assertEqual(config.ports, {}) + self.assertEqual(config.pins, {}) + + def test_config_with_ports(self): + config = ExternalWrapConfig( + name="TestModule", + files=Files(path=Path("/tmp")), + ports={ + "interrupt": Port( + interface="amaranth.lib.wiring.Out(1)", + map="o_interrupt" + ) + }, + clocks={"sys": "clk"}, + resets={"sys": "rst_n"}, + ) + self.assertEqual(config.name, "TestModule") + self.assertIn("interrupt", config.ports) + self.assertEqual(config.clocks["sys"], "clk") + self.assertEqual(config.resets["sys"], "rst_n") + + +class VerilogWrapperTestCase(unittest.TestCase): + def setUp(self): + warnings.simplefilter(action="ignore", category=UnusedElaboratable) + + def test_simple_wrapper(self): + config = ExternalWrapConfig( + name="TestModule", + files=Files(path=Path("/tmp")), + ports={ + "interrupt": Port( + interface="amaranth.lib.wiring.Out(1)", + map="o_interrupt" + ) + }, + clocks={"sys": "clk"}, + resets={"sys": "rst_n"}, + ) + + wrapper = VerilogWrapper(config) + self.assertEqual(wrapper._config.name, "TestModule") + # Check that the signature has the interrupt port + self.assertIn("interrupt", wrapper.signature.members) + + def test_elaborate_creates_instance(self): + config = ExternalWrapConfig( + name="TestModule", + files=Files(path=Path("/tmp")), + ports={ + "interrupt": Port( + interface="amaranth.lib.wiring.Out(1)", + map="o_interrupt" + ) + }, + clocks={"sys": "clk"}, + resets={"sys": "rst_n"}, + ) + + wrapper = VerilogWrapper(config) + # Elaborate with no platform (simulation mode) + m = wrapper.elaborate(platform=None) + self.assertIsInstance(m, Module) + # Check that the wrapped submodule exists + self.assertTrue(hasattr(m.submodules, "wrapped")) + + +class LoadWrapperFromTomlTestCase(unittest.TestCase): + def setUp(self): + warnings.simplefilter(action="ignore", category=UnusedElaboratable) + + def test_load_simple_toml(self): + toml_content = """ +name = 'SimpleTest' + +[files] +path = '/tmp' + +[clocks] +sys = 'clk' + +[resets] +sys = 'rst_n' + +[ports.interrupt] +interface = 'amaranth.lib.wiring.Out(1)' +map = 'o_interrupt' +""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) as f: + f.write(toml_content) + toml_path = Path(f.name) + + try: + wrapper = load_wrapper_from_toml(toml_path) + self.assertEqual(wrapper._config.name, "SimpleTest") + self.assertIn("interrupt", wrapper.signature.members) + finally: + toml_path.unlink() + + def test_load_invalid_toml_raises(self): + toml_content = """ +# Missing required 'name' field +[files] +path = '/tmp' +""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) as f: + f.write(toml_content) + toml_path = Path(f.name) + + try: + with self.assertRaises(ChipFlowError): + load_wrapper_from_toml(toml_path) + finally: + toml_path.unlink() + + +class SystemVerilogConfigTestCase(unittest.TestCase): + def test_generators_enum(self): + self.assertEqual(Generators.VERILOG, "verilog") + self.assertEqual(Generators.SYSTEMVERILOG, "systemverilog") + self.assertEqual(Generators.SPINALHDL, "spinalhdl") + + def test_generate_config_systemverilog(self): + gen = Generate( + generator=Generators.SYSTEMVERILOG, + sv2v=GenerateSV2V( + top_module="wb_timer", + include_dirs=["inc"], + defines={"SIMULATION": "1"} + ) + ) + self.assertEqual(gen.generator, Generators.SYSTEMVERILOG) + self.assertIsNotNone(gen.sv2v) + self.assertEqual(gen.sv2v.top_module, "wb_timer") + self.assertIn("inc", gen.sv2v.include_dirs) + + def test_sv2v_config_defaults(self): + sv2v = GenerateSV2V() + self.assertEqual(sv2v.include_dirs, []) + self.assertEqual(sv2v.defines, {}) + self.assertIsNone(sv2v.top_module) + + def test_config_with_systemverilog_generator(self): + config = ExternalWrapConfig( + name="SVModule", + files=Files(path=Path("/tmp")), + generate=Generate( + generator=Generators.SYSTEMVERILOG, + sv2v=GenerateSV2V(top_module="test") + ), + clocks={"sys": "clk"}, + resets={"sys": "rst_n"}, + ) + self.assertEqual(config.name, "SVModule") + self.assertEqual(config.generate.generator, Generators.SYSTEMVERILOG) + + def test_load_systemverilog_toml(self): + toml_content = """ +name = 'SVTest' + +[files] +path = '/tmp' + +[generate] +generator = 'systemverilog' + +[generate.sv2v] +top_module = 'test_module' +include_dirs = ['inc', 'src'] +defines = { DEBUG = '1', FEATURE_A = '' } + +[clocks] +sys = 'clk' + +[resets] +sys = 'rst_n' + +[ports.irq] +interface = 'amaranth.lib.wiring.Out(1)' +map = 'o_irq' +""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) as f: + f.write(toml_content) + toml_path = Path(f.name) + + try: + # This will fail at sv2v stage since no .sv files exist, but config parsing should work + # So we test the config parsing separately + import tomli + with open(toml_path, 'rb') as f: + raw = tomli.load(f) + config = ExternalWrapConfig.model_validate(raw) + self.assertEqual(config.name, "SVTest") + self.assertEqual(config.generate.generator, Generators.SYSTEMVERILOG) + self.assertEqual(config.generate.sv2v.top_module, "test_module") + finally: + toml_path.unlink() + + +class DriverConfigTestCase(unittest.TestCase): + def test_driver_config_defaults(self): + driver = DriverConfig() + self.assertIsNone(driver.regs_struct) + self.assertEqual(driver.c_files, []) + self.assertEqual(driver.h_files, []) + + def test_driver_config_full(self): + driver = DriverConfig( + regs_struct='timer_regs_t', + c_files=['drivers/timer.c'], + h_files=['drivers/timer.h'], + ) + self.assertEqual(driver.regs_struct, 'timer_regs_t') + self.assertEqual(driver.c_files, ['drivers/timer.c']) + self.assertEqual(driver.h_files, ['drivers/timer.h']) + + +class PortDirectionTestCase(unittest.TestCase): + def test_port_direction_explicit(self): + port = Port( + interface='amaranth.lib.wiring.Out(1)', + map='o_signal', + direction='out' + ) + self.assertEqual(port.direction, 'out') + + def test_port_direction_none(self): + port = Port( + interface='amaranth.lib.wiring.Out(1)', + map='o_signal', + ) + self.assertIsNone(port.direction) + + def test_config_with_ports_and_pins(self): + config = ExternalWrapConfig( + name="TestModule", + files=Files(path=Path("/tmp")), + ports={ + "bus": Port( + interface="amaranth.lib.wiring.Out(1)", + map="i_bus", + direction="in" + ) + }, + pins={ + "irq": Port( + interface="amaranth.lib.wiring.Out(1)", + map="o_irq", + direction="out" + ) + }, + driver=DriverConfig( + regs_struct='test_regs_t', + h_files=['drivers/test.h'] + ) + ) + self.assertIn("bus", config.ports) + self.assertIn("irq", config.pins) + self.assertIsNotNone(config.driver) + self.assertEqual(config.driver.regs_struct, 'test_regs_t') + + def test_load_toml_with_driver(self): + toml_content = """ +name = 'DriverTest' + +[files] +path = '/tmp' + +[clocks] +sys = 'clk' + +[resets] +sys = 'rst_n' + +[ports.bus] +interface = 'amaranth.lib.wiring.Out(1)' +map = 'i_bus' +direction = 'in' + +[pins.irq] +interface = 'amaranth.lib.wiring.Out(1)' +map = 'o_irq' + +[driver] +regs_struct = 'my_regs_t' +c_files = ['drivers/my_driver.c'] +h_files = ['drivers/my_driver.h'] +""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) as f: + f.write(toml_content) + toml_path = Path(f.name) + + try: + import tomli + with open(toml_path, 'rb') as f: + raw = tomli.load(f) + config = ExternalWrapConfig.model_validate(raw) + self.assertEqual(config.name, "DriverTest") + self.assertIn("bus", config.ports) + self.assertEqual(config.ports["bus"].direction, "in") + self.assertIn("irq", config.pins) + self.assertIsNotNone(config.driver) + self.assertEqual(config.driver.regs_struct, "my_regs_t") + self.assertEqual(config.driver.c_files, ["drivers/my_driver.c"]) + self.assertEqual(config.driver.h_files, ["drivers/my_driver.h"]) + finally: + toml_path.unlink() + + +class AutoMappingTestCase(unittest.TestCase): + def test_interface_patterns_has_known_interfaces(self): + """Verify the interface patterns registry contains expected entries.""" + self.assertIn("amaranth_soc.wishbone.Signature", _INTERFACE_PATTERNS) + self.assertIn("amaranth_soc.csr.Signature", _INTERFACE_PATTERNS) + self.assertIn("chipflow.platform.GPIOSignature", _INTERFACE_PATTERNS) + self.assertIn("chipflow.platform.UARTSignature", _INTERFACE_PATTERNS) + self.assertIn("chipflow.platform.I2CSignature", _INTERFACE_PATTERNS) + self.assertIn("chipflow.platform.SPISignature", _INTERFACE_PATTERNS) + + def test_parse_verilog_ports_ansi_style(self): + """Test parsing ANSI-style Verilog port declarations.""" + verilog = """ +module test_module ( + input logic i_clk, + input logic i_rst_n, + input logic i_wb_cyc, + input logic i_wb_stb, + output logic [31:0] o_wb_dat, + output logic o_wb_ack +); +endmodule +""" + ports = _parse_verilog_ports(verilog, "test_module") + self.assertEqual(ports.get("i_clk"), "input") + self.assertEqual(ports.get("i_wb_cyc"), "input") + self.assertEqual(ports.get("o_wb_dat"), "output") + self.assertEqual(ports.get("o_wb_ack"), "output") + + def test_infer_signal_direction(self): + """Test inferring signal direction from naming conventions.""" + # Prefixes + self.assertEqual(_infer_signal_direction("i_clk"), "i") + self.assertEqual(_infer_signal_direction("o_data"), "o") + self.assertEqual(_infer_signal_direction("in_signal"), "i") + self.assertEqual(_infer_signal_direction("out_signal"), "o") + # Suffixes + self.assertEqual(_infer_signal_direction("clk_i"), "i") + self.assertEqual(_infer_signal_direction("data_o"), "o") + self.assertEqual(_infer_signal_direction("enable_oe"), "o") + # Unknown + self.assertEqual(_infer_signal_direction("signal"), "io") + + def test_infer_auto_map_wishbone(self): + """Test auto-inferring Wishbone mapping from Verilog ports.""" + verilog_ports = { + "i_wb_cyc": "input", + "i_wb_stb": "input", + "i_wb_we": "input", + "i_wb_sel": "input", + "i_wb_adr": "input", + "i_wb_dat": "input", + "o_wb_dat": "output", + "o_wb_ack": "output", + } + result = _infer_auto_map("amaranth_soc.wishbone.Signature", verilog_ports, "in") + self.assertEqual(result.get("cyc"), "i_wb_cyc") + self.assertEqual(result.get("stb"), "i_wb_stb") + self.assertEqual(result.get("we"), "i_wb_we") + self.assertEqual(result.get("ack"), "o_wb_ack") + + def test_infer_auto_map_uart(self): + """Test auto-inferring UART mapping from Verilog ports.""" + verilog_ports = { + "uart_tx": "output", + "uart_rx": "input", + } + result = _infer_auto_map("chipflow.platform.UARTSignature", verilog_ports, "out") + self.assertEqual(result.get("tx.o"), "uart_tx") + self.assertEqual(result.get("rx.i"), "uart_rx") + + def test_generate_auto_map_fallback(self): + """Test fallback prefix-based auto-mapping.""" + result = _generate_auto_map("amaranth_soc.wishbone.Signature", "wb", "in") + self.assertIn("cyc", result) + self.assertIn("stb", result) + self.assertIn("ack", result) + + def test_generate_auto_map_simple_out(self): + """Test auto-mapping for simple Out(1) interface.""" + result = _generate_auto_map("amaranth.lib.wiring.Out(1)", "irq", "out") + self.assertEqual(result[""], "o_irq") + + def test_generate_auto_map_simple_in(self): + """Test auto-mapping for simple In(1) interface.""" + result = _generate_auto_map("amaranth.lib.wiring.In(8)", "data", "in") + self.assertEqual(result[""], "i_data") + + def test_generate_auto_map_unknown_interface(self): + """Test that unknown interfaces raise an error.""" + with self.assertRaises(ChipFlowError) as ctx: + _generate_auto_map("unknown.interface.Type", "prefix", "in") + self.assertIn("No auto-mapping available", str(ctx.exception)) + + def test_port_with_auto_map(self): + """Test Port configuration without explicit map.""" + port = Port( + interface='amaranth_soc.wishbone.Signature', + prefix='wb', + ) + self.assertIsNone(port.map) + self.assertEqual(port.prefix, 'wb') + + def test_port_with_explicit_map_and_prefix(self): + """Test Port configuration with both map and prefix (map takes precedence).""" + port = Port( + interface='amaranth.lib.wiring.Out(1)', + map='o_custom_signal', + prefix='ignored', + ) + self.assertEqual(port.map, 'o_custom_signal') + self.assertEqual(port.prefix, 'ignored') + + def test_config_with_auto_mapped_ports(self): + """Test ExternalWrapConfig with auto-mapped ports.""" + config = ExternalWrapConfig( + name="AutoMapTest", + files=Files(path=Path("/tmp")), + ports={ + "bus": Port( + interface="amaranth_soc.wishbone.Signature", + prefix="wb", + params={"addr_width": 4, "data_width": 32, "granularity": 8} + ) + }, + pins={ + "irq": Port( + interface="amaranth.lib.wiring.Out(1)", + prefix="irq" + ) + }, + clocks={"sys": "clk"}, + resets={"sys": "rst_n"}, + ) + self.assertEqual(config.name, "AutoMapTest") + self.assertIsNone(config.ports["bus"].map) + self.assertEqual(config.ports["bus"].prefix, "wb") + + def test_load_toml_with_auto_map(self): + """Test loading TOML with auto-mapped port.""" + toml_content = """ +name = 'AutoMapTomlTest' + +[files] +path = '/tmp' + +[clocks] +sys = 'clk' + +[resets] +sys = 'rst_n' + +[ports.bus] +interface = 'amaranth_soc.wishbone.Signature' +prefix = 'wb' +direction = 'in' + +[ports.bus.params] +addr_width = 4 +data_width = 32 +granularity = 8 + +[pins.irq] +interface = 'amaranth.lib.wiring.Out(1)' +prefix = 'irq' +""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.toml', delete=False) as f: + f.write(toml_content) + toml_path = Path(f.name) + + try: + import tomli + with open(toml_path, 'rb') as f: + raw = tomli.load(f) + config = ExternalWrapConfig.model_validate(raw) + self.assertEqual(config.name, "AutoMapTomlTest") + self.assertIsNone(config.ports["bus"].map) + self.assertEqual(config.ports["bus"].prefix, "wb") + self.assertIsNone(config.pins["irq"].map) + self.assertEqual(config.pins["irq"].prefix, "irq") + finally: + toml_path.unlink() + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_wb_timer.py b/tests/test_wb_timer.py new file mode 100644 index 0000000..1693ec8 --- /dev/null +++ b/tests/test_wb_timer.py @@ -0,0 +1,326 @@ +# amaranth: UnusedElaboratable=no + +# SPDX-License-Identifier: BSD-2-Clause + +"""Tests for the wb_timer Wishbone timer peripheral. + +This module tests the wb_timer SystemVerilog IP wrapped as an Amaranth component +via the VerilogWrapper system. It demonstrates how external Verilog/SystemVerilog +modules can be integrated and tested within the ChipFlow ecosystem. + +Note: Full simulation requires yosys with slang plugin (or yowasp-yosys) for +SystemVerilog conversion. The configuration and signature tests work without it. +""" + +import shutil +import tempfile +import unittest +import warnings +from pathlib import Path + +from amaranth import Elaboratable, Module, Signal +from amaranth.hdl import UnusedElaboratable +from amaranth.sim import Simulator + +from chipflow_digital_ip.io import load_wrapper_from_toml + + +# Path to the wb_timer TOML configuration +WB_TIMER_TOML = Path(__file__).parent.parent / "chipflow_digital_ip" / "io" / "sv_timer" / "wb_timer.toml" + + +def _has_yosys_slang() -> bool: + """Check if yosys with slang plugin is available.""" + # Try yowasp-yosys first + try: + import yowasp_yosys + return True + except ImportError: + pass + + # Try native yosys with slang + if shutil.which("yosys"): + import subprocess + try: + result = subprocess.run( + ["yosys", "-m", "slang", "-p", "help read_slang"], + capture_output=True, + timeout=10 + ) + return result.returncode == 0 + except (subprocess.TimeoutExpired, FileNotFoundError): + pass + + return False + + +class WbTimerConfigTestCase(unittest.TestCase): + """Test the wb_timer TOML configuration loading.""" + + def setUp(self): + warnings.simplefilter(action="ignore", category=UnusedElaboratable) + + def test_toml_exists(self): + """Verify the wb_timer TOML configuration file exists.""" + self.assertTrue(WB_TIMER_TOML.exists(), f"TOML not found at {WB_TIMER_TOML}") + + def test_load_wrapper_config(self): + """Test that the wb_timer wrapper can be loaded (config parsing only).""" + # This test verifies TOML parsing works + # It will fail at Verilog file loading if sv2v is not installed + # but the config parsing should succeed + import tomli + from chipflow_digital_ip.io._verilog_wrapper import ExternalWrapConfig + + with open(WB_TIMER_TOML, "rb") as f: + raw_config = tomli.load(f) + + config = ExternalWrapConfig.model_validate(raw_config) + + self.assertEqual(config.name, "wb_timer") + self.assertIn("bus", config.ports) + self.assertIn("irq", config.pins) + self.assertEqual(config.ports["bus"].interface, "amaranth_soc.wishbone.Signature") + self.assertEqual(config.pins["irq"].interface, "amaranth.lib.wiring.Out(1)") + + def test_driver_config(self): + """Test that driver configuration is present.""" + import tomli + from chipflow_digital_ip.io._verilog_wrapper import ExternalWrapConfig + + with open(WB_TIMER_TOML, "rb") as f: + raw_config = tomli.load(f) + + config = ExternalWrapConfig.model_validate(raw_config) + + self.assertIsNotNone(config.driver) + self.assertEqual(config.driver.regs_struct, "wb_timer_regs_t") + self.assertIn("drivers/wb_timer.h", config.driver.h_files) + + +@unittest.skipUnless(_has_yosys_slang(), "yosys with slang plugin not available") +class WbTimerWrapperTestCase(unittest.TestCase): + """Test the wb_timer wrapper instantiation (requires yosys-slang).""" + + def setUp(self): + warnings.simplefilter(action="ignore", category=UnusedElaboratable) + # Use a local directory instead of /tmp - yowasp-yosys (WASM) can't access /tmp + self._generate_dest = Path("build/test_wb_timer").absolute() + self._generate_dest.mkdir(parents=True, exist_ok=True) + + def tearDown(self): + import shutil as sh + sh.rmtree(self._generate_dest, ignore_errors=True) + + def test_load_wrapper(self): + """Test loading the complete wb_timer wrapper.""" + wrapper = load_wrapper_from_toml(WB_TIMER_TOML, generate_dest=Path(self._generate_dest)) + + self.assertEqual(wrapper._config.name, "wb_timer") + # Check signature has the expected members + self.assertIn("bus", wrapper.signature.members) + self.assertIn("irq", wrapper.signature.members) + + def test_wrapper_elaborate(self): + """Test that the wrapper can be elaborated.""" + wrapper = load_wrapper_from_toml(WB_TIMER_TOML, generate_dest=Path(self._generate_dest)) + + # Elaborate with no platform (simulation mode) + m = wrapper.elaborate(platform=None) + self.assertIsInstance(m, Module) + + +class _WbTimerHarness(Elaboratable): + """Test harness for wb_timer simulation. + + This harness wraps the wb_timer and provides clock/reset. + """ + + def __init__(self, toml_path: Path, generate_dest: Path): + self.timer = load_wrapper_from_toml(toml_path, generate_dest=generate_dest) + # Expose the IRQ signal for testing + self.irq = Signal() + + def elaborate(self, platform): + m = Module() + m.submodules.timer = self.timer + + # Connect IRQ output + m.d.comb += self.irq.eq(self.timer.irq) + + return m + + +@unittest.skipUnless(_has_yosys_slang(), "yosys with slang plugin not available") +class WbTimerSimulationTestCase(unittest.TestCase): + """Simulation tests for the wb_timer peripheral. + + These tests verify the timer functionality through Wishbone bus transactions. + Register map (32-bit registers, word-addressed): + 0x0: CTRL - [31:16] prescaler, [1] irq_en, [0] enable + 0x1: COMPARE - Compare value for timer match + 0x2: COUNTER - Current counter (read) / Reload value (write) + 0x3: STATUS - [1] match, [0] irq_pending + """ + + # Register addresses (word-addressed for 32-bit Wishbone) + REG_CTRL = 0x0 + REG_COMPARE = 0x1 + REG_COUNTER = 0x2 + REG_STATUS = 0x3 + + # Control register bits + CTRL_ENABLE = 1 << 0 + CTRL_IRQ_EN = 1 << 1 + + def setUp(self): + warnings.simplefilter(action="ignore", category=UnusedElaboratable) + # Use a local directory instead of /tmp - yowasp-yosys (WASM) can't access /tmp + self._generate_dest = Path("build/test_wb_timer_sim").absolute() + self._generate_dest.mkdir(parents=True, exist_ok=True) + + def tearDown(self): + import shutil as sh + sh.rmtree(self._generate_dest, ignore_errors=True) + + async def _wb_write(self, ctx, bus, addr, data): + """Perform a Wishbone write transaction.""" + ctx.set(bus.cyc, 1) + ctx.set(bus.stb, 1) + ctx.set(bus.we, 1) + ctx.set(bus.adr, addr) + ctx.set(bus.dat_w, data) + ctx.set(bus.sel, 0xF) # All byte lanes + + # Wait for acknowledge + await ctx.tick() + while not ctx.get(bus.ack): + await ctx.tick() + + ctx.set(bus.cyc, 0) + ctx.set(bus.stb, 0) + ctx.set(bus.we, 0) + await ctx.tick() + + async def _wb_read(self, ctx, bus, addr): + """Perform a Wishbone read transaction.""" + ctx.set(bus.cyc, 1) + ctx.set(bus.stb, 1) + ctx.set(bus.we, 0) + ctx.set(bus.adr, addr) + ctx.set(bus.sel, 0xF) + + # Wait for acknowledge + await ctx.tick() + while not ctx.get(bus.ack): + await ctx.tick() + + data = ctx.get(bus.dat_r) + + ctx.set(bus.cyc, 0) + ctx.set(bus.stb, 0) + await ctx.tick() + + return data + + def test_timer_enable_and_count(self): + """Test that the timer counts when enabled.""" + dut = _WbTimerHarness(WB_TIMER_TOML, Path(self._generate_dest)) + + async def testbench(ctx): + bus = dut.timer.bus + + # Set compare value high so we don't trigger a match + await self._wb_write(ctx, bus, self.REG_COMPARE, 0xFFFFFFFF) + + # Enable timer with prescaler=0 (count every cycle) + await self._wb_write(ctx, bus, self.REG_CTRL, self.CTRL_ENABLE) + + # Let it count for a few cycles + for _ in range(10): + await ctx.tick() + + # Read counter value - should be > 0 + count = await self._wb_read(ctx, bus, self.REG_COUNTER) + self.assertGreater(count, 0, "Counter should have incremented") + + sim = Simulator(dut) + sim.add_clock(1e-6) + sim.add_testbench(testbench) + with sim.write_vcd("wb_timer_count_test.vcd", "wb_timer_count_test.gtkw"): + sim.run() + + def test_timer_match_and_irq(self): + """Test that compare match sets status and IRQ.""" + dut = _WbTimerHarness(WB_TIMER_TOML, Path(self._generate_dest)) + + async def testbench(ctx): + bus = dut.timer.bus + + # Set compare value to 5 + await self._wb_write(ctx, bus, self.REG_COMPARE, 5) + + # Enable timer with IRQ enabled + await self._wb_write(ctx, bus, self.REG_CTRL, self.CTRL_ENABLE | self.CTRL_IRQ_EN) + + # Wait for match (counter should reach 5) + for _ in range(20): + await ctx.tick() + if ctx.get(dut.irq): + break + + # Check IRQ is asserted + self.assertEqual(ctx.get(dut.irq), 1, "IRQ should be asserted on match") + + # Check status register + status = await self._wb_read(ctx, bus, self.REG_STATUS) + self.assertTrue(status & 0x1, "IRQ pending flag should be set") + self.assertTrue(status & 0x2, "Match flag should be set") + + # Clear status by writing 1s to pending bits + await self._wb_write(ctx, bus, self.REG_STATUS, 0x3) + + # IRQ should clear + await ctx.tick() + status = await self._wb_read(ctx, bus, self.REG_STATUS) + self.assertEqual(status & 0x1, 0, "IRQ pending should be cleared") + + sim = Simulator(dut) + sim.add_clock(1e-6) + sim.add_testbench(testbench) + with sim.write_vcd("wb_timer_irq_test.vcd", "wb_timer_irq_test.gtkw"): + sim.run() + + def test_timer_prescaler(self): + """Test that prescaler divides the count rate.""" + dut = _WbTimerHarness(WB_TIMER_TOML, Path(self._generate_dest)) + + async def testbench(ctx): + bus = dut.timer.bus + + # Set compare value high + await self._wb_write(ctx, bus, self.REG_COMPARE, 0xFFFFFFFF) + + # Enable timer with prescaler=3 (count every 4 cycles) + # Prescaler is in upper 16 bits of CTRL + prescaler = 3 << 16 + await self._wb_write(ctx, bus, self.REG_CTRL, self.CTRL_ENABLE | prescaler) + + # Run for 20 cycles - should get 20/4 = 5 counts + for _ in range(20): + await ctx.tick() + + count = await self._wb_read(ctx, bus, self.REG_COUNTER) + # Allow some tolerance for setup cycles + self.assertGreater(count, 0, "Counter should have incremented") + self.assertLessEqual(count, 6, "Counter should be limited by prescaler") + + sim = Simulator(dut) + sim.add_clock(1e-6) + sim.add_testbench(testbench) + with sim.write_vcd("wb_timer_presc_test.vcd", "wb_timer_presc_test.gtkw"): + sim.run() + + +if __name__ == "__main__": + unittest.main()