Extracting high volume data from REST API's

Hello,

Can someone provide me a template on how to extract high volume data from a REST API using Python?

I need to:

  1. Pull id’s from multiple calls
  2. Use those id’s in different calls (by looping through them)
  3. Merging the obtained data into a dataframe
  4. Write the data to MSSQL database.

My code so far:

def session_token():
# Obtaining session token by posting a request to server

    url = "post endpoint"
    body = {"username":"username",
            "password":"pwd",
            "persistlogin": True
           }
    response = requests.request("POST", url, data = body)
    return response.json()["Token"]

def get_request(url):
# Retrieving data from the server

    headers = {"Authorization": "Token " + session_token()}
    response = requests.get(url, headers = headers)
    return response

def get_registrations():

    yield get_request("https://endpoint/registrations").json() # This call yields all registrations, I only need the registration ID. How can i return (or yield) a list of unique id's instead of returning all data? This is impossible to keep in memory.

def get_products():

yield (get_request("https://endpoint/{}/products".format(id)).json()) for id in get_registrations()) # I need to loop through the unique registration id's from get_registrations() function.


def main():

return pd.concat((json_normalize(row) for row in get_products()))

Would be great if you can post me some template that I can use. Also feedback on my code is welcome.

Finally I was hoping if there are data engineers around that can tutor / peer with me?

1 Like

I’m not a data engineer, but i’ll offer some ideas.

Use stream=True to prevent all results of get going into memory,
also try to find a way to extract only the id column by chaining a .json()['id'].
Not sure how to find unique id’s though without reading all id into memory, or editing the ids from the source before request.get

def get_registrations():
    yield get_request("https://endpoint/registrations",stream=True).json()

Rather than looping which is slow, you can speed up (10-100x depending on how many requests) your get requests with multithreading https://realpython.com/python-concurrency/. The get_products will be mapped to an iterable of ids, and will return a generator.

with concurrent.futures.ThreadPoolExecutor(max_workers = 20) as executor: # keep increasing workers until no time reduction
    executor.map(lambda args: get_products(*args), get_registrations())

Extras:

  1. Why get a new token for every get_request, this doubles your number of requests made by id?. Or is there some underlying caching i’m not aware of? You can try doing it once for each batch of ids.

  2. Can make code easier to read by factorizing BASE_URL = "https://endpoint" and compose endpoints later

2 Likes

Hey @LuukvanVliet

1.Use requests.Session() to store the session and the token in the common header, also the Session supports a connection to the server, which will speed up the data download
2.Where you do not need to get all the data from the response, enable the processing of results to save only id.
For example, if you have an item list in json() then

[elem['id'] for elem in response.json()]
  1. The @hanqi gave good advice to use concurrent.futures for parallel processing. But I’m more used to another format, from the official documentation. It simplifies error handling.
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    future_to_item = {executor.submit(parse_function, item): item for item in data}
  1. Look at the API documentation perhaps too much data has parameters for pagination.
  2. Since you have the json flag, Stream may not help you. Since one data match will have an invalid format for json. Consider the option of streaming to a file, and then creating parse logic for the file.
2 Likes

Thanks both! Will look into this soon and get back to you after implementing your advise :slight_smile:

So I would write it in general, but working with specific data can be better.

import requests

session = requests.Session()
workers = 5

def session_token():
    url = "post endpoint"
    body = {"username":"username",
            "password":"pwd",
            "persistlogin": True
           }
    response = session.post(url, data = body)
    session.headers["Authorization"] = "Token " + response.json()["Token"]
    
def get_registration_id(): 
    data = session.get("https://endpoint/registrations").json() # If there is a memory error here. then do a stream upload to the file and process the file.
    return list(set([elem['id'] for elem in data['products']])) # If you're sure you don't have a recurring id, then recommend this line and parse the following 
    #return (elem['id'] for elem in data['products']) # It creates a generator like with yield

def get_products(reg_id):
    return session.get(f"https://endpoint/{reg_id}/products").json()

def main():
    session_token()
    ids = get_registration_id()
    result_data = []
    counter = 0  
    with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:      
        future_to_item = {executor.submit(get_products, reg_id): reg_id for reg_id in ids}
        for future in concurrent.futures.as_completed(future_to_item):
            try:
                result_data.append(future.result()) # you might want to add more data processing here.
            except Exception as e:
                print(e, future_to_item[future])
            counter += 1
            if counter % 10:
                print(f'{counter}/{len(ids)}')
    
if __name__ == "__main__": 
    main()
2 Likes

Very helpful, thanks a lot @moriturus7

1 Like

Hi @moriturus7,

I’ve read myself into threading and multiprocessing.

Correct me if I’m wrong:

ThreadPoolExecutor runs each of your workers in separate threads within the main process.
ThreadPool is for I/O bound tasks, so you can benefit from I/O wait.

  • What do they mean with “workers”, CPU cores?

  • How can I determine the amount of workers?

  • After modifying above code it succesfully ran, however how can I access the results? I tried printing result_data but that leads to an error as somehow the variable does not exist into memory?

1 Like

Hi @LuukvanVliet

  1. In this case. Worker it is a function executed in a separate thread, we have it - get_products.

  2. The max_workers parameter sets the maximum number of workers. Do not consider Threads as the number of cores of your processor. This is a higher-level abstraction. Consider this as an example of how tasks are performed on a computer. In fact, your cores very quickly switch to different tasks at short intervals. But it seems to you that they are running at the same time. 5-20 workers is enough to work fast enough without overloading the API server.

ThreadPoolExecutor(max_workers=5)
  1. You must refer to result_data inside the main() function
    Here’s a simple code to check that you have everything working correctly.
def main():
    session_token()
    ids = get_registration_id()
    result_data = []
    counter = 0
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:      
        future_to_item = {executor.submit(get_products, reg_id): reg_id for reg_id in ids[:10]} #! [:10]
        for future in concurrent.futures.as_completed(future_to_item):
            try:
                result_data.append(future.result()) # you might want to add more data processing here.
            except Exception as e:
                print(e, future_to_item[future])
            counter += 1
            if counter % 10:
                print(f'{counter}/{len(ids)}')
    print(result_data ) #!
  1. you need to make sure you get ids. If you have this list, you can check its length. If it’s a generator, you can check the function separately. Or you can use next()
  2. Make a limited selection of data for testing. I gave an example for the list where the first 10 items are taken.
  3. Display the result_data.

If the code is running and does not output any error data and no results. It is most likely that no ids were obtained

When testing the code, remember to use Debug in your IDE

1 Like

Very clear again, big thanks.

Few more questions that I have difficulties understanding:

  1. ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.

I don’t understand what a pool of threads is to execute calls asynchronously, are you able to clarify this it seems so abstract. Does it mean like normally you would execute calls synchronously instead of asynchronously?

  1. What is the difference between a ThreadPoolExecutor and a ProcessPoolExecutor?

  2. What is a future in a thread pool?

Thanks!

1 Like

Hi @LuukvanVliet

  1. The thread pool is a group of workers who perform tasks. With the approach that I have shown you for multithreading, this will work in the following way. The worker takes the task from the list of tasks to be performed, executes it, and takes the next one until all the tasks are completed. Sometimes this is compared to punch at parties when people come up and collect their glasses as needed until the capacity runs out.
    Yes, we can say that usually our code is executed synchronously. Output input devices are external devices for our processor. Network card, keyboard, ports, etc. When we normally run the code, the processor turns to the device and synchronized by waits response from the device. When we execute code multithreaded, the processor creates a subtask that waits for a response from the device and there can be many such subtasks-threads.
    But in python there is a separate group of commands providing exactly asynchronous code execution using async await while the whole code is executed only in one thread.

  2. ThreadPoolExecutor - creates threads
    ProcessPoolExecutor - creates processes
    If you work with I/O, you create threads. If you need to parallelize the task calculation, you create processes. When talking about processes, you should take an approach - the number of worms should not exceed the number of cores on the processor.
    There are 3 types of competition in python.

  • Processes - for paralleling calculation tasks which require processor resources
  • Threads - for paralleling I/O tasks. Databases, Internet access, etc.
  • Asynchronous code with async-await - similar to the previous one. But Async-await programs are more difficult to write and debut. Even though it is the current trend in Python.
  1. The future is another abstraction. To be honest, I didn’t have a deep understanding of how they work, and I am a practitioner. But in fact, they are built on the change of the object’s state. Think of them as a high-level API to work with tasks that are in the queue for execution. But usually you only need to know if there are already results or not.

Maybe some of my explanations make things too easy. But I try to explain because I understand these things in simple language, so you can just use these tools.
Besides DataQuests has a course on multithreading in Data Engineering branches.

2 Likes

Big thanks once again. Took me a couple hours to understand the concepts of threading but I got it! Your great at explaining complex matter

1 Like

By the way I found the following on futures:

Futures represent the result of a task that may or may not have been executed

1 Like