22963浏览
查看: 22963|回复: 0

[M10项目] 行空板+AI|通过监听工业设备运行声音 预报故障

[复制链接]
本帖最后由 zoey不种土豆 于 2024-11-6 09:53 编辑

项目前言:
Machine Medic(机械医生)是一种基于行空板和AI模型驱动的解决方案,它通过分析机器的声音特征来检测可能存在的运行异常并预测潜在故障,帮助工业界避免昂贵的停机时间。

项目介绍:
这个项目始于我参观一家食品加工厂的经历。当我穿行于设备间,机器的轰鸣和传送带运输一件件产品的景象给我留下了深刻印象。在与操作员交谈时,我得知了一个意外的消息:只要机器停机,他们就会面临着巨大的损失,比如传送带、齿轮箱或泵的一个小故障就能导致整条生产线停摆,从而造成昂贵的生产延误。这让我思考,是否有办法在故障发生前就预测到它们?
这个项目的想法就是这样被启发的。工业机器设备在许多行业中扮演着至关重要的角色。平均而言,成千上万的机器依赖传送系统来高效地传输产品。但如果这些机器中有一个出现故障,可能会导致企业损失数小时乃至数天的生产力。
于是我决定寻求一种解决方案:利用监听设备声音进行预测性维护。就像人的心跳一样,机器也有自己以声音形式的“心跳”,每台机器都有一个独特的声音特征,而这个声音如果出现异常,就表明可能存在某种问题。通过实时分析这些声音,我们可以识别潜在问题,并在它们造成重大停机时间之前进行修复。
就这样,Machine Medic,机械医生,诞生了。这是一个基于云的,由ai驱动的平台,通过监听机器的声音并及早检测故障,帮助行业避免不必要的停机,以避免昂贵的维修费用。让我们一步一步深入了解这是如何工作的。

项目步骤
一、设置 “MEDIC”系统
行空板有全面的wiki文档来指导你设置。我的项目使用了开源的 Web 应用程序 Jupyter Notebook来充分利用ai构建声音模型。
行空板+AI|通过监听工业设备运行声音 预报故障图3

在设置了虚拟环境(venv)之后,现在是时候开始收集数据用于处理和训练模型了。

二、收集数据
在数据收集过程中,我遇到了一个问题——没有现成的、专门关注机器声音的数据集。所以为了构建一个强大的ai模型,我大部分时间都花在了数据收集上,特别是亲自去收集他们运动部件发出的独特音频信号。我记录了处于三种出于不同状态的机器关键运动部件的音频:良好、不良和老化。这些类别代表了设备健康状况的不同状态,为能够为训练模型创建一个多样化且有意义的数据集。
我在分别不同类别的设备的每个部件上录制了一分钟的声音,总共收集了5小时的音频文件。以下音频文件来自运行的设备的不同部件。
行空板+AI|通过监听工业设备运行声音 预报故障图2

这个过程可以按需定制化,适用于任何为特定机器构建项目,只需遵循相同的步骤。以下是对设备链条中某部分的录音。
行空板+AI|通过监听工业设备运行声音 预报故障图4


为了收集数据,需要在使用 Jupyter Notebook 创建的虚拟环境(venv)中运行该代码。

代码
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import time
  4. from unihiker import Audio, GUI
  5. u_gui = GUI()
  6. audio = Audio()
  7. text = None
  8. buttonA = None
  9. num_files = 5
  10. recording_duration = 60  # 60 seconds
  11. def start_recording():
  12.     global text, buttonA
  13.     buttonA.config(state="disabled")
  14.     for i in range(1, num_files + 1):
  15.         if text:
  16.             text.config(text=f"Recording {i}/{num_files} started...\nPlease wait...")
  17.         else:
  18.             text = u_gui.draw_text(text=f"Recording {i}/{num_files} started...\nPlease wait...", x=25, y=150, font_size=15, color='#FFFFFF')
  19.         file_name = f'machine_sound_{i}.wav'
  20.         audio.start_record(file_name)
  21.         time.sleep(recording_duration)
  22.         audio.stop_record()
  23.         text.config(text=f"Recording {i}/{num_files} completed. File saved as {file_name}")
  24.         time.sleep(3)
  25.     buttonA.config(state="normal")
  26. background_image = u_gui.draw_image(image="1.png", x=0, y=0)
  27. buttonA = u_gui.add_button(text="Start Recording", x=25, y=100, w=190, h=40, onclick=start_recording)
  28. title_text = u_gui.draw_text(text='Machine Medic - Data Collection', y=50, font_size=22, color='#FFFFFF')
  29. while True:
  30.     time.sleep(0.1)
复制代码
行空板+AI|通过监听工业设备运行声音 预报故障图5
这个界面将运行并通过将其放置在机器的不同部位来收集声音数据。

三、训练ai模型
将数据集整理到根文件夹中
行空板+AI|通过监听工业设备运行声音 预报故障图6

音频文件排列的一个示例架构:
行空板+AI|通过监听工业设备运行声音 预报故障图7

整理好数据集之后,接下来就是训练模型。
下面的代码实现了一个ai模型,用于分析机器的声音数据,并使用 MFCC(梅尔频率倒谱系数)作为特征。它涵盖了加载数据、预处理、构建神经网络模型、训练模型以及评估性能的步骤。
我已经在里面添加了许多注释,所以下面给出的代码是自解释性的。

代码
  1. import os
  2. import numpy as np
  3. import librosa
  4. import tensorflow as tf
  5. import matplotlib.pyplot as plt
  6. # Step 1: Load the Data
  7. def load_data(base_path):
  8.     X = []
  9.     y = []
  10.     # Loop through each category
  11.     for label in os.listdir(base_path):
  12.         label_path = os.path.join(base_path, label)
  13.         if os.path.isdir(label_path):
  14.             # Loop through each audio file in the category
  15.             for file in os.listdir(label_path):
  16.                 if file.endswith('.wav'):
  17.                     file_path = os.path.join(label_path, file)
  18.                     # Load the audio file
  19.                     signal, sr = librosa.load(file_path, sr=None)
  20.                     # Extract features (MFCC)
  21.                     mfccs = librosa.feature.mfcc(y=signal, sr=sr, n_mfcc=40)  # Corrected line
  22.                     mfccs = np.mean(mfccs.T, axis=0)
  23.                     X.append(mfccs)
  24.                     y.append(label)
  25.     return np.array(X), np.array(y)
  26. # Define the base paths where your training and testing data are stored
  27. train_base_path = "/root/dataset/machinesounds/train"  # Update this path
  28. test_base_path = "/root/dataset/machinesounds/test"     # Update this path
  29. # Load the data
  30. X_train, y_train = load_data(train_base_path)
  31. X_test, y_test = load_data(test_base_path)
  32. # Step 2: Preprocess the Data
  33. # Encode the labels manually
  34. unique_labels = np.unique(y_train)  # Use y_train for encoding
  35. label_to_index = {label: idx for idx, label in enumerate(unique_labels)}
  36. y_train_encoded = np.array([label_to_index[label] for label in y_train])
  37. y_test_encoded = np.array([label_to_index[label] for label in y_test])  # Encode test labels
  38. # Reshape for the model
  39. X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
  40. X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)
  41. # Step 3: Build the Model
  42. model = tf.keras.Sequential([
  43.     tf.keras.layers.Conv1D(16, kernel_size=2, activation='relu', input_shape=(X_train.shape[1], 1)),
  44.     tf.keras.layers.MaxPooling1D(pool_size=2),
  45.     tf.keras.layers.Conv1D(32, kernel_size=2, activation='relu'),
  46.     tf.keras.layers.MaxPooling1D(pool_size=2),
  47.     tf.keras.layers.Flatten(),
  48.     tf.keras.layers.Dense(64, activation='relu'),
  49.     tf.keras.layers.Dense(len(unique_labels), activation='softmax')
  50. ])
  51. # Compile the model
  52. model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
  53. # Step 4: Train the Model
  54. history = model.fit(X_train, y_train_encoded, epochs=50, validation_data=(X_test, y_test_encoded))
  55. # Step 5: Evaluate the Model
  56. # Predictions
  57. y_pred = np.argmax(model.predict(X_test), axis=-1)
  58. # Calculate accuracy
  59. accuracy = np.sum(y_pred == y_test_encoded) / len(y_test_encoded)
  60. print(f'Accuracy: {accuracy * 100:.2f}%')
  61. # Plotting the training history
  62. plt.plot(history.history['accuracy'], label='Train Accuracy')
  63. plt.plot(history.history['val_accuracy'], label='Test Accuracy')
  64. plt.xlabel('Epoch')
  65. plt.ylabel('Accuracy')
  66. plt.legend()
  67. plt.show()
复制代码

四、数据加载

函数 load_data(base_path) 从目录结构中读取音频文件,其中每个文件夹代表一个标签。它处理声音文件并提取 MFCC 特征:
MFCC(梅尔频率倒谱系数):用于表示音频信号的功率谱的特征,能捕捉相关的声音特性。该函数会遍历目录,使用 Librosa 加载 .wav 文件,计算每个文件的 MFCCs,并将它们平均以获得固定大小的特征向量。

五、预处理技术
MFCC 提取:主要的预处理步骤包括将原始音频信号转换为 MFCC,它将声音的关键特征压缩成模型可以处理的格式。标签编码:标签从字符串转换为数值,以便与模型的输出格式兼容。训练-测试分割:确保模型在未见过的数据上进行测试,防止过拟合。
重塑:将数据重塑以符合卷积神经网络的输入要求。
行空板+AI|通过监听工业设备运行声音 预报故障图8

一旦你的模型准备好了,可以使用以下代码片段将模型保存到相同的目录中:
  1. model.save('my_trained_model.h5')
复制代码
这个模型将被保存在同一个目录中。
行空板+AI|通过监听工业设备运行声音 预报故障图9

完成后,项目的其余部分将集中于构建一个图形用户界面(GUI),该界面将触发模型并为输入的声音提供输出。最后的运行是为了设置图形用户界面和管理模型的后端。使用 Jupyter Notebook,在根目录中创建一个 main 文件并运行以下代码。确保模型名称与代码中给出的名称相匹配。模型应该存在于同一目录中。

代码
  1. # -*- coding: utf-8 -*-
  2. import os
  3. import numpy as np
  4. import librosa
  5. import tensorflow as tf
  6. from pinpong.extension.unihiker import *
  7. from unihiker import Audio, GUI
  8. import time
  9. import threading  # Import threading for non-blocking execution
  10. # Load the pre-trained model
  11. model_path = '/root/my_trained_model.h5'  # Update this path to your model
  12. loaded_model = tf.keras.models.load_model(model_path)
  13. # Load unique labels for prediction
  14. unique_labels = ["Good", "Old", "Bad"]  # Update this with your actual labels
  15. # Function to predict the machine state
  16. def predict_audio(file_path):
  17.     signal, sr = librosa.load(file_path, sr=None)
  18.     mfccs = librosa.feature.mfcc(y=signal, sr=sr, n_mfcc=40)
  19.     mfccs = np.mean(mfccs.T, axis=0)
  20.     mfccs = mfccs.reshape(1, -1, 1)  # Reshape for the model
  21.     prediction = np.argmax(loaded_model.predict(mfccs), axis=-1)
  22.     return unique_labels[prediction[0]]
  23. # Instantiate a GUI object
  24. u_gui = GUI()
  25. # Instantiate audio
  26. audio = Audio()  # Create an instance of the audio class
  27. # Variable to hold the message label
  28. text = None
  29. result_text = None  # New variable to hold the result text
  30. # Create a variable to hold the button
  31. buttonA = None
  32. # Callback function for the Start Recording button
  33. def start_recording():
  34.     global text, buttonA  # Reference global variables
  35.    
  36.     # Disable the button to prevent multiple clicks
  37.     buttonA.config(state="disabled")  
  38.     # Start the countdown in a separate thread
  39.     countdown_thread = threading.Thread(target=countdown, args=(3,))
  40.     countdown_thread.start()  # Start the thread
  41. def countdown(t):
  42.     global text
  43.     # Show countdown
  44.     for i in range(t, 0, -1):
  45.         text.config(text=str(i).upper(), font_size=100, x=50, color='#FFFFFF')  # Update countdown text
  46.         time.sleep(1)  # Delay for 1 second
  47.     # After countdown, update the text to indicate recording status
  48.     text.config(text="RECORDING STARTED...\nPLEASE WAIT...", font_size=15,x=10, color='#FFFFFF')
  49.    
  50.     # Start recording
  51.     audio.start_record('machine_sound.wav')  # Start recording
  52.     # Record for 15 seconds
  53.     time.sleep(15)  # Recording duration
  54.     audio.stop_record()  # Stop recording
  55.     text.config(text="RECORDING STOPPED.", font_size=15, x=10, color='#FFFFFF')  # Update message
  56.     # Wait for 3 seconds
  57.     time.sleep(3)  # Wait before processing audio
  58.     text.config(text="PLEASE WAIT.", font_size=15, x=10, color='#FFFFFF')
  59.     process_audio()  # Process the audio
  60. def process_audio():
  61.     predicted_state = predict_audio('machine_sound.wav')  # Predict the machine state
  62.    
  63.     global result_text
  64.     if result_text:  # If result text already exists, update it
  65.         result_text.config(text=f"MACHINE STATE: {predicted_state.upper()}")
  66.     else:  # Create result text if it doesn't exist
  67.         result_text = u_gui.draw_text(text=f"MACHINE STATE: {predicted_state.upper()}", x=10, y=200, font_size=15, color='#FFFFFF')  # Center it
  68.     # Re-enable the button after the processing is complete
  69.     buttonA.config(state="normal")  
  70. # Load the background image
  71. # Ensure '1.png' is present in the same directory as your script
  72. background_image = u_gui.draw_image(image="1.png", x=0, y=0)  # Set to cover the entire screen
  73. # Create the Start Recording button and store it in the global variable
  74. buttonA = u_gui.add_button(text="Start Recording", x=25, y=100, w=190, h=40, onclick=start_recording)
  75. # Display the title of the application on top of the background
  76. title_text = u_gui.draw_text(text='MACHINE MEDIC', y=50, font_size=20, color='#FFFFFF')  # Adjust the y-position as needed
  77. # Initialize message text for countdown and recording status
  78. text = u_gui.draw_text(text="", x=10, y=150, font_size=15, color='#FFFFFF')  # Placeholder for message updates
  79. # Main loop to keep the GUI running
  80. while True:
  81.     # Prevent the program from exiting or getting stuck
  82.     time.sleep(0.1)
复制代码
目前,现有模型利用了内置在行空板上的麦克风来进行小规模的声音分析。该模型旨在监控持续运作的传输带。为了将这个项目扩展至更大的流水传送带,我们计划沿着传送带的周围放置多个麦克风。这些麦克风将连接到行空板上,用于实时监控和分析系统不同部分的声音:
1.点击打开录制
行空板+AI|通过监听工业设备运行声音 预报故障图10
2.等待倒计时,播放文件录音,然后将行空板靠近电脑,以监听设备声音。
行空板+AI|通过监听工业设备运行声音 预报故障图1

3.模型会在15秒的录音结束后开始进行预测。下面视频中模型已经基于录音做出了预测,所监听的机械设备目前处于不良状态!



总结
对于这个项目,未来的目标是将此解决方案整合到生产线的各个部分的全面运营机器中。通过应用边缘计算,我们可以直接在机器现场处理声音数据,减少延迟并提高实时故障检测能力。这不仅将有助于在更大规模上预防机器停机,而且还可以使系统在不同行业中具有可扩展性。


原文链接:https://community.dfrobot.com/makelog-314667.html
项目作者:surrvesh_ks
发表时间:2024.10.16

延伸阅读:

【行空板】用Python玩转开源硬件-第12课:AI门禁安全监控
行空板AI人工智能项目教程合集


您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

为本项目制作心愿单
购买心愿单
心愿单 编辑
[[wsData.name]]

硬件清单

  • [[d.name]]
btnicon
我也要做!
点击进入购买页面
上海智位机器人股份有限公司 沪ICP备09038501号-4

© 2013-2024 Comsenz Inc. Powered by Discuz! X3.4 Licensed

mail