Post

Query data from Log Analytics using Python

In this tutorial I will show you how to query data from Log Analytics using Python. For this to work you need to have a Log Analytics workspace and a Service Principal with access to the workspace.

Prerequisites

  • Log Analytics workspace
  • Service Principal with access to the workspace

If you are wondering what permissions the Service Principal needs, the Log Analytics Reader role is enough.

Let’s build

We will be only using the requests and the json module for this tutorial. They are already packaged with Pyhton, so you don’t have to install anything. But first we need to import them:

1
2
import json
import requests

We want to build a class that will handle the authentication and the requests to the Log Analytics API. We will call it LogAnalyticsReader. The class will have three methods: __init__, __get_token and query_data. The __init__ method will initialize the class and call the __get_token method, which will return an access token from Azure AD. The query_data method will query the Log Analytics API and return the results.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class LogAnalyticsReader:
    """Query data from Log Analytics"""

    def __init__(self, tenant_id, client_id, client_secret):
        # Initialize the class
        self.tenant_id = tenant_id
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token = self.__get_token()
    
    def __get_token(self):
        # Get an access token from Azure AD

    def query_data(self, workspace_id, query):
        # Query the Log Analytics API

Authenticate with Azure AD

As we already defined the variables in the __init__ method, we can use them to authenticate with Azure AD. We will use the requests module to send a POST request to the Azure AD token endpoint. The request will contain the client ID, client secret, tenant ID and the resource URI. The resource URI is the same for all Log Analytics workspaces, so you can just copy it. The response will contain the access token, which we will return from the method.

1
2
3
4
5
6
7
8
9
def __get_token(self):
    url = "https://login.microsoftonline.com/" + self.tenant_id + "/oauth2/token"
    payload = "grant_type=client_credentials&client_id=" + self.client_id + "&client_secret=" + self.client_secret + "&resource=https%3A%2F%2Fapi.loganalytics.io%2F"
    headers = {
        'Content-Type': "application/x-www-form-urlencoded",
        'cache-control': "no-cache"
        }
    response = requests.request("POST", url, data=payload, headers=headers, timeout=30)
    return response.json()['access_token']

Query the Log Analytics API

Now that we have the access token, we can query the Log Analytics API. We will use the requests module again to send a POST request to the Log Analytics API. The request will contain the access token in the header and the query in the body of the request. The response will then contain the results formatted in JSON, which we will return from the method.

1
2
3
4
5
6
7
8
9
10
11
12
13
def query_data(self, workspace_id, query):
        url = "https://api.loganalytics.azure.com/v1/workspaces/" + workspace_id + "/query" + "?timespan=P1D"
        payload = {
                    "query": query
        }
        headers = {
            'Authorization': "Bearer " + self.access_token,
            'Content-Type': "application/json"
            }
        response = requests.request("POST", url, data=json.dumps(payload), headers=headers, timeout=30)
        response.raise_for_status()
        
        return response.json()

Try it out

If you want to use this class, you can use the following code:

1
2
3
4
5
6
7
8
9
# Initialize the class
log_analytics_reader = LogAnalyticsReader(tenant_id, client_id, client_secret)
# Query the Log Analytics API
workspace_id = "yourWorkspaceId"
query = """IntuneDevices 
            | where DeviceName == 'DeviceName' 
            | project DeviceName, DeviceId
            | limit 10"""
results = log_analytics_reader.query_data(workspace_id, query)

The great thing about it is that you can use it in any Python script without great effort. You can even use it in a Jupyter Notebook, to help you in your data analysis. </br> But the best thing about it is that it works your KQL queries, that you meticulously crafted in the Azure Data Explorer.

The full code

For your convenience and all the copy-pasters out there, here is the full code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import json
import requests

class LogAnalyticsReader:
    """Class to query data from Log Analytics"""

    def __init__(self, tenant_id, client_id, client_secret):
        self.tenant_id = tenant_id
        self.client_id = client_id
        self.client_secret = client_secret
        self.access_token = self.__get_token()
    
    def __get_token(self):
        url = "https://login.microsoftonline.com/" + self.tenant_id + "/oauth2/token"
        payload = "grant_type=client_credentials&client_id=" + self.client_id + "&client_secret=" + self.client_secret + "&resource=https%3A%2F%2Fapi.loganalytics.io%2F"
        headers = {
            'Content-Type': "application/x-www-form-urlencoded",
            'cache-control': "no-cache"
            }
        response = requests.request("POST", url, data=payload, headers=headers, timeout=30)
        return response.json()['access_token']

    def query_data(self, workspace_id, query):
        url = "https://api.loganalytics.azure.com/v1/workspaces/" + workspace_id + "/query" + "?timespan=P1D"
        payload = {
                    "query": query
        }
        headers = {
            'Authorization': "Bearer " + self.access_token,
            'Content-Type': "application/json"
            }
        response = requests.request("POST", url, data=json.dumps(payload), headers=headers, timeout=30)
        response.raise_for_status()
        
        return response.json()

Final words

I hope this tutorial helped you to query data from Log Analytics using Python. If you have any questions or suggestions, feel free to join the discussion below.