Abstract

 Consumer price surveys can be supported by two different kinds of Big Data sources: scanner data, obtained from retail chains and representing transactions at single product detail, and web scraped data, gathered automatically from e-commerce web sites. Both sources can integrate effectively the traditional methods of price data collection, a path followed by most statistical organizations in recent years.

In particular, scanner data sets can go far beyond the usual sample of products used, reaching significant sizes. This is a great opportunity in methodological terms, with positive impacts on quality and efficiency and possible new products. On the other hand, there are challenges to be addressed at technological level for handling such large data sets, that can reach the order of hundreds of gigabytes. 

The main contribution of this task team is an assessment of the performance of Big Data tools, applied to synthetic data sets modelling scanner data. Synthetic data was generated using dedicated software developed as part of the team work. Following this approach, we were able to easily test data sets of various sizes, working with both Big Data tools and "traditional" software such as statistical tools and relational databases, all available in the Sandbox environment.

The results of the performance comparison show that while traditional tools are still more effective when dealing with moderate data size, Big Data tools are able to scale large data sizes, also aiding the preliminary tasks of loading and preparing data. 

As a side contribution, we also tested a visualization-based approach to Big Data analysis, working on web scraped data from UK e-commerce web sites, through a business analytics platform installed in the Sandbox.

Introduction

 Consumer price surveys are based on data about prices of a selected basket of products in a sample of retail shops. Data is traditionally (and mostly) collected on-field, through a complex procedure that can be costly and error-prone. A lot of research has been devoted in recent year in order to provide innovative means to improve the efficiency and the quality of the survey, introducing two complementary data sources:

  • Scanner data: data about pricing and sales volume generated by point of sales systems in retail stores.
  • Web scraped data: data automatically extracted from e-commerce web sites.

Both sources can be considered Big Data. In this team we worked on both with different purposes.

Scanner data

Despite being publicly available, scanner data are considered highly sensitive information because they provide a global overview of the prices and sales along time, which is used by retail store companies to analyze their selling performance and influence their commercial policies. From the point of view of statistical organizations, scanner data offer great opportunities for replacing the portion of on-field collection regarding large retail stores, increasing efficiency and quality and reducing costs.

Although several countries have  already been using scanner data in their CPI survey (completely or partly), many countries are still facing the technical and methodological challenges associated with the use of scanner data. Concerning methodology, the main challenge is represented by the high number and volatility of products, that makes them difficult to classify and choose replacements for products are no longer available.

On the technical side, datasets can easily reach large dimensions, as they contain the price and of every single product on a weekly basis. Therefore they might be difficult to treat with traditional computing tools, representing an interesting testbed for the application of Big Data tools.


This task team worked on testing the performance of Big Data tools applied to scanner data.

 The main idea was to gather understanding about when the data grows to an extent when it is necessary to exploit Big Data tools, in contrast to when "traditional" tools, such as statistical software and relational databases, are sufficient (or better). 

 A synthetic dataset generator has been developed, that allowed to produce datasets that resemble, at least in format and size, real scanner data. A refinement of the generator is planned for development, modeling in a more realistic fashion the actual behavior of scanner data (specifically, interrupted series and classfication of products).

We performed experiments with different tools available in the Sandbox (Hive, Pig, MonetDB), comparing the performance of simple queries (counts and filters) on synthetic datasets of different sizes, the bigger of which being 250Gb, a size in the same order of real scanner data that will be acquired by Istat.

Then we wrote simple programs for computing price indexes on scanner data. Again, this was done with different programming languages and tools, in order to compare not only the performances but also the ease of development.

The research can be extended on methodological aspects such as sampling. Different sampling strategies can be applied and compared, having the opportunity of testing the effect of sampling with respect to the complete universe of scanner data.

Web scraped data

A sample of web scraped data from UK supermarkets have been loaded in the Sandbox. This data set, although not being considerable Big Data in proper sense, can help understanding what is the form of price data extracted from the web and what are the opportunities this approach presents for NSIs. The raw form of the data allowed us to test the capabilities of another software available in the Sandbox, namely Pentaho. Pentaho is a so-called business analytics platform, that is a software tools that allows to easily build interactive visualizations on arbitrary data sets without the need of complex preparation on the IT side. Those visualizations can then be used both for analysis and presentation purposes.


The Team 

The team is composed by:

Istat: Lorenzo di Gaetano, Antonino Virgillito (IT) - developed the synthetic dataset generator and the scripts for computing the index in the different languages, carrying out the performance tests. Also worked on the visualization in Pentaho.

ONS: Stephen Ball, Christopher Webster (IT) - provided the web scraped datasets.

Data Characteristics

Web scraped data

Price data scraped from the web sites of two supermarket chains. 
Data is relative to 29 grocery items collected in the interval from 14th April to 4th June. 

The data was collected through a custom Python script written at ONS. The script attaches columns that allows to link the products to the ONS classifications.


Header

HDFS

Hive

Size

N. Records

imestamp, store, ons_item_no, ons_item_name, product_type, search_string, search_matches, product_name, item_price_str, item_price_num, volume_price, unit_price, no_units, units, std_price, std_unit, promo, offer

/datasets/prices_ukDB: prices_uk
TABLE: prices_web
26Mb

99,125


Scanner data

Synthetic data sets modeling scanner data from supermarkets. First version models only the format and size of real scanner data, leaving alone some other characteristic aspects:

  • Classification of products. Product codes are organized according to the COICOP classification, on which different levels of index aggregation are performed. 
  • Interrupted series. Product codes can disappear and appear over time, making it necessary to perform substitutions if the product is part of the basket. These are normally made by operators, that determine which product is best fit to replace one which is no longer available, an approach that is evidently not feasible on a large scale. 

Data can be generated on different sizes. Three sizes were used in the experiment. 

Header

HDFS

Hive

Size

N. Records

IDPV: Point of sale identifier

TIMESTAMP: numeric identifier of the period (e.g. week number)

EANCODE: Product identifier (EAN)

QUANTITY: quantity sold in the period

SALES: sales volume in the period

SC: indication of discount (1=YES)

/datasets/prices_it

/apps/hive/warehouse/prices_it.db

DB: prices_it

14Mb

1.7Gb

260Gb



Data Acquisition

Scanner data

No acquisition was performed because data was generated from a tool, directly inside the Sandbox. Import of data into different tools was part of the experiments and is discussed in "Project description" section. 


Web scraped data

Data acquired from the web through a custom tool

Data Processing

Scanner data

Data was stored in Hive, HDFS, MonetDB. 

Index processing procedures were written in Pig, RHadoop and Spark. 

Web scraped data

No processing involved. Visualization in Pentaho. 

Data Output


Project Description

QUERY PERFORMANCE TEST AND COMPARISONS

Some Hive test measurements are done with two different files of synthetic price data: A little file of about 14 Mb and a medium size file of 1,7 Gb and 61379865 rows.

Both files were moved from local file system to hdfs and then loaded into a table with hive from console, using these commands:


USE prices_it;
LOAD DATA INPATH '/user/lorenzo/<CSV_FILE_NAME>' OVERWRITE INTO TABLE prices_it;


this operation was pretty fast, it took only 1.029 seconds to load 1,7 Gb of data.


Here are some simple queries launched in Beeswax against both of the tables:


QUERY

TABLE

ELAPSED TIME

select count (*) from prices_it LITTLE

25 SECONDS

select count (*) from prices_it MID28 SECONDS
select * from prices_it_mid where sales > 50000LITTLE24 SECONDS
select * from prices_it_mid where sales > 50000MID1' 16''
select * from prices_it_mid where sales > 50000 and sales < 200LITTLE22 SECONDS
select * from prices_it_mid where sales > 50000 and sales < 200MID53 SECONDS


NOTE: We found that in the query against the little table only 1 mapper was involved, while in the query against the mid size table the system used 8 mappers simultaneously, plus indicating the cumulative CPU time (which is obviously bigger than the real time), for example, this is the log of the latter query:


Hadoop job information for Stage-1: number of mappers: 8 ; number of reducers: 0 
14/07/21 11:26:10 INFO exec.Task: Hadoop job information for Stage-1: number of mappers: 8; number of reducers: 0 
14/07/21 11:26:10 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 
2014-07-21 11:26:10,519 Stage-1 map = 0%, reduce = 0% 
14/07/21 11:26:10 INFO exec.Task: 2014-07-21 11:26:10,519 Stage-1 map = 0%, reduce = 0% 
2014-07-21 11:26:21,844 Stage-1 map = 13%, reduce = 0%, Cumulative CPU 5.16 sec 
14/07/21 11:26:21 INFO exec.Task: 2014-07-21 11:26:21,844 Stage-1 map = 13%, reduce = 0%, Cumulative CPU 5.16 sec 
2014-07-21 11:26:28,047 Stage-1 map = 31%, reduce = 0%, Cumulative CPU 75.75 sec 
14/07/21 11:26:28 INFO exec.Task: 2014-07-21 11:26:28,047 Stage-1 map = 31%, reduce = 0%, Cumulative CPU 75.75 sec 
2014-07-21 11:26:29,082 Stage-1 map = 63%, reduce = 0%, Cumulative CPU 93.28 sec 
14/07/21 11:26:29 INFO exec.Task: 2014-07-21 11:26:29,082 Stage-1 map = 63%, reduce = 0%, Cumulative CPU 93.28 sec 
2014-07-21 11:26:34,248 Stage-1 map = 69%, reduce = 0%, Cumulative CPU 116.58 sec 
14/07/21 11:26:34 INFO exec.Task: 2014-07-21 11:26:34,248 Stage-1 map = 69%, reduce = 0%, Cumulative CPU 116.58 sec 
2014-07-21 11:26:35,283 Stage-1 map = 75%, reduce = 0%, Cumulative CPU 127.96 sec 
14/07/21 11:26:35 INFO exec.Task: 2014-07-21 11:26:35,283 Stage-1 map = 75%, reduce = 0%, Cumulative CPU 127.96 sec 
2014-07-21 11:26:37,344 Stage-1 map = 81%, reduce = 0%, Cumulative CPU 129.81 sec 
14/07/21 11:26:37 INFO exec.Task: 2014-07-21 11:26:37,344 Stage-1 map = 81%, reduce = 0%, Cumulative CPU 129.81 sec 
2014-07-21 11:26:38,381 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 136.05 sec 
14/07/21 11:26:38 INFO exec.Task: 2014-07-21 11:26:38,381 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 136.05 sec 
MapReduce Total cumulative CPU time: 2 minutes 16 seconds 50 msec 
14/07/21 11:26:39 INFO exec.Task: MapReduce Total cumulative CPU time: 2 minutes 16 seconds 50 msec 



This shows the capability of the system to dynamically assign the right horse-power depending on workload involved.


TEST WITH 277 GB FILE

We tried the same experiments with a "big" 277 GB csv file. The most impressive thing was the blazing speed it took to load the csv file into tables: 1.033 seconds! So fast that, at first, we thought something had gone wrong!

We started with the following queries:

  • SELECT COUNT ( * ) FROM prices_it_big;
    • The query involved 1034 mappers with a total cumulative CPU time of 2 hours 8 minutes 55 seconds 860 msec, but the human elapsed time was approx. 2 minutes.


  •  select * from prices_it_big where sales > 50000
    • 1034 mappers, total cumulative CPU time: 0 days 16 hours 44 minutes 27 seconds 20 msec, human time: approx. 30 minutes

The same tests were repeated with Pig obtaining approximately the same results. Further analysis should be devoted to understand why adding a simple filter increases the loading time by a factor of 15.


TEST AGAINST VERTICAL DATABASE

We loaded the first two datasets in a vertical database that is available in the Sandbox (MonetDB). Predictably the querying times were orders of magnitude faster than Hadoop for small and medium data sets. However, We could not load the big dataset because estimate loading time was too high.


Index computation

A Pig script has been developed for computing a Laspeyes index on the whole dataset. This is not meant to be an exact replication of a production method because normally indexes are computed at different levels while the input dataset is "flat" and unclassified. The purpose was to test the possibility of implementing more complex methods directly in Pig, at the same time testing the performance.

This is the script:

prices = LOAD '/apps/hive/warehouse/prices_it.db/prices_it_mid/priceMockMid.csv' USING PigStorage(',') AS (IDPOS: chararray,TIMESTAMP: int,EANCODE: chararray,QUANTITY: int,SALES: double,SC: int);
base = FILTER prices BY TIMESTAMP==0;
prices_base = JOIN prices BY (IDPOS, EANCODE), base BY (IDPOS, EANCODE);
indexlist = FOREACH prices_base GENERATE prices::TIMESTAMP as TIMESTAMP, prices::IDPOS as IDPOS, prices::EANCODE as EANCODE, (double)prices::SALES as pt, (double)base::QUANTITY as q0, (double)(prices::SALES * base::QUANTITY) as WEIGHT;
indexgroup = GROUP indexlist BY TIMESTAMP;
basegroup = FILTER indexgroup BY group==0;
baseweight = FOREACH basegroup GENERATE (double)SUM(indexlist.WEIGHT) AS WEIGHT;
index = FOREACH indexgroup GENERATE group as TIMESTAMP, SUM(indexlist.WEIGHT)/(double)baseweight.WEIGHT*100;
store index into '/user/toni/cpi_index/index3' using PigStorage(',');



The execution times are 30 mins for the Mid dataset and 4.30 hours for the big one.

Project Findings

Technology

  • Big Data technologies are effective when applied to really big datasets in the order of hundreds of gigabytes. On small-medium dataset sizes (up to a order of a few gigabytes) the inherent performance overhead of starting a MapReduce job makes response times higher by several orders of magnitudes with respect to traditional tools. The simplest query in Hive or Pig takes at least 30 seconds even on a small dataset, where a relational database produces a response in some milliseconds. On the other hand, with big datasets the response times with Hadoop MapReduce scale very well (10 times the time when increasing the data size by a factor of 1500), while this size of data set cannot be handled in memory with statistical tools and imply long loading times with a database.
  • Different flavours of Big Data tools can be used effectively to speed up data analysis. Pig and Hive allow to write processing scripts in a very compact form, although there is a learning curve involved for taking maximum advantage from the technology. RHadoop forces the user to adapt to the MapReduce paradigm, which is not always the most intuitive form for writing analysis scripts and can lead to long and complex programs. On the other hand it gives more control to the developer as well as the possibility of using the entire R library.

Statistics

  • Scanner data and web scraping are already used in several statistical organizations in production. A more widespread use can involve new products and more timely and efficient computation of traditional products as well as standardized treatments. Moreover, it can lead statistical organizations to respond to the "menace" of price indices computed by both research projects and IT companies.

Privacy

  • Scanner data are highly sensitive data and they cannot be shared in the Sandbox. However, they can be modeled fairly easily. A more refined synthetic data generator can allow to share techniques and results, that can be applied on data within statistical organizations.

Partnership

  • The work on web scraping in this team and in the job vacancies team showed that it is difficult to gather a representative amount of data scraped from the web. While working on this team, a global scraped data provider was discovered, named www.pricestats.com. Establishing a partnership with this company could be attempted, in order to acquire large amounts of scraped data more easily. Partnership on a large scale involving several statistical organizations could lead to favourable conditions and shared solutions for data processing.

Skills

  • Trained IT professionals could acquire knowledge on 3-4 different languages during the project lifespan. This "dynamic" approach is required because the offering is constantly evolving and even the same tool can present significant differences from one version to another (e.g., Spark).

Recommendations

Technology

  • Datasets can be considered "big" starting from the order of tenth of gigabytes. However, that much depends on what you need to do with the data. 
  • If the datasets are not "big" it is likely that running your usual tool (possibly on a server) might do the trick.

Methodology

  • Scanner data analyzed with big data tools open many opportunities for new products and quality assessments of traditional approaches. IT can help, but more effort on methodology should be devoted on investigation of this approach. 
  • Sandbox and synthetic data sets can help sharing methods and findings. 

Statistics

  • Although we could not address all the practical implications of using scanner data in production (above all, the coding of products in COICOP groups), once  the technical complexity of dealing with scanner data on a large scale is addressed, the cost and time efficiencies for CPI statistics deriving from this approach should be evident. 

Partnership

  • There are already significant results for web scraping of e-commerce web sites, that should be taken into account and possibly reused. The MIT Billion Price Project and PriceStats.com should be considered in price scraping projects in statistical organizations.