If the original data is spread out in a number of files, we can read them in parallel (on multi-core) machines to save time. Ideally. However, it seems a bit tricky with python.

Let’s assume we have a bunch of hdf5 files, containing transactional data of cryptocurrencies on Bitmex. Presumably the data is acquired via the API provided by Bitmex. Due to technical reasons, e.g., network interruption, the data files begin with different key appended with indicies. I put all the hdf5 data files in a folder /data/Shared/order_book/hdf5/,

BITMEX<key>.1.h5quote
BITMEX<key>.2.h5quote
BITMEX<key>.4.h5quote
BITMEX<key>.5.h5quote

It makes sense to define some strings given the situation

data_home = '/data/Shared/order_book/hdf5/'
prefix = 'BITMEX'
postfix = 'h5quote'
product = 'XBTUSD'

All the necessary packages are imported as the follows

import os, time, math
from sys import getsizeof
from datetime import datetime, timedelta
import h5py as hdf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates

Data files integrety check

First, list all the files and identify all the files in file_list

file_list = [f for f in os.listdir(data_home) if (prefix in f) and (postfix in f)]

From file_list we get all the distinctive keys in key_list.

files = dict.fromkeys([f.split('.')[0] for f in file_list])
key_list = list(files.keys())

The files are manually uploaded. For example, if only files BITMEX<key>.1.h5quote, BITMEX<key>.2.h5quote, BITMEX<key>.4.h5quote and BITMEX<key>.5.h5quote are uploaded, we should point out that this key with index 3 is missing.

for key in files:
    indicies = sorted([int(f.split('.')[1]) for f in file_list if key in f])
    if indicies[len(indicies)-1]!=len(indicies)-1:
        # The files are manually uploaded, with possibility of missing 
        print(key, " misses ", 
              [i for i in range(len(indicies)) if i not in indicies], 
              " out of ", len(indicies)-1, "files.")
        # Choose not to process a key with incomplete file list.
        del files[key]
    else:
        files[key] = indicies

Read the data sequentially

This is straightforward, with the only trouble from issues in the data files. Now, we define a function to read data.

def read_hdf5(key):
    start = time.time()
    try:
        # some file may be broken
        df[key] = pd.concat([pd.read_hdf(data_home + key + '.' + str(i) + '.' + postfix,
                                    key=product)
                        for i in files[key]],
                       ignore_index=True)
        print(key, "has",
              len(df[key]), "lines of records, taking %d seconds" %
              (time.time()-start))
        print("Start at:", start, "end at:", datetime.fromtimestamp(time.time()))
    except OSError as e:
        print(e)

Now, we read the files sequentially and see how much time it takes

print("Start at:", datetime.fromtimestamp(time.time()))

df = {}
for key in files:
    read_hdf5(key)

print("End at:", datetime.fromtimestamp(time.time()))

Ok, that was easy. But this takes too long to run. For about three week’s data, it takes 6 minutes using a server with 88 cores. Right, such a code does not take advantage of the multi-cores.

Start at: 2020-02-1 17:05:26.604005
BITMEX<key1> has 78786430 lines of records, taking 50 seconds
BITMEX<key2> has 7526730 lines of records, taking 46 seconds
BITMEX<key3> has 1906630 lines of records, taking 1 seconds
BITMEX<key4> has 72682736 lines of records, taking 44 seconds
BITMEX<key5> has 114284897 lines of records, taking 71 seconds
...
End at: 2020-02-1 17:11:30.964797

Parallel Processing

We will try a few different ways to do this. First, need import the following packages.

from multiprocessing import Pool, Process, Manager

With huge memory compsumption

Now, define a function to read a data file.

def read_hdf5(file):
    start = time.time()
    try:
        # some file may be broken
        df = pd.read_hdf(data_home + file, key=product)
        print(file, "has",
              len(df), "lines of records, taking %d seconds" % 
              (time.time()-start))
        print(datetime.fromtimestamp(time.time()))
        return file, df
    except OSError as e:
        print(e)

The simplest parallel reading is probably to use Pool by doing

print("Start at:", datetime.fromtimestamp(time.time()))
start = time.time()
pool = Pool(8)
df_collection = pool.map(read_hdf5, file_list)
pool.close()
pool.join()
print("End at:", datetime.fromtimestamp(time.time()))

So for each data file bitmex<key>.id.h5quote, we read it into a dataframe stored in the list df_collection. We need to concatinate dataframes for each key according to file index order.

print("Start at:", datetime.fromtimestamp(time.time()))

df = {}
for key in files:
    df_list = []
    for i in files[key]:
        df_list.extend([dfc[1] for dfc in df_collection if key+'.'+str(i)+'.' in dfc[0]])
    df[key] = pd.concat(df_list, ignore_index=True)
    print(key, "has", len(df[key]), "records.")    

print("End at:", datetime.fromtimestamp(time.time()))

The above two steps cost in total 2 minutes, a three-times speed up. Sounds good? Yes, but this is at the cost of huge memory consumption. As shown in htop, the memory consumption has gone from 380G to 407G just by running the parallel data reading and store all dataframe in the list df_collection. Note that this server has 1T of memory, larger than the harddrive of mainstream desktop machines. The 27G memory consumption by this simple step is larger than most consumer level PC or Mac.

drawing

After concatinating all dataframes in df_collection accoding to each key, and storing the concatinated dataframes into df[key], an additional 35G memory is consumed. For Python, do not expect the so-called garbage collection

import gc 
gc.collect()

can help even a little bit. Neither can you expect to use del df_collection to achieve anything. Well, if you do

for i in range(len(df_collection), 0, -1):
    del(df_collection[i-1])

You will see some memory freed, roughly 15G. But remember 27G was consumed after reading files into the list of dataframes df_collection.

More on the parallel processing. You cannot naively expect to speed it up further by assigning more processes, e.g., pool = Pools(40) instead of Pool(8), since it takes time to start and close processes at least. 8 is the optimal number for this machine with 88 cores based on experiments of reading 300 data files with drastically different sizes.

Try to have a more memory-efficient solution

In Python, you can use Manager() as an agent to return valude from multiprocessing. For that, you need to redefine the readdata function.

def read_hdf5_return(key, df):
    start = time.time()
    try:
        # some file may be broken
        df[key] = pd.concat([pd.read_hdf(data_home + key + '.' + str(i) + '.' + postfix, 
                                    key=product)
                        for i in files[key]], 
                       ignore_index=True)
        print(key, "has",
              len(df[key]), "lines of records, taking %d seconds" % 
              (time.time()-start))
        print("Start at:", datetime.fromtimestamp(start), 
              "end at:", datetime.fromtimestamp(time.time()))
    except OSError as e:
        print(e)

Ok, now you can call the read data function with return value.

print("Start at:", datetime.fromtimestamp(time.time()))

manager = Manager()
df = manager.dict()
jobs = []

for key in files:
    p = Process(target=read_hdf5_return, args=(key,df))
    jobs.append(p)
    p.start()

for proc in jobs:
    proc.join()

print("End at:", datetime.fromtimestamp(time.time()))

It all works fine

BITMEX<key5> has 1906630 lines of records, taking 2 seconds
BITMEX<key2> has 1385198 lines of records, taking 1 seconds
BITMEX<key3> has 1322821 lines of records, taking 1 seconds
BITMEX<key4> has 2178846 lines of records, taking 2 seconds
BITMEX<key1> has 2377136 lines of records, taking 2 seconds
BITMEX<key7> has 1282649 lines of records, taking 1 seconds
BITMEX<key9> has 2322173 lines of records, taking 3 seconds
BITMEX<key6> has 3151901 lines of records, taking 3 seconds
BITMEX<key8> has 5223906 lines of records, taking 5 seconds
BITMEX<key11> has 14672360 lines of records, taking 14 seconds
BITMEX<key18> has 13753659 lines of records, taking 13 seconds
BITMEX<key20> has 17142423 lines of records, taking 17 seconds
BITMEX<key13> has 24016635 lines of records, taking 23 seconds
BITMEX<key12> has 34486869 lines of records, taking 32 seconds

till I encounter a key with huge data. And Python throw an error

Process Process-15:
Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-15-8b50676581a3>", line 8, in read_hdf5
    ignore_index=True)
  File "<string>", line 2, in __setitem__
  File "/usr/lib/python3.7/multiprocessing/managers.py", line 795, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.7/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647

OK. So it remains an open problem:

How to read into multiple data frame in parallel without wasting memory?