How to retrieve historical data using the DataList endpoint

In this article, we will show an example of a Python application able to query the database through the DataList API endpoint.

🚧

Differences between DataList database access and direct InfluxDB access

Datalist database access can be used for basic querying and for charting low or moderate data volumes by using IXON's post-aggregators.
Direct InfluxDB access allows for more advanced querying and performance: you can set up different post-aggregators and get data from multiple devices simultaneously.

Based on your needs, please refer to either this tutorial or the direct InfluxDB database access tutorial.

📘

Required modules

You will need the API Integration and Data Studio modules. Check your modules at Admin > Licenses. To obtain these modules, contact your IXON account manager or IXON distributor.

This section explains the basics of historical data, including what it is and how it works. But what if we wanted to speed up and/or automate the data extraction process, or maybe adapt a custom feature to the workflow? To tackle these problems, you can use pure Python code to manipulate and aggregate it by calling the DataList endpoint and use IXON's premade post-aggregators. This article will explain how to use some API endpoints in combination with Python to retrieve and structure historical data. Complete and working code snippets will be provided along the way.

Before you start

To create your project, this is what you will be need:

  • Python 3.13 or higher – can be downloaded from the official website. This tutorial uses Python 3.14.3;
  • A code editor – it is recommended to use one that understands Python, such as Microsoft Visual Studio Code or JetBrains PyCharm Community. This tutorial uses Visual Studio Code.
  • A basic understanding of Python.
  • InfluxDB version 1.11.7 is used. Documentation for this version can be found here.
📘

Note: dependencies and virtual environment

For this example, the creation of a virtual environment was not needed since the required packages were installed on the global Python interpreter, later selected in the code editor.
If you wish to, it is also possible to create an isolated virtual environment within the project and install the required packages there: in this case, you will need a requirements.txt file in the project folder containing the imports and their versions.

After setting up your environment, follow the upcoming steps to create your project.

Step 1 - Create the .env file

The data you need for the API calls will be safely stored in a .env file and therefore available throughout the whole project. Here are their values and how to get them:

Please note that, in case you see any hard-coded values in this tutorial, it is with the purpose of giving a simpler demonstration. You are encouraged to apply whatever solution you may need to protect your secrets, and you're free to apply different kinds of data manipulation to the code and make it dynamic.

Your .env file should look roughly like this:

API_VERSION = "2"
API_APPLICATION_ID = "$yourApplicationId"
AUTHORIZATION = "Bearer $yourBearerToken"
COMPANY_ID = "$yourCompanyId"
AGENT_ID = "$yourAgentId"
DATA_SOURCE_ID = "$yourDataSourceId"
TAG_IDENTIFIER = "$yourSlug"

Now, all that is left to do is replacing the lines preceded by the $ with your values of choice.

❗️

Pay attention to the tagId field!

Keep in mind that a tagIdc field does not correspond to the publicId of a data tag; instead, it consists of a "database identifier" used in InfluxDB together with the variable type. Be careful to not confuse it with the publicId of a variable.

It is important to note that a tagId (as well as a slug) is unique on a data source basis and not on an agent basis. This means that multiple data tags within an agent configured in multiple, different data sources, could have duplicates in a JSON response. A solution to this problem would be using the AgentDataTagsList endpoint and filter based on a given data source. Here is an example of a JSON response of the AgentDataTagList endpoint:

curl --request GET \
     --url 'https://portal.ixon.cloud:443/api/agents/$agentPublicId/data-tags?fields=slug' \
     --header 'Api-Application: $applicationId' \
     --header 'Api-Company: $companyId' \
     --header 'Api-Version: 2' \
     --header 'accept: application/json' \
     --header 'authorization: Bearer $bearerToken'
{
    "type": "AgentDataTag",
    "data": [
        {
            "publicId": "$tagPublicId",
            "slug": "batch-trigger"
        },
        {
            "publicId": "$tagPublicId",
            "slug": "batch-cos-nr"
        },
        {
            "publicId": "$tagPublicId",
            "slug": "batch-trigger-slow"
        }
    ],
    "moreAfter": null,
    "status": "success"
}

Step 2 - Create the request_data.py file

This file will contain all of the logic we need to build our workflow:

  1. Import requests , json, os and dotenv and declare the variables

    We will start by importing the needed packages. We will need requests to build our API calls, json to properly format a POST request, os to access the .env file and dotenv to load the values inside of it:

    import requests
    import json
    import os
    from dotenv import load_dotenv

    We will now load and access the values of our variables by using load_dotenv(), which will allow us to create new instances and assign the correct values to them:

    load_dotenv()
    
    API_VERSION = os.getenv('API_VERSION')
    API_APPLICATION_ID = os.getenv('API_APPLICATION_ID')
    AUTHORIZATION = os.getenv('AUTHORIZATION')
    COMPANY_ID = os.getenv('COMPANY_ID')
    AGENT_ID = os.getenv('AGENT_ID')
    DATA_SOURCE_ID = os.getenv('DATA_SOURCE_ID') 
    TAG_IDENTIFIER = os.getenv('TAG_IDENTIFIER')
  2. Create the get_discovery function

    Just as mentioned in this documentation section, the Discovery endpoint is used to get an up-to-date list of all endpoints. Therefore, we will create a dictionary containing the rel and href fields, which consist of the name (key) and URL of an endpoint (value) respectively:

    def get_discovery():
    
        response = requests.get(
            "https://portal.ixon.cloud/api/",
            headers={
                "Api-Version": API_VERSION,
                "Api-Application": API_APPLICATION_ID,
            },
        )
        if response.status_code != 200:
            raise Exception(
                f"Failed to fetch discovery data. Status code: {response.status_code}"
            )
    
        return {row["rel"]: row["href"] for row in response.json().get("data", [])}
    {
      "type": "Discovery",
      "data": [
        {
          "rel": "AccessRecoverList",
          "href": "https://portal.ixon.cloud/api/access-recover",
          "methods": [
            "GET",
            "POST"
          ]
        },
        {
          "rel": "AccessTokenList",
          "href": "https://portal.ixon.cloud/api/access-tokens",
          "methods": [
            "GET",
            "POST",
            "PATCH",
            "DELETE"
          ]
        }
      ],
      "moreAfter": $moreAfterValue,
      "status": "success"
    }
  3. Create the get_data_sources_list function

    The get_discovery function will build the request for the AgentDataSourceList API call.
    This function will retrieve a list of data sources, and in the next step we will select the one we need specifically. We will only need to extract their publicId and name fields:

    def get_data_sources_list():
    
        discovery_dict = get_discovery()
        url = discovery_dict["AgentDataSourceList"].format(agentId=AGENT_ID)
    
        response = requests.get(
            url,
            headers={
                "Api-Version": API_VERSION,
                "Api-Application": API_APPLICATION_ID,
                "Authorization": AUTHORIZATION,
                "Api-Company": COMPANY_ID,
            },
        )
        if response.status_code != 200:
            raise Exception(
                f"Failed to fetch Data Sources. Status code: {response.status_code}"
            )
    
        data_sources = response.json().get("data", [])
        assert len(data_sources) > 0, "No data sources found."
        return {
            row["publicId"]: row
            for row in data_sources if "publicId" in row and "name" in row
        }
    {
        "type": "AgentDataSource",
        "data": [
            {
                "publicId": "$dataSourcePublicId",
                "name": "PLC"
            },
            {
                "publicId": "$dataSourcePublicId",
                "name": "OPCUA"
            }
        ],
        "moreAfter": null,
        "status": "success"
    }
  4. Create the get_data_tag function

    In this example, the return data will only be the tag whose slug value equals the TAG_IDENTIFIER that we set up previously. To select a specific data source, all you need to do at this point is modify the URL and add a filter in the URL and an additional condition at the end of the function. This will allow you to retrieve a tag for your chosen data source.
    It will fetch the desired data source, add a filter in the AgentDataTagList endpoint URL and an additional condition with a loop into line 26 of get_data_tags, so that you can retrieve a tag's slug for a data source of choice. You can follow the same structure even if you want to loop through the tags of a specific data source instead of using a specific one, in which case you will only need the first part of the condition on line 27:

    def get_data_tag():
    
        data_sources = get_data_sources_list()
        chosen_data_source = data_sources.get("$dataSourceName")
        discovery_dict = get_discovery()
        tags_url = discovery_dict["AgentDataTagList"].format(agentId=AGENT_ID)
        tags_url += f'?filters=eq(source.publicId, "{chosen_data_source}")'
        tags_url += "&fields=slug"
    
        response = requests.get(
            tags_url,
            headers={
                "Api-Version": API_VERSION,
                "Api-Application": API_APPLICATION_ID,
                "Authorization": AUTHORIZATION,
                "Api-Company": COMPANY_ID,
            }
        )
    
        if response.status_code != 200:
            raise Exception(
                f"Failed to fetch data tags. Status code: {response.status_code}"
            )
        data = response.json().get("data", [])
    
        for tag in data:
            if chosen_data_source["publicId"] == "DATA_SOURCE_ID" and tag["slug"] == "TAG_IDENTIFIER":
                return tag 
        return None
    {
        "type": "AgentDataTag",
        "data": [
            {
                "publicId": "$tagPublicId",
                "slug": "batch-trigger"
            },
            {
                "publicId": "$tagPublicId",
                "slug": "batch-cos-nr"
            },
            {
                "publicId": "$tagPublicId",
                "slug": "batch-trigger-slow"
            }
        ],
        "moreAfter": null,
        "status": "success"
    }
  5. Create the get_data function

    Finally, we will now use the functions we have created earlier to call the DataList endpoint. We will create a dictionary using the get_discovery function, initialize the data points as JSON and build the payload. To convert the payload (which is a Python object) we will use json. On line 42, json will serialize the object into a JSON-formatted string, will set up the Content-Type: application/json header automatically and will send the JSON string as the body of the POST request:

    def get_data(start_time, end_time):
    
      discovery_dict = get_discovery()    
    	data_sources = get_data_sources_list()
      chosen_data_source = data_sources.get("DATA_SOURCE_ID")
      chosen_tag = get_data_tag()
    
        data_url = discovery_dict["DataList"]
        data_url += f'?filters=eq(source.publicId, "{chosen_data_source}")'
    
        payload = [
            {
                "source": {
                    "publicId": chosen_data_source
                },
                "start": start_time,
                "end": end_time,
                "timeZone": "utc",
                "tags": [
                    {
                        "slug": chosen_tag,
                        "preAggr": "raw",
                        "queries": [
                            {
                                'ref': chosen_tag,
                                'limit': 10,
                                'offset': 0
                            }
                        ]
                    }
                ]
            }
        ]
        response = requests.post(
        data_url,
        headers={
            "Api-Version": API_VERSION,
            "Api-Application": API_APPLICATION_ID,
            "Authorization": AUTHORIZATION,
            "Api-Company": COMPANY_ID,
        },
        json = payload
    )
    
        if response.status_code != 200:
            raise Exception(
                f"Failed to fetch data tags. Status code: {response.status_code}"
            )
        points = response.json().get("data", [])
        print("Data points:", json.dumps(points, indent=2))
        return points
    Data points: [
      {
        "start": "2025-01-01T10:11:09Z",
        "end": "2025-07-07T10:11:09Z",
        "timeZone": "UTC",
        "source": {
          "publicId": "$sourcePublicId",
          "reference": {
            "name": "source"
          }
        },
        "points": [
          {
            "time": "2025-03-26T11:11:43.560Z",
            "values": {
              "$tagSlug": true
            }
          },
          {
            "time": "2025-03-26T11:11:08.140Z",
            "values": {
              "$tagSlug": false
            }
          },
          {
            "time": "2025-03-06T11:55:39.050Z",
            "values": {
              "$tagSlug": true
            }
          },
    			... more data points ...
        ]
      }
    ]

Step 3 - Call the function!

Now we can finally set up a start time and an end time, assign them as inputs to our function and get the data points:

if __name__ == "__main__":
    # Replace with real time values as needed
    start_time = "2025-01-01T10:11:09Z"
    end_time = "2025-07-07T10:11:09Z"
    get_data(start_time, end_time)