// Compile compiles an ast.StmtNode to a physical plan.
func (c *Compiler) Compile(goCtx goctx.Context, stmtNode ast.StmtNode) (*ExecStmt, error) {
	//validation, name binding
	plan.Preprocess(c.Ctx, stmtNode, infoSchema, false)

	infoSchema := GetInfoSchema(c.Ctx)
	// build and optimize query plan
	finalPlan = plan.Optimize(c.Ctx, stmtNode, infoSchema)

	return &ExecStmt{
		InfoSchema: infoSchema,
		Plan:       finalPlan,
		Expensive:  isExpensive,
		Cacheable:  plan.Cacheable(stmtNode),
		Text:       stmtNode.Text(),
		StmtNode:   stmtNode,
		Ctx:        c.Ctx,
	}, nil
}

// Optimize does optimization and creates a Plan.
// The node must be prepared first.
func Optimize(ctx context.Context, node ast.Node, is infoschema.InfoSchema) (Plan, error) {
	builder := &planBuilder{
		ctx:       ctx,
		is:        is,
		colMapper: make(map[*ast.ColumnNameExpr]int),
	}
	p := builder.build(node)

        //Assuming p is a LogicalPlan, which could be optimzied
	return doOptimize(builder.optFlag, p)
}


func (b *planBuilder) build(node ast.Node) Plan {
	b.optFlag = flagPrunColumns
	switch x := node.(type) {
	case *ast.InsertStmt:
		return b.buildInsert(x)
	case *ast.SelectStmt:
		return b.buildSelect(x)
	}
	b.err = ErrUnsupportedType.Gen("Unsupported type %T", node)
	return nil
}

/*

Hierarchy: ResultSet <- GroupBy <- Selection <- Aggregation <- Projection

ResultSetNode interface has a ResultFields property, represents a Node that returns result set. Implementations include SelectStmt, SubqueryExpr, TableSource, TableName and Join.

*/



func (b *planBuilder) buildSelect(sel *ast.SelectStmt) LogicalPlan {
	var (
		p        LogicalPlan
		aggFuncs []*ast.AggregateFuncExpr
		totalMap map[*ast.AggregateFuncExpr]int
		gbyCols  []expression.Expression
	)
        //base plan node
	p = b.buildResultSetNode(sel.From.TableRefs)
	originalFields := sel.Fields.Fields
        //add children
	p, gbyCols = b.resolveGbyExprs(p, sel.GroupBy, sel.Fields.Fields)

        //add children
	p = b.buildSelection(p, sel.Where, nil)

	aggFuncs, totalMap = b.extractAggFuncs(sel.Fields.Fields)
	var aggIndexMap map[int]int
        //add children
	p, aggIndexMap = b.buildAggregation(p, aggFuncs, gbyCols)
	for k, v := range totalMap {
		totalMap[k] = aggIndexMap[v]
	}
	var oldLen int

        //add children
	p, oldLen = b.buildProjection(p, sel.Fields.Fields, totalMap)
		p = b.buildDistinct(p, oldLen)
		p = b.buildSort(p, sel.OrderBy.Items, orderMap)
		p = b.buildLimit(p, sel.Limit)
	sel.Fields.Fields = originalFields
	if oldLen != p.Schema().Len() {
		proj := LogicalProjection{Exprs: expression.Column2Exprs(p.Schema().Columns[:oldLen])}.init(b.ctx)
		proj.SetChildren(p)
		schema := expression.NewSchema(p.Schema().Clone().Columns[:oldLen]...)
		for _, col := range schema.Columns {
			col.FromID = proj.ID()
		}
		proj.SetSchema(schema)
		return proj
	}

	return p
}

func (b *planBuilder) buildResultSetNode(node ast.ResultSetNode) LogicalPlan {
	switch x := node.(type) {
	case *ast.Join:
		return b.buildJoin(x)
	case *ast.TableSource:
		var p LogicalPlan
		switch v := x.Source.(type) {
		case *ast.SelectStmt:
			p = b.buildSelect(v)
		case *ast.TableName:
			p = b.buildDataSource(v)
		if v, ok := p.(*DataSource); ok {
			v.TableAsName = &x.AsName
		}
		for _, col := range p.Schema().Columns {
			col.OrigTblName = col.TblName
			if x.AsName.L != "" {
				col.TblName = x.AsName
				col.DBName = model.NewCIStr("")
			}
		}
		// Duplicate column name in one table is not allowed.
		// "select * from (select 1, 1) as a;" is duplicate
		dupNames := make(map[string]struct{}, len(p.Schema().Columns))
		for _, col := range p.Schema().Columns {
			name := col.ColName.O
			if _, ok := dupNames[name]; ok {
				b.err = ErrDupFieldName.GenByArgs(name)
				return nil
			}
			dupNames[name] = struct{}{}
		}
		return p
	case *ast.SelectStmt:
		return b.buildSelect(x)
	}
}



// buildProjection returns a Projection plan and non-aux columns length.
func (b *planBuilder) buildProjection(p LogicalPlan, fields []*ast.SelectField, mapper map[*ast.AggregateFuncExpr]int) (LogicalPlan, int) {
	b.optFlag |= flagEliminateProjection
	b.curClause = fieldList
	proj := LogicalProjection{Exprs: make([]expression.Expression, 0, len(fields))}.init(b.ctx)
	schema := expression.NewSchema(make([]*expression.Column, 0, len(fields))...)
	oldLen := 0
	for _, field := range fields {
		newExpr, np, err := b.rewrite(field.Expr, p, mapper, true)
		p = np
		proj.Exprs = append(proj.Exprs, newExpr)

		col := b.buildProjectionField(proj.id, schema.Len()+1, field, newExpr)
		schema.Append(col)

		if !field.Auxiliary {
			oldLen++
		}
	}
	proj.SetSchema(schema)
	proj.SetChildren(p)
	return proj, oldLen
}

func (b *planBuilder) buildSelection(p LogicalPlan, where ast.ExprNode, AggMapper map[*ast.AggregateFuncExpr]int) LogicalPlan {
	b.optFlag = b.optFlag | flagPredicatePushDown
	if b.curClause != havingClause {
		b.curClause = whereClause
	}
	conditions := splitWhere(where)
	expressions := make([]expression.Expression, 0, len(conditions))
	selection := LogicalSelection{}.init(b.ctx)
	for _, cond := range conditions {
		expr, np, err := b.rewrite(cond, p, AggMapper, false)
		p = np
		if expr == nil {
			continue
		}
		cnfItems := expression.SplitCNFItems(expr)
		for _, item := range cnfItems {
			expressions = append(expressions, item)
		}
	}
	selection.Conditions = expressions
	selection.SetChildren(p)
	return selection
}

    // RecordSet is an abstract result set interface to help get data from Plan.
    type RecordSet interface {
        // Fields gets result fields.
        Fields() []*ResultField
        // Next returns the next row, nil row means there is no more to return.
        Next(ctx context.Context) (row types.Row, err error)
        // NextChunk reads records into chunk.
        NextChunk(ctx context.Context, chk *chunk.Chunk) error
        // NewChunk creates a new chunk with initial capacity.
        NewChunk() *chunk.Chunk
        // SupportChunk check if the RecordSet supports Chunk structure.
        SupportChunk() bool
        // Close closes the underlying iterator, call Next after Close will
        // restart the iteration.
        Close() error
    }


// Exec builds an Executor from a plan. If the Executor doesn't return result,
// like the INSERT, UPDATE statements, it executes in this function, if the Executor returns
// result, execution is done after this function returns, in the returned ast.RecordSet Next method.
func (a *ExecStmt) Exec(goCtx goctx.Context) (ast.RecordSet, error) {
	ctx := a.Ctx
	e := a.buildExecutor(ctx)

	e.Open(goCtx)

	var pi processinfoSetter
	if raw, ok := ctx.(processinfoSetter); ok {
		pi = raw
		sql := a.OriginText()
		// Update processinfo, ShowProcess() will use it.
		pi.SetProcessInfo(sql)
	}

	return &recordSet{
		executor:    e,
		stmt:        a,
		processinfo: pi,
		txnStartTS:  ctx.Txn().StartTS(),
	}, nil
}

type SelectStmt struct {
        dmlNode
        resultSetNode
        // SelectStmtOpts wraps around select hints and switches.
        *SelectStmtOpts
        // Distinct represents whether the select has distinct option.
        Distinct bool
        // From is the from clause of the query.
        From *TableRefsClause
        // Where is the where clause in select statement.
        Where ExprNode
        // Fields is the select expression list.
        Fields *FieldList
        // GroupBy is the group by expression list.
        GroupBy *GroupByClause
        // Having is the having condition.
        Having *HavingClause
        // OrderBy is the ordering expression list.
        OrderBy *OrderByClause
        // Limit is the limit clause.
        Limit *Limit
        // LockTp is the lock type
        LockTp SelectLockType
        // TableHints represents the level Optimizer Hint
        TableHints [](#)*TableOptimizerHint
}

// LogicalSelection represents a where or having predicate.
type LogicalSelection struct {
	baseLogicalPlan

	// Originally the WHERE or ON condition is parsed into a single expression,
	// but after we converted to CNF(Conjunctive normal form), it can be
	// split into a list of AND conditions.
	Conditions []expression.Expression
}