Lab 4 查询处理
基础功能
Limit 算子
/* executors/limit_executor.h */
class LimitExecutor : public Executor {
private:
uint32_t limit_;
uint32_t offset_;
};
/* executors/limit_executor.cpp */
std::shared_ptr<Record> LimitExecutor::Next() {
// 通过 plan_ 获取 limit 语句中的 offset 和 limit 值
auto offset = plan_->limit_offset_.value_or(0);
auto limit = plan_->limit_count_.value_or(UINT32_MAX);
while (auto record = children_[0]->Next()) {
// 设置开始输出的记录位置
if (offset_++ < offset) {
continue;
}
// 限制输出的记录数
if (limit_++ < limit) {
return record;
}
break;
}
return nullptr;
}
排序算子
/* executors/orderby_executor.h */
struct order_pair {
std::vector<std::pair<Value, OrderByType>> values_;
std::shared_ptr<Record> record_;
};
bool compare(const order_pair &a, const order_pair &b);
class OrderByExecutor : public Executor {
private:
std::vector<order_pair> records_;
bool sorted_ = false;
};
/* executors/orderby_executor.cpp */
bool compare(const order_pair &a, const order_pair &b) {
// 根据 OrderByType 进行排序,传入 std::sort 函数
for (size_t i = 0; i < a.values_.size(); i++) {
// 通过 Value 的 Less, Equal, Greater 函数比较 Value 的值
if (a.values_[i].first.Equal(b.values_[i].first)) {
continue;
}
if (a.values_[i].second == OrderByType::DESC) {
return a.values_[i].first.Less(b.values_[i].first);
}
else {
return a.values_[i].first.Greater(b.values_[i].first);
}
}
return false;
}
std::shared_ptr<Record> OrderByExecutor::Next() {
// 排序阶段
if (!sorted_) {
auto records = std::vector<order_pair>();
while (auto record = children_[0]->Next()) {
std::vector<std::pair<Value, OrderByType>> values;
for (auto &order_by: plan_->order_bys_) {
// 通过 OperatorExpression 的 Evaluate 函数获取 Value 的值
auto value = order_by.second->Evaluate(record);
auto type = order_by.first;
values.push_back(std::make_pair(value, type));
}
records.push_back(order_pair{values, record});
}
// 使用 STL 的 sort 函数
std::sort(records.begin(), records.end(), compare);
records_ = std::move(records);
sorted_ = true;
}
// 输出阶段
if (records_.size() > 0) {
auto record = records_.back().record_;
records_.pop_back();
return record;
}
return nullptr;
}
嵌套循环连接算子
/* executors/nested_loop_join_executor.h */
class NestedLoopJoinExecutor : public Executor {
private:
std::vector<std::shared_ptr<Record>> first_records_;
std::vector<std::shared_ptr<Record>> second_records_;
std::vector<std::shared_ptr<Record>> records_;
bool joined_ = false;
};
/* executors/nested_loop_join_executor.cpp */
std::shared_ptr<Record> NestedLoopJoinExecutor::Next() {
// 连接阶段
if (!joined_) {
while (auto record = children_[0]->Next()) {
first_records_.push_back(record);
}
while (auto record = children_[1]->Next()) {
second_records_.push_back(record);
}
// 二重嵌套循环连接
for (auto &first_record : first_records_) {
for (auto &second_record : second_records_) {
// 从 NestedLoopJoinOperator 中获取连接条件
if (plan_->join_type_ == JoinType::INNER) {
// 使用 EvaluateJoin 函数判断是否满足 join 条件
auto value = plan_->join_condition_->EvaluateJoin(first_record, second_record).GetValue<bool>();
if (value) {
// 使用 Record 的 Append 函数进行记录的连接
auto record = std::make_shared<Record>();
record->Append(*first_record);
record->Append(*second_record);
// 将连接后的记录加入到 records_ 中
records_.push_back(record);
}
}
}
}
joined_ = true;
}
// 输出阶段
if (records_.size() > 0) {
auto record = records_.back();
records_.pop_back();
return record;
}
return nullptr;
}
归并连接算子
/* executors/merge_join_executor.h */
class MergeJoinExecutor : public Executor {
private:
std::list<std::shared_ptr<Record>> first_records_;
std::list<std::shared_ptr<Record>> second_records_;
std::list<std::shared_ptr<Record>> records_;
bool joined_ = false;
};
/* executors/merge_join_executor.cpp */
std::shared_ptr<Record> MergeJoinExecutor::Next() {
// 连接阶段
if (!joined_) {
while (auto record = children_[0]->Next()) {
first_records_.push_back(record);
}
while (auto record = children_[1]->Next()) {
second_records_.push_back(record);
}
if (plan_->join_type_ == JoinType::INNER) {
auto first_record = first_records_.front();
auto second_record = second_records_.front();
while (first_records_.size() > 0 && second_records_.size() > 0) {
// 不满足 join 条件,跳过相应记录
if (plan_->left_key_->Evaluate(first_record).Less(plan_->right_key_->Evaluate(second_record))) {
first_records_.pop_front();
first_record = first_records_.front();
continue;
}
// 不满足 join 条件,跳过相应记录
if (plan_->left_key_->Evaluate(first_record).Greater(plan_->right_key_->Evaluate(second_record))) {
second_records_.pop_front();
second_record = second_records_.front();
continue;
}
// 满足 join 条件
while (plan_->left_key_->Evaluate(first_record).Equal(plan_->right_key_->Evaluate(second_record))) {
auto second_record_iter = second_records_.begin();
// 查询重复元组
while (plan_->left_key_->Evaluate(first_record).Equal(plan_->right_key_->Evaluate(*second_record_iter))) {
// 使用 Record 的 Append 函数进行记录的连接
auto record = std::make_shared<Record>();
record->Append(*first_record);
record->Append(**second_record_iter);
// 将连接后的记录加入到 records_ 中
records_.push_back(record);
// 遍历 second_records_ 直到为空
if (++second_record_iter == second_records_.end()) {
break;
}
}
// 遍历 first_records_ 直到为空
first_records_.pop_front();
if (first_records_.size() == 0) {
break;
}
first_record = first_records_.front();
}
}
}
joined_ = true;
}
// 输出阶段
if (records_.size() > 0) {
auto record = records_.back();
records_.pop_back();
return record;
}
return nullptr;
}
高级功能
外连接
设计思路
使用 bool
数组标记循环中外表与内表中的每条记录的连接情况. 循环完毕后, 从 NestedLoopJoinOperator
中获取连接条件, 对于外连接情形分别使用 null
补全未被连接的记录.
新增代码
文件 src/executors/nested_loop_join_executor.cpp
.
测例展示
测试用例位于 test/lab4/50-nested-loop-outer-join.test
, 设置上完全复用了 test/lab4/30-nested-loop-join.test
, 只修改连接条件为 left join
/ right join
/ full join
.
> make debug && make lab4/50
内存哈希连接
设计思路
在 src/executors/hash_join_executor.h
中添加成员变量:
class HashJoinExecutor : public Executor {
private:
std::shared_ptr<const HashJoinOperator> plan_;
std::hash<Value> hasher;
std::unordered_map<size_t, std::vector<std::shared_ptr<Record>>> first_records_;
std::unordered_map<size_t, std::vector<std::shared_ptr<Record>>> second_records_;
std::vector<std::shared_ptr<Record>> records_;
bool joined_ = false;
};
划分阶段: 按 hasher
的哈希值对两表的记录进行分组, 哈希值相同的记录被划分到 unordered_map
的一个桶里进行连接.
探查阶段: 对记录进行嵌套循环连接, 并输出到 records_
中.
新增代码
文件 src/executors/hash_join_executor.cpp
.
测例展示
测试用例位于 test/lab4/60-hash-loop-join.test
, 完全复用了 test/lab4/30-nested-loop-join.test
, 只修改连接设置为 hash
.
> make debug && make lab4/60
分组聚合算子
设计思路
首先对记录进行哈希分组:
/* src/executors/aggregate_executor.cpp */
while (auto record = children_[0]->Next()) {
uint64_t hash = 0;
for (auto &group_by : plan_->group_bys_) {
hash *= 31;
hash += hasher(group_by->Evaluate(record));
auto col_value = dynamic_cast<ColumnValue *>(group_by.get());
}
hash %= 100000;
hash_records_[hash].push_back(record);
}
在分组后遍历组内的每条记录以实现 min
, max
, sum
, avg
聚合函数, 依据记录字段是否为空实现 count
聚合函数, count(*)
聚合函数直接返回记录总数.
新增代码
文件 src/executors/aggregate_executor.cpp
.
测例展示
测试用例位于 test/lab4/70-aggregate.test
.
> make debug && make lab4/70
分别进行了如下测试:
select id, room, max(score) from test_group_by group by id, room
select id, room, sum(score) from test_group_by group by id, room
select id, room, avg(score) from test_group_by group by id, room
select id, room, count(score) from test_group_by group by id, room
------------------------------------------------------------------
select count(id) from test_count
select count(distinct id) from test_count
select count(*) from test_count
实验耗时
- 基础功能
- Limit 算子: 0.5h.
- 排序算子: 0.5h.
- 嵌套循环连接算子: 0.5h.
- 归并连接算子: 1h.
- 高级功能
- 外连接: 2.5h.
- 内存哈希连接与分组聚合算子: 4h.