时序异常检测产线使用教程¶
1. 通用时序异常检测产线介绍¶
时序异常检测是一种识别时间序列数据中异常模式或行为的技术,广泛应用于网络安全、设备监控和金融欺诈检测等领域。它通过分析历史数据中的正常趋势和规律,来发现与预期行为显著不同的事件,例如突然增加的网络流量或异常的交易活动。时序异常检测能够自动识别数据中的异常点,为企业和组织提供实时警报,帮助及时应对潜在风险和问题。这项技术在保障系统稳定性和安全性方面发挥着重要作用。本产线同时提供了灵活的服务化部署方式,支持在多种硬件上使用多种编程语言调用。不仅如此,本产线也提供了二次开发的能力,您可以基于本产线在您自己的数据集上训练调优,训练后的模型也可以无缝集成。
通用时序异常检测产线中包含了时序异常检测模块,如您更考虑模型精度,请选择精度较高的模型,如您更考虑模型推理速度,请选择推理速度较快的模型,如您更考虑模型存储大小,请选择存储大小较小的模型。
模型 | 模型下载链接 | precison | recall | f1_score | 模型存储大小(M) |
---|---|---|---|---|---|
AutoEncoder_ad | 推理模型/训练模型 | 99.36 | 84.36 | 91.25 | 52K |
DLinear_ad | 推理模型/训练模型 | 98.98 | 93.96 | 96.41 | 112K |
Nonstationary_ad | 推理模型/训练模型 | 98.55 | 88.95 | 93.51 | 1.8M |
PatchTST_ad | 推理模型/训练模型 | 98.78 | 90.70 | 94.57 | 320K |
Test Environment Description:
- Performance Test Environment
- Test Dataset: PSM 数据集。
-
Hardware Configuration:
- GPU: NVIDIA Tesla T4
- CPU: Intel Xeon Gold 6271C @ 2.60GHz
- Other Environments: Ubuntu 20.04 / cuDNN 8.6 / TensorRT 8.5.2.2
-
Inference Mode Description
Mode | GPU Configuration | CPU Configuration | Acceleration Technology Combination |
---|---|---|---|
Normal Mode | FP32 Precision / No TRT Acceleration | FP32 Precision / 8 Threads | PaddleInference |
High-Performance Mode | Optimal combination of pre-selected precision types and acceleration strategies | FP32 Precision / 8 Threads | Pre-selected optimal backend (Paddle/OpenVINO/TRT, etc.) |
2. 快速开始¶
PaddleX 所提供的预训练的模型产线均可以快速体验效果,你可以在星河社区体验通用时序异常检测产线的效果,也可以在本地使用命令行或 Python 体验时序异常检测产线的效果。
2.1 在线体验¶
您可以在线体验通用时序异常检测产线的效果,用官方提供的 demo 进行识别,例如:
如果您对产线运行的效果满意,可以直接进行集成部署。您可以选择从云端下载部署包,也可以参考2.2节本地体验中的方法进行本地部署。如果对效果不满意,您可以利用私有数据对产线中的模型进行微调训练。如果您具备本地训练的硬件资源,可以直接在本地开展训练;如果没有,星河零代码平台提供了一键式训练服务,无需编写代码,只需上传数据后,即可一键启动训练任务。
注:由于时序数据和场景紧密相关,时序任务的在线体验官方内置模型仅是在一个特定场景下的模型方案,并非通用方案,不适用其他场景,因此体验方式不支持使用任意的文件来体验官方模型方案效果。但是,在完成自己场景数据下的模型训练之后,可以选择自己训练的模型方案,并使用对应场景的数据进行在线体验。
2.2 本地体验¶
在本地使用通用时序异常检测产线前,请确保您已经按照PaddleX本地安装教程完成了PaddleX的wheel包安装。
2.2.1 命令行方式体验¶
一行命令即可快速体验时序异常检测产线效果,使用 测试文件,并将 --input
替换为本地路径,进行预测
相关的参数说明可以参考2.2.2 Python脚本方式集成中的参数说明。
运行后,会将结果打印到终端上,结果如下:
{'input_path': 'ts_ad.csv', 'anomaly': label
timestamp
220226 0
220227 0
220228 0
220229 0
220230 0
... ...
220317 1
220318 1
220319 1
220320 1
220321 0
[96 rows x 1 columns]}
运行结果参数说明可以参考2.2.2 Python脚本方式集成中的结果解释。
时序文件结果保存在save_path
下。
2.2.2 Python脚本方式集成¶
上述命令行是为了快速体验查看效果,一般来说,在项目中,往往需要通过代码集成,您可以通过几行代码即可完成产线的快速推理,推理代码如下:
from paddlex import create_pipeline
pipeline = create_pipeline(pipeline="ts_anomaly_detection")
output = pipeline.predict("ts_ad.csv")
for res in output:
res.print() ## 打印预测的结构化输出
res.save_to_csv(save_path="./output/") ## 保存csv格式结果
res.save_to_json(save_path="./output/") ## 保存json格式结果
在上述 Python 脚本中,执行了如下几个步骤:
(1)通过 create_pipeline()
实例化产线对象:具体参数说明如下:
参数 | 参数说明 | 参数类型 | 默认值 |
---|---|---|---|
pipeline |
产线名称或是产线配置文件路径。如为产线名称,则必须为 PaddleX 所支持的产线。 | str |
None |
config |
产线具体的配置信息(如果和pipeline 同时设置,优先级高于pipeline ,且要求产线名和pipeline 一致)。 |
dict[str, Any] |
None |
device |
产线推理设备。支持指定GPU具体卡号,如“gpu:0”,其他硬件具体卡号,如“npu:0”,CPU如“cpu”。 | str |
gpu:0 |
use_hpip |
是否启用高性能推理,仅当该产线支持高性能推理时可用。 | bool |
False |
(2)调用 ts_anomaly_detection 产线对象的 predict()
方法进行推理预测。该方法将返回一个 generator
。以下是 predict()
方法的参数及其说明:
参数 | 参数说明 | 参数类型 | 可选项 | 默认值 |
---|---|---|---|---|
input |
待预测数据,支持多种输入类型,必填 | Python Var|str|list |
|
None |
device |
产线推理设备 | str|None |
|
None |
(3)对预测结果进行处理,每个样本的预测结果均为对应的Result对象,且支持打印、保存为csv
文件、保存为json
文件的操作:
方法 | 方法说明 | 参数 | 参数类型 | 参数说明 | 默认值 |
---|---|---|---|---|---|
print() |
打印结果到终端 | format_json |
bool |
是否对输出内容进行使用 JSON 缩进格式化 |
True |
indent |
int |
指定缩进级别,以美化输出的 JSON 数据,使其更具可读性,仅当 format_json 为 True 时有效 |
4 | ||
ensure_ascii |
bool |
控制是否将非 ASCII 字符转义为 Unicode 。设置为 True 时,所有非 ASCII 字符将被转义;False 则保留原始字符,仅当format_json 为True 时有效 |
False |
||
save_to_json() |
将结果保存为json格式的文件 | save_path |
str |
保存的文件路径,当为目录时,保存文件命名与输入文件类型命名一致 | 无 |
indent |
int |
指定缩进级别,以美化输出的 JSON 数据,使其更具可读性,仅当 format_json 为 True 时有效 |
4 | ||
ensure_ascii |
bool |
控制是否将非 ASCII 字符转义为 Unicode 。设置为 True 时,所有非 ASCII 字符将被转义;False 则保留原始字符,仅当format_json 为True 时有效 |
False |
||
save_to_csv() |
将结果保存为时序文件格式的文件 | save_path |
str |
保存的文件路径,支持目录或文件路径 | 无 |
-
调用
print()
方法会将结果打印到终端,打印到终端的内容解释如下:input_path
:(str)
待预测时序文件的输入路径anomaly
:(Pandas.DataFrame)
时序异常检测结果,包含样本id和对应的异常检测类别。1为异常,0为正常。
-
调用
save_to_json()
方法会将上述内容保存到指定的save_path
中,如果指定为目录,则保存的路径为save_path/{your_ts_basename}_res.json
,如果指定为文件,则直接保存到该文件中。由于json文件不支持保存numpy数组,因此会将其中的numpy.array
类型转换为列表形式。 -
调用
save_to_csv()
方法会将可视化结果保存到指定的save_path
中,如果指定为目录,则保存的路径为save_path/{your_ts_basename}_res.csv
,如果指定为文件,则直接保存到该文件中。 -
此外,也支持通过属性获取不同格式的预测结果,具体如下:
属性 | 属性说明 |
---|---|
json |
获取预测的 json 格式的结果 |
csv |
获取格式为 csv 格式的结果 |
json
属性获取的预测结果为dict类型的数据,相关内容与调用save_to_json()
方法保存的内容一致。csv
属性返回的是一个Pandas.DataFrame
类型数据,保存了时序异常检测结果。
此外,您可以获取 ts_anomaly_detection 产线配置文件,并加载配置文件进行预测。可执行如下命令将结果保存在 my_path
中:
若您获取了配置文件,即可对时序异常检测产线各项配置进行自定义,只需要修改 create_pipeline
方法中的 pipeline
参数值为产线配置文件路径即可。
例如,若您的配置文件保存在 ./my_path/ts_anomaly_detection.yaml
,则只需执行:
from paddlex import create_pipeline
pipeline = create_pipeline(pipeline="./my_path/ts_anomaly_detection.yaml")
output = pipeline.predict("ts_ad.csv")
for res in output:
res.print() ## 打印预测的结构化输出
res.save_to_csv("./output/") ## 保存csv格式结果
res.save_to_json("./output/") ## 保存json格式结果
3. 开发集成/部署¶
如果产线可以达到您对产线推理速度和精度的要求,您可以直接进行开发集成/部署。
若您需要将产线直接应用在您的Python项目中,可以参考 2.2.2 Python脚本方式中的示例代码。
此外,PaddleX 也提供了其他三种部署方式,详细说明如下:
🚀 高性能推理:在实际生产环境中,许多应用对部署策略的性能指标(尤其是响应速度)有着较严苛的标准,以确保系统的高效运行与用户体验的流畅性。为此,PaddleX 提供高性能推理插件,旨在对模型推理及前后处理进行深度性能优化,实现端到端流程的显著提速,详细的高性能推理流程请参考PaddleX高性能推理指南。
☁️ 服务化部署:服务化部署是实际生产环境中常见的一种部署形式。通过将推理功能封装为服务,客户端可以通过网络请求来访问这些服务,以获取推理结果。PaddleX 支持多种产线服务化部署方案,详细的产线服务化部署流程请参考PaddleX服务化部署指南。
以下是基础服务化部署的API参考与多语言服务调用示例:
API参考
对于服务提供的主要操作:
- HTTP请求方法为POST。
- 请求体和响应体均为JSON数据(JSON对象)。
- 当请求处理成功时,响应状态码为
200
,响应体的属性如下:
名称 | 类型 | 含义 |
---|---|---|
logId |
string |
请求的UUID。 |
errorCode |
integer |
错误码。固定为0 。 |
errorMsg |
string |
错误说明。固定为"Success" 。 |
result |
object |
操作结果。 |
- 当请求处理未成功时,响应体的属性如下:
名称 | 类型 | 含义 |
---|---|---|
logId |
string |
请求的UUID。 |
errorCode |
integer |
错误码。与响应状态码相同。 |
errorMsg |
string |
错误说明。 |
服务提供的主要操作如下:
infer
进行时序异常检测。
POST /time-series-anomaly-detection
- 请求体的属性如下:
名称 | 类型 | 含义 | 是否必填 |
---|---|---|---|
csv |
string |
服务器可访问的CSV文件的URL或CSV文件内容的Base64编码结果。CSV文件需要使用UTF-8编码。 | 是 |
- 请求处理成功时,响应体的
result
具有如下属性:
名称 | 类型 | 含义 |
---|---|---|
csv |
string |
CSV格式的时序异常检测结果。使用UTF-8+Base64编码。 |
result
示例如下:
{
"csv": "xxxxxx"
}
多语言调用服务示例
Python
import base64
import requests
API_URL = "http://localhost:8080/time-series-anomaly-detection" # 服务URL
csv_path = "./test.csv"
output_csv_path = "./out.csv"
# 对本地图像进行Base64编码
with open(csv_path, "rb") as file:
csv_bytes = file.read()
csv_data = base64.b64encode(csv_bytes).decode("ascii")
payload = {"csv": csv_data}
# 调用API
response = requests.post(API_URL, json=payload)
# 处理接口返回数据
assert response.status_code == 200
result = response.json()["result"]
with open(output_csv_path, "wb") as f:
f.write(base64.b64decode(result["csv"]))
print(f"Output time-series data saved at {output_csv_path}")
C++
#include <iostream>
#include "cpp-httplib/httplib.h" // https://github.com/Huiyicc/cpp-httplib
#include "nlohmann/json.hpp" // https://github.com/nlohmann/json
#include "base64.hpp" // https://github.com/tobiaslocker/base64
int main() {
httplib::Client client("localhost:8080");
const std::string csvPath = "./test.csv";
const std::string outputCsvPath = "./out.csv";
httplib::Headers headers = {
{"Content-Type", "application/json"}
};
// 进行Base64编码
std::ifstream file(csvPath, std::ios::binary | std::ios::ate);
std::streamsize size = file.tellg();
file.seekg(0, std::ios::beg);
std::vector<char> buffer(size);
if (!file.read(buffer.data(), size)) {
std::cerr << "Error reading file." << std::endl;
return 1;
}
std::string bufferStr(reinterpret_cast<const char*>(buffer.data()), buffer.size());
std::string encodedCsv = base64::to_base64(bufferStr);
nlohmann::json jsonObj;
jsonObj["csv"] = encodedCsv;
std::string body = jsonObj.dump();
// 调用API
auto response = client.Post("/time-series-anomaly-detection", headers, body, "application/json");
// 处理接口返回数据
if (response && response->status == 200) {
nlohmann::json jsonResponse = nlohmann::json::parse(response->body);
auto result = jsonResponse["result"];
// 保存数据
encodedCsv = result["csv"];
decodedString = base64::from_base64(encodedCsv);
std::vector<unsigned char> decodedCsv(decodedString.begin(), decodedString.end());
std::ofstream outputCsv(outputCsvPath, std::ios::binary | std::ios::out);
if (outputCsv.is_open()) {
outputCsv.write(reinterpret_cast<char*>(decodedCsv.data()), decodedCsv.size());
outputCsv.close();
std::cout << "Output time-series data saved at " << outputCsvPath << std::endl;
} else {
std::cerr << "Unable to open file for writing: " << outputCsvPath << std::endl;
}
} else {
std::cout << "Failed to send HTTP request." << std::endl;
std::cout << response->body << std::endl;
return 1;
}
return 0;
}
Java
import okhttp3.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Base64;
public class Main {
public static void main(String[] args) throws IOException {
String API_URL = "http://localhost:8080/time-series-anomaly-detection";
String csvPath = "./test.csv";
String outputCsvPath = "./out.csv";
// 对本地csv进行Base64编码
File file = new File(csvPath);
byte[] fileContent = java.nio.file.Files.readAllBytes(file.toPath());
String csvData = Base64.getEncoder().encodeToString(fileContent);
ObjectMapper objectMapper = new ObjectMapper();
ObjectNode params = objectMapper.createObjectNode();
params.put("csv", csvData);
// 创建 OkHttpClient 实例
OkHttpClient client = new OkHttpClient();
MediaType JSON = MediaType.Companion.get("application/json; charset=utf-8");
RequestBody body = RequestBody.Companion.create(params.toString(), JSON);
Request request = new Request.Builder()
.url(API_URL)
.post(body)
.build();
// 调用API并处理接口返回数据
try (Response response = client.newCall(request).execute()) {
if (response.isSuccessful()) {
String responseBody = response.body().string();
JsonNode resultNode = objectMapper.readTree(responseBody);
JsonNode result = resultNode.get("result");
// 保存返回的数据
String base64Csv = result.get("csv").asText();
byte[] csvBytes = Base64.getDecoder().decode(base64Csv);
try (FileOutputStream fos = new FileOutputStream(outputCsvPath)) {
fos.write(csvBytes);
}
System.out.println("Output time-series data saved at " + outputCsvPath);
} else {
System.err.println("Request failed with code: " + response.code());
}
}
}
}
Go
package main
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)
func main() {
API_URL := "http://localhost:8080/time-series-anomaly-detection"
csvPath := "./test.csv";
outputCsvPath := "./out.csv";
// 读取csv文件并进行Base64编码
csvBytes, err := ioutil.ReadFile(csvPath)
if err != nil {
fmt.Println("Error reading csv file:", err)
return
}
csvData := base64.StdEncoding.EncodeToString(csvBytes)
payload := map[string]string{"csv": csvData} // Base64编码的文件内容
payloadBytes, err := json.Marshal(payload)
if err != nil {
fmt.Println("Error marshaling payload:", err)
return
}
// 调用API
client := &http.Client{}
req, err := http.NewRequest("POST", API_URL, bytes.NewBuffer(payloadBytes))
if err != nil {
fmt.Println("Error creating request:", err)
return
}
res, err := client.Do(req)
if err != nil {
fmt.Println("Error sending request:", err)
return
}
defer res.Body.Close()
// 处理返回数据
body, err := ioutil.ReadAll(res.Body)
if err != nil {
fmt.Println("Error reading response body:", err)
return
}
type Response struct {
Result struct {
Csv string `json:"csv"`
} `json:"result"`
}
var respData Response
err = json.Unmarshal([]byte(string(body)), &respData)
if err != nil {
fmt.Println("Error unmarshaling response body:", err)
return
}
// 将Base64编码的csv数据解码并保存为文件
outputCsvData, err := base64.StdEncoding.DecodeString(respData.Result.Csv)
if err != nil {
fmt.Println("Error decoding base64 csv data:", err)
return
}
err = ioutil.WriteFile(outputCsvPath, outputCsvData, 0644)
if err != nil {
fmt.Println("Error writing csv to file:", err)
return
}
fmt.Printf("Output time-series data saved at %s.csv", outputCsvPath)
}
C#
using System;
using System.IO;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;
class Program
{
static readonly string API_URL = "http://localhost:8080/time-series-anomaly-detection";
static readonly string csvPath = "./test.csv";
static readonly string outputCsvPath = "./out.csv";
static async Task Main(string[] args)
{
var httpClient = new HttpClient();
// 对本地csv文件进行Base64编码
byte[] csvBytes = File.ReadAllBytes(csvPath);
string csvData = Convert.ToBase64String(csvBytes);
var payload = new JObject{ { "csv", csvData } }; // Base64编码的文件内容
var content = new StringContent(payload.ToString(), Encoding.UTF8, "application/json");
// 调用API
HttpResponseMessage response = await httpClient.PostAsync(API_URL, content);
response.EnsureSuccessStatusCode();
// 处理接口返回数据
string responseBody = await response.Content.ReadAsStringAsync();
JObject jsonResponse = JObject.Parse(responseBody);
// 保存csv文件
string base64Csv = jsonResponse["result"]["csv"].ToString();
byte[] outputCsvBytes = Convert.FromBase64String(base64Csv);
File.WriteAllBytes(outputCsvPath, outputCsvBytes);
Console.WriteLine($"Output time-series data saved at {outputCsvPath}");
}
}
Node.js
const axios = require('axios');
const fs = require('fs');
const API_URL = 'http://localhost:8080/time-series-anomaly-detection'
const csvPath = "./test.csv";
const outputCsvPath = "./out.csv";
let config = {
method: 'POST',
maxBodyLength: Infinity,
url: API_URL,
data: JSON.stringify({
'csv': encodeFileToBase64(csvPath) // Base64编码的文件内容
})
};
// 读取csv文件并转换为Base64
function encodeFileToBase64(filePath) {
const bitmap = fs.readFileSync(filePath);
return Buffer.from(bitmap).toString('base64');
}
axios.request(config)
.then((response) => {
const result = response.data["result"];
// 保存csv文件
const csvBuffer = Buffer.from(result["csv"], 'base64');
fs.writeFile(outputCsvPath, csvBuffer, (err) => {
if (err) throw err;
console.log(`Output time-series data saved at ${outputCsvPath}`);
});
})
.catch((error) => {
console.log(error);
});
PHP
<?php
$API_URL = "http://localhost:8080/time-series-anomaly-detection"; // 服务URL
$csv_path = "./test.csv";
$output_csv_path = "./out.csv";
// 对本地csv文件进行Base64编码
$csv_data = base64_encode(file_get_contents($csv_path));
$payload = array("csv" => $csv_data); // Base64编码的文件内容
// 调用API
$ch = curl_init($API_URL);
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload));
curl_setopt($ch, CURLOPT_HTTPHEADER, array('Content-Type: application/json'));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$response = curl_exec($ch);
curl_close($ch);
// 处理接口返回数据
$result = json_decode($response, true)["result"];
file_put_contents($output_csv_path, base64_decode($result["csv"]));
echo "Output time-series data saved at " . $output_csv_path . "\n";
?>
📱 端侧部署:端侧部署是一种将计算和数据处理功能放在用户设备本身上的方式,设备可以直接处理数据,而不需要依赖远程的服务器。PaddleX 支持将模型部署在 Android 等端侧设备上,详细的端侧部署流程请参考PaddleX端侧部署指南。 您可以根据需要选择合适的方式部署模型产线,进而进行后续的 AI 应用集成。
4. 二次开发¶
如果通用时序异常检测产线提供的默认模型权重在您的场景中,精度或速度不满意,您可以尝试利用您自己拥有的特定领域或应用场景的数据对现有模型进行进一步的微调,以提升通用时序异常检测产线的在您的场景中的识别效果。
4.1 模型微调¶
由于通用时序异常检测产线包含时序异常检测模块,如果模型产线的效果不及预期,那么您需要参考时序预测模块开发教程中的二次开发章节,使用您的私有数据集对时序异常检测模型进行微调。
4.2 模型应用¶
当您使用私有数据集完成微调训练后,可获得本地模型权重文件。
若您需要使用微调后的模型权重,只需对产线配置文件做修改,将微调后模型权重的本地路径填写至产线配置文件中的 model_dir
即可:
pipeline_name: ts_anomaly_detection
SubModules:
TSAnomalyDetection:
module_name: ts_anomaly_detection
model_name: DLinear_ad
model_dir: null # 此处替换为您训练后得到的模型权重本地路径
batch_size: 1
随后, 参考本地体验中的命令行方式或 Python 脚本方式,加载修改后的产线配置文件即可。
5. 多硬件支持¶
PaddleX 支持英伟达 GPU、昆仑芯 XPU、昇腾 NPU和寒武纪 MLU 等多种主流硬件设备,仅需修改 --device
参数即可完成不同硬件之间的无缝切换。
例如,您使用昇腾 NPU 进行时序异常检测产线的推理,使用的 CLI 命令为:
当然,您也可以在 Python 脚本中 create_pipeline()
时或者 predict()
时指定硬件设备。
若您想在更多种类的硬件上使用通用时序异常检测产线,请参考PaddleX多硬件使用指南。