MiniOB 1
MiniOB is one mini database, helping developers to learn how database works.
载入中...
搜索中...
未找到
aggregate_hash_table.h
1/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved.
2miniob is licensed under Mulan PSL v2.
3You can use this software according to the terms and conditions of the Mulan PSL v2.
4You may obtain a copy of Mulan PSL v2 at:
5 http://license.coscl.org.cn/MulanPSL2
6THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
7EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
8MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
9See the Mulan PSL v2 for more details. */
10
11#pragma once
12
13#include "common/lang/vector.h"
14#include "common/lang/unordered_map.h"
15#include "common/math/simd_util.h"
16#include "common/sys/rc.h"
17#include "sql/expr/expression.h"
18
23{
24public:
25 class Scanner
26 {
27 public:
28 explicit Scanner(AggregateHashTable *hash_table) : hash_table_(hash_table) {}
29 virtual ~Scanner() = default;
30
31 virtual void open_scan() = 0;
32
36 virtual RC next(Chunk &chunk) = 0;
37
38 virtual void close_scan() {}
39
40 protected:
41 AggregateHashTable *hash_table_;
42 };
43
47 virtual RC add_chunk(Chunk &groups_chunk, Chunk &aggrs_chunk) = 0;
48
49 virtual ~AggregateHashTable() = default;
50 vector<AggregateExpr::Type> aggr_types_;
51 vector<AttrType> aggr_child_types_;
52};
53
55{
56private:
58 {
59 size_t operator()(const vector<Value> &vec) const;
60 };
61
63 {
64 bool operator()(const vector<Value> &lhs, const vector<Value> &rhs) const;
65 };
66
67public:
68 using StandardHashTable = unordered_map<vector<Value>, vector<void *>, VectorHash, VectorEqual>;
70 {
71 public:
72 explicit Scanner(AggregateHashTable *hash_table) : AggregateHashTable::Scanner(hash_table) {}
73 ~Scanner() = default;
74
75 void open_scan() override;
76
77 RC next(Chunk &chunk) override;
78
79 private:
80 StandardHashTable::iterator end_;
81 StandardHashTable::iterator it_;
82 };
83 StandardAggregateHashTable(const vector<Expression *> aggregations)
84 {
85 for (auto &expr : aggregations) {
86 ASSERT(expr->type() == ExprType::AGGREGATION, "expect aggregate expression");
87 auto *aggregation_expr = static_cast<AggregateExpr *>(expr);
88 aggr_types_.push_back(aggregation_expr->aggregate_type());
89 aggr_child_types_.push_back(aggregation_expr->value_type());
90 }
91 }
93 {
94 for (auto &aggr : aggr_values_) {
95 for (auto &state : aggr.second) {
96 free(state);
97 }
98 }
99 }
100
101 RC add_chunk(Chunk &groups_chunk, Chunk &aggrs_chunk) override;
102
103 StandardHashTable::iterator begin() { return aggr_values_.begin(); }
104 StandardHashTable::iterator end() { return aggr_values_.end(); }
105
107 StandardHashTable aggr_values_;
108};
109
114#ifdef USE_SIMD
115template <typename V>
116class LinearProbingAggregateHashTable : public AggregateHashTable
117{
118public:
119 class Scanner : public AggregateHashTable::Scanner
120 {
121 public:
122 explicit Scanner(AggregateHashTable *hash_table) : AggregateHashTable::Scanner(hash_table) {}
123 ~Scanner() = default;
124
125 void open_scan() override;
126
127 RC next(Chunk &chunk) override;
128
129 void close_scan() override;
130
131 private:
132 int capacity_ = -1;
133 int size_ = -1;
134 int scan_pos_ = -1;
135 int scan_count_ = 0;
136 };
137
138 LinearProbingAggregateHashTable(AggregateExpr::Type aggregate_type, int capacity = DEFAULT_CAPACITY)
139 : keys_(capacity, EMPTY_KEY), values_(capacity, 0), capacity_(capacity), aggregate_type_(aggregate_type)
140 {}
141 virtual ~LinearProbingAggregateHashTable() {}
142
143 RC get(int key, V &value);
144
145 RC iter_get(int pos, int &key, V &value);
146
147 RC add_chunk(Chunk &group_chunk, Chunk &aggr_chunk) override;
148
149 int capacity() { return capacity_; }
150 int size() { return size_; }
151
152private:
160 void add_batch(int *input_keys, V *input_values, int len);
161
162 void aggregate(V *value, V value_to_aggregate);
163
164 void resize();
165
166 void resize_if_need();
167
168private:
169 static const int EMPTY_KEY;
170 static const int DEFAULT_CAPACITY;
171
172 vector<int> keys_;
173 vector<V> values_;
174 int size_ = 0;
175 int capacity_ = 0;
176 AggregateExpr::Type aggregate_type_;
177};
178#endif // USE_SIMD
Definition: expression.h:469
Definition: aggregate_hash_table.h:26
virtual RC next(Chunk &chunk)=0
用于hash group by 的哈希表实现,不支持并发访问。
Definition: aggregate_hash_table.h:23
virtual RC add_chunk(Chunk &groups_chunk, Chunk &aggrs_chunk)=0
将 groups_chunk 和 aggrs_chunk 写入到哈希表中。哈希表中记录了聚合结果。
A Chunk represents a set of columns.
Definition: chunk.h:23
Definition: aggregate_hash_table.h:70
RC next(Chunk &chunk) override
Definition: aggregate_hash_table.cpp:60
Definition: aggregate_hash_table.h:55
StandardHashTable aggr_values_
group by values -> aggregate values
Definition: aggregate_hash_table.h:107
RC add_chunk(Chunk &groups_chunk, Chunk &aggrs_chunk) override
将 groups_chunk 和 aggrs_chunk 写入到哈希表中。哈希表中记录了聚合结果。
Definition: aggregate_hash_table.cpp:16
Definition: aggregate_hash_table.h:63
Definition: aggregate_hash_table.h:58