创建矿工
miner/worker.go
func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
...
worker.wg.Add(4)
go worker.mainLoop()
go worker.newWorkLoop(recommit)
go worker.resultLoop()
go worker.taskLoop()
接收任务
// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
func (w *worker) newWorkLoop(recommit time.Duration) {
...
// commit aborts in-flight transaction execution with given signal and resubmits a new one.
commit := func(noempty bool, s int32) {
if interrupt != nil {
atomic.StoreInt32(interrupt, s)
}
interrupt = new(int32)
select {
case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}: //接收任务
任务传递
// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
func (w *worker) mainLoop() {
....
for {
select {
case req := <-w.newWorkCh:
w.commitNewWork(req.interrupt, req.noempty, req.timestamp) // 提交任务
开始提交
// commitNewWork generates several new sealing tasks based on the parent block.
func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
...
w.commit(uncles, w.fullTaskHook, true, tstart) // 开始提交
组装区块
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
...
block, receipts, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, txs, uncles, cpyReceipts) // 通过对应共识组装最后区块
if err != nil {
return err
}
if w.isRunning() {
if interval != nil {
interval()
}
select {
case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}: //开始广播区块
func (w *worker) taskLoop() {
...
if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil { // 开始矿工签名
log.Warn("Block sealing failed", "err", err)
w.pendingMu.Lock()
delete(w.pendingTasks, sealHash)
w.pendingMu.Unlock()
}
矿工签名
func (c *Congress) Seal(chain consensus.ChainHeaderReader, block *types.Block, results chan<- *types.Block, stop <-chan struct{}) error {
// Sign all the things!
sighash, err := signFn(accounts.Account{Address: val}, accounts.MimetypeCongress, CongressRLP(header))
if err != nil {
return err
}
copy(header.Extra[len(header.Extra)-extraSeal:], sighash) //将签名放到区块header.Extra字段
// Wait until sealing is terminated or delay timeout.
log.Trace("Waiting for slot to sign and propagate", "delay", common.PrettyDuration(delay))
go func() {
select {
case <-stop:
return
case <-time.After(delay):
}
select {
case results <- block.WithSeal(header): // 返回给上一步的 w.resultCh
default:
log.Warn("Sealing result is not read by miner", "sealhash", SealHash(header))
}
}()
广播区块
func (w *worker) resultLoop() {
defer w.wg.Done()
for {
select {
case block := <-w.resultCh: // 接收到上步矿工签名的区块
...
// Commit block and state to database.
_, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true) // 更新本地数据
...
// Broadcast the block and announce chain insertion event
w.mux.Post(core.NewMinedBlockEvent{Block: block}) // 广播区块
// Insert the block into the set of pending ones to resultLoop for confirmations
w.unconfirmed.Insert(block.NumberU64(), block.Hash())