Source code for brickschema.bacnet

from .graph import Graph
from .namespaces import BRICK, A, REF, BACNET
from rdflib import Namespace, Literal
import BAC0
import logging
from typing import Optional

logging.basicConfig(level=logging.INFO)
logging.getLogger("BAC0_Root.BAC0").propagate = False
logging.getLogger("BAC0_Root.BAC0").setLevel(logging.WARNING)

NET = Namespace("urn:bacnet-scan/")


[docs]def clean_name(name): return name.replace(" ", "_").replace("-", "_").replace(".", "_").replace("/", "_")
# TODO: provide namespace for graph
[docs]def scan(ns: Namespace = NET, ip: Optional[str] = None) -> Graph: """ Scan for BACnet devices on the provided network. If no network is provided, use the default scan logic which scans all available interfaces. Provide a network by indicating the IP address that BAC0 should bind to The Optional 'ns' parameter provides a namespace for the graph containing the scanned results. """ graph = Graph() # ping=False avoids spamming messages on the network client = BAC0.connect(ip=ip, ping=False) client.discover() for dev in client.devices: print(dev) name, _, address, deviceid = dev name = clean_name(name) graph.add((ns[name], A, BACNET.BACnetDevice)) graph.add((ns[name], BACNET["device-instance"], Literal(deviceid))) graph.add((ns[name], BACNET["hasAddress"], Literal(address))) logging.info(f"Scanning BACnet device {dev}") device = BAC0.device( dev[2], dev[3], client, history_size=0, segmentation_supported=False ) for point in device.points: print(point) objectIdent = point.properties.address objectIRI = ns[name + "/" + str(objectIdent)] graph.add((objectIRI, A, BRICK.Point)) props = [ (A, REF.BACnetReference), (BACNET["object-identifier"], Literal(int(objectIdent))), (BACNET["objectOf"], ns[name]), ] props.append( (BACNET["object-name"], Literal(point.properties.name.strip())) ) props.append( ( BACNET["object-description"], Literal(point.properties.description.strip()), ) ) props.append((BACNET["object-type"], Literal(point.properties.type))) props.append((BACNET["units"], Literal(point.properties.units_state))) graph.add((objectIRI, REF.hasExternalReference, props)) return graph