How to retrieve historical data using the DataList endpoint
In this article, we will show an example of a Python application able to query the database through the DataList API endpoint.
Differences between DataList database access and direct InfluxDB access
Datalist database access can be used for basic querying and for charting low or moderate data volumes by using IXON's post-aggregators.
Direct InfluxDB access allows for more advanced querying and performance: you can set up different post-aggregators and get data from multiple devices simultaneously.Based on your needs, please refer to either this tutorial or the direct InfluxDB database access tutorial.
Required modules
You will need the API Integration and Data Studio modules. Check your modules at Admin > Licenses. To obtain these modules, contact your IXON account manager or IXON distributor.
This section explains the basics of historical data, including what it is and how it works. But what if we wanted to speed up and/or automate the data extraction process, or maybe adapt a custom feature to the workflow? To tackle these problems, you can use pure Python code to manipulate and aggregate it by calling the DataList endpoint and use IXON's premade aggregators. This article will explain how to use some API endpoints in combination with Python to retrieve and structure historical data. Complete and working code snippets will be provided along the way.
Before you start
To create your project, this is what you will be need:
- Python 3.13 or higher β can be downloaded from the official website. This tutorial uses Python 3.14.3;
- A code editor β it is recommended to use one that understands Python, such as Microsoft Visual Studio Code or JetBrains PyCharm Community. This tutorial uses Visual Studio Code.
- A basic understanding of Python.
Note: dependencies and virtual environment
For this example, the creation of a virtual environment was not needed since the required packages were installed on the global Python interpreter, later selected in the code editor.
If you wish to, it is also possible to create an isolated virtual environment within the project and install the required packages there: in this case, you will need a requirements.txt file in the project folder containing the imports and their versions.
After setting up your environment, follow the upcoming steps to create your project.
Step 1 - Create the .env file
The data you need for the API calls will be safely stored in a .env file and therefore available throughout the whole project. Here are their values and how to get them:
API_VERSION
(string): the default value is "2";API_APPLICATION_ID
(string): your personal applicationId. If you do not have one yet, refer to this documentation section;AUTHORIZATION
(string): your bearer token. To get one, refer to this documentation section. Keep in mind that this string must have the word "Bearer" at the beginning: "Bearer$generated_bearer_token1
";COMPANY_ID
(string): your companyId. Refer to this documentation section to get it;AGENT_ID
(string): the chosen agentId. Refer to this documentation section to get it;DATA_SOURCE_ID
(string): the chosen dataSourceId. Refer to this documentation section (Data sources subsection) to get it. Note that this variable is optional, check this paragraph for further details;TAG_IDENTIFIER
(string): the slug of the tag. Refer to this documentation section (Tags subsection) to get it. This example uses aslug
value, but alternatively it is possible to use atagId
if you prefer.
Please note: in this project, these values are hard-coded for a simpler demonstration, but you are of course free to apply different kinds of data manipulation to the code and make it dynamic instead.
Your .env file should look roughly like this:
API_VERSION = "2"
API_APPLICATION_ID = "$yourApplicationId"
AUTHORIZATION = "Bearer $yourBearerToken"
COMPANY_ID = "$yourCompanyId"
AGENT_ID = "$yourAgentId"
DATA_SOURCE_ID = "$yourDataSourceId"
TAG_IDENTIFIER = "$yourTagIdOrSlug"
Now, all that is left to do is replacing the lines preceded by the $ with your values of choice.
Note about tag identification
Do not confuse
tagId
with the tag'spublicId
, as they are two different things:tagId
is used for building queries and communicating with the database, whereas thepublicId
is used for other purposes concerning REST APIs.
If you are following this tutorial and do not want to useslug
, then you must usetagId
.
Important: atagId
is anint
type of value!
Step 2 - Create the request_data.py file
This file will contain all of the logic we need to build our workflow:
-
Import requests , os and dotenv and declare the variables
We will start by importing the needed packages. We will need requests to build our API calls, json to properly format a POST request, os to access the .env file and dotenv to load the values inside of it:
import requests import json import os from dotenv import load_dotenv
We will now load and access the values of our variables by using load_dotenv(), which will allow us to create new instances and assign the correct values to them:
load_dotenv() API_VERSION = os.getenv('API_VERSION') API_APPLICATION_ID = os.getenv('API_APPLICATION_ID') AUTHORIZATION = os.getenv('AUTHORIZATION') COMPANY_ID = os.getenv('COMPANY_ID') AGENT_ID = os.getenv('AGENT_ID') DATA_SOURCE_ID = os.getenv('DATA_SOURCE_ID') TAG_IDENTIFIER = os.getenv('TAG_IDENTIFIER')
-
Create the get_discovery function
Just as mentioned in this documentation section, the Discovery endpoint is used to get an up-to-date list of all endpoints. Therefore, we will create a dictionary containing the
rel
andhref
fields, which consist of the name (key) and URL of an endpoint (value) respectively:
def get_discovery(): response = requests.get( "https://portal.ixon.cloud/api/", headers={ "Api-Version": API_VERSION, "Api-Application": API_APPLICATION_ID, }, ) if response.status_code != 200: raise Exception( f"Failed to fetch discovery data. Status code: {response.status_code}" ) return {row["rel"]: row["href"] for row in response.json().get("data", [])}
Discovery: {'AccessRecoverList': 'https://portal.ixon.cloud/api/access-recover', 'AccessTokenList': 'https://portal.ixon.cloud/api/access-tokens', 'AccessToken': 'https://portal.ixon.cloud/api/access-tokens/{publicId}', 'AgentList': 'https://portal.ixon.cloud/api/agents', ...more data...}
-
Optional: create the get_data_sources_list function
The get_discovery function will build the request for the AgentDataSourceList API call.
This function is optional in our example since we hard-coded the tag's identifier in the .env file, without having to loop through the list of data sources. We will only need to extract thepublicId
andname
fields:
def get_data_sources_list(): discovery_dict = get_discovery() url = discovery_dict["AgentDataSourceList"].format(agentId=AGENT_ID) response = requests.get( url, headers={ "Api-Version": API_VERSION, "Api-Application": API_APPLICATION_ID, "Authorization": AUTHORIZATION, "Api-Company": COMPANY_ID, }, ) if response.status_code != 200: raise Exception( f"Failed to fetch Data Sources. Status code: {response.status_code}" ) data_sources = response.json().get("data", []) assert len(data_sources) > 0, "No data sources found." return { row["publicId"]: row for row in data_sources if "publicId" in row and "name" in row }
Data Sources: [{'publicId': '$dataSourceId', 'name': 'Data source'}, {'publicId': '$dataSourceId', 'name': 'Data source'}, {'publicId': '$dataSourceId', 'name': 'MQTT'} ...more data...]
If you want to select a specific data-source or add other conditions, all you need to do at this point is modify the URL and add a
filter
in the URL and an additional condition into line 28, so that you can retrieve a tag for a Data Source of choice:
def get_data_tags(): data_sources = get_data_sources_list() chosen_data_source = data_sources.get(DATA_SOURCE_ID) discovery_dict = get_discovery() tags_url = discovery_dict["AgentDataTagList"].format(agentId=AGENT_ID) tags_url += f'?filters=eq(source.publicId, "{DATA_SOURCE_ID}")' tags_url += "&fields=tagId,slug,retentionPolicy,variable.type" response = requests.get( tags_url, headers={ "Api-Version": API_VERSION, "Api-Application": API_APPLICATION_ID, "Authorization": AUTHORIZATION, "Api-Company": COMPANY_ID, } ) if response.status_code != 200: raise Exception( f"Failed to fetch data tags. Status code: {response.status_code}" ) data = response.json().get("data", []) for tag in data: if chosen_data_source["publicId"] == DATA_SOURCE_ID and tag["slug"] == TAG_IDENTIFIER: return tag print("DEBUG TAG RESULT: ", tag) return None
Tags: {'variable': {'publicId': '$variablePublicId', 'type': 'str'}, 'publicId': '$tagsPublicId', 'tagId': $tagsId, 'slug': '$tagsSlug', 'retentionPolicy': '$retentionPolicy'}
-
Optional: create the get_data_tags function
Subsequently, this function is also optional in our case since we already have a
TAG_IDENTIFIER
. If you want to select a specific tags for a data source or more, or if you want to add other conditions, all you need to do at this point is create the get_data_tags function, fetch your desired data source, add afilter
in the AgentDataTagList endpoint URL and an additional condition with a loop into line 26 of get_data_tags, so that you can retrieve a tag for a Data Source of choice:
def get_data_tags(): data_sources = get_data_sources_list() chosen_data_source = data_sources.get("$dataSourceName") discovery_dict = get_discovery() tags_url = discovery_dict["AgentDataTagList"].format(agentId=AGENT_ID) tags_url += f'?filters=eq(source.publicId, "{chosen_data_source}")' tags_url += "&fields=tagId,slug" response = requests.get( tags_url, headers={ "Api-Version": API_VERSION, "Api-Application": API_APPLICATION_ID, "Authorization": AUTHORIZATION, "Api-Company": COMPANY_ID, } ) if response.status_code != 200: raise Exception( f"Failed to fetch data tags. Status code: {response.status_code}" ) data = response.json().get("data", []) for tag in data: if chosen_data_source["publicId"] == "$dataSourceName" and tag["slug"] == "$tagSlug": return tag return None
get_tag_id_and_retention_policy response: ('$variableType_$tagId', 'rp_$retentionPolicyValue')
-
Create the get_data function
Finally, we will now use the functions we have created earlier to call the DataList endpoint. We will create a dictionary using the get_discovery function, initialize the data points as JSON and build the payload. To convert the payload (which is a Python object) we will use json. On line 42, json will serialize the object into a JSON-formatted string, will set up the Content-Type: application/json header automatically and will send the JSON string as the body of the POST request:
def get_data(start_time, end_time): discovery_dict = get_discovery() data_url = discovery_dict["DataList"] data_url += f'?filters=eq(source.publicId, "{DATA_SOURCE_ID}")' payload = [ { "source": { "publicId": DATA_SOURCE_ID }, "start": start_time, "end": end_time, "timeZone": "utc", "tags": [ { "slug": TAG_IDENTIFIER, "preAggr": "raw", "queries": [ { 'ref': TAG_IDENTIFIER, 'limit': 10, 'offset': 0 } ] } ] } ] response = requests.post( data_url, headers={ "Api-Version": API_VERSION, "Api-Application": API_APPLICATION_ID, "Authorization": AUTHORIZATION, "Api-Company": COMPANY_ID, }, json = payload ) if response.status_code != 200: raise Exception( f"Failed to fetch data tags. Status code: {response.status_code}" ) points = response.json().get("data", []) print("Data points:", json.dumps(points, indent=2)) return points
Data points: [ { "start": "2025-01-01T10:11:09Z", "end": "2025-07-07T10:11:09Z", "timeZone": "UTC", "source": { "publicId": "$sourcePublicId", "reference": { "name": "source" } }, "points": [ { "time": "2025-03-26T11:11:43.560Z", "values": { "$tagSlug": true } }, { "time": "2025-03-26T11:11:08.140Z", "values": { "$tagSlug": false } }, { "time": "2025-03-06T11:55:39.050Z", "values": { "$tagSlug": true } }, ... more data points ... ] } ]
Step 3 - Call the function!
Now we can finally set up a start time and an end time, assign them as inputs to our function and get the data points:
if __name__ == "__main__":
# Replace with real time values as needed
start_time = "2025-01-01T10:11:09Z"
end_time = "2025-07-07T10:11:09Z"
get_data(start_time, end_time)
Updated about 18 hours ago