Use Node to parse data to firestore

April 26 2018

NodeFirebase

I'm working on a mini node server work with firebase to handle the data transaction. It fetches a large amount data files on daily bases, and compare with existing ones to update, and expose APIs to serve data based on requests from my mobile application.

There are some notices:

  1. The data is JSON in plain text, one JSON object for each lines, it's not a pure JSON object.
  2. There are multiple files with index in their names.
  3. Need to parse line by line, file by file, async.
  4. Firestore has limitation on usage, need to use its batch method to reduce the cost.

Data format

As we know that the JSON file requires only one root, so it should be

{
    "a":xxxx,
    "b":{
        "c":xxxx
    }
}

or

[
    {"a":xxxx},
    {"b":xxxx}
]

But in the data file the format is

{"a":xxxx}
{"b":xxxx}

This format is used on purpose because one important thing to consider in data handling is the size of file. Clearly the data format above is not in JSON format, and the reason is the size of data file we have is too large.

Many of us who uses API to fetch data, usually the size of data is not large because we could predefine parameters to limit the amount so that we just request and read the whole JSON object which has one root.

But it becomes an issue on the server where the data could be large, for example you don't want the system to read a 1000mb JSON object at once and save it in RAM to start.

fs.readline

To work with the data I have, there's only one object in each line, so it's split by '\n'. I need to cut the data into pieces, then for each piece, read and put into 'Buffer' one by one.

I could use a module 'readlne' from node to make it easier.

Most of the use cases of 'readline' is command line handling. To use it in this case, we change the input from 'Standard Input' to our file read stream.

const data = "./public/data.json";
const fs = require("fs");
const readline = require("readline");

const rl = readline.createInterface({
  input: fs.createReadStream(data)
});

let i = 0;
rl.on("line", line => {
    console.log (JSON.parse (line));
  i++;
});

Now it will read line by line from 'one' file.

Check file exist

fs.statSync is the sync version of fs.stat.

const validateFileExist = path => {
  try {
    return fs.statSync(path).isFile();
  } catch (e) {
    return false;
  }
};

It check target file's existence and returns a boolean based on the dir path, so that I could use this method to validate in the main function.

let fileIndex = 1;
  while (fileIndex > 0) {
    let fileName =
      "test_" + fileIndex + ".json";
    let dataPath = path.join(__dirname, dataLocation, fileName);
    if (validateFileExist(dataPath)) {
      dataParse(dataPath,db);
      fileIndex++;
    } else {
      fileIndex = -1;
    }
  }

So the method will go through target dir and look for the file that matches the name.

Data parser

To send data to firestore, the easiest way is ref.set({id:JSON.parse(lineFromFile)}). But this also means for every single object, there will be one write I/O with firestore. And there's a commit limitation of firestore's batch update => max size:10mb, max doc amount: 500, max write rate: 500/s

So saving big data directly into firestore is expensive and may eat up all the usage when data size increase.

Now it's the time for firestore's batced writes jump in to solve it and reduce the cost.

With batch, the times of writing is reduced to…well as you could guess, 1/500 (if total amount not execute 10MB).

var batch = db.batch();
var nycRef = db.collection('cities').doc('NYC');
batch.set(nycRef, { name: 'New York City' });
var sfRef = db.collection('cities').doc('SF');
batch.update(sfRef, { population: 1000000 });
var laRef = db.collection('cities').doc('LA');
batch.delete(laRef);
// Commit the batch
return batch.commit().then(function () {
  // ...
});

First attempt

const dataParse = (filePath, fileIndex, db) => {
  let petsArray = []
  // firestore write limit for single batch => size:10MB, doc amount: 500, rate: 1/s
  const docAmountPerBatch = setting.firestore.docAmountPerBatch;
  const docRef = db.collection(setting.firestore.petsCollectionName);
  const rl = readline.createInterface({
    input: fs.createReadStream(filePath),
    terminal: false
  });

  rl.on("line", line => {
    let petObj = JSON.parse(line);
    petsArray.push(petObj);
  });

  rl.on("close", () => {
    console.log("========================================");
    console.log("doc amount for each batch: ", docAmountPerBatch);
    console.log("========================================");
    console.log("petsArray length: ", petsArray.length);
    let lowerLimit = 0;
    let upperLimit = docAmountPerBatch;
    while (lowerLimit < upperLimit && lowerLimit < petsArray.length) {
      // create one batch each cycle and then save data into it 
      let batch = db.batch();
      let temp = petsArray.slice(lowerLimit, upperLimit);
      temp.map(pet => {
        let ref = docRef.doc(pet.animalID);
        batch.set(ref, pet);
      });
      //after saving, commit this batch. One batch can only be committed once
      batch
        .commit()
        .then(result => {
          console.log("result: committed doc amount --", result.length);
        })
        .catch(err => {
          console.log("err", err);
        });
      console.log("commit: file number ", fileIndex);
      console.log(" -- from ", lowerLimit, " -- to ", upperLimit);
      // increase the boundary to start a new batch
      lowerLimit = upperLimit;
      upperLimit += docAmountPerBatch;
    }
  });
};

This works, but you may find something is not quite right. I was saving all the objects into an array at once, and this will cause problem if the file grows large. And what if there are more than one file? In fact I have 10+ files and will add more in the future. Imaging the process start 10+ streams asynchronously, and each of them creates a large array, this will eat up all the memory.

err { Error: 4 DEADLINE_EXCEEDED: Deadline Exceeded
    at new createStatusError (/Users/congli/Projects/rn-projects/rn-pet-server/node_modules/grpc/src/client.js:64:15)
    at /Users/congli/Projects/rn-projects/rn-pet-server/node_modules/grpc/src/client.js:583:15
  code: 4,
  metadata: Metadata { _internal_repr: {} },
  details: 'Deadline Exceeded' }

Be aware, readline.pause does not stop the stream immediately due to the nature of node and design of readline.

The program is async with multiple threads, but it's sync in each stream's thread — the readline will be pause when reaches the batch limit, then commit the batch to firestore, the resume method is passed as callback if succeed.

Second attempt

const dataParse = (filePath, fileIndex, db) => {
  let petsArray = [];
  let total = 0;
  // firestore write limit for single batch => size:10MB, doc amount: 500, rate: 1/s
  const docAmountPerBatch = setting.firestore.docAmountPerBatch;
  const docRef = db.collection(setting.firestore.petsCollectionName);
  const rl = readline.createInterface({
    input: fs.createReadStream(filePath),
    terminal: false
  });
  let linePointer = 1;
  rl.on("line", line => {
    let petObj = JSON.parse(line);
    petsArray.push(petObj);
    if (linePointer % docAmountPerBatch === 0) {
      console.log(
        `Pause: file(${fileIndex})'s ReadStream batch limit(${docAmountPerBatch}) reached. -- Line Pointer: ${linePointer} -- temp array size: ${
          petsArray.length
        }`
      );
      rl.pause();
    }
    linePointer++;
  });
  rl.on("pause", () => {
    let batch = db.batch();
    petsArray.map(pet => {
      let ref = docRef.doc(pet.animalID);
      batch.set(ref, pet);
    });
    batch
      .commit()
      .then(result => {
        console.log(
          `Resume: file(${fileIndex}) committed ${result.length} documents`
        );
        total += petsArray.length;
        // after committed batch, reset the temp array for next use.
        petsArray = [];
        rl.resume();
      })
      .catch(err => {
        console.log("err", err);
      });
  });
  rl.on("close", () => {
    console.log("========================================");
    console.log(`file(index ${fileIndex} on close, total ${total} objects commit succeed`);
    console.log("========================================");
  });
};

Check the log blow

Pause: file(7)'s ReadStream batch limit(480) reached. -- Line Pointer: 480 -- temp array size: 480
Pause: file(4)'s ReadStream batch limit(480) reached. -- Line Pointer: 480 -- temp array size: 480
Pause: file(6)'s ReadStream batch limit(480) reached. -- Line Pointer: 480 -- temp array size: 480
Pause: file(5)'s ReadStream batch limit(480) reached. -- Line Pointer: 480 -- temp array size: 480
Pause: file(2)'s ReadStream batch limit(480) reached. -- Line Pointer: 480 -- temp array size: 480
Pause: file(3)'s ReadStream batch limit(480) reached. -- Line Pointer: 480 -- temp array size: 480
Pause: file(1)'s ReadStream batch limit(480) reached. -- Line Pointer: 480 -- temp array size: 480
Resume: file(6) committed 480 documents
Resume: file(7) committed 480 documents
Resume: file(4) committed 480 documents
Resume: file(5) committed 480 documents
Pause: file(7)'s ReadStream batch limit(480) reached. -- Line Pointer: 960 -- temp array size: 472
Pause: file(6)'s ReadStream batch limit(480) reached. -- Line Pointer: 960 -- temp array size: 476
Pause: file(4)'s ReadStream batch limit(480) reached. -- Line Pointer: 960 -- temp array size: 475
Pause: file(5)'s ReadStream batch limit(480) reached. -- Line Pointer: 960 -- temp array size: 478
Resume: file(3) committed 480 documents
Resume: file(2) committed 480 documents
Resume: file(1) committed 480 documents
Resume: file(7) committed 472 documents
Pause: file(7)'s ReadStream batch limit(480) reached. -- Line Pointer: 1440 -- temp array size: 471
Pause: file(3)'s ReadStream batch limit(480) reached. -- Line Pointer: 960 -- temp array size: 480