C++ API Reference for Intel® Data Analytics Acceleration Library 2020 Update 1

arrow_numeric_table.h
1 /* file: arrow_numeric_table.h */
2 /*******************************************************************************
3 * Copyright 2014-2020 Intel Corporation
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *******************************************************************************/
17 
18 /*
19 //++
20 // Implementation of a numeric table stored as a Apache Arrow table.
21 //--
22 */
23 
24 #ifndef __ARROW_NUMERIC_TABLE_H__
25 #define __ARROW_NUMERIC_TABLE_H__
26 
27 #include "data_management/data/numeric_table.h"
28 #include "data_management/data/internal/conversion.h"
29 #include "data_management/data/internal/base_arrow_numeric_table.h"
30 #include <memory>
31 #include <arrow/table.h>
32 #include <arrow/util/config.h>
33 
34 namespace daal
35 {
36 namespace data_management
37 {
38 
39 namespace interface1
40 {
49 class DAAL_EXPORT ArrowImmutableNumericTable : public BaseArrowImmutableNumericTable
50 {
51 public:
52  DECLARE_SERIALIZABLE_IMPL();
53 
60  static DAAL_FORCEINLINE services::SharedPtr<ArrowImmutableNumericTable> create(const std::shared_ptr<arrow::Table> & table,
61  services::Status * stat = NULL)
62  {
63  if (!table)
64  {
65  if (stat) { stat->add(services::ErrorNullPtr); }
66  return services::SharedPtr<ArrowImmutableNumericTable>();
67  }
68 
69  DAAL_DEFAULT_CREATE_IMPL_EX(ArrowImmutableNumericTable, table);
70  }
71 
78  static DAAL_FORCEINLINE services::SharedPtr<ArrowImmutableNumericTable> create(const std::shared_ptr<const arrow::Table> & table,
79  services::Status * stat = NULL)
80  {
81  if (!table)
82  {
83  if (stat) { stat->add(services::ErrorNullPtr); }
84  return services::SharedPtr<ArrowImmutableNumericTable>();
85  }
86 
87  DAAL_DEFAULT_CREATE_IMPL_EX(ArrowImmutableNumericTable, table);
88  }
89 
90  services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
91  {
92  return getTBlock<double>(vectorIdx, vector_num, rwflag, block);
93  }
94 
95  services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
96  {
97  return getTBlock<float>(vectorIdx, vector_num, rwflag, block);
98  }
99 
100  services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
101  {
102  return getTBlock<int>(vectorIdx, vector_num, rwflag, block);
103  }
104 
105  services::Status releaseBlockOfRows(BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
106  {
107  return releaseTBlock<double>(block);
108  }
109 
110  services::Status releaseBlockOfRows(BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
111  {
112  return releaseTBlock<float>(block);
113  }
114 
115  services::Status releaseBlockOfRows(BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
116  {
117  return releaseTBlock<int>(block);
118  }
119 
120  services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum,
121  ReadWriteMode rwflag, BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
122  {
123  return getTFeature<double>(featureIdx, vectorIdx, valueNum, rwflag, block);
124  }
125 
126  services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum,
127  ReadWriteMode rwflag, BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
128  {
129  return getTFeature<float>(featureIdx, vectorIdx, valueNum, rwflag, block);
130  }
131 
132  services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum,
133  ReadWriteMode rwflag, BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
134  {
135  return getTFeature<int>(featureIdx, vectorIdx, valueNum, rwflag, block);
136  }
137 
138  services::Status releaseBlockOfColumnValues(BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
139  {
140  return releaseTFeature<double>(block);
141  }
142  services::Status releaseBlockOfColumnValues(BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
143  {
144  return releaseTFeature<float>(block);
145  }
146  services::Status releaseBlockOfColumnValues(BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
147  {
148  return releaseTFeature<int>(block);
149  }
150 
151 protected:
152  services::Status setNumberOfColumnsImpl(size_t ncol) DAAL_C11_OVERRIDE
153  {
154  if (ncol == getNumberOfColumns()) return services::Status();
155  return services::Status(services::ErrorMethodNotSupported);
156  }
157 
158  services::Status allocateDataMemoryImpl(daal::MemType type = daal::dram) DAAL_C11_OVERRIDE
159  {
160  return services::Status(services::ErrorMethodNotSupported);
161  }
162 
163  template<typename Archive, bool onDeserialize>
164  services::Status serialImpl(Archive* arch)
165  {
166  if (onDeserialize)
167  {
168  return services::Status(services::ErrorMethodNotSupported);
169  }
170 
171  NumericTable::serialImpl<Archive, onDeserialize>(arch);
172 
173  const size_t ncol = _ddict->getNumberOfFeatures();
174  const size_t nrows = getNumberOfRows();
175 
176  for (size_t i = 0; i < ncol; ++i)
177  {
178  const NumericTableFeature& f = (*_ddict)[i];
179 
180  const std::shared_ptr<const arrow::ChunkedArray> columnChunkedArrayPtr = getColumnChunkedArrayPtr(i);
181  DAAL_ASSERT(columnChunkedArrayPtr);
182  const arrow::ChunkedArray& columnChunkedArray = *columnChunkedArrayPtr;
183  const int chunkCount = columnChunkedArray.num_chunks();
184  DAAL_ASSERT(chunkCount > 0);
185 
186  for (int chunk = 0; chunk < chunkCount; ++chunk)
187  {
188  const std::shared_ptr<const arrow::Array> arrayPtr = columnChunkedArray.chunk(chunk);
189  DAAL_ASSERT(arrayPtr);
190  const int64_t chunkLength = arrayPtr->length();
191  DAAL_ASSERT(chunkLength > 0);
192  arch->set(getPtr(arrayPtr, f), chunkLength * f.typeSize);
193  }
194  }
195 
196  return services::Status();
197  }
198 
199 private:
200  DAAL_FORCEINLINE ArrowImmutableNumericTable(const std::shared_ptr<const arrow::Table> & table, services::Status & st)
201  : BaseArrowImmutableNumericTable(table->num_columns(), table->num_rows(), st), _table(table)
202  {
203  _layout = arrow;
204  _memStatus = userAllocated;
205  if (st) st |= updateFeatures(*table);
206  }
207 
208  std::shared_ptr<const arrow::Table> _table;
209 
210  DAAL_FORCEINLINE services::Status updateFeatures(const arrow::Table & table)
211  {
212  services::Status s;
213  if (_ddict.get() == NULL)
214  {
215  _ddict = NumericTableDictionary::create(&s);
216  }
217  if (!s) return s;
218 
219  const std::shared_ptr<const arrow::Schema> schemaPtr = table.schema();
220  DAAL_ASSERT(schemaPtr);
221  const int ncols = schemaPtr->num_fields();
222  for (int col = 0; col < ncols; ++col)
223  {
224  const arrow::Type::type type = schemaPtr->field(col)->type()->id();
225  switch (type)
226  {
227  case arrow::Type::UINT8: s |= setFeature<unsigned char>(col); break;
228  case arrow::Type::INT8: s |= setFeature<char>(col); break;
229  case arrow::Type::UINT16: s |= setFeature<unsigned short>(col); break;
230  case arrow::Type::INT16: s |= setFeature<short>(col); break;
231  case arrow::Type::UINT32: s |= setFeature<unsigned int>(col); break;
232  case arrow::Type::DATE32:
233  case arrow::Type::TIME32:
234  case arrow::Type::INT32: s |= setFeature<int>(col); break;
235  case arrow::Type::UINT64: s |= setFeature<DAAL_UINT64>(col); break;
236  case arrow::Type::DATE64:
237  case arrow::Type::TIMESTAMP:
238  case arrow::Type::TIME64:
239  case arrow::Type::INT64: s |= setFeature<DAAL_INT64>(col); break;
240  case arrow::Type::FLOAT: s |= setFeature<float>(col); break;
241  case arrow::Type::DOUBLE: s |= setFeature<double>(col); break;
242  default: s.add(services::ErrorDataTypeNotSupported); return s;
243  }
244  }
245  return s;
246  }
247 
248  template <typename T>
249  services::Status setFeature(size_t idx, features::FeatureType featureType = features::DAAL_CONTINUOUS, size_t categoryNumber = 0)
250  {
251  DAAL_ASSERT(_ddict);
252  services::Status s = _ddict->setFeature<T>(idx);
253  if (!s) return s;
254  (*_ddict)[idx].featureType = featureType;
255  (*_ddict)[idx].categoryNumber = categoryNumber;
256  return s;
257  }
258 
259  template <typename T>
260  services::Status getTBlock(size_t idx, size_t nrows, ReadWriteMode rwFlag, BlockDescriptor<T>& block)
261  {
262  if (block.getRWFlag() & (int)writeOnly)
263  {
264  return services::Status(services::ErrorMethodNotSupported);
265  }
266 
267  const size_t ncols = getNumberOfColumns();
268  const size_t nobs = getNumberOfRows();
269  block.setDetails(0, idx, rwFlag);
270 
271  if (idx >= nobs)
272  {
273  block.resizeBuffer(ncols, 0);
274  return services::Status();
275  }
276 
277  nrows = (idx + nrows < nobs) ? nrows : nobs - idx;
278 
279  if (!block.resizeBuffer(ncols, nrows)) { return services::Status(services::ErrorMemoryAllocationFailed); }
280 
281  T lbuf[32];
282  size_t di = 32;
283  T* const buffer = block.getBlockPtr();
284 
285  for (size_t i = 0; i < nrows; i += di)
286  {
287  if (i + di > nrows) { di = nrows - i; }
288 
289  for (size_t j = 0; j < ncols; ++j)
290  {
291  const NumericTableFeature & f = (*_ddict)[j];
292 
293  const std::shared_ptr<const arrow::ChunkedArray> columnChunkedArrayPtr = getColumnChunkedArrayPtr(j);
294  DAAL_ASSERT(columnChunkedArrayPtr);
295  const std::shared_ptr<const arrow::ChunkedArray> sliceChunkedArrayPtr = columnChunkedArrayPtr->Slice(idx + i, di);
296  DAAL_ASSERT(sliceChunkedArrayPtr);
297  const arrow::ChunkedArray& sliceChunkedArray = *sliceChunkedArrayPtr;
298  const int chunkCount = sliceChunkedArray.num_chunks();
299  DAAL_ASSERT(chunkCount > 0);
300  if (chunkCount == 1)
301  {
302  const char* const ptr = getPtr(sliceChunkedArray.chunk(0), f);
303  DAAL_ASSERT(ptr);
304  internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(di, ptr, lbuf);
305  }
306  else
307  {
308  size_t offset = 0;
309  for (int chunk = 0; chunk < chunkCount; ++chunk)
310  {
311  const std::shared_ptr<const arrow::Array> arrayPtr = sliceChunkedArray.chunk(chunk);
312  DAAL_ASSERT(arrayPtr);
313  const int64_t chunkLength = arrayPtr->length();
314  DAAL_ASSERT(chunkLength > 0);
315  const char* const ptr = getPtr(arrayPtr, f);
316  DAAL_ASSERT(ptr);
317  internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(chunkLength, ptr, &(lbuf[offset]));
318  offset += chunkLength;
319  }
320  DAAL_ASSERT(offset == di);
321  }
322 
323  for (size_t ii = 0; ii < di; ++ii)
324  {
325  buffer[(i + ii) * ncols + j] = lbuf[ii];
326  }
327  }
328  }
329 
330  return services::Status();
331  }
332 
333  template <typename T>
334  services::Status releaseTBlock(BlockDescriptor<T>& block)
335  {
336  if (block.getRWFlag() & (int)writeOnly)
337  {
338  return services::Status(services::ErrorMethodNotSupported);
339  }
340 
341  block.reset();
342  return services::Status();
343  }
344 
345  template <typename T>
346  services::Status getTFeature(size_t featIdx, size_t idx, size_t nrows, int rwFlag, BlockDescriptor<T>& block)
347  {
348  if (block.getRWFlag() & (int)writeOnly)
349  {
350  return services::Status(services::ErrorMethodNotSupported);
351  }
352 
353  const size_t ncols = getNumberOfColumns();
354  const size_t nobs = getNumberOfRows();
355  block.setDetails(featIdx, idx, rwFlag);
356 
357  if (idx >= nobs)
358  {
359  block.resizeBuffer(1, 0);
360  return services::Status();
361  }
362 
363  nrows = (idx + nrows < nobs) ? nrows : nobs - idx;
364 
365  const NumericTableFeature& f = (*_ddict)[featIdx];
366 
367  const std::shared_ptr<const arrow::ChunkedArray> columnChunkedArrayPtr = getColumnChunkedArrayPtr(featIdx);
368  DAAL_ASSERT(columnChunkedArrayPtr);
369  const std::shared_ptr<const arrow::ChunkedArray> sliceChunkedArrayPtr = columnChunkedArrayPtr->Slice(idx, nrows);
370  DAAL_ASSERT(sliceChunkedArrayPtr);
371  const arrow::ChunkedArray& sliceChunkedArray = *sliceChunkedArrayPtr;
372  const int chunkCount = sliceChunkedArray.num_chunks();
373  DAAL_ASSERT(chunkCount > 0);
374 
375  if (features::internal::getIndexNumType<T>() == f.indexType && chunkCount == 1)
376  {
377  const T* const ptr = getPtr<T>(sliceChunkedArray.chunk(0), f);
378  DAAL_ASSERT(ptr);
379  block.setPtr(const_cast<T* const>(ptr), 1, nrows);
380  }
381  else
382  {
383  if (!block.resizeBuffer(1, nrows))
384  {
385  return services::Status(services::ErrorMemoryAllocationFailed);
386  }
387 
388  if (!(block.getRWFlag() & (int)readOnly)) return services::Status();
389 
390  if (chunkCount == 1)
391  {
392  const char* const ptr = getPtr(sliceChunkedArray.chunk(0), f);
393  DAAL_ASSERT(ptr);
394  internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(nrows, ptr, block.getBlockPtr());
395  }
396  else
397  {
398  size_t offset = 0;
399  T* const destPtr = block.getBlockPtr();
400  for (int chunk = 0; chunk < chunkCount; ++chunk)
401  {
402  const std::shared_ptr<const arrow::Array> arrayPtr = sliceChunkedArray.chunk(chunk);
403  DAAL_ASSERT(arrayPtr);
404  const int64_t chunkLength = arrayPtr->length();
405  DAAL_ASSERT(chunkLength > 0);
406  const char* const ptr = getPtr(arrayPtr, f);
407  DAAL_ASSERT(ptr);
408  internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(chunkLength, ptr, destPtr + offset);
409  offset += chunkLength;
410  }
411  DAAL_ASSERT(offset == di);
412  }
413  }
414  return services::Status();
415  }
416 
417  template <typename T>
418  services::Status releaseTFeature(BlockDescriptor<T>& block)
419  {
420  if (block.getRWFlag() & (int)writeOnly)
421  {
422  return services::Status(services::ErrorMethodNotSupported);
423  }
424 
425  block.reset();
426  return services::Status();
427  }
428 
429  template <typename T = char>
430  const T* getPtr(const arrow::Array& array, const NumericTableFeature& f, int bufferIndex = 1) const
431  {
432  const std::shared_ptr<const arrow::ArrayData> arrayDataPtr = array.data();
433  DAAL_ASSERT(arrayDataPtr);
434  const arrow::ArrayData& arrayData = *arrayDataPtr;
435  return reinterpret_cast<const T*>(arrayData.template GetValues<char>(bufferIndex, arrayData.offset * f.typeSize));
436  }
437 
438  template <typename T = char>
439  const T* getPtr(const std::shared_ptr<const arrow::Array>& array, const NumericTableFeature& f, int bufferIndex = 1) const
440  {
441  return getPtr<T>(*array, f, bufferIndex);
442  }
443 
444  const std::shared_ptr<const arrow::ChunkedArray> getColumnChunkedArrayPtr(size_t idx)
445  {
446 #if ARROW_VERSION >= 15000
447  return _table->column(idx);
448 #else
449  const std::shared_ptr<const arrow::Column> columnPtr = _table->column(idx);
450  DAAL_ASSERT(columnPtr);
451  return columnPtr->data();
452 #endif
453  }
454 };
455 typedef services::SharedPtr<ArrowImmutableNumericTable> ArrowImmutableNumericTablePtr;
457 } // namespace interface1
458 using interface1::ArrowImmutableNumericTable;
459 using interface1::ArrowImmutableNumericTablePtr;
460 
461 } // namespace data_management
462 } // namespace daal
463 #endif
daal::services::ErrorMethodNotSupported
Definition: error_indexes.h:71
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfColumnValues
services::Status releaseBlockOfColumnValues(BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:146
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfRows
services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:90
daal::data_management::interface1::ArrowImmutableNumericTable::create
static DAAL_FORCEINLINE services::SharedPtr< ArrowImmutableNumericTable > create(const std::shared_ptr< const arrow::Table > &table, services::Status *stat=NULL)
Definition: arrow_numeric_table.h:78
daal::data_management::interface1::ArrowImmutableNumericTable::create
static DAAL_FORCEINLINE services::SharedPtr< ArrowImmutableNumericTable > create(const std::shared_ptr< arrow::Table > &table, services::Status *stat=NULL)
Definition: arrow_numeric_table.h:60
daal::services::ErrorDataTypeNotSupported
Definition: error_indexes.h:144
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfColumnValues
services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum, ReadWriteMode rwflag, BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:132
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfRows
services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:100
daal::data_management::interface1::BaseArrowImmutableNumericTable
Base class that provides methods to access data stored as a immutable Apache Arrow table...
Definition: base_arrow_numeric_table.h:57
daal::services::ErrorNullPtr
Definition: error_indexes.h:141
daal::MemType
MemType
Definition: daal_defines.h:147
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfColumnValues
services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum, ReadWriteMode rwflag, BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:126
daal::data_management::interface1::ArrowImmutableNumericTable
Class that provides methods to access data stored as a Apache Arrow table.
Definition: arrow_numeric_table.h:49
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfColumnValues
services::Status releaseBlockOfColumnValues(BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:142
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfRows
services::Status releaseBlockOfRows(BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:105
daal::dram
Definition: daal_defines.h:149
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:188
daal::data_management::interface1::BlockDescriptor
Base class that manages buffer memory for read/write operations required by numeric tables...
Definition: numeric_table.h:57
daal::services::ErrorMemoryAllocationFailed
Definition: error_indexes.h:150
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfRows
services::Status releaseBlockOfRows(BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:115
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfRows
services::Status releaseBlockOfRows(BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:110
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfColumnValues
services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum, ReadWriteMode rwflag, BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:120
daal::algorithms::implicit_als::training::offset
Definition: implicit_als_training_types.h:150
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfColumnValues
services::Status releaseBlockOfColumnValues(BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:138
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfRows
services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:95

For more complete information about compiler optimizations, see our Optimization Notice.