How to retrieve historical data from InfluxDB using IXON's API Endpoints

In this article, we will show an example of a Python application able to query the Database directly with only a few API calls.

๐Ÿ“˜

Required module

You will need the API Integration and Data Studio modules. Check your modules at Admin > Licenses. To obtain these modules, contact your IXON account manager or IXON distributor.

Eventually you might need to extract historical data using a specific, customized query. In this tutorial, you will find the steps to build a project that will allow the querying of your own InfluxDB database directly through pure Python code. Complete and working code snippets will be provided along the way.

Before you start

To create your project, this is what you will be need:

  • Python 3.13 or higher โ€“ can be downloaded from the official website. This tutorial uses Python 3.14.3;
  • A code editor โ€“ it is recommended to use one that understands Python, such as Microsoft Visual Studio Code or JetBrains PyCharm Community. This tutorial uses Visual Studio Code.
  • A basic understanding of Python.

๐Ÿ“˜

Note: dependencies and virtual environment

For this example, the creation of a virtual environment was not needed since the required packages were installed on the global Python interpreter, later selected in the code editor.
If you wish to, it is also possible to create an isolated virtual environment within the project and install the required packages there: in this case, you will need a requirements.txt file in the project folder containing the imports and their versions.

Once you are all ready and set, you can start following this tutorial!

Step 1 - Create the client_builder.py file

The first thing to do consists of creating an instance of theInfluxDBClient object containing the correct data to access the database: we will need it further in this tutorial. To do this with Python, you will need to create a file containing the query_influxdb function, where the client's instance and the correct fields and their values will be stored.

Here are the steps to do so:

  1. Import the InfluxDBClient

    By importing the InfluxDBClient, you will be able to build a custom client and assign your query to it:

    from influxdb import InfluxDBClient
    
  2. Create the query_influxdb function

    The function's input parameter will be the query's string value. In this example, it also contains the following fields and their values:
    • host (string);
    • port (int);
    • username (string);
    • password (string);
    • database (string);
    • ssl (bool): if True, the client will connect using https:// and if False it will use http://. In this specific example, it will be set to True;
    • verify_ssl (bool): if True the client will check that the serverโ€™s SSL certificate is valid and trusted, if False then the client will not verify the SSL certificate. In this example case we will set it to False.


      Everything should look similar to this:

    def query_influxdb(query):
        host = "www.example.com"
        port = 12345
        username = "example-user1"
        password = "example-password1"
        database = "example-domain-name"
        ssl = True
        verify_ssl = False
    

๐Ÿšง

Note about protecting credentials

Your credentials (secrets) can be stored by using sturdier tools for encryption and protection. You can choose whatever suits your needs best.

  1. Create the client's constructor

    After the values are set, we will create a new instance of InfluxDBClient and assign the aforementioned fields respectively:

    client = InfluxDBClient(
            host,
            port,
            username,
            password,
            database,
            ssl,
            verify_ssl
        )
    

  2. Return the query's result

    Finally, return your query's result as a string:

        result = client.query(query)
        return result
    

In the end, the complete file should look like this:

from influxdb import InfluxDBClient

def query_influxdb(query):

    host = "www.example.com"
    port = 12345
    username = "example-user1"
    password = "example-password1"
    database = "example-domain-name"
    ssl = True
    verify_ssl = False
   
    client = InfluxDBClient(
        host,
        port,
        username,
        password,
        database,
        ssl,
        verify_ssl
    )
    
    result = client.query(query)
    return result

Step 2 - Create the .env file

To make sure that the data you need for the API calls is safely stored and available throughout the whole project, we will declare some constant variables in a .env file. This way, all the sensitive data will be stored in a secure manner. Here are their values and how to get them:

  • API_VERSION (string): the default value is "2";
  • API_APPLICATION_ID (string): your personal applicationId. If you do not have one yet, refer to this documentation section;
  • AUTHORIZATION (string): your bearer token. To get one, refer to this documentation section. Keep in mind that this string must have the word "Bearer" at the beginning: "Bearer $generated_bearer_token1";
  • COMPANY_ID (string): your companyId. Refer to this documentation section to get it;
  • AGENT_ID (string): the chosen agentId. Refer to this documentation section to get it;
  • DATA_SOURCE_ID (string): the chosen dataSourceId. Refer to this documentation section (Data sources subsection) to get it;
  • TAG_IDENTIFIER (string): the slug of the tag. Refer to this documentation section (Tags subsection) to get it. This example uses a slug value, but alternatively it is possible to use a tagId if you prefer.

Please note: in this project, these values are hard-coded for a simpler demonstration, but you are of course free to apply different kinds of data manipulation to the code and make it dynamic instead.

Your .env file should look roughly like this:

API_VERSION = "2"
API_APPLICATION_ID = "$yourApplicationId"
AUTHORIZATION = "Bearer $yourBearerToken"
COMPANY_ID = "$yourCompanyId"
AGENT_ID = "$yourAgentId"
DATA_SOURCE_ID = "$yourDataSourceId"
TAG_IDENTIFIER = "$yourTagIdOrSlug"

Now, all that is left to do is replacing the lines preceded by the $ with your values of choice.

โ—๏ธ

Note about tag identification

Do not confuse tagId with the tag's publicId, as they are two different things: tagId is used for building queries and communicating with the database, whereas the publicId is used for other purposes concerning REST APIs.
If you are following this tutorial and do not want to use slug, then you must use tagId.
Important: a tagId is an int type of value!

Step 3 - Create the query_api_calls.py file

Now it's time to create the query_api_calls.py file. As stated in its name, it will contain the API calls we need. Their responses will be used to subsequently build the query. Additionally, it contains a function used to format the variable type, tag and retention policy into strings needed in the query's syntax.

  1. Import requests , os and dotenv and declare the variables

    In this file, you will need to import requests, which we will need to create our API calls, and os alongside dotenv to load our constant variables from the .env file. These packages can be installed either globally in the interpreter or in the virtual environment of the project:

    import requests
    import config
    from dotenv import load_dotenv
    

    After importing what we need, we will load and access the values of our variables by using load_dotenv(), and this will allow us to create new variables and assign the correct values to them:

    load_dotenv()
    
    API_VERSION = os.getenv('API_VERSION')
    API_APPLICATION_ID = os.getenv('API_APPLICATION_ID')
    AUTHORIZATION = os.getenv('AUTHORIZATION')
    COMPANY_ID = os.getenv('COMPANY_ID')
    AGENT_ID = os.getenv('AGENT_ID')
    DATA_SOURCE_ID = os.getenv('DATA_SOURCE_ID')
    TAG_IDENTIFIER = os.getenv('TAG_IDENTIFIER')
    
  2. Create the get_discovery function

    Just as mentioned in this documentation section, the Discovery endpoint is used to get an up-to-date list of all endpoints. Therefore, we will create a dictionary containing the rel and href fields, which consist of the name (key) and URL of an endpoint (value) respectively:

    def get_discovery():
    
        response = requests.get(
            "https://portal.ixon.cloud/api/",
            headers={
                "Api-Version": API_VERSION,
                "Api-Application": API_APPLICATION_ID,
            },
        )
        if response.status_code != 200:
            raise Exception(
                f"Failed to fetch discovery data. Status code: {response.status_code}"
            )
    
        return {row["rel"]: row["href"] for row in response.json().get("data", [])}
    
    Discovery: {'AccessRecoverList': 'https://portal.ixon.cloud/api/access-recover', 'AccessTokenList': 'https://portal.ixon.cloud/api/access-tokens', 'AccessToken': 'https://portal.ixon.cloud/api/access-tokens/{publicId}', 'AgentList': 'https://portal.ixon.cloud/api/agents', ...more data...}
    
  3. Create the get_data_sources_list function

    Using our get_discovery function, we will build the request for the AgentDataSourceList API call. We will only need to extract the publicId and name fields:

    def get_data_sources_list():
    
        discovery_dict = get_discovery()
        url = discovery_dict["AgentDataSourceList"].format(agentId=AGENT_ID)
    
        response = requests.get(
            url,
            headers={
                "Api-Version": API_VERSION,
                "Api-Application": API_APPLICATION_ID,
                "Authorization": AUTHORIZATION,
                "Api-Company": COMPANY_ID,
            },
        )
        if response.status_code != 200:
            raise Exception(
                f"Failed to fetch Data Sources. Status code: {response.status_code}"
            )
    
        data_sources = response.json().get("data", [])
        assert len(data_sources) > 0, "No data sources found."
        return {
            row["publicId"]: row["name"]
            for row in data_sources if "publicId" in row and "name" in row
        }
    
    Data Sources: [{'publicId': '$dataSourceId', 'name': 'Data source'}, {'publicId': '$dataSourceId', 'name': 'Data source'}, {'publicId': '$dataSourceId', 'name': 'MQTT'} ...more data...]
    
  4. Create the get_data_tag_and_variable_type function

    In this function we will yet build another request, this time belonging to the AgentDataTagList endpoint. To build the URL, we will use the agent's ID value and we will filter based on the data source's ID value, both found in the config.py file. The fields we are recovering are slug, tagId, retentionPolicy and variable.type. We will need these last two values later in the tutorial.
    Please note: the publicId of the variable and that of the tag will be returned automatically without being added to the query fields.


    In this example, the return data will only be the tag whose slug value equals the global variable that we set up previously:

    def get_data_tag_and_variable_type():
    
        discovery_dict = get_discovery()
        tags_url = discovery_dict["AgentDataTagList"].format(agentId=AGENT_ID)
        tags_url += f'?filters=eq(source.publicId, "{DATA_SOURCE_ID}")'
        tags_url += "&fields=slug,retentionPolicy,tagId,variable.type"
    
        response = requests.get(
            tags_url,
            headers={
                "Api-Version": API_VERSION,
                "Api-Application": API_APPLICATION_ID,
                "Authorization": AUTHORIZATION,
                "Api-Company": COMPANY_ID,
            },
        )
    
        if response.status_code != 200:
            raise Exception(
                f"Failed to fetch data tags. Status code: {response.status_code}"
            )
        data = response.json().get("data", [])
    
        for tag in data:
            if tag["slug"] == TAG_IDENTIFIER:
                return tag
        return None
    
    Tag Data: {'variable': {'publicId': '$variablePublicId', 'type': '$variableType'}, 'publicId': '$tagPublicId', 'tagId': $tagId, 'slug': '$slugValue', 'retentionPolicy': '$retentionPolicyValue'}
    

๐Ÿ‘

Variable Type

The variable type can also be seen in the same section as the identifier and name mentioned in the "Note about slugs" tip in this paragraph.

  1. Create the get_tag_id_and_retention_policy function

    In this function, we will first use the previous function to retrieve the retention policy of a tag, the tagId and the type of the variable. These three values together will be formatted in two different strings needed in our query. In the response below, you can see that the variable's type is concatenated with the tag's ID, and the retention policy value is formatted so that it matches with its actual value in the database.

    def get_tag_id_and_retention_policy():
    
        tag = get_data_tag_and_variable_id()
        if tag is None:
            raise ValueError(
                f"No tag found for data_source={DATA_SOURCE_ID}, "
                f"tag_identifier={TAG_IDENTIFIER}"
            )
        retention_policy = tag["retentionPolicy"]
        variable_type = get_variable_type()
    
        response = variable_type + "_" + str(tag["tagId"]), "rp_" + retention_policy
        print("DEBUG get_tag_id_and_retention_policy response: ", response)
        return response
    
    get_tag_id_and_retention_policy response:  ('$variableType_$tagId', 'rp_$retentionPolicyValue')
    

Now you finally have all you need to query your database!

Step 4 - Create the query_caller.py file

This will be the final file we will create, where the query will be built and called.

  1. Import os, dotenv and the files you've created

    First, we must import os, dotenv, client_builder.py and query_api_calls.py, alongside the functions that we need:

    import os
    from dotenv import load_dotenv
    from client_builder import query_influxdb
    from query_api_calls import get_data_sources_list, get_tag_id_and_retention_policy
    
  2. Access the variables and create the build_and_call_query function

    Once more, we will need to load and access our variables, then we will build the query based on a given start_time and end_time. Lastly, the query_influxdb function will take the query as input and return the result:

    def build_query(start_time, end_time):
    
        data_sources = get_data_sources_list()
        if DATA_SOURCE_ID not in data_sources:
            raise ValueError(
                f"Data source with publicId {DATA_SOURCE_ID} not found!"
            )
    
        tag_id, retention_policy = get_tag_id_and_retention_policy()
    
        query = """
        SELECT "{}" AS "value", "time"
        FROM "$domain_name"."{}"."data"
        WHERE time >= '{}' AND time <= '{}'
        AND "device"='{}'
        """.format(
            tag_id, retention_policy, start_time, end_time, DATA_SOURCE_ID
        )
        result = query_influxdb(query)
        data_points = list(result.get_points())
        print("Data Points:", data_points)
        return data_points
    
        SELECT "$variableType_$tagId" AS "value", "time"
        FROM "$domain_name"."rp_$retentionPolicyValue"."data"
        WHERE time >= '2024-01-01T00:00:00Z' AND time <= '2024-12-31T23:59:59Z'
        AND "device"='$dataSourceId'
    
  3. Call the function!

    The last thing we need to do now is choosing the start_time and end_time and calling our function:

    if __name__ == "__main__":
        start_time = "2024-01-01T00:00:00Z"
        end_time = "2024-12-31T23:59:59Z"
        build_and_call_query(start_time, end_time)
    
    Points: [{"time": "2024-01-15T10:57:41.675000Z", "value": $value}, {'time': '2024-01-15T10:57:55.370000Z', 'value': $value}, ... more data points ...]
    

๐Ÿ‘

Using Chronograf to run the query

It is possible to run Chronograf on your browser to better visualize the query result. In our example, this is what it looks like in the form of a table: