【Python】FANUC机器人OPC UA通信并记录数据

2024-04-10 1092阅读

目录

    • 引言
    • 机器人仿真
    • 环境准备
    • 代码实现
      • 1. 导入库
      • 2. 设置参数
      • 3. 日志配置
      • 4. OPC UA通信
      • 5. 备份旧CSV文件
      • 6. 主函数
      • 总结

        引言

         OPC UA(Open Platform Communications Unified Architecture)是一种跨平台的、开放的数据交换标准,常用于工业自动化领域。Python因其易用性和丰富的库支持,成为实现OPC UA通信的不错选择。本文将介绍如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。

        机器人仿真

         FANUC机器人可以使用官方软件RoboGuide进行机器人仿真,启动后默认OPC UA地址为127.0.0.1:4880/FANUC/NanoUaServer。

        【Python】FANUC机器人OPC UA通信并记录数据

        环境准备

        • Python 3.5+
        • opcua库:用于实现OPC UA通信
        • logging库:用于记录日志

          安装opcua库:

          pip install opcua
          

          代码实现

          1. 导入库

          import csv
          from datetime import datetime
          import logging
          import os
          import shutil
          import time
          from typing import List
          from opcua.common.node import Node
          from opcua import Client, ua
          

          2. 设置参数

          SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
          CSV_FILENAME = 'fanuc_opcua_data.csv'
          FAUNC_LOG = 'fanuc.log'
          LOG_DIR = 'log'
          BACKUP_DIR = 'backup'
          

          3. 日志配置

          def getLogger(filename: str):
              if not os.path.exists(LOG_DIR):
                  os.makedirs(LOG_DIR)
              logger = logging.Logger(filename[:-4].upper(), logging.INFO)
              formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")
              fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")
              fh.setFormatter(formatter)
              ch = logging.StreamHandler()
              ch.setFormatter(formatter)
              logger.addHandler(fh)
              logger.addHandler(ch)
              return logger
              
          LOGGER = getLogger(FAUNC_LOG)
          

          4. OPC UA通信

          • 连接到服务器
            def connect_to_server(url):
                client = Client(url)
                client.connect()
                return client
            
            • 获取根节点和对象节点
              def get_root_node(client: Client):
                  return client.get_root_node()
              def get_objects_node(client: Client):
                  return client.get_objects_node()
              
              • 遍历所有子节点并返回变量节点的路径和数值
                def get_variables(node: Node, path=""):
                    variables = {}
                    children: List[Node] = node.get_children()
                    for child in children:
                        try:
                            name: ua.QualifiedName = child.get_browse_name()
                            new_path = f"{path}/{name.Name}"
                            if child.get_node_class() == ua.NodeClass.Variable:
                                value = child.get_value()
                                if isinstance(value, list):
                                    value = ','.join(str(x) for x in value)
                                if isinstance(value, str):
                                    value = value.replace('\n', '\\n').replace(',', ' ')
                                variables[new_path] = value
                            else:
                                variables.update(get_variables(child, new_path))
                        except Exception as e:
                            LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")
                    return variables
                

                5. 备份旧CSV文件

                def backup_csv_file(filename):
                    if not os.path.exists(BACKUP_DIR):
                        os.makedirs(BACKUP_DIR)
                    if os.path.exists(filename):
                        modification_time = os.path.getmtime(filename)
                        modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')
                        new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"
                        try:
                            shutil.move(filename, new_filename)
                            LOGGER.info(f"文件已移动到 {new_filename}")
                        except Exception as e:
                            LOGGER.error(f"移动文件出错: {new_filename}, Error: {e})
                

                6. 主函数

                if __name__ == "__main__":
                    try:
                        client = connect_to_server(SERVER_URL)
                        root_node = get_root_node(client)
                        objects_node = get_objects_node(client)
                        backup_csv_file(CSV_FILENAME)
                        with open(CSV_FILENAME, mode='w', newline='') as csvfile:
                            num = 0
                            while True:
                                variables = get_variables(objects_node)
                                if num == 1:
                                    writer = csv.DictWriter(csvfile, fieldnames=variables.keys())
                                    writer.writeheader()
                                writer.writerow(variables)
                                csvfile.flush()
                                num += 1
                                LOGGER.info("数据记录:" + str(num))
                                time.sleep(1)
                    except KeyboardInterrupt:
                        print("程序被用户中断")
                    finally:
                        client.disconnect()
                

                记录数据预览:

                【Python】FANUC机器人OPC UA通信并记录数据

                总结

                 本文介绍了如何使用Python进行OPC UA通信,并实时记录从FANUC机器人获取的数据。通过使用opcua库,我们可以轻松地连接到OPC UA


                完整代码:

                import csv
                from datetime import datetime
                import logging
                import os
                import shutil
                import time
                from typing import List
                from opcua.common.node import Node
                from opcua import Client, ua
                # OPC UA服务器的URL
                SERVER_URL = "opc.tcp://127.0.0.1:4880/FANUC/NanoUaServer"
                # CSV文件名
                CSV_FILENAME = 'fanuc_opcua_data.csv'
                # 日志文件
                FAUNC_LOG = 'fanuc.log'
                # 文件夹 
                LOG_DIR = 'log'
                BACKUP_DIR = 'backup'
                def getLogger(filename: str):
                    if not os.path.exists(LOG_DIR):
                        os.makedirs(LOG_DIR)
                    logger = logging.Logger(filename[:-4].upper(), logging.INFO)
                    formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s %(message)s")
                    fh = logging.FileHandler(LOG_DIR + "/" + filename, encoding="utf-8", mode="a")
                    fh.setFormatter(formatter)
                    ch = logging.StreamHandler()
                    ch.setFormatter(formatter)
                    logger.addHandler(fh)
                    logger.addHandler(ch)
                    return logger
                    
                LOGGER = getLogger(FAUNC_LOG)
                def connect_to_server(url):
                    """创建客户端实例并连接到服务端"""
                    client = Client(url)
                    client.connect()
                    return client
                def get_root_node(client: Client):
                    """获取服务器命名空间中的根节点"""
                    return client.get_root_node()
                def get_objects_node(client: Client):
                    """获取服务器的对象节点"""
                    return client.get_objects_node()
                def get_variables(node: Node, path=""):
                    """遍历所有子节点并返回变量节点的路径和数值"""
                    variables = {}
                    children: List[Node] = node.get_children()
                    for child in children:
                        try:
                            name: ua.QualifiedName = child.get_browse_name()
                            new_path = f"{path}/{name.Name}"
                            if child.get_node_class() == ua.NodeClass.Variable:
                                value = child.get_value()
                                if isinstance(value, list):
                                    value = ','.join(str(x) for x in value)
                                if isinstance(value, str):
                                    value = value.replace('\n', '\\n').replace(',', ' ')
                                variables[new_path] = value
                            else:
                                variables.update(get_variables(child, new_path))
                        except Exception as e:
                            LOGGER.error(f"Error fetching variable: {new_path}, Error: {e}")
                    return variables
                def backup_csv_file(filename):
                    """如果CSV文件已存在则备份"""
                    if not os.path.exists(BACKUP_DIR):
                        os.makedirs(BACKUP_DIR)
                    if os.path.exists(filename):
                        modification_time = os.path.getmtime(filename)
                        modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S')
                        new_filename = f"{BACKUP_DIR}/{filename}_{modification_time_str}"
                        try:
                            shutil.move(filename, new_filename)
                            LOGGER.info(f"文件已移动到 {new_filename}")
                        except Exception as e:
                            LOGGER.error(f"移动文件出错: {new_filename}, Error: {e}")
                        
                if __name__ == "__main__":
                    try:
                        client = connect_to_server(SERVER_URL)
                        root_node = get_root_node(client)
                        objects_node = get_objects_node(client)
                        backup_csv_file(CSV_FILENAME)
                        with open(CSV_FILENAME, mode='w', newline='') as csvfile:
                            num = 0
                            while True:
                                variables = get_variables(objects_node)
                                if num == 1:
                                    writer = csv.DictWriter(csvfile, fieldnames=variables.keys())
                                    writer.writeheader()
                                writer.writerow(variables)
                                csvfile.flush()
                                num += 1
                                LOGGER.info("数据记录:" + str(num))
                                time.sleep(1)
                    except KeyboardInterrupt:
                        print("程序被用户中断")
                    finally:
                        client.disconnect()
                
VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]