# ==================================================================================================================== #
# _____ ____ _ _ ___ _ _ _____ _ _ _ #
# _ __ _ _| ____| _ \ / \ / \ / _ \ _ _| |_ _ __ _ _| |_| ___(_) | |_ ___ _ __ #
# | '_ \| | | | _| | | | |/ _ \ / _ \ | | | | | | | __| '_ \| | | | __| |_ | | | __/ _ \ '__| #
# | |_) | |_| | |___| |_| / ___ \ / ___ \ | |_| | |_| | |_| |_) | |_| | |_| _| | | | || __/ | #
# | .__/ \__, |_____|____/_/ \_\/_/ \_(_)___/ \__,_|\__| .__/ \__,_|\__|_| |_|_|\__\___|_| #
# |_| |___/ |_| #
# ==================================================================================================================== #
# Authors: #
# Patrick Lehmann #
# #
# License: #
# ==================================================================================================================== #
# Copyright 2025-2026 Electronic Design Automation Abstraction (EDA²) #
# #
# Licensed under the Apache License, Version 2.0 (the "License"); #
# you may not use this file except in compliance with the License. #
# You may obtain a copy of the License at #
# #
# http://www.apache.org/licenses/LICENSE-2.0 #
# #
# Unless required by applicable law or agreed to in writing, software #
# distributed under the License is distributed on an "AS IS" BASIS, #
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
# See the License for the specific language governing permissions and #
# limitations under the License. #
# #
# SPDX-License-Identifier: Apache-2.0 #
# ==================================================================================================================== #
#
from argparse import Namespace
from datetime import datetime
from json import dumps
from pathlib import Path
from sys import stdin as sys_stdin, stdout as sys_stdout
from typing import Optional as Nullable, Dict, List, TextIO, Iterator, Tuple
from pyTooling.Common import getFullyQualifiedName
from pyTooling.Decorators import export
from pyTooling.MetaClasses import ExtendedType, abstractmethod, mustoverride
from pyTooling.Attributes.ArgParse import CommandHandler
from pyTooling.Attributes.ArgParse.Flag import LongFlag
from pyTooling.Attributes.ArgParse.ValuedFlag import LongValuedFlag
from pyTooling.TerminalUI import TerminalApplication
from pyTooling.Warning import WarningCollector
from pyEDAA.OutputFilter import OutputFilterException
from pyEDAA.OutputFilter.CLI.Configuration import Configuration, Vivado, ProcessingPipeline, OutputFormat, Rule, StdOutOutput, FileOutput, TimestampFormat
from pyEDAA.OutputFilter.CLI.Filter import preprocessing, mirror, postprocessing
from pyEDAA.OutputFilter.Xilinx import Processor, LineKind, VivadoLine, Command, LineAction, VivadoMessage
[docs]
@export
class VivadoHandlers(metaclass=ExtendedType, mixin=True):
[docs]
@CommandHandler("vivado", help="Parse AMD/Xilinx Vivado log files.", description="Parse AMD/Xilinx Vivado log files.")
@LongFlag("--stdin", dest="stdin", help="Read log from STDIN.")
@LongValuedFlag("--file", dest="logfile", metaName='Log file', optional=True, help="Read from log file (*.vds|*.vdi).")
@LongValuedFlag("--config", dest="configfile", metaName='Config file', optional=True, help="Configuration file (*.yaml).")
@LongFlag("--colored", dest="colored", help="Render logfile with colored lines.")
# @LongFlag("--summary", dest="summary", help="Print a summary.")
# @LongFlag("--info", dest="info", help="Print info messages.")
# @LongFlag("--warning", dest="warning", help="Print warning messages.")
# @LongFlag("--critical", dest="critical", help="Print critical warning messages.")
# @LongFlag("--error", dest="error", help="Print error messages.")
# @LongFlag("--influxdb", dest="influxdb", help="Write statistics as InfluxDB line protocol file (*.line).")
# @LongValuedFlag("--file", dest="logfile", metaName='Synthesis Log', help="Synthesis log file (*.vds).")
def HandleVivado(self, args: Namespace) -> None:
"""Handle program calls with command ``vivado``."""
if not args.quiet:
self._PrintHeadline()
config = Configuration()
if args.configfile is not None:
configFile = Path(args.configfile)
if not configFile.exists():
self.WriteError(f"Configuration file '{configFile}' doesn't exist.")
self.Exit(3)
else:
with WarningCollector() as warnings:
config.Load(configFile)
for warning in warnings:
self.WriteWarning(warning)
for note in warning.Notes:
self.WriteWarningNote(note)
if args.stdin is True:
if args.logfile is not None:
self.WriteError(f"If option '--stdin' is set, then option '--file' can't be set, too.")
self.Exit(2)
self.WriteVerbose("Reading lines from STDIN ...")
inputSource = StdInSource(self)
elif args.logfile is None:
self.WriteError(f"No input file (<logfile> or '-' for STDIN) specified via option '--file=<logfile>'.")
self.Exit(2)
elif args.logfile == "-":
self.WriteVerbose("Reading lines from STDIN ...")
inputSource = StdInSource(self)
else:
logFile = Path(args.logfile)
if not logFile.exists():
self.WriteError(f"Vivado log file '{logFile}' doesn't exist.")
self.Exit(3)
inputSource = FileSource(logFile, self)
self.ExitOnPreviousErrors()
vivadoConfig: Vivado = config._tools["vivado"]
pipeline: ProcessingPipeline = vivadoConfig._processingPipeline
inputSource.Open()
lineIterator = iter(inputSource)
targets: List[Target] = []
for output in pipeline._outputs.values():
if isinstance(output, StdOutOutput):
targets.append(target := StdOutTarget(
inputSource._startTime,
output._coloring or args.colored,
vivadoConfig._colors,
output._format,
output._lineNumbers,
output._timestampFormat,
output._commands,
output._rules
))
elif isinstance(output, FileOutput):
targets.append(target := FileTarget(
output._path,
output._format,
output._commands,
output._rules
))
else:
ex = OutputFilterException(f"Unknown Output kind.")
ex.add_note(f"Got '{getFullyQualifiedName(output)}'.")
raise ex
target.Open()
processor = Processor()
generator = processor.LineClassification(lineIterator)
preprocessed = preprocessing(generator, pipeline._preprocessing)
mirrored = mirror(preprocessed, len(targets))
postProcessed = tuple(postprocessing(mirror, target._rules) for mirror, target in zip(mirrored, targets))
with WarningCollector() as warnings:
for lines in zip(*postProcessed): # zip_longest(*postProcessed, fillvalue=None):
for target, line in zip(targets, lines):
target.Write(line)
for warning in warnings:
self.WriteWarning(warning)
for target in targets:
target.Close()
#
# if args.influxdb:
# synthesizeDesign = processor[SynthesizeDesign]
# influxString = "vivado_synthesis_overview"
# influxString += f",version={processor.Preamble.ToolVersion}"
# influxString += f",branch=main"
# influxString += f",design=Stopwatch"
# influxString += " "
# influxString += f"processing_duration={processor.ProcessingDuration:.3f}"
# influxString += f",duration={processor.Duration:.3f}"
# influxString += f",synthesis_duration={synthesizeDesign[WritingSynthesisReport].Duration:.1f}"
# influxString += f",info_count={len(processor.InfoMessages)}u"
# influxString += f",warning_count={len(processor.WarningMessages)}u"
# influxString += f",critical_count={len(processor.CriticalWarningMessages)}u"
# influxString += f",error_count={len(processor.ErrorMessages)}u"
# influxString += f",blackbox_count={len(synthesizeDesign[WritingSynthesisReport].Blackboxes)}u"
# influxString += "\n"
# influxString += "vivado_synthesis_cells"
# influxString += f",version={processor.Preamble.ToolVersion}"
# influxString += f",branch=main"
# influxString += f",design=Stopwatch"
# influxString += " "
# influxString += ",".join(f"{cellName}={cellCount}" for cellName, cellCount in synthesizeDesign[WritingSynthesisReport].Cells.items() if not cellName.endswith("_bbox"))
#
# self.WriteNormal(influxString)
#
# if args.summary:
# synthesizeDesign : SynthesizeDesign = processor[SynthesizeDesign]
# self.WriteNormal("Summary:")
# self.WriteNormal(f" Tool version: {processor.Preamble.ToolVersion}")
# self.WriteNormal(f" Started at: {processor.Preamble.StartDatetime}")
# self.WriteNormal(f" Duration: {processor.Duration:.3f} s")
# self.WriteNormal(f" Processing duration: {processor.ProcessingDuration:.3f} s")
# self.WriteNormal(f" Info: {len(processor.InfoMessages)}")
# self.WriteNormal(f" Warning: {len(processor.WarningMessages)}")
# self.WriteNormal(f" Critical Warning: {len(processor.CriticalWarningMessages)}")
# self.WriteNormal(f" Error: {len(processor.ErrorMessages)}")
# self.WriteNormal(f" Part: {synthesizeDesign[LoadingPart].Part}")
#
# self.WriteNormal("Policies:")
# self.WriteNormal(f" Latches: {'found' if synthesizeDesign.HasLatches else '----'}")
# if synthesizeDesign.HasLatches:
# for cellName in ("LD", ):
# try:
# self.WriteNormal(f" {cellName}: {synthesizeDesign.Cells[cellName]}")
# except KeyError:
# pass
# for latch in synthesizeDesign.Latches:
# self.WriteNormal(f" {latch}")
# self.WriteNormal(f" Blackboxes: {'found' if synthesizeDesign.HasBlackboxes else '----'}")
# if synthesizeDesign.HasBlackboxes:
# for bbox in synthesizeDesign.Blackboxes:
# self.WriteNormal(f" {bbox}")
#
# self.WriteNormal(f"VHDL report statements ({len(synthesizeDesign.VHDLReportMessages)}):")
# for message in synthesizeDesign.VHDLReportMessages:
# self.WriteNormal(f" {message}")
# self.WriteNormal(f"VHDL assert statements ({len(synthesizeDesign.VHDLAssertMessages)}):")
# for message in synthesizeDesign.VHDLAssertMessages:
# self.WriteNormal(f" {message}")
#
# self.WriteNormal(f"Cells: {len(synthesizeDesign.Cells)}")
# for cell, count in synthesizeDesign.Cells.items():
# self.WriteNormal(f" {cell}: {count}")
self.ExitOnPreviousErrors()
[docs]
@export
class Source(metaclass=ExtendedType, slots=True):
_parent: VivadoHandlers
_file: TextIO
_startTime: datetime
[docs]
@mustoverride
def __init__(self, parent: VivadoHandlers) -> None:
self._parent = parent
@abstractmethod
def __iter__(self) -> Iterator[Tuple[datetime, str]]:
pass
@abstractmethod
def Open(self) -> TextIO:
pass
[docs]
@export
class StdInSource(Source):
[docs]
def __init__(self, parent: VivadoHandlers) -> None:
super().__init__(parent)
self._startTime = datetime.now()
def __iter__(self) -> Iterator[Tuple[datetime, str]]:
for line in self._file:
yield datetime.now(), line
def Open(self) -> TextIO:
self._file = sys_stdin
return self._file
[docs]
@export
class FileSource(Source):
_path: Path
[docs]
def __init__(self, path: Path, parent: VivadoHandlers) -> None:
super().__init__(parent)
self._path = path
self._startTime = datetime.fromtimestamp(self._path.stat().st_mtime)
def __iter__(self) -> Iterator[Tuple[datetime, str]]:
for line in self._file:
yield self._startTime, line
def Open(self) -> TextIO:
try:
self._file = open(self._path, "r", encoding="utf-8")
except OSError as ex:
raise OutputFilterException(f"Vivado log file '{self._path}' cannot be opened.") from ex
return self._file
[docs]
@export
class Target(metaclass=ExtendedType, slots=True):
_file: TextIO
_format: OutputFormat
_commands: Nullable[List[Command]]
_rules: Nullable[List[Rule]]
[docs]
def __init__(
self,
format: OutputFormat,
commands: Nullable[List[Command]],
rules: Nullable[List[Rule]]
) -> None:
self._format = format
self._commands = commands
self._rules = rules
@abstractmethod
def Open(self) -> TextIO:
pass
def Write(self, line: VivadoLine) -> None:
if line is None:
return
elif line._action is LineAction.Remove:
return
self._file.write(f"{line}\n")
@abstractmethod
def Close(self) -> None:
pass
[docs]
@export
class StdOutTarget(Target):
_coloring: bool
_colors: Dict[str, str]
_lineNumbers: bool
_timestampFormat: TimestampFormat
_startTime: datetime
[docs]
def __init__(
self,
startTime: datetime,
coloring: bool,
colors: Dict[str, str],
format: OutputFormat,
lineNumbers: bool,
timestampFormat: TimestampFormat,
commands: Nullable[List[Command]],
rules: Nullable[List[Rule]]
) -> None:
super().__init__(format, commands, rules)
self._startTime = startTime
self._coloring = coloring
self._colors = colors
self._lineNumbers = lineNumbers
self._timestampFormat = timestampFormat
def Open(self) -> TextIO:
self._file = sys_stdout
return self._file
def Write(self, line: VivadoLine) -> None:
if line is None:
return
elif line._action is LineAction.Remove:
return
if self._format is OutputFormat.Plain:
self._WritePlain(line)
elif self._format is OutputFormat.JSONLine:
self._WriteJSONLine(line)
else:
raise OutputFilterException(f"Unknown format '{self._format}'.")
self._file.flush()
def _WritePlain(self, line: VivadoLine) -> None:
if self._timestampFormat == TimestampFormat.DateTime:
timestamp = f"{line._timestamp:%d.%m.%Y %H:%M:%S} - "
elif self._timestampFormat == TimestampFormat.TimeOnly:
timestamp = f"{line._timestamp:%H:%M:%S} - "
elif self._timestampFormat == TimestampFormat.Runtime:
delta = line._timestamp - self._startTime
seconds = int(delta.total_seconds())
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secondss = seconds % 60
milliseconds = delta.microseconds // 1000
timestamp = f"{hours:02d}:{minutes:02d}:{secondss:02d}.{milliseconds:03d} - "
elif self._timestampFormat == TimestampFormat.Undefined:
timestamp = ""
else:
raise OutputFilterException(f"Unknown timestamp format '{self._timestampFormat}'.")
lineNumber = f"{line.LineNumber:4}: " if self._lineNumbers else ""
if self._coloring:
color = self._GetColorOfLine(line)
message = str(line).replace("{", "{{").replace("}", "}}")
self._file.write(f"{timestamp}{lineNumber}{{{color}}}{message}{{NOCOLOR}}\n".format(**TerminalApplication.Foreground))
else:
self._file.write(f"{timestamp}{lineNumber}{line}\n")
def _WriteJSONLine(self, line: VivadoLine) -> None:
if isinstance(line, VivadoMessage):
jsonLine = {
"line": line._lineNumber,
"timestamp": line._timestamp.isoformat(),
"kind": line._kind.name,
"tool": line._toolName,
"toolID": line._toolID,
"messageID": line._messageKindID,
"message": line._message,
}
else:
jsonLine = {
"line": line._lineNumber,
"timestamp": line._timestamp.isoformat(),
"kind": line._kind.name,
"message": line._message,
}
self._file.write(dumps(jsonLine, indent=None) + "\n")
def Close(self) -> None:
self._file.flush()
def _GetColorOfLine(self, line: VivadoLine) -> str:
if line._kind is LineKind.Normal:
return self._colors["normal"]
elif LineKind.Message in line.Kind:
if line.Kind is LineKind.InfoMessage:
return self._colors["info"]
elif line.Kind is LineKind.WarningMessage:
return self._colors["warning"]
elif line.Kind is LineKind.CriticalWarningMessage:
return self._colors["critical"]
elif line.Kind is LineKind.ErrorMessage:
return self._colors["error"]
else:
raise OutputFilterException(f"Unknown LineKind '{line._kind}' for line {line._lineNumber}.")
elif LineKind.TclCommand in line.Kind:
return self._colors["tcl"]
elif LineKind.Success in line.Kind:
return self._colors["success"]
elif LineKind.Failed in line.Kind:
return self._colors["failed"]
elif LineKind.Verbose in line.Kind:
return self._colors["verbose"]
elif line.Kind is LineKind.Unprocessed:
return self._colors["unprocessed"]
elif line.Kind is LineKind.Empty:
return self._colors["empty"]
elif LineKind.Start in line.Kind:
if LineKind.Task in line.Kind:
return self._colors["taskStart"]
elif LineKind.Phase in line.Kind:
return self._colors["phaseStart"]
elif LineKind.SubPhase in line.Kind:
return self._colors["subphaseStart"]
elif LineKind.SubSubPhase in line.Kind:
return self._colors["subsubphaseStart"]
elif LineKind.SubSubSubPhase in line.Kind:
return self._colors["subsubsubphaseStart"]
elif LineKind.Section in line.Kind:
return self._colors["sectionStart"]
elif LineKind.SubSection in line.Kind:
return self._colors["subsectionStart"]
elif LineKind.NestedTask in line.Kind:
return self._colors["nestedTaskStart"]
elif LineKind.NestedPhase in line.Kind:
return self._colors["nestedPhaseStart"]
else:
raise OutputFilterException(f"Unknown LineKind.****Start '{line._kind}' for line {line._lineNumber}.")
elif LineKind.End in line.Kind:
if LineKind.Task in line.Kind:
return self._colors["taskEnd"]
elif LineKind.Phase in line.Kind:
return self._colors["phaseEnd"]
elif LineKind.SubPhase in line.Kind:
return self._colors["subphaseEnd"]
elif LineKind.SubSubPhase in line.Kind:
return self._colors["subsubphaseEnd"]
elif LineKind.SubSubSubPhase in line.Kind:
return self._colors["subsubsubphaseEnd"]
elif LineKind.Section in line.Kind:
return self._colors["sectionEnd"]
elif LineKind.SubSection in line.Kind:
return self._colors["subsectionEnd"]
elif LineKind.NestedTask in line.Kind:
return self._colors["nestedTaskEnd"]
elif LineKind.NestedPhase in line.Kind:
return self._colors["nestedPhaseEnd"]
else:
raise OutputFilterException(f"Unknown LineKind.****End '{line._kind}' for line {line._lineNumber}.")
elif LineKind.Time in line.Kind:
if LineKind.Task in line.Kind:
return self._colors["taskTime"]
elif LineKind.Phase in line.Kind:
return self._colors["phaseTime"]
elif LineKind.SubPhase in line.Kind:
return self._colors["subphaseTime"]
elif LineKind.SubSubPhase in line.Kind:
return self._colors["subsubphaseTime"]
elif LineKind.SubSubSubPhase in line.Kind:
return self._colors["subsubsubphaseTime"]
elif LineKind.Section in line.Kind:
return self._colors["sectionTime"]
elif LineKind.SubSection in line.Kind:
return self._colors["subsectionTime"]
else:
raise OutputFilterException(f"Unknown LineKind.****Time '{line._kind}' for line {line._lineNumber}.")
elif LineKind.Table in line.Kind:
return self._colors["table"]
elif LineKind.Delimiter in line.Kind:
if LineKind.Section in line.Kind:
return self._colors["sectionDelimiter"]
else:
raise OutputFilterException(f"Unknown LineKind.****Delimiter '{line._kind}' for line {line._lineNumber}.")
elif line.Kind is LineKind.PhaseFinal:
return self._colors["verbose"]
elif line.Kind is LineKind.ParagraphHeadline:
return self._colors["paragraphHeadline"]
elif line.Kind is LineKind.ProcessorError:
raise OutputFilterException(f"Erroneous line {line._lineNumber} '{line._kind}' should have been wrapped in an exception.")
elif LineKind.Table in line.Kind:
raise OutputFilterException()
elif LineKind.Delimiter in line.Kind:
raise OutputFilterException()
else:
raise OutputFilterException(f"Unknown LineKind '{line._kind}' for line {line._lineNumber}.")
[docs]
@export
class FileTarget(Target):
_path: Path
[docs]
def __init__(
self,
file: Path,
format: OutputFormat,
commands: List[Command],
rules: List[Rule]
) -> None:
super().__init__(format, commands, rules)
self._path = file
def Open(self) -> TextIO:
self._file = self._path.open("w", encoding="utf-8")
return self._file
def Close(self) -> None:
self._file.flush()
self._file.close()