Skip to content

Time Series Anomaly Detection Pipeline Tutorial

1. Introduction to the General Time Series Anomaly Detection Pipeline

Time series anomaly detection is a technique for identifying abnormal patterns or behaviors in time series data. It is widely applied in fields such as network security, equipment monitoring, and financial fraud detection. By analyzing normal trends and patterns in historical data, it discovers events that significantly deviate from expected behaviors, such as sudden spikes in network traffic or unusual transaction activities. Time series anomaly detection enable automatic identification of anomalies in data. This technology provides real-time alerts for enterprises and organizations, helping them promptly address potential risks and issues. It plays a crucial role in ensuring system stability and security.

The General Time Series Anomaly Detection Pipeline includes a time series anomaly detection module. If you prioritize model accuracy, choose a model with higher precision. If you prioritize inference speed, select a model with faster inference. If you prioritize model storage size, choose a model with a smaller storage footprint.

Model NameModel Download Link Precision Recall F1-Score Model Storage Size (M)
AutoEncoder_adInference Model/Trained Model 99.36 84.36 91.25 52K
DLinear_adInference Model/Trained Model 98.98 93.96 96.41 112K
Nonstationary_adInference Model/Trained Model 98.55 88.95 93.51 1.8M
PatchTST_adInference Model/Trained Model 98.78 90.70 94.57 320K
TimesNet_adInference Model/Trained Model 98.37 94.80 96.56 1.3M

Note: The above precision metrics are measured on the PSM dataset. All model GPU inference times are based on an NVIDIA Tesla T4 machine with FP32 precision. CPU inference speeds are based on an Intel(R) Xeon(R) Gold 5117 CPU @ 2.00GHz with 8 threads and FP32 precision.

2. Quick Start

The pre-trained model pipelines provided by PaddleX allow for quick experience of their effects. You can experience the effects of the General Time Series Anomaly Detection Pipeline online or locally using command line or Python.

2.1 Online Experience

You can experience online the effects of the General Time Series Anomaly Detection Pipeline using the official demo for recognition, for example:

If you are satisfied with the pipeline's performance, you can directly integrate and deploy it. If not, you can also use your private data to fine-tune the model within the pipeline online.

Note: Due to the close relationship between time series data and scenarios, the official built-in models for online experience of time series tasks are only model solutions for a specific scenario and are not universal. They are not applicable to other scenarios. Therefore, the experience mode does not support using arbitrary files to experience the effects of the official model solutions. However, after training a model for your own scenario data, you can select your trained model solution and use data from the corresponding scenario for online experience.

2.2 Local Experience

Before using the General Time Series Anomaly Detection Pipeline locally, ensure you have installed the PaddleX wheel package following the PaddleX Local Installation Tutorial.

2.2.1 Command Line Experience

A single command is all you need to quickly experience the effects of the time series anomaly detection pipeline:

Experience the image anomaly detection pipeline with a single command,Use the test file, and replace --input with the local path to perform prediction.

paddlex --pipeline ts_ad --input ts_ad.csv --device gpu:0
Parameter Explanation:

--pipeline: The name of the pipeline, here it's the image anomaly detection pipeline.
--input: The local path or URL of the input image to be processed.
--device: The GPU index to use (e.g., gpu:0 for the first GPU, gpu:1,2 for the second and third GPUs). CPU can also be selected (--device cpu).

When executing the above command, the default image anomaly detection pipeline configuration file is loaded. If you need to customize the configuration file, you can run the following command to obtain it:

👉Click to expand
paddlex --get_pipeline_config ts_ad --save_path ./my_path

After obtaining the pipeline configuration file, you can replace --pipeline with the configuration file save path to make the configuration file take effect. For example, if the configuration file save path is ./ts_ad.yaml, simply execute:

paddlex --pipeline ./ts_ad.yaml --input ts_ad.csv --device gpu:0

Here, parameters such as --model and --device do not need to be specified, as they will use the parameters in the configuration file. If parameters are still specified, the specified parameters will take precedence.

After running, the result obtained is:

{'input_path': 'ts_ad.csv', 'anomaly':            label
timestamp
220226         0
220227         0
220228         0
220229         0
220230         0
...          ...
220317         1
220318         1
220319         1
220320         1
220321         0

[96 rows x 1 columns]}

2.2.2 Python Script Integration

A few lines of code can complete the rapid inference of the pipeline. Taking the general time series anomaly detection pipeline as an example:

from paddlex import create_pipeline

pipeline = create_pipeline(pipeline="ts_ad")

output = pipeline.predict("ts_ad.csv")
for res in output:
    res.print()  # Print the structured output of the prediction
    res.save_to_csv("./output/")  # Save the result in CSV format

The result obtained is the same as that of the command line method.

In the above Python script, the following steps are executed:

(1)Instantiate the production line object using create_pipeline: Specific parameter descriptions are as follows:

Parameter Description Type Default
pipeline The name of the production line or the path to the production line configuration file. If it is the name of the production line, it must be supported by PaddleX. str None
device The device for production line model inference. Supports: "gpu", "cpu". str gpu
use_hpip Whether to enable high-performance inference, only available if the production line supports it. bool False

(2)Invoke the predict method of the production line object for inference prediction: The predict method parameter is x, which is used to input data to be predicted, supporting multiple input methods, as shown in the following examples:

Parameter Type Parameter Description
Python Var Supports directly passing in Python variables, such as numpy.ndarray representing image data.
str Supports passing in the path of the file to be predicted, such as the local path of an image file: /root/data/img.jpg.
str Supports passing in the URL of the file to be predicted, such as the network URL of an image file: Example.
str Supports passing in a local directory, which should contain files to be predicted, such as the local path: /root/data/.
dict Supports passing in a dictionary type, where the key needs to correspond to a specific task, such as "img" for image classification tasks. The value of the dictionary supports the above types of data, for example: {"img": "/root/data1"}.
list Supports passing in a list, where the list elements need to be of the above types of data, such as [numpy.ndarray, numpy.ndarray], ["/root/data/img1.jpg", "/root/data/img2.jpg"], ["/root/data1", "/root/data2"], [{"img": "/root/data1"}, {"img": "/root/data2/img.jpg"}].

(3)Obtain the prediction results by calling the predict method: The predict method is a generator, so prediction results need to be obtained through iteration. The predict method predicts data in batches, so the prediction results are in the form of a list.

(4)Process the prediction results: The prediction result for each sample is of dict type and supports printing or saving to files, with the supported file types depending on the specific pipeline. For example:

Method Description Method Parameters
save_to_csv Saves results as a csv file - save_path: str, the path to save the file, when it's a directory, the saved file name is consistent with the input file type;
- indent: int, json formatting setting, default is 4;
- ensure_ascii: bool, json formatting setting, default is False;
save_to_xlsx Saves results as table file - save_path: str, the path to save the file, when it's a directory, the saved file name is consistent with the input file type;

If you have a configuration file, you can customize the configurations of the image anomaly detection pipeline by simply modifying the pipeline parameter in the create_pipeline method to the path of the pipeline configuration file.

For example, if your configuration file is saved at ./my_path/ts_ad.yaml, you only need to execute:

from paddlex import create_pipeline
pipeline = create_pipeline(pipeline="./my_path/ts_ad.yaml")
output = pipeline.predict("ts_ad.csv")
for res in output:
    res.print()  # Print the structured output of prediction
    res.save_to_csv("./output/")  # Save results in CSV format

3. Development Integration/Deployment

If the pipeline meets your requirements for inference speed and accuracy in production, you can proceed with development integration/deployment.

If you need to directly apply the pipeline in your Python project, refer to the example code in 2.2.2 Python Script Integration.

Additionally, PaddleX provides three other deployment methods, detailed as follows:

🚀 High-Performance Inference: In actual production environments, many applications have stringent standards for the performance metrics of deployment strategies (especially response speed) to ensure efficient system operation and smooth user experience. To this end, PaddleX provides high-performance inference plugins aimed at deeply optimizing model inference and pre/post-processing for significant end-to-end speedups. For detailed high-performance inference procedures, refer to the PaddleX High-Performance Inference Guide.

☁️ Service-Oriented Deployment: Service-oriented deployment is a common deployment form in actual production environments. By encapsulating inference functions as services, clients can access these services through network requests to obtain inference results. PaddleX enables users to achieve low-cost service-oriented deployment of pipelines. For detailed service-oriented deployment procedures, refer to the PaddleX Service-Oriented Deployment Guide.

Below are the API references and multi-language service invocation examples:

API Reference

For main operations provided by the service:

  • The HTTP request method is POST.
  • The request body and the response body are both JSON data (JSON objects).
  • When the request is processed successfully, the response status code is 200, and the properties of the response body are as follows:
Name Type Description
errorCode integer Error code. Fixed as 0.
errorMsg string Error description. Fixed as "Success".

The response body may also have a result property of type object, which stores the operation result information.

  • When the request is not processed successfully, the properties of the response body are as follows:
Name Type Description
errorCode integer Error code. Same as the response status code.
errorMsg string Error description.

Main operations provided by the service:

  • infer

Performs time-series anomaly detection.

POST /time-series-anomaly-detection

  • Attributes of the request body:
Name Type Description Required
csv string The URL of a CSV file accessible by the service or the Base64 encoded result of the CSV file content. The CSV file must be encoded in UTF-8. Yes
  • When the request is processed successfully, the result of the response body has the following properties:
Name Type Description
csv string Time-series anomaly detection results in CSV format. Encoded in UTF-8+Base64.

An example of result is as follows:

{
"csv": "xxxxxx"
}
Multi-Language Service Invocation Examples
Python
import base64
import requests

API_URL = "http://localhost:8080/time-series-anomaly-detection"
csv_path = "./test.csv"
output_csv_path = "./out.csv"

with open(csv_path, "rb") as file:
    csv_bytes = file.read()
    csv_data = base64.b64encode(csv_bytes).decode("ascii")

payload = {"csv": csv_data}

response = requests.post(API_URL, json=payload)

assert response.status_code == 200
result = response.json()["result"]
with open(output_csv_path, "wb") as f:
    f.write(base64.b64decode(result["csv"]))
print(f"Output time-series data saved at  {output_csv_path}")
C++
#include <iostream>
#include "cpp-httplib/httplib.h" // https://github.com/Huiyicc/cpp-httplib
#include "nlohmann/json.hpp" // https://github.com/nlohmann/json
#include "base64.hpp" // https://github.com/tobiaslocker/base64

int main() {
    httplib::Client client("localhost:8080");
    const std::string csvPath = "./test.csv";
    const std::string outputCsvPath = "./out.csv";

    httplib::Headers headers = {
        {"Content-Type", "application/json"}
    };

    std::ifstream file(csvPath, std::ios::binary | std::ios::ate);
    std::streamsize size = file.tellg();
    file.seekg(0, std::ios::beg);

    std::vector<char> buffer(size);
    if (!file.read(buffer.data(), size)) {
        std::cerr << "Error reading file." << std::endl;
        return 1;
    }
    std::string bufferStr(reinterpret_cast<const char*>(buffer.data()), buffer.size());
    std::string encodedCsv = base64::to_base64(bufferStr);

    nlohmann::json jsonObj;
    jsonObj["csv"] = encodedCsv;
    std::string body = jsonObj.dump();

    auto response = client.Post("/time-series-anomaly-detection", headers, body, "application/json");
    if (response && response->status == 200) {
        nlohmann::json jsonResponse = nlohmann::json::parse(response->body);
        auto result = jsonResponse["result"];

        encodedCsv = result["csv"];
        decodedString = base64::from_base64(encodedCsv);
        std::vector<unsigned char> decodedCsv(decodedString.begin(), decodedString.end());
        std::ofstream outputCsv(outputCsvPath, std::ios::binary | std::ios::out);
        if (outputCsv.is_open()) {
            outputCsv.write(reinterpret_cast<char*>(decodedCsv.data()), decodedCsv.size());
            outputCsv.close();
            std::cout << "Output time-series data saved at " << outputCsvPath << std::endl;
        } else {
            std::cerr << "Unable to open file for writing: " << outputCsvPath << std::endl;
        }
    } else {
        std::cout << "Failed to send HTTP request." << std::endl;
        std::cout << response->body << std::endl;
        return 1;
    }

    return 0;
}
Java
import okhttp3.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Base64;

public class Main {
    public static void main(String[] args) throws IOException {
        String API_URL = "http://localhost:8080/time-series-anomaly-detection";
        String csvPath = "./test.csv";
        String outputCsvPath = "./out.csv";

        File file = new File(csvPath);
        byte[] fileContent = java.nio.file.Files.readAllBytes(file.toPath());
        String csvData = Base64.getEncoder().encodeToString(fileContent);

        ObjectMapper objectMapper = new ObjectMapper();
        ObjectNode params = objectMapper.createObjectNode();
        params.put("csv", csvData);

        OkHttpClient client = new OkHttpClient();
        MediaType JSON = MediaType.Companion.get("application/json; charset=utf-8");
        RequestBody body = RequestBody.Companion.create(params.toString(), JSON);
        Request request = new Request.Builder()
                .url(API_URL)
                .post(body)
                .build();

        try (Response response = client.newCall(request).execute()) {
            if (response.isSuccessful()) {
                String responseBody = response.body().string();
                JsonNode resultNode = objectMapper.readTree(responseBody);
                JsonNode result = resultNode.get("result");

                String base64Csv = result.get("csv").asText();
                byte[] csvBytes = Base64.getDecoder().decode(base64Csv);
                try (FileOutputStream fos = new FileOutputStream(outputCsvPath)) {
                    fos.write(csvBytes);
                }
                System.out.println("Output time-series data saved at " + outputCsvPath);
            } else {
                System.err.println("Request failed with code: " + response.code());
            }
        }
    }
}
Go
package main

import (
    "bytes"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "net/http"
)

func main() {
    API_URL := "http://localhost:8080/time-series-anomaly-detection"
    csvPath := "./test.csv";
    outputCsvPath := "./out.csv";

    csvBytes, err := ioutil.ReadFile(csvPath)
    if err != nil {
        fmt.Println("Error reading csv file:", err)
        return
    }
    csvData := base64.StdEncoding.EncodeToString(csvBytes)

    payload := map[string]string{"csv": csvData}
    payloadBytes, err := json.Marshal(payload)
    if err != nil {
        fmt.Println("Error marshaling payload:", err)
        return
    }

    client := &http.Client{}
    req, err := http.NewRequest("POST", API_URL, bytes.NewBuffer(payloadBytes))
    if err != nil {
        fmt.Println("Error creating request:", err)
        return
    }

    res, err := client.Do(req)
    if err != nil {
        fmt.Println("Error sending request:", err)
        return
    }
    defer res.Body.Close()

    body, err := ioutil.ReadAll(res.Body)
    if err != nil {
        fmt.Println("Error reading response body:", err)
        return
    }
    type Response struct {
        Result struct {
            Csv string `json:"csv"`
        } `json:"result"`
    }
    var respData Response
    err = json.Unmarshal([]byte(string(body)), &respData)
    if err != nil {
        fmt.Println("Error unmarshaling response body:", err)
        return
    }

    outputCsvData, err := base64.StdEncoding.DecodeString(respData.Result.Csv)
    if err != nil {
        fmt.Println("Error decoding base64 csv data:", err)
        return
    }
    err = ioutil.WriteFile(outputCsvPath, outputCsvData, 0644)
    if err != nil {
        fmt.Println("Error writing csv to file:", err)
        return
    }
    fmt.Printf("Output time-series data saved at %s.csv", outputCsvPath)
}
C#
using System;
using System.IO;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json.Linq;

class Program
{
    static readonly string API_URL = "http://localhost:8080/time-series-anomaly-detection";
    static readonly string csvPath = "./test.csv";
    static readonly string outputCsvPath = "./out.csv";

    static async Task Main(string[] args)
    {
        var httpClient = new HttpClient();

        byte[] csvBytes = File.ReadAllBytes(csvPath);
        string csvData = Convert.ToBase64String(csvBytes);

        var payload = new JObject{ { "csv", csvData } };
        var content = new StringContent(payload.ToString(), Encoding.UTF8, "application/json");

        HttpResponseMessage response = await httpClient.PostAsync(API_URL, content);
        response.EnsureSuccessStatusCode();

        string responseBody = await response.Content.ReadAsStringAsync();
        JObject jsonResponse = JObject.Parse(responseBody);

        string base64Csv = jsonResponse["result"]["csv"].ToString();
        byte[] outputCsvBytes = Convert.FromBase64String(base64Csv);
        File.WriteAllBytes(outputCsvPath, outputCsvBytes);
        Console.WriteLine($"Output time-series data saved at {outputCsvPath}");
    }
}
Node.js
const axios = require('axios');
const fs = require('fs');

const API_URL = 'http://localhost:8080/time-series-anomaly-detection'
const csvPath = "./test.csv";
const outputCsvPath = "./out.csv";

let config = {
   method: 'POST',
   maxBodyLength: Infinity,
   url: API_URL,
   data: JSON.stringify({
    'csv': encodeFileToBase64(csvPath)
  })
};

function encodeFileToBase64(filePath) {
  const bitmap = fs.readFileSync(filePath);
  return Buffer.from(bitmap).toString('base64');
}

axios.request(config)
.then((response) => {
    const result = response.data["result"];

    const csvBuffer = Buffer.from(result["csv"], 'base64');
    fs.writeFile(outputCsvPath, csvBuffer, (err) => {
      if (err) throw err;
      console.log(`Output time-series data saved at ${outputCsvPath}`);
    });
})
.catch((error) => {
  console.log(error);
});
PHP
<?php

$API_URL = "http://localhost:8080/time-series-anomaly-detection";
$csv_path = "./test.csv";
$output_csv_path = "./out.csv";

$csv_data = base64_encode(file_get_contents($csv_path));
$payload = array("csv" => $csv_data);

$ch = curl_init($API_URL);
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($payload));
curl_setopt($ch, CURLOPT_HTTPHEADER, array('Content-Type: application/json'));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$response = curl_exec($ch);
curl_close($ch);

$result = json_decode($response, true)["result"];

file_put_contents($output_csv_path, base64_decode($result["csv"]));
echo "Output time-series data saved at " . $output_csv_path . "\n";

?>


📱 Edge Deployment: Edge deployment is a method that places computing and data processing capabilities on user devices themselves, allowing devices to process data directly without relying on remote servers. PaddleX supports deploying models on edge devices such as Android. For detailed edge deployment procedures, refer to the PaddleX Edge Deployment Guide. Choose the appropriate deployment method for your model pipeline based on your needs, and proceed with subsequent AI application integration.

4. Custom Development

If the default model weights provided by the General Time Series Anomaly Detection Pipeline do not meet your requirements for accuracy or speed in your specific scenario, you can try to further fine-tune the existing model using your own domain-specific or application-specific data to improve the recognition performance of the pipeline in your scenario.

4.1 Model Fine-tuning

Since the General Time Series Anomaly Detection Pipeline includes a time series anomaly detection module, if the performance of the pipeline does not meet expectations, you need to refer to the Customization section in the Time Series Modules Development Tutorial to fine-tune the time series anomaly detection model using your private dataset.

4.2 Model Application

After fine-tuning with your private dataset, you will obtain local model weights files.

To use the fine-tuned model weights, simply modify the pipeline configuration file by replacing the local path of the fine-tuned model weights to the corresponding location in the pipeline configuration file:

......
Pipeline:
  model: DLinear_ad  # Can be modified to the local path of the fine-tuned model
  batch_size: 1
  device: "gpu:0"
......

Then, refer to the command line method or Python script method in the local experience section to load the modified pipeline configuration file.

5. Multi-hardware Support

PaddleX supports various mainstream hardware devices such as NVIDIA GPUs, Kunlun XPU, Ascend NPU, and Cambricon MLU. Simply modify the --device parameter to seamlessly switch between different hardware.

For example, if you use an NVIDIA GPU for inference of the time series anomaly detection pipeline, the Python command is:

paddlex --pipeline ts_ad --input ts_ad.csv --device gpu:0
``````
At this point, if you wish to switch the hardware to Ascend NPU, simply modify the `--device` in the Python command to `npu:0`:

```bash
paddlex --pipeline ts_ad --input ts_ad.csv --device npu:0
If you want to use the General Time-Series Anomaly Detection Pipeline on more diverse hardware, please refer to the PaddleX Multi-Device Usage Guide.

Comments