C++ API Reference for Intel® Data Analytics Acceleration Library 2020 Update 1

kdb_data_source.h
1 /* file: kdb_data_source.h */
2 /*******************************************************************************
3 * Copyright 2014-2020 Intel Corporation
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *******************************************************************************/
17 
18 /*
19 //++
20 // Implementation of the KDB data source class
21 //--
22 */
23 #ifndef __KDB_DATA_SOURCE_H__
24 #define __KDB_DATA_SOURCE_H__
25 
26 #include <sstream>
27 #include <fstream>
28 #include "services/daal_memory.h"
29 #include "data_management/data_source/data_source.h"
30 #include "data_management/data/data_dictionary.h"
31 #include "data_management/data/numeric_table.h"
32 #include "data_management/data/homogen_numeric_table.h"
33 
34 #include <k.h>
35 
36 #include "kdb_feature_manager.h"
37 
38 namespace daal
39 {
40 namespace data_management
41 {
42 
46 namespace interface1
47 {
55 template<typename _featureManager, typename summaryStatisticsType = DAAL_SUMMARY_STATISTICS_TYPE>
56 class KDBDataSource : public DataSourceTemplate<data_management::HomogenNumericTable<DAAL_DATA_TYPE>, summaryStatisticsType>
57 {
58 public:
59  typedef _featureManager FeatureManager;
60 
61  using DataSourceIface::NumericTableAllocationFlag;
62  using DataSourceIface::DictionaryCreationFlag;
63  using DataSourceIface::DataSourceStatus;
64 
65  using DataSource::checkDictionary;
66  using DataSource::checkNumericTable;
67  using DataSource::freeNumericTable;
68  using DataSource::_dict;
69  using DataSource::_initialMaxRows;
70 
71 protected:
72  typedef data_management::HomogenNumericTable<DAAL_DATA_TYPE> DefaultNumericTableType;
73 
74  FeatureManager featureManager;
75 
76 public:
93  KDBDataSource(const std::string &dbname, size_t port, const std::string &tablename, const std::string &username = "",
94  const std::string &password = "",
95  DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable = DataSource::notAllocateNumericTable,
96  DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext = DataSource::notDictionaryFromContext,
97  size_t initialMaxRows = 10) :
98  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>(doAllocateNumericTable, doCreateDictionaryFromContext),
99  _port(port), _idx_last_read(0)
100  {
101  if (dbname.find('\0') != std::string::npos || tablename.find('\0') != std::string::npos ||
102  username.find('\0') != std::string::npos || password.find('\0') != std::string::npos)
103  {
104  this->_errors->add(services::ErrorNullByteInjection);
105  return;
106  }
107  _dbname = dbname;
108  _username = username;
109  _password = password;
110  _tablename = tablename;
111  _query = _tablename;
112  _initialMaxRows = initialMaxRows;
113  }
114 
116  ~KDBDataSource() {}
117 
118  size_t loadDataBlock() DAAL_C11_OVERRIDE
119  {
120  checkDictionary();
121  if( this->_errors->size() != 0 ) { return 0; }
122 
123  checkNumericTable();
124  if( this->_errors->size() != 0 ) { return 0; }
125 
126  return loadDataBlock(0, this->DataSource::_spnt.get());
127  }
128 
129  size_t loadDataBlock(NumericTable* nt) DAAL_C11_OVERRIDE
130  {
131  checkDictionary();
132  if( this->_errors->size() != 0 ) { return 0; }
133 
134  return loadDataBlock(0, nt);
135  }
136 
137  virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
138  {
139  checkDictionary();
140  if( !this->_errors->isEmpty() ) { return 0; }
141 
142  checkNumericTable();
143  if( !this->_errors->isEmpty() ) { return 0; }
144 
145  return loadDataBlock(maxRows, this->DataSource::_spnt.get());
146  }
147 
154  virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
155  {
156  checkDictionary();
157 
158  if( this->_errors->size() != 0 ) { return 0; }
159 
160  if( nt == NULL ) { this->_errors->add(services::ErrorNullInputNumericTable); return 0; }
161 
162  I handle = _kdbConnect();
163 
164  if (handle <= 0) { return 0; }
165 
166  size_t nRows = getNumberOfAvailableRows();
167 
168  if (nRows == 0) {
169  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( 0, nt );
170  _kdbClose(handle);
171  return 0;
172  }
173 
174  if (maxRows != 0 && nRows > maxRows)
175  {
176  nRows = maxRows;
177  }
178 
179  std::ostringstream query;
180  query << "(" << _query << ")[(til " << nRows << ") + " << _idx_last_read << + "]";
181  std::string query_exec = query.str();
182 
183  K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
184 
185  _kdbClose(handle);
186 
187  _idx_last_read += nRows;
188 
189  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::resizeNumericTableImpl( nRows, nt );
190 
191  if(nt->getDataMemoryStatus() == NumericTableIface::userAllocated)
192  {
193  if(nt->getNumberOfRows() < nRows)
194  {
195  r0(result);
196  this->_errors->add(services::ErrorIncorrectNumberOfObservations);
197  return 0;
198  }
199  if(nt->getNumberOfColumns() != _dict->getNumberOfFeatures())
200  {
201  r0(result);
202  this->_errors->add(services::ErrorIncorrectNumberOfFeatures);
203  return 0;
204  }
205  }
206 
207  if (result->t == XT)
208  {
209  K columnData = kK(result->k)[1];
210  featureManager.statementResultsNumericTableFromColumnData(columnData, nt, nRows);
211  }
212  else if (result->t == XD)
213  {
214  K columnData = kK(result)[1];
215  featureManager.statementResultsNumericTableFromColumnData(columnData, nt, nRows);
216  }
217  else
218  {
219  featureManager.statementResultsNumericTableFromList(result, nt, nRows);
220  }
221  r0(result);
222 
223  if(nt->basicStatistics.get(NumericTableIface::minimum ).get() != NULL &&
224  nt->basicStatistics.get(NumericTableIface::maximum ).get() != NULL &&
225  nt->basicStatistics.get(NumericTableIface::sum ).get() != NULL &&
226  nt->basicStatistics.get(NumericTableIface::sumSquares).get() != NULL)
227  {
228  for(size_t i = 0; i < nRows; i++)
229  {
230  DataSourceTemplate<DefaultNumericTableType, summaryStatisticsType>::updateStatistics( i, nt );
231  }
232  }
233 
234  NumericTableDictionaryPtr ntDict = nt->getDictionarySharedPtr();
235  size_t nFeatures = _dict->getNumberOfFeatures();
236  ntDict->setNumberOfFeatures(nFeatures);
237  for (size_t i = 0; i < nFeatures; i++)
238  {
239  ntDict->setFeature((*_dict)[i].ntFeature, i);
240  }
241 
242  return nRows;
243  }
244 
245  services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
246  {
247  if(_dict)
248  return services::Status(services::ErrorDictionaryAlreadyAvailable);
249 
250  I handle = _kdbConnect();
251 
252  std::string query_exec = "(" + _query + ")[til 1]";
253 
254  K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
255 
256  if (!result)
257  {
258  _kdbClose(handle);
259  return services::Status(services::ErrorKDBNetworkError);
260  }
261 
262  if (result->t == -128)
263  {
264  r0(result);
265  _kdbClose(handle);
266  return services::Status(services::ErrorKDBServerError);
267  }
268 
269  services::Status status;
270  _dict = DataSourceDictionary::create(&status);
271  if (!status) return status;
272 
273  if (result->t == XT)
274  {
275  featureManager.createDictionaryFromTable(result->k, this->_dict.get());
276  }
277  else if (result->t == XD)
278  {
279  featureManager.createDictionaryFromTable(result, this->_dict.get());
280  }
281  else
282  {
283  featureManager.createDictionaryFromList(kK(result)[0], this->_dict.get());
284  }
285  r0(result);
286 
287  _kdbClose(handle);
288  return status;
289  }
290 
291  DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
292  {
293  return DataSourceIface::readyForLoad;
294  }
295 
296  size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
297  {
298  I handle = _kdbConnect();
299 
300  if (handle <= 0) return 0;
301 
302  std::string query_exec = "count " + _query;
303 
304  K result = k(handle, const_cast<char*>(query_exec.c_str ()), (K)0);
305 
306  if (result->t != -KJ)
307  {
308  this->_errors->add(services::ErrorKDBWrongTypeOfOutput);
309  r0(result);
310  _kdbClose(handle);
311  return 0;
312  }
313 
314  size_t nRows = result->j;
315 
316  r0(result);
317 
318  _kdbClose(handle);
319 
320  return nRows - _idx_last_read;
321  }
322 
323  FeatureManager &getFeatureManager()
324  {
325  return featureManager;
326  }
327 
328 private:
329  std::string _dbname;
330  size_t _port;
331  std::string _username;
332  std::string _password;
333  std::string _tablename;
334  std::string _query;
335  size_t _idx_last_read;
336 
337  I _kdbConnect()
338  {
339  I handle = khpu(const_cast<char*>(_dbname.c_str ()), _port, const_cast<char*>((_username + ":" + _password).c_str ()));
340 
341  if (handle < 0)
342  {
343  this->_errors->add(services::ErrorKDBNoConnection);
344  return handle;
345  }
346 
347  if (handle == 0)
348  {
349  this->_errors->add(services::ErrorKDBWrongCredentials);
350  return handle;
351  }
352 
353  return handle;
354  }
355 
356  void _kdbClose(I handle)
357  {
358  kclose(handle);
359  }
360 };
361 } // namespace interface1
362 using interface1::KDBDataSource;
363 
364 }
365 }
366 #endif
daal::data_management::interface1::KDBDataSource::getStatus
DataSourceIface::DataSourceStatus getStatus() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:291
daal::data_management::interface1::KDBDataSource::loadDataBlock
size_t loadDataBlock(NumericTable *nt) DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:129
daal::data_management::interface1::DataSourceIface::NumericTableAllocationFlag
NumericTableAllocationFlag
Specifies whether a Numeric Table is allocated inside of the Data Source object.
Definition: data_source.h:80
daal::data_management::interface1::NumericTableIface::sumSquares
Definition: numeric_table.h:301
daal::data_management::interface1::NumericTableIface::maximum
Definition: numeric_table.h:299
daal::data_management::interface1::DataSource::checkNumericTable
services::Status checkNumericTable()
Definition: data_source.h:346
daal::services::ErrorDictionaryAlreadyAvailable
Definition: error_indexes.h:156
daal::data_management::interface1::NumericTable::getDataMemoryStatus
virtual MemoryStatus getDataMemoryStatus() const
Definition: numeric_table.h:722
daal::data_management::interface1::DataSource::status
services::Status status() const
Definition: data_source.h:310
daal::data_management::interface1::KDBDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows) DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:137
daal::data_management::interface1::KDBDataSource
Connects to data sources with the KDB API.
Definition: kdb_data_source.h:56
daal::services::ErrorIncorrectNumberOfObservations
Definition: error_indexes.h:73
daal::data_management::interface1::DataSourceIface::freeNumericTable
virtual void freeNumericTable()=0
daal::data_management::interface1::DataSourceIface::notAllocateNumericTable
Definition: data_source.h:82
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:577
daal::data_management::interface1::KDBDataSource::loadDataBlock
size_t loadDataBlock() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:118
daal::data_management::interface1::DataSourceIface::notDictionaryFromContext
Definition: data_source.h:72
daal::data_management::interface1::DataSourceIface::readyForLoad
Definition: data_source.h:60
daal::data_management::interface1::KDBDataSource::createDictionaryFromContext
services::Status createDictionaryFromContext() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:245
daal::data_management::interface1::KDBDataSource::getNumberOfAvailableRows
size_t getNumberOfAvailableRows() DAAL_C11_OVERRIDE
Definition: kdb_data_source.h:296
daal::data_management::interface1::KDBDataSource::loadDataBlock
virtual size_t loadDataBlock(size_t maxRows, NumericTable *nt)
Definition: kdb_data_source.h:154
daal::services::ErrorKDBNoConnection
Definition: error_indexes.h:396
daal::data_management::interface1::NumericTableIface::minimum
Definition: numeric_table.h:298
daal::data_management::interface1::KDBDataSource::KDBDataSource
KDBDataSource(const std::string &dbname, size_t port, const std::string &tablename, const std::string &username="", const std::string &password="", DataSourceIface::NumericTableAllocationFlag doAllocateNumericTable=DataSource::notAllocateNumericTable, DataSourceIface::DictionaryCreationFlag doCreateDictionaryFromContext=DataSource::notDictionaryFromContext, size_t initialMaxRows=10)
Definition: kdb_data_source.h:93
daal::data_management::interface1::NumericTableIface::userAllocated
Definition: numeric_table.h:277
daal::services::ErrorNullByteInjection
Definition: error_indexes.h:394
daal::data_management::interface1::DataSourceIface::DataSourceStatus
DataSourceStatus
Specifies the status of the Data Source.
Definition: data_source.h:58
daal::data_management::interface1::NumericTable::getNumberOfColumns
size_t getNumberOfColumns() const
Definition: numeric_table.h:654
daal::data_management::interface1::DataSourceIface::doAllocateNumericTable
Definition: data_source.h:83
daal::data_management::interface1::NumericTable::getNumberOfRows
size_t getNumberOfRows() const
Definition: numeric_table.h:663
daal::services::ErrorNullInputNumericTable
Definition: error_indexes.h:83
daal::data_management::interface1::DataSourceIface::DictionaryCreationFlag
DictionaryCreationFlag
Specifies whether a Data Dictionary is created from the context of a Data Source. ...
Definition: data_source.h:70
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:188
daal::data_management::interface1::NumericTable::getDictionarySharedPtr
virtual NumericTableDictionaryPtr getDictionarySharedPtr() const DAAL_C11_OVERRIDE
Definition: numeric_table.h:635
daal::services::ErrorIncorrectNumberOfFeatures
Definition: error_indexes.h:72
daal::data_management::interface1::NumericTableIface::sum
Definition: numeric_table.h:300
daal::data_management::interface1::DataSourceTemplate
Implements the abstract DataSourceIface interface.
Definition: data_source.h:464
daal::data_management::interface1::DataSource::checkDictionary
services::Status checkDictionary()
Definition: data_source.h:360
daal::services::ErrorKDBWrongCredentials
Definition: error_indexes.h:397
daal::services::ErrorKDBServerError
Definition: error_indexes.h:399
daal::services::ErrorKDBNetworkError
Definition: error_indexes.h:398
daal::services::ErrorKDBWrongTypeOfOutput
Definition: error_indexes.h:401

For more complete information about compiler optimizations, see our Optimization Notice.