C++ API Reference for Intel® Data Analytics Acceleration Library 2020 Update 1

csv_data_source.h
1 /* file: csv_data_source.h */
2 /*******************************************************************************
3 * Copyright 2014-2020 Intel Corporation
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *******************************************************************************/
17 
18 /*
19 //++
20 // Implementation of the file data source class.
21 //--
22 */
23 
24 #ifndef __CSV_DATA_SOURCE_H__
25 #define __CSV_DATA_SOURCE_H__
26 
27 #include "services/daal_memory.h"
28 
29 #include "data_management/data_source/data_source.h"
30 #include "data_management/data/data_dictionary.h"
31 #include "data_management/data/numeric_table.h"
32 #include "data_management/data/homogen_numeric_table.h"
33 #include "data_management/data_source/internal/data_source_options.h"
34 
35 namespace daal
36 {
37 namespace data_management
38 {
39 namespace interface1
40 {
50 class CsvDataSourceOptions
51 {
52 public:
53  enum Value
54  {
55  byDefault = 0,
56  allocateNumericTable = 1 << 0,
57  createDictionaryFromContext = 1 << 1,
58  parseHeader = 1 << 2
59  };
60 
61  static CsvDataSourceOptions::Value unite(const CsvDataSourceOptions::Value &lhs,
62  const CsvDataSourceOptions::Value &rhs)
63  {
64  return internal::DataSourceOptionsImpl<Value>::unite(lhs, rhs);
65  }
66 
67  CsvDataSourceOptions(Value flags = byDefault) :
68  _impl(flags) { }
69 
70  DataSource::NumericTableAllocationFlag getNumericTableAllocationFlag() const
71  {
72  return (_impl.getFlag(allocateNumericTable))
73  ? DataSource::doAllocateNumericTable
74  : DataSource::notAllocateNumericTable;
75  }
76 
77  DataSource::DictionaryCreationFlag getDictionaryCreationFlag() const
78  {
79  return (_impl.getFlag(createDictionaryFromContext))
80  ? DataSource::doDictionaryFromContext
81  : DataSource::notDictionaryFromContext;
82  }
83 
84  bool getParseHeaderFlag() const
85  {
86  return _impl.getFlag(parseHeader);
87  }
88 
89 private:
90  internal::DataSourceOptionsImpl<Value> _impl;
91 };
92 
99 template< typename FeatureManager, typename SummaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
100 class CsvDataSource : public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, SummaryStatisticsType>
101 {
102 private:
103  typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
104  typedef DataSourceTemplate<DefaultNumericTableType, SummaryStatisticsType> super;
105 
106 protected:
107  using super::_dict;
108  using super::_initialMaxRows;
109 
110 public:
119  CsvDataSource(DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
120  DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
121  size_t initialMaxRows = 10) :
122  super(doAllocateNumericTable,
123  doCreateDictionaryFromContext)
124  {
125  initialize(initialMaxRows);
126  }
127 
133  CsvDataSource(const CsvDataSourceOptions &options, size_t initialMaxRows = 10) :
134  super(options.getNumericTableAllocationFlag(),
135  options.getDictionaryCreationFlag())
136  {
137  initialize(initialMaxRows);
138  _parseHeader = options.getParseHeaderFlag();
139  }
140 
141  virtual ~CsvDataSource()
142  {
143  daal::services::daal_free(_rawLineBuffer);
144  _rawLineBuffer = NULL;
145  }
146 
150  FeatureManager &getFeatureManager()
151  {
152  return _featureManager;
153  }
154 
155  size_t getNumericTableNumberOfColumns() DAAL_C11_OVERRIDE
156  {
157  return _featureManager.getNumericTableNumberOfColumns();
158  }
159 
160  services::Status setDictionary(DataSourceDictionary *dict) DAAL_C11_OVERRIDE
161  {
162  services::Status s = DataSource::setDictionary(dict);
163  _featureManager.setFeatureDetailsFromDictionary(dict);
164 
165  return s;
166  }
167 
168  size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
169  {
170  services::Status s = super::checkDictionary();
171  if(!s)
172  {
173  this->_status.add(services::throwIfPossible(s));
174  return 0;
175  }
176  s = checkInputNumericTable(nt);
177  if(!s)
178  {
179  this->_status.add(services::throwIfPossible(s));
180  return 0;
181  }
182 
183  size_t maxRows = (_initialMaxRows > 0 ? _initialMaxRows : 10);
184  size_t nrows = 0;
185  const size_t ncols = getNumericTableNumberOfColumns();
186  DataCollection tables;
187  for( ;; maxRows *= 2)
188  {
189  NumericTablePtr ntCurrent = HomogenNumericTable<DAAL_DATA_TYPE>::create(ncols, maxRows, NumericTableIface::doAllocate, &s);
190  if (!s)
191  {
192  this->_status.add(services::throwIfPossible(services::Status(services::ErrorNumericTableNotAllocated)));
193  break;
194  }
195  tables.push_back(ntCurrent);
196  const size_t rows = loadDataBlock(maxRows, ntCurrent.get());
197  nrows += rows;
198  if (rows < maxRows)
199  break;
200  }
201 
202  s = resetNumericTable(nt, nrows);
203  if(!s)
204  {
205  this->_status.add(services::throwIfPossible(s));
206  return 0;
207  }
208 
209  BlockDescriptor<DAAL_DATA_TYPE> blockCurrent, block;
210  size_t pos = 0;
211  int result = 0;
212  for (size_t i = 0; i < tables.size(); i++)
213  {
214  NumericTable *ntCurrent = (NumericTable*)(tables[i].get());
215  size_t rows = ntCurrent->getNumberOfRows();
216 
217  if(!rows)
218  continue;
219 
220  ntCurrent->getBlockOfRows(0, rows, readOnly, blockCurrent);
221  nt->getBlockOfRows(pos, rows, writeOnly, block);
222 
223  result |= services::internal::daal_memcpy_s(block.getBlockPtr(), rows * ncols * sizeof(DAAL_DATA_TYPE),
224  blockCurrent.getBlockPtr(), rows * ncols * sizeof(DAAL_DATA_TYPE));
225 
226  ntCurrent->releaseBlockOfRows(blockCurrent);
227  nt->releaseBlockOfRows(block);
228 
229  super::combineStatistics( ntCurrent, nt, pos == 0);
230  pos += rows;
231  }
232  if (result)
233  {
234  this->_status.add(services::throwIfPossible(services::Status(services::ErrorMemoryCopyFailedInternal)));
235  }
236  return nrows;
237  }
238 
239  size_t loadDataBlock(size_t maxRows, NumericTable* nt) DAAL_C11_OVERRIDE
240  {
241  size_t nLines = loadDataBlock(maxRows, 0, maxRows, nt);
242  nt->resize( nLines );
243  return nLines;
244  }
245 
246  size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows, NumericTable *nt) DAAL_C11_OVERRIDE
247  {
248  services::Status s = super::checkDictionary();
249  if(!s)
250  {
251  this->_status.add(services::throwIfPossible(s));
252  return 0;
253  }
254  s = checkInputNumericTable(nt);
255  if(!s)
256  {
257  this->_status.add(services::throwIfPossible(s));
258  return 0;
259  }
260 
261  if (rowOffset + maxRows > fullRows)
262  {
263  this->_status.add(services::throwIfPossible(services::ErrorIncorrectDataRange));
264  return 0;
265  }
266 
267  s = resetNumericTable(nt, fullRows);
268  if(!s)
269  {
270  this->_status.add(services::throwIfPossible(s));
271  return 0;
272  }
273 
274  if (_parseHeader && !_firstRowRead)
275  {
276  // Skip header
277  s = readLine();
278  if(!s)
279  {
280  this->_status.add(services::throwIfPossible(s));
281  return 0;
282  }
283 
284  _firstRowRead = true;
285  }
286 
287  size_t j = 0;
288  for(; j < maxRows && !iseof() ; j++ )
289  {
290  s = readLine();
291  if(!s)
292  {
293  this->_status.add(services::throwIfPossible(s));
294 
295  return 0;
296  }
297  if(!_rawLineLength)
298  {
299  break;
300  }
301 
302  _featureManager.parseRowIn( _rawLineBuffer, _rawLineLength, this->_dict.get(), nt, rowOffset + j );
303 
304  super::updateStatistics( j, nt, rowOffset );
305  }
306  _featureManager.finalize(this->_dict.get());
307 
308  return rowOffset + j;
309  }
310 
311  size_t loadDataBlock() DAAL_C11_OVERRIDE
312  {
313  return DataSource::loadDataBlock();
314  }
315 
316  size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
317  {
318  return DataSource::loadDataBlock(maxRows);
319  }
320 
321  size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows) DAAL_C11_OVERRIDE
322  {
323  return DataSource::loadDataBlock(maxRows, rowOffset, fullRows);
324  }
325 
326 
327  services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
328  {
329  services::Status s;
330 
331  if (_dict)
332  {
333  return services::throwIfPossible(services::Status(services::ErrorDictionaryAlreadyAvailable));
334  }
335 
336  _dict = DataSourceDictionary::create(&s);
337  if (!s) { return s; }
338  _contextDictFlag = true;
339 
340  if (_parseHeader)
341  {
342  s = readLine();
343  if (!s)
344  {
345  return services::throwIfPossible(s);
346  }
347  _featureManager.parseRowAsHeader(_rawLineBuffer, _rawLineLength);
348  }
349 
350  s = readLine();
351  if (!s)
352  {
353  return services::throwIfPossible(s);
354  }
355  _featureManager.parseRowAsDictionary(_rawLineBuffer, _rawLineLength, this->_dict.get());
356 
357  return services::Status();
358  }
359 
360  size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
361  {
362  return 0;
363  }
364 
365 protected:
366  virtual bool iseof() const = 0;
367  virtual services::Status readLine() = 0;
368 
369  virtual services::Status resetNumericTable(NumericTable *nt, const size_t newSize)
370  {
371  services::Status s;
372 
373  NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
374  const size_t nFeatures = getNumericTableNumberOfColumns();
375  ntDict->setNumberOfFeatures(nFeatures);
376  for (size_t i = 0; i < nFeatures; i++)
377  ntDict->setFeature((*_dict)[i].ntFeature, i);
378 
379  s = super::resizeNumericTableImpl(newSize, nt);
380  if(!s)
381  {
382  return s;
383  }
384 
385  nt->setNormalizationFlag(NumericTable::nonNormalized);
386  return services::Status();
387  }
388 
389  virtual services::Status checkInputNumericTable(const NumericTable* const nt) const
390  {
391  if(!nt)
392  {
393  return services::Status(services::ErrorNullInputNumericTable);
394  }
395 
396  const NumericTable::StorageLayout layout = nt->getDataLayout();
397  if (layout == NumericTable::csrArray)
398  {
399  return services::Status(services::ErrorIncorrectTypeOfInputNumericTable);
400  }
401 
402  return services::Status();
403  }
404 
405  bool enlargeBuffer()
406  {
407  int newRawLineBufferLen = _rawLineBufferLen * 2;
408  char* newRawLineBuffer = (char *)daal::services::daal_malloc( newRawLineBufferLen );
409  if(newRawLineBuffer == 0)
410  return false;
411  int result = daal::services::internal::daal_memcpy_s(newRawLineBuffer, newRawLineBufferLen, _rawLineBuffer, _rawLineBufferLen);
412  if (result)
413  {
414  this->_status.add(services::throwIfPossible(services::Status(services::ErrorMemoryCopyFailedInternal)));
415  }
416  daal::services::daal_free( _rawLineBuffer );
417  _rawLineBuffer = newRawLineBuffer;
418  _rawLineBufferLen = newRawLineBufferLen;
419  return true;
420  }
421 
422 private:
423  services::Status initialize(size_t initialMaxRows)
424  {
425  _parseHeader = false;
426  _firstRowRead = false;
427  _contextDictFlag = false;
428  _rawLineLength = 0;
429  _initialMaxRows = initialMaxRows;
430 
431  _rawLineBufferLen = (int)INITIAL_LINE_BUFFER_LENGTH;
432  _rawLineBuffer = (char *)daal::services::daal_malloc(_rawLineBufferLen);
433  if (!_rawLineBuffer) { return services::throwIfPossible(services::ErrorMemoryAllocationFailed); }
434 
435  return services::Status();
436  }
437 
438 protected:
439  char *_rawLineBuffer;
440  int _rawLineBufferLen;
441  int _rawLineLength;
442 
443 private:
444  bool _parseHeader;
445  bool _firstRowRead;
446  bool _contextDictFlag;
447  FeatureManager _featureManager;
448 
449  static const size_t INITIAL_LINE_BUFFER_LENGTH = 1024;
450 };
451 
453 } // namespace interface1
454 
455 using interface1::CsvDataSource;
456 using interface1::CsvDataSourceOptions;
457 
458 inline CsvDataSourceOptions::Value operator |(const CsvDataSourceOptions::Value &lhs,
459  const CsvDataSourceOptions::Value &rhs)
460 { return CsvDataSourceOptions::unite(lhs, rhs); }
461 
462 } // namespace data_management
463 } // namespace daal
464 
465 #endif
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:80
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:156
daal::data_management::interface1::CsvDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:360
daal::data_management::interface1::CsvDataSource::CsvDataSource
CsvDataSource(const CsvDataSourceOptions &options, size_t initialMaxRows=10)
Definition: csv_data_source.h:133
daal::data_management::interface1::CsvDataSource::getFeatureManager
FeatureManager & getFeatureManager()
Definition: csv_data_source.h:150
daal::data_management::interface1::NumericTableIface::doAllocate
Definition: numeric_table.h:289
daal::data_management::interface1::CsvDataSource
Specifies methods to access data stored in files.
Definition: csv_data_source.h:100
daal::data_management::interface1::DataSourceIface::doDictionaryFromContext
Definition: data_source.h:73
daal::data_management::interface1::CsvDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:327
daal::data_management::interface1::Dictionary
Class that represents a dictionary of a data set and provides methods to work with the data dictionar...
Definition: data_dictionary.h:163
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:82
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:577
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:72
daal::data_management::interface1::DataCollection::size
size_t size() const
daal::data_management::interface1::CsvDataSource::getNumericTableNumberOfColumns
size_t getNumericTableNumberOfColumns() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:155
daal::data_management::interface1::DataSource::setDictionary
services::Status setDictionary(DataSourceDictionary *dict) DAAL_C11_OVERRIDE
Definition: data_source.h:218
daal::data_management::interface1::HomogenNumericTable::create
static services::SharedPtr< HomogenNumericTable< DataType > > create(NumericTableDictionaryPtr ddictForHomogenNumericTable, services::Status *stat=NULL)
Definition: homogen_numeric_table.h:95
daal::data_management::interface1::DataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: data_source.h:273
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows, NumericTable *nt) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:246
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:316
daal::data_management::interface1::CsvDataSourceOptions
Options of CSV data source.
Definition: csv_data_source.h:50
daal::data_management::internal::DataSourceOptionsImpl
Class that helps to define data source options.
Definition: data_source_options.h:33
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:168
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:83
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:663
daal::services::daal_memcpy_s
DAAL_DEPRECATED DAAL_EXPORT void daal_memcpy_s(void *dest, size_t numberOfElements, const void *src, size_t count)
daal::data_management::interface1::NumericTableIface::nonNormalized
Definition: numeric_table.h:319
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:83
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:70
daal::services::daal_malloc
DAAL_EXPORT void * daal_malloc(size_t size, size_t alignment=DAAL_MALLOC_DEFAULT_ALIGNMENT)
daal::services::ErrorNumericTableNotAllocated
Definition: error_indexes.h:160
daal::services::ErrorIncorrectDataRange
Definition: error_indexes.h:79
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:188
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:311
daal::data_management::interface1::DenseNumericTableIface::getBlockOfRows
virtual services::Status getBlockOfRows(size_t vector_idx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block)=0
daal::data_management::interface1::BlockDescriptor< DAAL_DATA_TYPE >
daal::services::daal_free
DAAL_EXPORT void daal_free(void *ptr)
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:321
daal::data_management::interface1::DenseNumericTableIface::releaseBlockOfRows
virtual services::Status releaseBlockOfRows(BlockDescriptor< double > &block)=0
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:635
daal::services::ErrorMemoryAllocationFailed
Definition: error_indexes.h:150
daal::services::ErrorMemoryCopyFailedInternal
Definition: error_indexes.h:152
daal::data_management::interface1::NumericTableIface::StorageLayout
StorageLayout
Storage layouts that may need to be supported.
Definition: numeric_table.h:328
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows, NumericTable *nt) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:239
daal::data_management::interface1::DataCollection::push_back
DataCollection & push_back(const SerializationIfacePtr &x)
daal::data_management::interface1::NumericTable::setNormalizationFlag
NormalizationType setNormalizationFlag(NormalizationType flag)
Definition: numeric_table.h:739
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:464
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:360
daal::data_management::interface1::CsvDataSource::setDictionary
services::Status setDictionary(DataSourceDictionary *dict) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:160
daal::data_management::interface1::DataCollection
Class that provides functionality of Collection container for objects derived from SerializationIface...
Definition: data_collection.h:47
daal::services::ErrorIncorrectTypeOfInputNumericTable
Definition: error_indexes.h:93
daal::data_management::interface1::BlockDescriptor::getBlockPtr
DataType * getBlockPtr() const
Definition: numeric_table.h:71
daal::data_management::interface1::CsvDataSource::CsvDataSource
CsvDataSource(DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: csv_data_source.h:119

For more complete information about compiler optimizations, see our Optimization Notice.