23 #ifndef __KDB_DATA_SOURCE_H__
24 #define __KDB_DATA_SOURCE_H__
28 #include "services/daal_memory.h"
29 #include "data_management/data_source/data_source.h"
30 #include "data_management/data/data_dictionary.h"
31 #include "data_management/data/numeric_table.h"
32 #include "data_management/data/homogen_numeric_table.h"
36 #include "kdb_feature_manager.h"
40 namespace data_management
55 template<
typename _featureManager,
typename summaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
56 class KDBDataSource :
public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, summaryStatisticsType>
59 typedef _featureManager FeatureManager;
61 using DataSourceIface::NumericTableAllocationFlag;
62 using DataSourceIface::DictionaryCreationFlag;
63 using DataSourceIface::DataSourceStatus;
65 using DataSource::checkDictionary;
66 using DataSource::checkNumericTable;
67 using DataSource::freeNumericTable;
68 using DataSource::_dict;
69 using DataSource::_initialMaxRows;
72 typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
74 FeatureManager featureManager;
93 KDBDataSource(
const std::string &dbname,
size_t port,
const std::string &tablename,
const std::string &username =
"",
94 const std::string &password =
"",
95 DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
96 DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
97 size_t initialMaxRows = 10) :
98 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>(doAllocateNumericTable, doCreateDictionaryFromContext),
99 _port(port), _idx_last_read(0)
101 if (dbname.find(
'\0') != std::string::npos || tablename.find(
'\0') != std::string::npos ||
102 username.find(
'\0') != std::string::npos || password.find(
'\0') != std::string::npos)
104 this->_errors->add(services::ErrorNullByteInjection);
108 _username = username;
109 _password = password;
110 _tablename = tablename;
112 _initialMaxRows = initialMaxRows;
118 size_t loadDataBlock() DAAL_C11_OVERRIDE
121 if( this->_errors->size() != 0 ) {
return 0; }
124 if( this->_errors->size() != 0 ) {
return 0; }
126 return loadDataBlock(0, this->DataSource::_spnt.
get());
129 size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
132 if( this->_errors->size() != 0 ) {
return 0; }
134 return loadDataBlock(0, nt);
137 virtual size_t loadDataBlock(
size_t maxRows) DAAL_C11_OVERRIDE
140 if( !this->_errors->isEmpty() ) {
return 0; }
143 if( !this->_errors->isEmpty() ) {
return 0; }
145 return loadDataBlock(maxRows, this->DataSource::_spnt.
get());
154 virtual size_t loadDataBlock(
size_t maxRows, NumericTable *nt)
158 if( this->_errors->size() != 0 ) {
return 0; }
160 if( nt == NULL ) { this->_errors->add(services::ErrorNullInputNumericTable);
return 0; }
162 I handle = _kdbConnect();
164 if (handle <= 0) {
return 0; }
166 size_t nRows = getNumberOfAvailableRows();
169 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( 0, nt );
174 if (maxRows != 0 && nRows > maxRows)
179 std::ostringstream query;
180 query <<
"(" << _query <<
")[(til " << nRows <<
") + " << _idx_last_read << +
"]";
181 std::string query_exec = query.str();
183 K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
187 _idx_last_read += nRows;
189 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( nRows, nt );
191 if(nt->getDataMemoryStatus() == NumericTableIface::userAllocated)
193 if(nt->getNumberOfRows() < nRows)
196 this->_errors->add(services::ErrorIncorrectNumberOfObservations);
199 if(nt->getNumberOfColumns() != _dict->getNumberOfFeatures())
202 this->_errors->add(services::ErrorIncorrectNumberOfFeatures);
209 K columnData = kK(result->k)[1];
210 featureManager.statementResultsNumericTableFromColumnData(columnData, nt, nRows);
212 else if (result->t == XD)
214 K columnData = kK(result)[1];
215 featureManager.statementResultsNumericTableFromColumnData(columnData, nt, nRows);
219 featureManager.statementResultsNumericTableFromList(result, nt, nRows);
223 if(nt->basicStatistics.get(NumericTableIface::minimum ).get() != NULL &&
224 nt->basicStatistics.get(NumericTableIface::maximum ).get() != NULL &&
225 nt->basicStatistics.get(NumericTableIface::sum ).get() != NULL &&
226 nt->basicStatistics.get(NumericTableIface::sumSquares).get() != NULL)
228 for(
size_t i = 0; i < nRows; i++)
230 DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::updateStatistics( i, nt );
234 NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
235 size_t nFeatures = _dict->getNumberOfFeatures();
236 ntDict->setNumberOfFeatures(nFeatures);
237 for (
size_t i = 0; i < nFeatures; i++)
239 ntDict->setFeature((*_dict)[i].ntFeature, i);
245 services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
248 return services::Status(services::ErrorDictionaryAlreadyAvailable);
250 I handle = _kdbConnect();
252 std::string query_exec =
"(" + _query +
")[til 1]";
254 K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
259 return services::Status(services::ErrorKDBNetworkError);
262 if (result->t == -128)
266 return services::Status(services::ErrorKDBServerError);
269 services::Status status;
270 _dict = DataSourceDictionary::create(&status);
271 if (!status)
return status;
275 featureManager.createDictionaryFromTable(result->k, this->_dict.get());
277 else if (result->t == XD)
279 featureManager.createDictionaryFromTable(result, this->_dict.get());
283 featureManager.createDictionaryFromList(kK(result)[0], this->_dict.get());
291 DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
293 return DataSourceIface::readyForLoad;
296 size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
298 I handle = _kdbConnect();
300 if (handle <= 0)
return 0;
302 std::string query_exec =
"count " + _query;
304 K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
306 if (result->t != -KJ)
308 this->_errors->add(services::ErrorKDBWrongTypeOfOutput);
314 size_t nRows = result->j;
320 return nRows - _idx_last_read;
323 FeatureManager &getFeatureManager()
325 return featureManager;
331 std::string _username;
332 std::string _password;
333 std::string _tablename;
335 size_t _idx_last_read;
339 I handle = khpu(const_cast<char*>(_dbname.c_str ()), _port, const_cast<char*>((_username +
":" + _password).c_str ()));
343 this->_errors->add(services::ErrorKDBNoConnection);
349 this->_errors->add(services::ErrorKDBWrongCredentials);
356 void _kdbClose(I handle)
362 using interface1::KDBDataSource;
daal::data_management::interface1::KDBDataSource::getStatus
DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:291
daal::data_management::interface1::KDBDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:129
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:80
daal::data_management::interface1::NumericTableIface::sumSquares
Definition: numeric_table.h:301
daal::data_management::interface1::NumericTableIface::maximum
Definition: numeric_table.h:299
daal::data_management::interface1::DataSource::checkNumericTable
services::Status checkNumericTable()
Definition: data_source.h:346
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:156
daal::data_management::interface1::NumericTable::getDataMemoryStatus
virtual MemoryStatus getDataMemoryStatus() const
Definition: numeric_table.h:722
daal::data_management::interface1::DataSource::status
services::Status status() const
Definition: data_source.h:310
daal::data_management::interface1::KDBDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:137
daal::data_management::interface1::KDBDataSource
Connects to data sources with the KDB API.
Definition: kdb_data_source.h:56
daal::services::ErrorIncorrectNumberOfObservations
Definition: error_indexes.h:73
daal::data_management::interface1::DataSourceIface::freeNumericTable
virtual void freeNumericTable()=0
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:82
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:577
daal::data_management::interface1::KDBDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:118
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:72
daal::data_management::interface1::DataSourceIface::readyForLoad
Definition: data_source.h:60
daal::data_management::interface1::KDBDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:245
daal::data_management::interface1::KDBDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:296
daal::data_management::interface1::KDBDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
Definition: kdb_data_source.h:154
daal::services::ErrorKDBNoConnection
Definition: error_indexes.h:396
daal::data_management::interface1::NumericTableIface::minimum
Definition: numeric_table.h:298
daal::data_management::interface1::KDBDataSource::KDBDataSource
KDBDataSource(const std::string &dbname, size_t port, const std::string &tablename, const std::string &username="", const std::string &password="", DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: kdb_data_source.h:93
daal::data_management::interface1::NumericTableIface::userAllocated
Definition: numeric_table.h:277
daal::services::ErrorNullByteInjection
Definition: error_indexes.h:394
daal::data_management::interface1::DataSourceIface::DataSourceStatus
DataSourceStatus
Specifies the status of the Data Source.
Definition: data_source.h:58
daal::data_management::interface1::NumericTable::getNumberOfColumns
size_t getNumberOfColumns() const
Definition: numeric_table.h:654
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:83
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:663
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:83
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:70
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:188
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:635
daal::services::ErrorIncorrectNumberOfFeatures
Definition: error_indexes.h:72
daal::data_management::interface1::NumericTableIface::sum
Definition: numeric_table.h:300
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:464
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:360
daal::services::ErrorKDBWrongCredentials
Definition: error_indexes.h:397
daal::services::ErrorKDBServerError
Definition: error_indexes.h:399
daal::services::ErrorKDBNetworkError
Definition: error_indexes.h:398
daal::services::ErrorKDBWrongTypeOfOutput
Definition: error_indexes.h:401