C++ API Reference for Intel® Data Analytics Acceleration Library 2020 Update 1

mysql_feature_manager.h
1 /* file: mysql_feature_manager.h */
2 /*******************************************************************************
3 * Copyright 2014-2020 Intel Corporation
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *******************************************************************************/
17 
18 /*
19 //++
20 // Implementation of the MYSQL data source class.
21 //--
22 */
23 #ifndef __MYSQL_FEATURE_MANAGER_H__
24 #define __MYSQL_FEATURE_MANAGER_H__
25 
26 #include <sstream>
27 
28 #include "data_management/data/numeric_table.h"
29 #include "data_management/features/shortcuts.h"
30 #include "data_management/data_source/data_source.h"
31 #include "data_management/data_source/internal/sql_feature_utils.h"
32 #include "data_management/data_source/modifiers/sql/shortcuts.h"
33 #include "data_management/data_source/modifiers/sql/internal/engine.h"
34 
35 namespace daal
36 {
37 namespace data_management
38 {
39 namespace interface1
40 {
50 class SQLFeatureManager
51 {
52 public:
53  SQLFeatureManager() : _errors(services::SharedPtr<services::ErrorCollection>(new services::ErrorCollection))
54  {}
55 
63  SQLFeatureManager &addModifier(const features::FeatureIdCollectionIfacePtr &featureIds,
64  const modifiers::sql::FeatureModifierIfacePtr &modifier,
65  services::Status *status = NULL)
66  {
67  services::Status localStatus;
68  if (!_modifiersManager)
69  {
70  _modifiersManager = modifiers::sql::internal::ModifiersManager::create(&localStatus);
71  if (!localStatus)
72  {
73  services::internal::tryAssignStatusAndThrow(status, localStatus);
74  return *this;
75  }
76  }
77 
78  localStatus |= _modifiersManager->addModifier(featureIds, modifier);
79  if (!localStatus)
80  {
81  services::internal::tryAssignStatusAndThrow(status, localStatus);
82  return *this;
83  }
84 
85  return *this;
86  }
87 
94  DataSourceIface::DataSourceStatus statementResultsNumericTable(SQLHSTMT hdlStmt, NumericTable *nt, size_t maxRows)
95  {
96  DAAL_ASSERT( nt );
97  DAAL_ASSERT( hdlStmt );
98 
99  nt->resize(maxRows);
100 
101  nt->getBlockOfRows(0, maxRows, writeOnly, _currentRowBlock);
102  DAAL_DATA_TYPE *ntBuffer = _currentRowBlock.getBlockPtr();
103  const size_t nColumns = _currentRowBlock.getNumberOfColumns();
104 
105  SQLRETURN ret;
106  size_t read = 0;
107  while (SQL_SUCCEEDED(ret = SQLFetchScroll(hdlStmt, SQL_FETCH_NEXT, 1)))
108  {
109  services::BufferView<DAAL_DATA_TYPE> rowBuffer(ntBuffer + read * nColumns, nColumns);
110 
111  if (_modifiersManager)
112  {
113  _modifiersManager->applyModifiers(rowBuffer);
114  }
115  else
116  {
117  _fetchBuffer->copyTo(rowBuffer);
118  }
119 
120  read++;
121  if (read >= maxRows) { break; }
122  }
123 
124  nt->releaseBlockOfRows(_currentRowBlock);
125  nt->resize(read);
126 
127  DataSourceIface::DataSourceStatus status = DataSourceIface::readyForLoad;
128  if (ret != SQL_NO_DATA)
129  {
130  if (!SQL_SUCCEEDED(ret))
131  {
132  status = DataSourceIface::notReady;
133  _errors->add(services::ErrorODBC);
134  }
135  }
136  else
137  {
138  if (read < maxRows)
139  {
140  status = DataSourceIface::endOfData;
141  }
142  }
143  return status;
144  }
145 
151  services::Status createDictionary(SQLHSTMT hdlStmt, DataSourceDictionary *dictionary)
152  {
153  DAAL_ASSERT( dictionary );
154  DAAL_ASSERT( hdlStmt );
155 
156  services::Status status;
157 
158  const internal::SQLFeaturesInfo &featuresInfo = getFeaturesInfo(hdlStmt, &status);
159  DAAL_CHECK_STATUS_VAR( status );
160 
161  DAAL_CHECK_STATUS( status, bindSQLColumns(hdlStmt, featuresInfo) );
162  DAAL_CHECK_STATUS( status, fillDictionary(*dictionary, featuresInfo) );
163 
164  return status;
165  }
166 
174  std::string setLimitQuery(std::string &query, size_t idx_last_read, size_t maxRows)
175  {
176  if (query.find('\0') != std::string::npos)
177  {
178  this->_errors->add(services::ErrorNullByteInjection);
179  return std::string();
180  }
181  std::stringstream ss;
182  ss << query << " LIMIT " << idx_last_read << ", " << maxRows << ";";
183  return ss.str();
184  }
185 
186  services::ErrorCollectionPtr getErrors()
187  {
188  return services::ErrorCollectionPtr(new services::ErrorCollection());
189  }
190 
191 private:
192  internal::SQLFeaturesInfo getFeaturesInfo(SQLHSTMT hdlStmt, services::Status *status = NULL)
193  {
194  SQLSMALLINT nFeatures = 0;
195  SQLRETURN ret = SQLNumResultCols(hdlStmt, &nFeatures);
196  if (!SQL_SUCCEEDED(ret))
197  {
198  services::internal::tryAssignStatusAndThrow(status, services::ErrorODBC);
199  return internal::SQLFeaturesInfo();
200  }
201 
202  SQLLEN sqlType;
203  SQLLEN sqlIsUnsigned;
204  SQLLEN sqlTypeLength;
205  internal::SQLFeaturesInfo featuresInfo;
206 
207  for (int i = 0 ; i < nFeatures; i++)
208  {
209  const int bufferSize = 128;
210  char label[bufferSize];
211 
212  SQLLEN sqlType;
213  SQLLEN sqlOctetLength;
214  SQLSMALLINT labelLenUsed;
215 
216  SQLLEN sqlIsUnsigned;
217  ret = SQLColAttributes(hdlStmt, (SQLUSMALLINT)(i + 1), SQL_DESC_UNSIGNED, NULL, 0, NULL, &sqlIsUnsigned);
218  if (!SQL_SUCCEEDED(ret))
219  {
220  services::internal::tryAssignStatusAndThrow(status, services::ErrorODBC);
221  return internal::SQLFeaturesInfo();
222  }
223 
224  ret = SQLColAttributes(hdlStmt, (SQLUSMALLINT)(i + 1), SQL_DESC_TYPE, NULL, 0, NULL, &sqlType);
225  if (!SQL_SUCCEEDED(ret))
226  {
227  services::internal::tryAssignStatusAndThrow(status, services::ErrorODBC);
228  return internal::SQLFeaturesInfo();
229  }
230 
231  ret = SQLColAttributes(hdlStmt, (SQLUSMALLINT)(i + 1), SQL_DESC_OCTET_LENGTH, NULL, 0, NULL, &sqlOctetLength);
232  if (!SQL_SUCCEEDED(ret))
233  {
234  services::internal::tryAssignStatusAndThrow(status, services::ErrorODBC);
235  return internal::SQLFeaturesInfo();
236  }
237 
238  ret = SQLColAttributes(hdlStmt, (SQLUSMALLINT)(i + 1), SQL_DESC_NAME, (SQLPOINTER)label,
239  (SQLSMALLINT)bufferSize, &labelLenUsed, NULL);
240  if (!SQL_SUCCEEDED(ret))
241  {
242  services::internal::tryAssignStatusAndThrow(status, services::ErrorODBC);
243  return internal::SQLFeaturesInfo();
244  }
245  services::Status internalStatus = services::internal::checkForNullByteInjection(label, label + labelLenUsed);
246  if (!internalStatus)
247  {
248  services::internal::tryAssignStatusAndThrow(status, internalStatus);
249  return internal::SQLFeaturesInfo();
250  }
251  const services::String labelStr(label);
252  const bool isSigned = sqlIsUnsigned == SQL_FALSE;
253 
254  featuresInfo.add( internal::SQLFeatureInfo(labelStr,
255  sqlType,
256  sqlOctetLength,
257  isSigned) );
258  }
259 
260  return featuresInfo;
261  }
262 
263  services::Status bindSQLColumns(SQLHSTMT hdlStmt, const internal::SQLFeaturesInfo &featuresInfo)
264  {
265  DAAL_ASSERT( hdlStmt );
266 
267  services::Status status;
268 
269  const internal::SQLFetchMode::Value fetchMode = _modifiersManager
270  ? internal::SQLFetchMode::useNativeSQLTypes
271  : internal::SQLFetchMode::castToFloatingPointType;
272  _fetchBuffer = internal::SQLFetchBuffer::create(featuresInfo, fetchMode, &status);
273  DAAL_CHECK_STATUS_VAR(status);
274 
275  SQLRETURN ret = SQLFreeStmt(hdlStmt, SQL_UNBIND);
276  if (!SQL_SUCCEEDED(ret)) { return services::throwIfPossible(services::ErrorODBC); }
277 
278  const SQLSMALLINT targetSQLType = internal::SQLFetchMode::getTargetType(fetchMode);
279  for (size_t i = 0; i < featuresInfo.getNumberOfFeatures(); i++)
280  {
281  char * const buffer = _fetchBuffer->getBufferForFeature(i);
282  const SQLLEN bufferSize = _fetchBuffer->getBufferSizeForFeature(i);
283  SQLLEN * const actualSizeBuffer = _fetchBuffer->getActualDataSizeBufferForFeature(i);
284 
285  SQLLEN strLenOrIndPtr;
286  ret = SQLBindCol(hdlStmt, (SQLUSMALLINT)(i + 1), targetSQLType,
287  (SQLPOINTER)buffer, bufferSize, actualSizeBuffer);
288  if (!SQL_SUCCEEDED(ret)) { return services::throwIfPossible(services::ErrorODBC); }
289  }
290 
291  if (_modifiersManager)
292  {
293  DAAL_CHECK_STATUS( status, _modifiersManager->prepare(featuresInfo, *_fetchBuffer) );
294  }
295 
296  return status;
297  }
298 
299  services::Status fillDictionary(DataSourceDictionary &dictionary,
300  const internal::SQLFeaturesInfo &featuresInfo)
301  {
302  if (_modifiersManager)
303  {
304  return _modifiersManager->fillDictionary(dictionary);
305  }
306 
307  const size_t nFeatures = featuresInfo.getNumberOfFeatures();
308  services::Status status = dictionary.setNumberOfFeatures(nFeatures);
309  if (!status) { return services::throwIfPossible(status); }
310 
311  for (size_t i = 0; i < nFeatures; i++)
312  {
313  dictionary[i].ntFeature.setType<DAAL_DATA_TYPE>();
314  dictionary[i].ntFeature.featureType = features::DAAL_CONTINUOUS;
315  }
316 
317  return status;
318  }
319 
320 private:
321  internal::SQLFetchBufferPtr _fetchBuffer;
322  BlockDescriptor<DAAL_DATA_TYPE> _currentRowBlock;
323  services::SharedPtr<services::ErrorCollection> _errors;
324  modifiers::sql::internal::ModifiersManagerPtr _modifiersManager;
325 };
326 
327 typedef SQLFeatureManager MySQLFeatureManager;
328 
330 } // namespace interface1
331 
332 using interface1::SQLFeatureManager;
333 using interface1::MySQLFeatureManager;
334 
335 } // namespace data_management
336 } // namespace daal
337 
338 #endif
daal::services::ErrorODBC
Definition: error_indexes.h:390
daal::data_management::interface1::SQLFeatureManager::addModifier
SQLFeatureManager & addModifier(const features::FeatureIdCollectionIfacePtr &featureIds, const modifiers::sql::FeatureModifierIfacePtr &modifier, services::Status *status=NULL)
Definition: mysql_feature_manager.h:63
daal::data_management::interface1::SQLFeatureManager
Interprets the response of SQL data base and fill provided numeric table and dictionary.
Definition: mysql_feature_manager.h:50
daal::data_management::interface1::Dictionary
Class that represents a dictionary of a data set and provides methods to work with the data dictionar...
Definition: data_dictionary.h:163
daal::data_management::interface1::NumericTable
Class for a data management component responsible for representation of data in the numeric format...
Definition: numeric_table.h:577
daal::data_management::interface1::DataSourceIface::readyForLoad
Definition: data_source.h:60
daal::data_management::interface1::DataSourceIface::notReady
Definition: data_source.h:63
daal::services::ErrorNullByteInjection
Definition: error_indexes.h:394
daal::data_management::interface1::DataSourceIface::DataSourceStatus
DataSourceStatus
Specifies the status of the Data Source.
Definition: data_source.h:58
daal::data_management::interface1::SQLFeatureManager::createDictionary
services::Status createDictionary(SQLHSTMT hdlStmt, DataSourceDictionary *dictionary)
Definition: mysql_feature_manager.h:151
daal::data_management::interface1::DataSourceIface::endOfData
Definition: data_source.h:62
daal::data_management::interface1::NumericTable::resize
virtual services::Status resize(size_t nrows) DAAL_C11_OVERRIDE
Definition: numeric_table.h:639
daal::data_management::interface1::SQLFeatureManager::statementResultsNumericTable
DataSourceIface::DataSourceStatus statementResultsNumericTable(SQLHSTMT hdlStmt, NumericTable *nt, size_t maxRows)
Definition: mysql_feature_manager.h:94
daal::data_management::internal::SQLFeaturesInfo
Class that holds auxiliary information about multiple SQL columns.
Definition: sql_feature_utils.h:72
daal::data_management::interface1::DenseNumericTableIface::getBlockOfRows
virtual services::Status getBlockOfRows(size_t vector_idx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block)=0
daal::data_management::interface1::DenseNumericTableIface::releaseBlockOfRows
virtual services::Status releaseBlockOfRows(BlockDescriptor< double > &block)=0
daal::data_management::interface1::BlockDescriptor::getNumberOfColumns
size_t getNumberOfColumns() const
Definition: numeric_table.h:97
daal::data_management::interface1::BlockDescriptor::getBlockPtr
DataType * getBlockPtr() const
Definition: numeric_table.h:71
daal::data_management::interface1::SQLFeatureManager::setLimitQuery
std::string setLimitQuery(std::string &query, size_t idx_last_read, size_t maxRows)
Definition: mysql_feature_manager.h:174

For more complete information about compiler optimizations, see our Optimization Notice.