If you’ve ever needed to merge data files from different sources, you know how infuriating the process can be. Not all systems produce the same formats and sometimes fields don’t match properly. Instead of dealing with these issues manually in Excel or another spreadsheet program, let's use Legato.
Friday, September 21. 2018
LDC #103: Bringing Data Together
In this example, we are going to take a CSV file that has partial information about registrants and then try to find additional information in a MySQL database. If the information isn’t available there, we will use the SEC’s company lookup. Finally, we will write the completed CSV file.
For this example, we are going to need a CSV file, a MySQL database, and the MySQL ODBC driver.
To get started, I will provide the CSV file, as shown below:
CIK,Registrant Name 1800,ABBOTT LABORATORIES 3570,CHENIERE ENERGY INC 4447,HESS CORP 5108,AMERICAN GENERAL LIFE INSURANCE CO 9015,BARINGS LLC 9235,BKF CAPITAL GROUP INC 11544,BERKLEY W R CORP 16732,CAMPBELL SOUP CO 19617,JPMORGAN CHASE & CO 108772,XEROX CORP 200406,JOHNSON & JOHNSON 203527,VARIAN MEDICAL SYSTEMS INC 225602,MASSACHUSETTS MUTUAL LIFE INSURANCE CO 310051,KING LUTHER CAPITAL MANAGEMENT CORP 310732,BNP PARIBAS 310764,STRYKER CORP 312069,BARCLAYS PLC 313137,HYATT INTERNATIONAL CORP
Here is also a SQL file you can run on your database to create a basic CIK library table for the example. Setting up MySQL is a little out of the scope of this blog but there are many guides available.
CREATE DATABASE `legatosample` USE `legatosample` CREATE TABLE `ciklibrary` ( `CIK ID` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT, `CIK` CHAR(10) NOT NULL, `Name` VARCHAR(150) NOT NULL, `Revised` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `Street 1` VARCHAR(100) NULL DEFAULT '\'\'', `Street 2` VARCHAR(100) NULL DEFAULT '\'\'', `City` VARCHAR(45) NULL DEFAULT '\'\'', `State` VARCHAR(45) NULL DEFAULT '\'\'', `Zip` VARCHAR(10) NULL DEFAULT '\'\'', PRIMARY KEY (`CIK ID`), UNIQUE INDEX `CIK` (`CIK`) ) COLLATE='utf8_general_ci' ENGINE=InnoDB ;
Now that we have some of the data files, let’s talk about the script. This script is a little different from most scripts we’ve done in the past in that it has no functions. In this case, separate functions aren’t really necessary and add complication. We will define a few variables to start and then we can load the CSV file. To do this we are going to use the CSVReadTable function. This function loads the CSV file into a two dimensional array.
string table[][]; // Table of Data string ciks[][]; // CIK Cache string tmp[]; // Temp Array string cik; // Current CIK string query; // Query handle hDB; // Database Connection handle hQuery; // Query Result int cik_col, // Index of CIK column name_col, // Index of Name column rc, // Return Code rx, cx, rows, cols; // Rows and Columns // Load Table table = CSVReadTable(AddPaths(GetScriptFolder(), "ciks.csv")); rows = ArrayGetAxisDepth(table, AXIS_ROW); if (rows == 0) { AddMessage("Could not load table 0x%08X", GetLastError()); exit; }
We’ve loaded the file into our table variable in one easy step. Since we are reading CIKs from the file, we can either assume the CSV file is correctly formatted in a particular order and count the number of columns or we can look for a column with the name CIK. Let’s do the latter. We can look at the contents of the table for the first row, searching for the string “CIK”. Once we find this name we can stop looking. If the name is not found we can’t read any CIKs, so the script exits. We can also change the logic to default instead to the first column if we want.
// Find Columns cols = ArrayGetAxisDepth(table, AXIS_COL); cik_col = -1; for (cx = 0; cx < cols; cx++) { if (table[0][cx] == "CIK") { cik_col = cx; break; } } if (cik_col < 0) { AddMessage("Could not find CIK Column"); exit; }
So we have the file loaded and we know which column contains CIKs. Now we need to connect to our SQL database. For this we are going to use ODBC. We have had other posts about ODBC in the past which are informative but not required to understand this article. Let’s connect to the database using the ODBCConnect SDK function. You may need to edit this line of the code in order to run the example in your environment, namely the name of the computer with the MySQL database and the username and password for the database. Also, if you are using a different version of the MySQL ODBC connector, you may need to change the driver name.
// Connect to Database hDB = ODBCConnect("Driver={MySQL ODBC 8.0 ANSI Driver};Server=localhost;Database=legatosample;User=root;Password=APassword;Option=3;"); if (IsError(hDB)) { AddMessage("Could not connect to DB 0x%08X - %s", GetLastError(), GetLastErrorMessage()); exit; }
Before we start processing the file we are going to add columns to table to store the information we are retrieving. If we wanted to adapt the script to merge data into a CSV file that already had these columns, we could store the column index of these columns instead of adding new ones.
// Add headings table[0][cols ] = "Street 1"; table[0][cols + 1] = "Street 2"; table[0][cols + 2] = "City"; table[0][cols + 3] = "State"; table[0][cols + 4] = "Zip";
With all of our preparations completed we can start to process the data. We will loop over the rows, read the CIK from the table variable, and then check our own internal cache of ciks (the ciks variable). If the cik is not in the cache we try the database. We query the database using the ODBCQuery function. The function takes a handle to the database as well as the query we want to run. In our case, the query is a straightforward SELECT statement that returns the Name, Street 1, Street 2, City, State and Zip for any CIK in the “ciklibrary” table that matches the cik variable. Ideally, you would ignore columns that are out of date using the Revised column in the WHERE clause, but since this is a simple example we will skip that logic. Using the ODBCGetRowCount function, we can see if the database gave us any results. If it did, we can add the results to our cache. Note that the ODBCFetchArray function uses the SQL column names for array keys. Since, in our example, these keys match our CSV headings, we can just add the resulting array to our ciks variable instead of individually copying each column.
// Process Rows for (rx = 1; rx < rows; rx++) { StatusBarMessage("Processing Row %d of %d", rx, rows); cik = table[rx][cik_col]; AddMessage("Checking row %d (%s)...", rx, cik); // Not in cache - Check DB if (ciks[cik]["Name"] == "") { hQuery = ODBCQuery(hDB, "SELECT `Name`, `Street 1`, `Street 2`, `City`, `State`, `Zip` FROM `ciklibrary` WHERE `CIK` = \"" + ReplaceInString(cik, "\"", "\\\"") + "\""); if (IsNotError(hQuery)) { if (ODBCGetRowCount(hQuery) > 0) { ciks[cik] = ODBCFetchArray(hQuery); CloseHandle(hQuery); AddMessage(" Found %s in database", cik); } } }
If you are reading the code you will have noticed that we used the ReplaceInString function on the cik before adding it to the query string. This is because, as with most database processing, you should treat all data as hostile. If someone added a CIK with the value of “a" OR 1=1 #”, it would match every entry. So it’s best to make sure the string is safe before we add it to our query.
If the CIK was not found in the database, our ciks variable will still not have an entry. If this is the case, we can use the EDGARLookupCIK function to retrieve information about the CIK from the EDGAR Company database. This function returns an array of company data. Since the key names are different from our database, we will want to copy each part of the array individually starting with the name.
// Not found - Check Web if (ciks[cik]["Name"] == "") { tmp = EDGARLookupCIK(cik); if (IsNotError()) { ciks[cik]["Name"] = tmp["CompanyName"];
The address is also an issue since the EDGAR Company database does not have individual fields for the components of the address. We will need to take the address apart. We can start by using the ExplodeString function to split the address into an array with an entry for Street 1, possibly an entry for Street 2, and an entry for the rest of the address. Depending on the size of this array, the address either had a Street 2 or it did not. Either way, we use a regular expression to split the last line into city, state or country and postal code.
// Fix address tmp = ExplodeString(tmp["Address"], "\r"); ciks[cik]["Street 1"] = tmp[0]; if (ArrayGetAxisDepth(tmp) > 2) { ciks[cik]["Street 2"] = tmp[1]; tmp = ExplodeString(ReplaceInStringRegex(tmp[2], "(.+), (.+) (.+)", "$1,$2,$3"), ","); } else { tmp = ExplodeString(ReplaceInStringRegex(tmp[1], "(.+), (.+) (.+)", "$1,$2,$3"), ","); } ciks[cik]["City"] = tmp[0]; ciks[cik]["Zip"] = tmp[2];
We need to have a special case for postal codes with spaces. The regular expression is greedy by default and will therefore count the first part of the postal code as part of the state or country. We can check for this by seeing if the postal code is too small. If it is we can get the last word from the state or country entry and add that to the zip code.
// Zip had space? if (GetStringLength(tmp[2]) <= 3) { tmp = ExplodeString(tmp[1], " "); ciks[cik]["Zip"] = tmp[ArrayGetAxisDepth(tmp) - 1] + " " + ciks[cik]["Zip"]; tmp[ArrayGetAxisDepth(tmp) - 1] = ""; ciks[cik]["State"] = ImplodeArray(tmp, " "); } else { ciks[cik]["State"] = tmp[1]; } AddMessage(" Used web for %s", cik);
Finally, we can put the information back into the database. Since this is just an example we add entries to the database using an INSERT statement. As stated above, if you were keeping track of the last time an entry was updated, we would probably need to use a UPDATE statement for entries that were out of date or even a REPLACE statement. Again, this is a more simplistic approach. First we make sure our data is properly formatted for entry into the database. Then we use the FormatString function to build the query and finally run it with the ODBCQuery function. Since our query does not return any data, we need to use the GetLastError function to check the result.
tmp["CIK"] = ReplaceInString(cik, "'", "\\'"); tmp["Name"] = ReplaceInString(ciks[cik]["Name"], "'", "\\'"); tmp["Street 1"] = ReplaceInString(ciks[cik]["Street 1"], "'", "\\'"); tmp["Street 2"] = ReplaceInString(ciks[cik]["Street 2"], "'", "\\'"); tmp["City"] = ReplaceInString(ciks[cik]["City"], "'", "\\'"); tmp["State"] = ReplaceInString(ciks[cik]["State"], "'", "\\'"); tmp["Zip"] = ReplaceInString(ciks[cik]["Zip"], "'", "\\'"); // Add to our database query = FormatString("INSERT INTO `ciklibrary` (`CIK`, `Name`, `Street 1`, `Street 2`, `City`, `State`, `Zip`) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s')", tmp["CIK"], tmp["Name"], tmp["Street 1"], tmp["Street 2"], tmp["City"], tmp["State"], tmp["Zip"]); hQuery = ODBCQuery(hDB, query); rc = GetLastError(); if (IsError(rc)) { AddMessage("Could not update DB 0x%08X - %s", GetLastError(), GetLastErrorMessage()); } } }
Now that we have retrieved the data from our database, internal cache, or the EDGAR Company database, it is time to update the table. We can then loop back and run the next entry. When the loop is complete we write a CSV file back out using the CSVWriteTable function.
table[rx][cols ] = ciks[cik]["Street 1"]; table[rx][cols + 1] = ciks[cik]["Street 2"]; table[rx][cols + 2] = ciks[cik]["City"]; table[rx][cols + 3] = ciks[cik]["State"]; table[rx][cols + 4] = ciks[cik]["Zip"]; } rx = CSVWriteTable(table, AddPaths(GetScriptFolder(), "result.csv")); if (IsError(rx)) { AddMessage("Could not write file! 0x%08X", rx); } MessageBox("Complete!");
Legato offers many tools to help you combine data from different sources. We could add another input CSV, Database, XML or even a Word or PDF file! This script itself is not overly useful but it is a good starting point to build a procedure to integrate different data sources into your next Legato project.
Here is the entire script:
string table[][]; // Table of Data string ciks[][]; // CIK Cache string tmp[]; // Temp Array string cik; // Current CIK string query; // Query handle hDB; // Database Connection handle hQuery; // Query Result int cik_col, // Index of CIK column name_col, // Index of Name column rc, // Return Code rx, cx, rows, cols; // Rows and Columns // Load Table table = CSVReadTable(AddPaths(GetScriptFolder(), "ciks.csv")); rows = ArrayGetAxisDepth(table, AXIS_ROW); if (rows == 0) { AddMessage("Could not load table 0x%08X", GetLastError()); exit; } // Find Columns cols = ArrayGetAxisDepth(table, AXIS_COL); cik_col = -1; for (cx = 0; cx < cols; cx++) { if (table[0][cx] == "CIK") { cik_col = cx; break; } } if (cik_col < 0) { AddMessage("Could not find CIK Column"); exit; } // Connect to Database hDB = ODBCConnect("Driver={MySQL ODBC 8.0 ANSI Driver};Server=localhost;Database=legatosample;User=root;Password=APassword;Option=3;"); if (IsError(hDB)) { AddMessage("Could not connect to DB 0x%08X - %s", GetLastError(), GetLastErrorMessage()); exit; } // Add headings table[0][cols ] = "Street 1"; table[0][cols + 1] = "Street 2"; table[0][cols + 2] = "City"; table[0][cols + 3] = "State"; table[0][cols + 4] = "Zip"; // Process Rows for (rx = 1; rx < rows; rx++) { StatusBarMessage("Processing Row %d of %d", rx, rows); cik = table[rx][cik_col]; AddMessage("Checking row %d (%s)...", rx, cik); // Not in cache - Check DB if (ciks[cik]["Name"] == "") { hQuery = ODBCQuery(hDB, "SELECT `Name`, `Street 1`, `Street 2`, `City`, `State`, `Zip` FROM `ciklibrary` WHERE `CIK` = \"" + ReplaceInString(cik, "\"", "\\\""); + "\""); if (IsNotError(hQuery)) { if (ODBCGetRowCount(hQuery) > 0) { ciks[cik] = ODBCFetchArray(hQuery); CloseHandle(hQuery); AddMessage(" Found %s in database", cik); } } } // Not found - Check Web if (ciks[cik]["Name"] == "") { tmp = EDGARLookupCIK(cik); if (IsNotError()) { ciks[cik]["Name"] = tmp["CompanyName"]; // Fix address tmp = ExplodeString(tmp["Address"], "\r"); ciks[cik]["Street 1"] = tmp[0]; if (ArrayGetAxisDepth(tmp) > 2) { ciks[cik]["Street 2"] = tmp[1]; tmp = ExplodeString(ReplaceInStringRegex(tmp[2], "(.+), (.+) (.+)", "$1,$2,$3"), ","); } else { tmp = ExplodeString(ReplaceInStringRegex(tmp[1], "(.+), (.+) (.+)", "$1,$2,$3"), ","); } ciks[cik]["City"] = tmp[0]; ciks[cik]["Zip"] = tmp[2]; // Zip had space? if (GetStringLength(tmp[2]) <= 3) { tmp = ExplodeString(tmp[1], " "); ciks[cik]["Zip"] = tmp[ArrayGetAxisDepth(tmp) - 1] + " " + ciks[cik]["Zip"]; tmp[ArrayGetAxisDepth(tmp) - 1] = ""; ciks[cik]["State"] = ImplodeArray(tmp, " "); } else { ciks[cik]["State"] = tmp[1]; } AddMessage(" Used web for %s", cik); tmp["CIK"] = ReplaceInString(cik, "'", "\\'"); tmp["Name"] = ReplaceInString(ciks[cik]["Name"], "'", "\\'"); tmp["Street 1"] = ReplaceInString(ciks[cik]["Street 1"], "'", "\\'"); tmp["Street 2"] = ReplaceInString(ciks[cik]["Street 2"], "'", "\\'"); tmp["City"] = ReplaceInString(ciks[cik]["City"], "'", "\\'"); tmp["State"] = ReplaceInString(ciks[cik]["State"], "'", "\\'"); tmp["Zip"] = ReplaceInString(ciks[cik]["Zip"], "'", "\\'"); // Add to our database query = FormatString("INSERT INTO `ciklibrary` (`CIK`, `Name`, `Street 1`, `Street 2`, `City`, `State`, `Zip`) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s')", tmp["CIK"], tmp["Name"], tmp["Street 1"], tmp["Street 2"], tmp["City"], tmp["State"], tmp["Zip"]); hQuery = ODBCQuery(hDB, query); rc = GetLastError(); if (IsError(rc)) { AddMessage("Could not update DB 0x%08X - %s", GetLastError(), GetLastErrorMessage()); } } } table[rx][cols ] = ciks[cik]["Street 1"]; table[rx][cols + 1] = ciks[cik]["Street 2"]; table[rx][cols + 2] = ciks[cik]["City"]; table[rx][cols + 3] = ciks[cik]["State"]; table[rx][cols + 4] = ciks[cik]["Zip"]; } rx = CSVWriteTable(table, AddPaths(GetScriptFolder(), "result.csv")); if (IsError(rx)) { AddMessage("Could not write file! 0x%08X", rx); } MessageBox("Complete!");
David Theis has been developing software for Windows operating systems for over fifteen years. He has a Bachelor of Sciences in Computer Science from the Rochester Institute of Technology and co-founded Novaworks in 2006. He is the Vice President of Development and is one of the primary developers of GoFiler, a financial reporting software package designed to create and file EDGAR XML, HTML, and XBRL documents to the U.S. Securities and Exchange Commission. |
Additional Resources
Legato Script Developers LinkedIn Group
Primer: An Introduction to Legato