24 #ifndef __ARROW_NUMERIC_TABLE_H__
25 #define __ARROW_NUMERIC_TABLE_H__
27 #include "data_management/data/numeric_table.h"
28 #include "data_management/data/internal/conversion.h"
29 #include "data_management/data/internal/base_arrow_numeric_table.h"
31 #include <arrow/table.h>
32 #include <arrow/util/config.h>
36 namespace data_management
49 class DAAL_EXPORT ArrowImmutableNumericTable :
public BaseArrowImmutableNumericTable
52 DECLARE_SERIALIZABLE_IMPL();
60 static DAAL_FORCEINLINE services::SharedPtr<ArrowImmutableNumericTable> create(
const std::shared_ptr<arrow::Table> & table,
61 services::Status * stat = NULL)
65 if (stat) { stat->add(services::ErrorNullPtr); }
66 return services::SharedPtr<ArrowImmutableNumericTable>();
69 DAAL_DEFAULT_CREATE_IMPL_EX(ArrowImmutableNumericTable, table);
78 static DAAL_FORCEINLINE services::SharedPtr<ArrowImmutableNumericTable> create(
const std::shared_ptr<const arrow::Table> & table,
79 services::Status * stat = NULL)
83 if (stat) { stat->add(services::ErrorNullPtr); }
84 return services::SharedPtr<ArrowImmutableNumericTable>();
87 DAAL_DEFAULT_CREATE_IMPL_EX(ArrowImmutableNumericTable, table);
90 services::Status getBlockOfRows(
size_t vectorIdx,
size_t vector_num, ReadWriteMode rwflag, BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
92 return getTBlock<double>(vectorIdx, vector_num, rwflag, block);
95 services::Status getBlockOfRows(
size_t vectorIdx,
size_t vector_num, ReadWriteMode rwflag, BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
97 return getTBlock<float>(vectorIdx, vector_num, rwflag, block);
100 services::Status getBlockOfRows(
size_t vectorIdx,
size_t vector_num, ReadWriteMode rwflag, BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
102 return getTBlock<int>(vectorIdx, vector_num, rwflag, block);
105 services::Status releaseBlockOfRows(BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
107 return releaseTBlock<double>(block);
110 services::Status releaseBlockOfRows(BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
112 return releaseTBlock<float>(block);
115 services::Status releaseBlockOfRows(BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
117 return releaseTBlock<int>(block);
120 services::Status getBlockOfColumnValues(
size_t featureIdx,
size_t vectorIdx,
size_t valueNum,
121 ReadWriteMode rwflag, BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
123 return getTFeature<double>(featureIdx, vectorIdx, valueNum, rwflag, block);
126 services::Status getBlockOfColumnValues(
size_t featureIdx,
size_t vectorIdx,
size_t valueNum,
127 ReadWriteMode rwflag, BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
129 return getTFeature<float>(featureIdx, vectorIdx, valueNum, rwflag, block);
132 services::Status getBlockOfColumnValues(
size_t featureIdx,
size_t vectorIdx,
size_t valueNum,
133 ReadWriteMode rwflag, BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
135 return getTFeature<int>(featureIdx, vectorIdx, valueNum, rwflag, block);
138 services::Status releaseBlockOfColumnValues(BlockDescriptor<double>& block) DAAL_C11_OVERRIDE
140 return releaseTFeature<double>(block);
142 services::Status releaseBlockOfColumnValues(BlockDescriptor<float>& block) DAAL_C11_OVERRIDE
144 return releaseTFeature<float>(block);
146 services::Status releaseBlockOfColumnValues(BlockDescriptor<int>& block) DAAL_C11_OVERRIDE
148 return releaseTFeature<int>(block);
152 services::Status setNumberOfColumnsImpl(
size_t ncol) DAAL_C11_OVERRIDE
154 if (ncol == getNumberOfColumns())
return services::Status();
155 return services::Status(services::ErrorMethodNotSupported);
158 services::Status allocateDataMemoryImpl(daal::MemType type = daal::dram) DAAL_C11_OVERRIDE
160 return services::Status(services::ErrorMethodNotSupported);
163 template<
typename Archive,
bool onDeserialize>
164 services::Status serialImpl(Archive* arch)
168 return services::Status(services::ErrorMethodNotSupported);
171 NumericTable::serialImpl<Archive, onDeserialize>(arch);
173 const size_t ncol = _ddict->getNumberOfFeatures();
174 const size_t nrows = getNumberOfRows();
176 for (
size_t i = 0; i < ncol; ++i)
178 const NumericTableFeature& f = (*_ddict)[i];
180 const std::shared_ptr<const arrow::ChunkedArray> columnChunkedArrayPtr = getColumnChunkedArrayPtr(i);
181 DAAL_ASSERT(columnChunkedArrayPtr);
182 const arrow::ChunkedArray& columnChunkedArray = *columnChunkedArrayPtr;
183 const int chunkCount = columnChunkedArray.num_chunks();
184 DAAL_ASSERT(chunkCount > 0);
186 for (
int chunk = 0; chunk < chunkCount; ++chunk)
188 const std::shared_ptr<const arrow::Array> arrayPtr = columnChunkedArray.chunk(chunk);
189 DAAL_ASSERT(arrayPtr);
190 const int64_t chunkLength = arrayPtr->length();
191 DAAL_ASSERT(chunkLength > 0);
192 arch->set(getPtr(arrayPtr, f), chunkLength * f.typeSize);
196 return services::Status();
200 DAAL_FORCEINLINE ArrowImmutableNumericTable(
const std::shared_ptr<const arrow::Table> & table, services::Status & st)
201 : BaseArrowImmutableNumericTable(table->num_columns(), table->num_rows(), st), _table(table)
204 _memStatus = userAllocated;
205 if (st) st |= updateFeatures(*table);
208 std::shared_ptr<const arrow::Table> _table;
210 DAAL_FORCEINLINE services::Status updateFeatures(
const arrow::Table & table)
213 if (_ddict.get() == NULL)
215 _ddict = NumericTableDictionary::create(&s);
219 const std::shared_ptr<const arrow::Schema> schemaPtr = table.schema();
220 DAAL_ASSERT(schemaPtr);
221 const int ncols = schemaPtr->num_fields();
222 for (
int col = 0; col < ncols; ++col)
224 const arrow::Type::type type = schemaPtr->field(col)->type()->id();
227 case arrow::Type::UINT8: s |= setFeature<unsigned char>(col);
break;
228 case arrow::Type::INT8: s |= setFeature<char>(col);
break;
229 case arrow::Type::UINT16: s |= setFeature<unsigned short>(col);
break;
230 case arrow::Type::INT16: s |= setFeature<short>(col);
break;
231 case arrow::Type::UINT32: s |= setFeature<unsigned int>(col);
break;
232 case arrow::Type::DATE32:
233 case arrow::Type::TIME32:
234 case arrow::Type::INT32: s |= setFeature<int>(col);
break;
235 case arrow::Type::UINT64: s |= setFeature<DAAL_UINT64>(col);
break;
236 case arrow::Type::DATE64:
237 case arrow::Type::TIMESTAMP:
238 case arrow::Type::TIME64:
239 case arrow::Type::INT64: s |= setFeature<DAAL_INT64>(col);
break;
240 case arrow::Type::FLOAT: s |= setFeature<float>(col);
break;
241 case arrow::Type::DOUBLE: s |= setFeature<double>(col);
break;
242 default: s.add(services::ErrorDataTypeNotSupported);
return s;
248 template <
typename T>
249 services::Status setFeature(
size_t idx, features::FeatureType featureType = features::DAAL_CONTINUOUS,
size_t categoryNumber = 0)
252 services::Status s = _ddict->setFeature<T>(idx);
254 (*_ddict)[idx].featureType = featureType;
255 (*_ddict)[idx].categoryNumber = categoryNumber;
259 template <
typename T>
260 services::Status getTBlock(
size_t idx,
size_t nrows, ReadWriteMode rwFlag, BlockDescriptor<T>& block)
262 if (block.getRWFlag() & (int)writeOnly)
264 return services::Status(services::ErrorMethodNotSupported);
267 const size_t ncols = getNumberOfColumns();
268 const size_t nobs = getNumberOfRows();
269 block.setDetails(0, idx, rwFlag);
273 block.resizeBuffer(ncols, 0);
274 return services::Status();
277 nrows = (idx + nrows < nobs) ? nrows : nobs - idx;
279 if (!block.resizeBuffer(ncols, nrows)) {
return services::Status(services::ErrorMemoryAllocationFailed); }
283 T*
const buffer = block.getBlockPtr();
285 for (
size_t i = 0; i < nrows; i += di)
287 if (i + di > nrows) { di = nrows - i; }
289 for (
size_t j = 0; j < ncols; ++j)
291 const NumericTableFeature & f = (*_ddict)[j];
293 const std::shared_ptr<const arrow::ChunkedArray> columnChunkedArrayPtr = getColumnChunkedArrayPtr(j);
294 DAAL_ASSERT(columnChunkedArrayPtr);
295 const std::shared_ptr<const arrow::ChunkedArray> sliceChunkedArrayPtr = columnChunkedArrayPtr->Slice(idx + i, di);
296 DAAL_ASSERT(sliceChunkedArrayPtr);
297 const arrow::ChunkedArray& sliceChunkedArray = *sliceChunkedArrayPtr;
298 const int chunkCount = sliceChunkedArray.num_chunks();
299 DAAL_ASSERT(chunkCount > 0);
302 const char*
const ptr = getPtr(sliceChunkedArray.chunk(0), f);
304 internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(di, ptr, lbuf);
309 for (
int chunk = 0; chunk < chunkCount; ++chunk)
311 const std::shared_ptr<const arrow::Array> arrayPtr = sliceChunkedArray.chunk(chunk);
312 DAAL_ASSERT(arrayPtr);
313 const int64_t chunkLength = arrayPtr->length();
314 DAAL_ASSERT(chunkLength > 0);
315 const char*
const ptr = getPtr(arrayPtr, f);
317 internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(chunkLength, ptr, &(lbuf[offset]));
318 offset += chunkLength;
320 DAAL_ASSERT(offset == di);
323 for (
size_t ii = 0; ii < di; ++ii)
325 buffer[(i + ii) * ncols + j] = lbuf[ii];
330 return services::Status();
333 template <
typename T>
334 services::Status releaseTBlock(BlockDescriptor<T>& block)
336 if (block.getRWFlag() & (int)writeOnly)
338 return services::Status(services::ErrorMethodNotSupported);
342 return services::Status();
345 template <
typename T>
346 services::Status getTFeature(
size_t featIdx,
size_t idx,
size_t nrows,
int rwFlag, BlockDescriptor<T>& block)
348 if (block.getRWFlag() & (int)writeOnly)
350 return services::Status(services::ErrorMethodNotSupported);
353 const size_t ncols = getNumberOfColumns();
354 const size_t nobs = getNumberOfRows();
355 block.setDetails(featIdx, idx, rwFlag);
359 block.resizeBuffer(1, 0);
360 return services::Status();
363 nrows = (idx + nrows < nobs) ? nrows : nobs - idx;
365 const NumericTableFeature& f = (*_ddict)[featIdx];
367 const std::shared_ptr<const arrow::ChunkedArray> columnChunkedArrayPtr = getColumnChunkedArrayPtr(featIdx);
368 DAAL_ASSERT(columnChunkedArrayPtr);
369 const std::shared_ptr<const arrow::ChunkedArray> sliceChunkedArrayPtr = columnChunkedArrayPtr->Slice(idx, nrows);
370 DAAL_ASSERT(sliceChunkedArrayPtr);
371 const arrow::ChunkedArray& sliceChunkedArray = *sliceChunkedArrayPtr;
372 const int chunkCount = sliceChunkedArray.num_chunks();
373 DAAL_ASSERT(chunkCount > 0);
375 if (features::internal::getIndexNumType<T>() == f.indexType && chunkCount == 1)
377 const T*
const ptr = getPtr<T>(sliceChunkedArray.chunk(0), f);
379 block.setPtr(const_cast<T* const>(ptr), 1, nrows);
383 if (!block.resizeBuffer(1, nrows))
385 return services::Status(services::ErrorMemoryAllocationFailed);
388 if (!(block.getRWFlag() & (int)readOnly))
return services::Status();
392 const char*
const ptr = getPtr(sliceChunkedArray.chunk(0), f);
394 internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(nrows, ptr, block.getBlockPtr());
399 T*
const destPtr = block.getBlockPtr();
400 for (
int chunk = 0; chunk < chunkCount; ++chunk)
402 const std::shared_ptr<const arrow::Array> arrayPtr = sliceChunkedArray.chunk(chunk);
403 DAAL_ASSERT(arrayPtr);
404 const int64_t chunkLength = arrayPtr->length();
405 DAAL_ASSERT(chunkLength > 0);
406 const char*
const ptr = getPtr(arrayPtr, f);
408 internal::getVectorUpCast(f.indexType, internal::getConversionDataType<T>())(chunkLength, ptr, destPtr + offset);
409 offset += chunkLength;
411 DAAL_ASSERT(offset == di);
414 return services::Status();
417 template <
typename T>
418 services::Status releaseTFeature(BlockDescriptor<T>& block)
420 if (block.getRWFlag() & (int)writeOnly)
422 return services::Status(services::ErrorMethodNotSupported);
426 return services::Status();
429 template <
typename T =
char>
430 const T* getPtr(
const arrow::Array& array,
const NumericTableFeature& f,
int bufferIndex = 1)
const
432 const std::shared_ptr<const arrow::ArrayData> arrayDataPtr = array.data();
433 DAAL_ASSERT(arrayDataPtr);
434 const arrow::ArrayData& arrayData = *arrayDataPtr;
435 return reinterpret_cast<const T*
>(arrayData.template GetValues<char>(bufferIndex, arrayData.offset * f.typeSize));
438 template <
typename T =
char>
439 const T* getPtr(
const std::shared_ptr<const arrow::Array>& array,
const NumericTableFeature& f,
int bufferIndex = 1)
const
441 return getPtr<T>(*array, f, bufferIndex);
444 const std::shared_ptr<const arrow::ChunkedArray> getColumnChunkedArrayPtr(
size_t idx)
446 #if ARROW_VERSION >= 15000
447 return _table->column(idx);
449 const std::shared_ptr<const arrow::Column> columnPtr = _table->column(idx);
450 DAAL_ASSERT(columnPtr);
451 return columnPtr->data();
455 typedef services::SharedPtr<ArrowImmutableNumericTable> ArrowImmutableNumericTablePtr;
458 using interface1::ArrowImmutableNumericTable;
459 using interface1::ArrowImmutableNumericTablePtr;
daal::services::ErrorMethodNotSupported
Definition: error_indexes.h:71
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfColumnValues
services::Status releaseBlockOfColumnValues(BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:146
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfRows
services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:90
daal::data_management::interface1::ArrowImmutableNumericTable::create
static DAAL_FORCEINLINE services::SharedPtr< ArrowImmutableNumericTable > create(const std::shared_ptr< const arrow::Table > &table, services::Status *stat=NULL)
Definition: arrow_numeric_table.h:78
daal::data_management::interface1::ArrowImmutableNumericTable::create
static DAAL_FORCEINLINE services::SharedPtr< ArrowImmutableNumericTable > create(const std::shared_ptr< arrow::Table > &table, services::Status *stat=NULL)
Definition: arrow_numeric_table.h:60
daal::services::ErrorDataTypeNotSupported
Definition: error_indexes.h:144
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfColumnValues
services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum, ReadWriteMode rwflag, BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:132
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfRows
services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:100
daal::data_management::interface1::BaseArrowImmutableNumericTable
Base class that provides methods to access data stored as a immutable Apache Arrow table...
Definition: base_arrow_numeric_table.h:57
daal::services::ErrorNullPtr
Definition: error_indexes.h:141
daal::MemType
MemType
Definition: daal_defines.h:147
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfColumnValues
services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum, ReadWriteMode rwflag, BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:126
daal::data_management::interface1::ArrowImmutableNumericTable
Class that provides methods to access data stored as a Apache Arrow table.
Definition: arrow_numeric_table.h:49
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfColumnValues
services::Status releaseBlockOfColumnValues(BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:142
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfRows
services::Status releaseBlockOfRows(BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:105
daal::dram
Definition: daal_defines.h:149
daal::data_management::interface1::Dictionary::create
static services::SharedPtr< Dictionary > create(size_t nfeat, FeaturesEqual featuresEqual=notEqual, services::Status *stat=NULL)
Definition: data_dictionary.h:188
daal::data_management::interface1::BlockDescriptor
Base class that manages buffer memory for read/write operations required by numeric tables...
Definition: numeric_table.h:57
daal::services::ErrorMemoryAllocationFailed
Definition: error_indexes.h:150
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfRows
services::Status releaseBlockOfRows(BlockDescriptor< int > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:115
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfRows
services::Status releaseBlockOfRows(BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:110
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfColumnValues
services::Status getBlockOfColumnValues(size_t featureIdx, size_t vectorIdx, size_t valueNum, ReadWriteMode rwflag, BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:120
daal::algorithms::implicit_als::training::offset
Definition: implicit_als_training_types.h:150
daal::data_management::interface1::ArrowImmutableNumericTable::releaseBlockOfColumnValues
services::Status releaseBlockOfColumnValues(BlockDescriptor< double > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:138
daal::data_management::interface1::ArrowImmutableNumericTable::getBlockOfRows
services::Status getBlockOfRows(size_t vectorIdx, size_t vector_num, ReadWriteMode rwflag, BlockDescriptor< float > &block) DAAL_C11_OVERRIDE
Definition: arrow_numeric_table.h:95