24 #ifndef __CSV_DATA_SOURCE_H__
25 #define __CSV_DATA_SOURCE_H__
27 #include "services/daal_memory.h"
29 #include "data_management/data_source/data_source.h"
30 #include "data_management/data/data_dictionary.h"
31 #include "data_management/data/numeric_table.h"
32 #include "data_management/data/homogen_numeric_table.h"
33 #include "data_management/data_source/internal/data_source_options.h"
37 namespace data_management
50 class CsvDataSourceOptions
56 allocateNumericTable = 1 << 0,
57 createDictionaryFromContext = 1 << 1,
61 static CsvDataSourceOptions::Value unite(
const CsvDataSourceOptions::Value &lhs,
62 const CsvDataSourceOptions::Value &rhs)
64 return internal::DataSourceOptionsImpl<Value>::unite(lhs, rhs);
67 CsvDataSourceOptions(Value flags = byDefault) :
70 DataSource::NumericTableAllocationFlag getNumericTableAllocationFlag()
const
72 return (_impl.getFlag(allocateNumericTable))
73 ? DataSource::doAllocateNumericTable
74 : DataSource::notAllocateNumericTable;
77 DataSource::DictionaryCreationFlag getDictionaryCreationFlag()
const
79 return (_impl.getFlag(createDictionaryFromContext))
80 ? DataSource::doDictionaryFromContext
81 : DataSource::notDictionaryFromContext;
84 bool getParseHeaderFlag()
const
86 return _impl.getFlag(parseHeader);
90 internal::DataSourceOptionsImpl<Value> _impl;
99 template<
typename FeatureManager,
typename SummaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
100 class CsvDataSource :
public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, SummaryStatisticsType>
103 typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
104 typedef DataSourceTemplate<DefaultNumericTableType, SummaryStatisticsType> super;
108 using super::_initialMaxRows;
119 CsvDataSource(DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
120 DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
121 size_t initialMaxRows = 10) :
122 super(doAllocateNumericTable,
123 doCreateDictionaryFromContext)
125 initialize(initialMaxRows);
133 CsvDataSource(
const CsvDataSourceOptions &options,
size_t initialMaxRows = 10) :
134 super(options.getNumericTableAllocationFlag(),
135 options.getDictionaryCreationFlag())
137 initialize(initialMaxRows);
138 _parseHeader = options.getParseHeaderFlag();
141 virtual ~CsvDataSource()
143 daal::services::daal_free(_rawLineBuffer);
144 _rawLineBuffer = NULL;
150 FeatureManager &getFeatureManager()
152 return _featureManager;
155 size_t getNumericTableNumberOfColumns() DAAL_C11_OVERRIDE
157 return _featureManager.getNumericTableNumberOfColumns();
160 services::Status setDictionary(DataSourceDictionary *dict) DAAL_C11_OVERRIDE
162 services::Status s = DataSource::setDictionary(dict);
163 _featureManager.setFeatureDetailsFromDictionary(dict);
168 size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
170 services::Status s = super::checkDictionary();
173 this->_status.add(services::throwIfPossible(s));
176 s = checkInputNumericTable(nt);
179 this->_status.add(services::throwIfPossible(s));
183 size_t maxRows = (_initialMaxRows > 0 ? _initialMaxRows : 10);
185 const size_t ncols = getNumericTableNumberOfColumns();
186 DataCollection tables;
187 for( ;; maxRows *= 2)
189 NumericTablePtr ntCurrent = HomogenNumericTable<DAAL_DATA_TYPE>::create(ncols, maxRows, NumericTableIface::doAllocate, &s);
192 this->_status.add(services::throwIfPossible(services::Status(services::ErrorNumericTableNotAllocated)));
195 tables.push_back(ntCurrent);
196 const size_t rows = loadDataBlock(maxRows, ntCurrent.get());
202 s = resetNumericTable(nt, nrows);
205 this->_status.add(services::throwIfPossible(s));
209 BlockDescriptor<DAAL_DATA_TYPE> blockCurrent, block;
212 for (
size_t i = 0; i < tables.size(); i++)
214 NumericTable *ntCurrent = (NumericTable*)(tables[i].
get());
215 size_t rows = ntCurrent->getNumberOfRows();
220 ntCurrent->getBlockOfRows(0, rows, readOnly, blockCurrent);
221 nt->getBlockOfRows(pos, rows, writeOnly, block);
223 result |= services::internal::daal_memcpy_s(block.getBlockPtr(), rows * ncols *
sizeof(DAAL_DATA_TYPE),
224 blockCurrent.getBlockPtr(), rows * ncols *
sizeof(DAAL_DATA_TYPE));
226 ntCurrent->releaseBlockOfRows(blockCurrent);
227 nt->releaseBlockOfRows(block);
229 super::combineStatistics( ntCurrent, nt, pos == 0);
234 this->_status.add(services::throwIfPossible(services::Status(services::ErrorMemoryCopyFailedInternal)));
239 size_t loadDataBlock(
size_t maxRows, NumericTable* nt) DAAL_C11_OVERRIDE
241 size_t nLines = loadDataBlock(maxRows, 0, maxRows, nt);
242 nt->resize( nLines );
246 size_t loadDataBlock(
size_t maxRows,
size_t rowOffset,
size_t fullRows, NumericTable *nt) DAAL_C11_OVERRIDE
248 services::Status s = super::checkDictionary();
251 this->_status.add(services::throwIfPossible(s));
254 s = checkInputNumericTable(nt);
257 this->_status.add(services::throwIfPossible(s));
261 if (rowOffset + maxRows > fullRows)
263 this->_status.add(services::throwIfPossible(services::ErrorIncorrectDataRange));
267 s = resetNumericTable(nt, fullRows);
270 this->_status.add(services::throwIfPossible(s));
274 if (_parseHeader && !_firstRowRead)
280 this->_status.add(services::throwIfPossible(s));
284 _firstRowRead =
true;
288 for(; j < maxRows && !iseof() ; j++ )
293 this->_status.add(services::throwIfPossible(s));
302 _featureManager.parseRowIn( _rawLineBuffer, _rawLineLength, this->_dict.get(), nt, rowOffset + j );
304 super::updateStatistics( j, nt, rowOffset );
306 _featureManager.finalize(this->_dict.get());
308 return rowOffset + j;
311 size_t loadDataBlock() DAAL_C11_OVERRIDE
313 return DataSource::loadDataBlock();
316 size_t loadDataBlock(
size_t maxRows) DAAL_C11_OVERRIDE
318 return DataSource::loadDataBlock(maxRows);
321 size_t loadDataBlock(
size_t maxRows,
size_t rowOffset,
size_t fullRows) DAAL_C11_OVERRIDE
323 return DataSource::loadDataBlock(maxRows, rowOffset, fullRows);
327 services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
333 return services::throwIfPossible(services::Status(services::ErrorDictionaryAlreadyAvailable));
336 _dict = DataSourceDictionary::create(&s);
337 if (!s) {
return s; }
338 _contextDictFlag =
true;
345 return services::throwIfPossible(s);
347 _featureManager.parseRowAsHeader(_rawLineBuffer, _rawLineLength);
353 return services::throwIfPossible(s);
355 _featureManager.parseRowAsDictionary(_rawLineBuffer, _rawLineLength, this->_dict.get());
357 return services::Status();
360 size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
366 virtual bool iseof()
const = 0;
367 virtual services::Status readLine() = 0;
369 virtual services::Status resetNumericTable(NumericTable *nt,
const size_t newSize)
373 NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
374 const size_t nFeatures = getNumericTableNumberOfColumns();
375 ntDict->setNumberOfFeatures(nFeatures);
376 for (
size_t i = 0; i < nFeatures; i++)
377 ntDict->setFeature((*_dict)[i].ntFeature, i);
379 s = super::resizeNumericTableImpl(newSize, nt);
385 nt->setNormalizationFlag(NumericTable::nonNormalized);
386 return services::Status();
389 virtual services::Status checkInputNumericTable(
const NumericTable*
const nt)
const
393 return services::Status(services::ErrorNullInputNumericTable);
396 const NumericTable::StorageLayout layout = nt->getDataLayout();
397 if (layout == NumericTable::csrArray)
399 return services::Status(services::ErrorIncorrectTypeOfInputNumericTable);
402 return services::Status();
407 int newRawLineBufferLen = _rawLineBufferLen * 2;
408 char* newRawLineBuffer = (
char *)daal::services::daal_malloc( newRawLineBufferLen );
409 if(newRawLineBuffer == 0)
411 int result = daal::services::internal::daal_memcpy_s(newRawLineBuffer, newRawLineBufferLen, _rawLineBuffer, _rawLineBufferLen);
414 this->_status.add(services::throwIfPossible(services::Status(services::ErrorMemoryCopyFailedInternal)));
416 daal::services::daal_free( _rawLineBuffer );
417 _rawLineBuffer = newRawLineBuffer;
418 _rawLineBufferLen = newRawLineBufferLen;
423 services::Status initialize(
size_t initialMaxRows)
425 _parseHeader =
false;
426 _firstRowRead =
false;
427 _contextDictFlag =
false;
429 _initialMaxRows = initialMaxRows;
431 _rawLineBufferLen = (int)INITIAL_LINE_BUFFER_LENGTH;
432 _rawLineBuffer = (
char *)daal::services::daal_malloc(_rawLineBufferLen);
433 if (!_rawLineBuffer) {
return services::throwIfPossible(services::ErrorMemoryAllocationFailed); }
435 return services::Status();
439 char *_rawLineBuffer;
440 int _rawLineBufferLen;
446 bool _contextDictFlag;
447 FeatureManager _featureManager;
449 static const size_t INITIAL_LINE_BUFFER_LENGTH = 1024;
455 using interface1::CsvDataSource;
456 using interface1::CsvDataSourceOptions;
458 inline CsvDataSourceOptions::Value operator |(
const CsvDataSourceOptions::Value &lhs,
459 const CsvDataSourceOptions::Value &rhs)
460 {
return CsvDataSourceOptions::unite(lhs, rhs); }
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:80
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:156
daal::data_management::interface1::CsvDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:360
daal::data_management::interface1::CsvDataSource::CsvDataSource
CsvDataSource(const CsvDataSourceOptions &options, size_t initialMaxRows=10)
Definition: csv_data_source.h:133
daal::data_management::interface1::CsvDataSource::getFeatureManager
FeatureManager & getFeatureManager()
Definition: csv_data_source.h:150
daal::data_management::interface1::NumericTableIface::doAllocate
Definition: numeric_table.h:289
daal::data_management::interface1::CsvDataSource
Specifies methods to access data stored in files.
Definition: csv_data_source.h:100
daal::data_management::interface1::DataSourceIface::doDictionaryFromContext
Definition: data_source.h:73
daal::data_management::interface1::CsvDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:327
daal::data_management::interface1::Dictionary
Class that represents a dictionary of a data set and provides methods to work with the data dictionar...
Definition: data_dictionary.h:163
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:82
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:577
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:72
daal::data_management::interface1::DataCollection::size
size_t size() const
daal::data_management::interface1::CsvDataSource::getNumericTableNumberOfColumns
size_t getNumericTableNumberOfColumns() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:155
daal::data_management::interface1::DataSource::setDictionary
services::Status setDictionary(DataSourceDictionary *dict) DAAL_C11_OVERRIDE
Definition: data_source.h:218
daal::data_management::interface1::HomogenNumericTable::create
static services::SharedPtr< HomogenNumericTable< DataType > > create(NumericTableDictionaryPtr ddictForHomogenNumericTable, services::Status *stat=NULL)
Definition: homogen_numeric_table.h:95
daal::data_management::interface1::DataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: data_source.h:273
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows, NumericTable *nt) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:246
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:316
daal::data_management::interface1::CsvDataSourceOptions
Options of CSV data source.
Definition: csv_data_source.h:50
daal::data_management::internal::DataSourceOptionsImpl
Class that helps to define data source options.
Definition: data_source_options.h:33
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:168
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:83
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:663
daal::services::daal_memcpy_s
DAAL_DEPRECATED DAAL_EXPORT void daal_memcpy_s(void *dest, size_t numberOfElements, const void *src, size_t count)
daal::data_management::interface1::NumericTableIface::nonNormalized
Definition: numeric_table.h:319
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:83
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:70
daal::services::daal_malloc
DAAL_EXPORT void * daal_malloc(size_t size, size_t alignment=DAAL_MALLOC_DEFAULT_ALIGNMENT)
daal::services::ErrorNumericTableNotAllocated
Definition: error_indexes.h:160
daal::services::ErrorIncorrectDataRange
Definition: error_indexes.h:79
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:188
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: csv_data_source.h:311
daal::data_management::interface1::DenseNumericTableIface::getBlockOfRows
virtual services::Status getBlockOfRows(size_t vector_idx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block)=0
daal::data_management::interface1::BlockDescriptor< DAAL_DATA_TYPE >
daal::services::daal_free
DAAL_EXPORT void daal_free(void *ptr)
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows, size_t rowOffset, size_t fullRows) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:321
daal::data_management::interface1::DenseNumericTableIface::releaseBlockOfRows
virtual services::Status releaseBlockOfRows(BlockDescriptor< double > &block)=0
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:635
daal::services::ErrorMemoryAllocationFailed
Definition: error_indexes.h:150
daal::services::ErrorMemoryCopyFailedInternal
Definition: error_indexes.h:152
daal::data_management::interface1::NumericTableIface::StorageLayout
StorageLayout
Storage layouts that may need to be supported.
Definition: numeric_table.h:328
daal::data_management::interface1::CsvDataSource::loadDataBlock
size_t loadDataBlock(size_t maxRows, NumericTable *nt) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:239
daal::data_management::interface1::DataCollection::push_back
DataCollection & push_back(const SerializationIfacePtr &x)
daal::data_management::interface1::NumericTable::setNormalizationFlag
NormalizationType setNormalizationFlag(NormalizationType flag)
Definition: numeric_table.h:739
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:464
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:360
daal::data_management::interface1::CsvDataSource::setDictionary
services::Status setDictionary(DataSourceDictionary *dict) DAAL_C11_OVERRIDE
Definition: csv_data_source.h:160
daal::data_management::interface1::DataCollection
Class that provides functionality of Collection container for objects derived from SerializationIface...
Definition: data_collection.h:47
daal::services::ErrorIncorrectTypeOfInputNumericTable
Definition: error_indexes.h:93
daal::data_management::interface1::BlockDescriptor::getBlockPtr
DataType * getBlockPtr() const
Definition: numeric_table.h:71
daal::data_management::interface1::CsvDataSource::CsvDataSource
CsvDataSource(DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: csv_data_source.h:119