【Python】OPC UA模拟服务器实现

04-19 1279阅读

目录

      • 服务器模拟
        • 1. 环境准备
        • 2. 服务器设置
        • 3. 服务器初始化
        • 4. 节点操作
        • 5. 读取CSV文件
        • 6. 运行服务器
        • 查看服务器
        • 客户端
        • 总结

           在工业自动化和物联网(IoT)领域,OPC UA(开放平台通信统一架构)已经成为一种广泛采用的数据交换标准。它提供了一种安全、可靠且独立于平台的方式来访问实时数据。在本文中,我们将探讨如何使用Python和OPC UA库来创建一个高效的数据服务器,该服务器能够从CSV文件读取数据,并允许OPC UA客户端访问这些数据。

          服务器模拟

          1. 环境准备

          确保您的Python环境中已安装opcua库。如果未安装,请使用以下命令进行安装:

          pip install opcua
          

          导入python库

          import argparse
          import csv
          import time
          from typing import List
          from opcua import Server, Node, ua
          
          2. 服务器设置

          首先,我们定义命令行参数来配置服务器地址、端口和数据源路径。

          parser = argparse.ArgumentParser(description="OPCUA模拟服务器")
          # 添加参数
          parser.add_argument("--ip", "-i", default="0.0.0.0", help="服务器地址")
          parser.add_argument("--port", "-p", default=4840, type=int, help="服务器端口")
          parser.add_argument("--source", "-s", default="opcua_data.csv", help="数据源路径")
          parser.add_argument("--type", "-t", default="fanuc", help="模拟类型")
          # 解析命令行参数
          args = parser.parse_args()
          CSV_FILE = args.source
          SERVER_URL = "opc.tcp://" + str(args.ip) + ":" + str(args.port)
          SERVER_TYPE: str = args.type
          

          参数:

          【Python】OPC UA模拟服务器实现

          3. 服务器初始化

          接着,我们初始化OPC UA服务器,并设置命名空间。

          # 创建服务器实例
          server = Server()
          # 设置服务器端口
          server.set_endpoint(SERVER_URL)
          # 创建一个命名空间
          uri = "http://opcua.simulator.com"
          idx = server.register_namespace(uri)
          
          4. 节点操作

          定义函数来处理节点信息,并添加或更新节点。

          # 获取对象节点,它通常是根节点的第一个孩子
          objects = server.get_objects_node()
          node_cache_dict = {}
          def get_node_dict(node_info: str):
              pairs = node_info.split(";")
              if len(pairs) "ns": idx, "s": node_info}
              data = {}
              for pair in pairs:
                  key, value = pair.split("=")
                  data[key] = value
              return data
          def add_node(parent: Node, node_name, value, node_dict: dict, node_type="obj"):
              node_path = node_dict.get("s")
              node_ns = node_dict.get("ns")
              node_id = node_dict.get("i")
              # 检查节点是否已经存在
              children: List[Node] = parent.get_children()
              for child in children:
                  browse_name: ua.QualifiedName = child.get_browse_name()
                  if browse_name.Name == node_name:
                      return child  # 返回已存在的节点
              # 如果节点不存在,创建它
              if node_type == "var":
                  if SERVER_TYPE.upper() == "FANUC":
                      node_idx = f"ns={node_ns};i={node_id}" if node_id else idx
                  else:
                      node_idx = f"ns={node_ns};s={node_path}"
                  new_node = parent.add_variable(node_idx, node_name, str(value))
              else:
                  new_node: Node = parent.add_object(idx, node_name)
                  # new_node.set_writable()  # 设置变量为可写
              return new_node
          def set_node_value(node_info: str, value):
              node_dict = get_node_dict(node_info)
              node_path: str = node_dict.get("s")
              if node_path.startswith("/Server/"):
                  return
              try:
                  last_node: Node = node_cache_dict.get(node_path)
                  if last_node is None:
                      # 递归查找或创建节点
                      parent = objects
                      parts = node_path.split("/")
                      for part in parts[:-1]:  # 遍历除了最后一个节点的所有节点
                          if part == "":
                              continue
                          parent = add_node(
                              parent, part, value, node_dict
                          )  # 创建节点(如果不存在)
                      # 设置或更新最后一个节点的值
                      last_node_name = parts[-1]
                      last_node = add_node(parent, last_node_name, value, node_dict, "var")
                      node_cache_dict[node_path] = last_node
                  last_node.set_value(str(value))
              except Exception as e:
                  print(f"Error setting node value: {e}")
          
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]