# VALIDATORS RESEARCH ## Основной процесс Проверяем назначенные роли для каждого активного локального валидатора и выполняем соотвю функционал ````go // Run the main validator routine. This routine exits if the context is // canceled. // // Order of operations: // 1 - Initialize validator data // 2 - Wait for validator activation // 3 - Wait for the next slot start // 4 - Update assignments // 5 - Determine role at current slot // 6 - Perform assigned role, if any func run(ctx context.Context, v iface.Validator) { ... for { ... select { ... case slot := <-v.NextSlot(): ... // Keep trying to update assignments if they are nil or if we are past an // epoch transition in the beacon node's state. if err := v.UpdateDuties(ctx, slot); err != nil { ... } // Start fetching domain data for the next epoch. if slots.IsEpochEnd(slot) { go v.UpdateDomainDataCaches(ctx, slot+1) } ... allRoles, err := v.RolesAt(ctx, slot) ... for pubKey, roles := range allRoles { wg.Add(len(roles)) for _, role := range roles { go func(role iface.ValidatorRole, pubKey [fieldparams.BLSPubkeyLength]byte) { defer wg.Done() switch role { case iface.RoleAttester: v.SubmitAttestation(slotCtx, slot, pubKey) case iface.RoleProposer: v.ProposeBlock(slotCtx, slot, pubKey) case iface.RoleAggregator: v.SubmitAggregateAndProof(slotCtx, slot, pubKey) case iface.RoleSyncCommittee: v.SubmitSyncCommitteeMessage(slotCtx, slot, pubKey) case iface.RoleSyncCommitteeAggregator: v.SubmitSignedContributionAndProof(slotCtx, slot, pubKey) case iface.RoleUnknown: log.WithField("pubKey", fmt.Sprintf("%#x", bytesutil.Trunc(pubKey[:]))).Trace("No active roles, doing nothing") default: log.Warnf("Unhandled role %v", role) } }(role, pubKey) } } ... } } } ```` #### Определение ролей `validator/client/validator.go:676` ````go // RolesAt slot returns the validator roles at the given slot. Returns nil if the // validator is known to not have a roles at the slot. Returns UNKNOWN if the // validator assignments are unknown. Otherwise returns a valid ValidatorRole map. func (v *validator) RolesAt(ctx context.Context, slot types.Slot) (map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole, error) { rolesAt := make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole) for validator, duty := range v.duties.Duties { var roles []iface.ValidatorRole if duty == nil { continue } if len(duty.ProposerSlots) > 0 { for _, proposerSlot := range duty.ProposerSlots { if proposerSlot != 0 && proposerSlot == slot { roles = append(roles, iface.RoleProposer) break } } } if duty.AttesterSlot == slot { roles = append(roles, iface.RoleAttester) aggregator, err := v.isAggregator(ctx, duty.Committee, slot, bytesutil.ToBytes48(duty.PublicKey)) if err != nil { return nil, errors.Wrap(err, "could not check if a validator is an aggregator") } if aggregator { roles = append(roles, iface.RoleAggregator) } } // Being assigned to a sync committee for a given slot means that the validator produces and // broadcasts signatures for `slot - 1` for inclusion in `slot`. At the last slot of the epoch, // the validator checks whether it's in the sync committee of following epoch. inSyncCommittee := false if slots.IsEpochEnd(slot) { if v.duties.NextEpochDuties[validator].IsSyncCommittee { roles = append(roles, iface.RoleSyncCommittee) inSyncCommittee = true } } else { if duty.IsSyncCommittee { roles = append(roles, iface.RoleSyncCommittee) inSyncCommittee = true } } if inSyncCommittee { aggregator, err := v.isSyncCommitteeAggregator(ctx, slot, bytesutil.ToBytes48(duty.PublicKey)) if err != nil { return nil, errors.Wrap(err, "could not check if a validator is a sync committee aggregator") } if aggregator { roles = append(roles, iface.RoleSyncCommitteeAggregator) } } if len(roles) == 0 { roles = append(roles, iface.RoleUnknown) } var pubKey [fieldparams.BLSPubkeyLength]byte copy(pubKey[:], duty.PublicKey) rolesAt[pubKey] = roles } return rolesAt, nil } ```` ### RoleAttester #### intro для текущего слота в duty.Committee : [163 167 102 156 191 124 245 28] для текущего валидатора в текущем слоте duty.AttesterSlot : types.Slot > Ждет появления блока текущего слота от пропопозера, либо наступления 1/3 времени от начала слота > > Broadcast the new attestation to the subnet. #### AttestationData ````go type AttestationData struct { Slot types.Slot `protobuf:"varint,1,opt,name=slot,proto3" json:"slot,omitempty" cast-type:"github.com/prysmaticlabs/eth2-types.Slot"` CommitteeIndex types.CommitteeIndex `protobuf:"varint,2,opt,name=committee_index,json=committeeIndex,proto3" json:"committee_index,omitempty" cast-type:"github.com/prysmaticlabs/eth2-types.CommitteeIndex"` BeaconBlockRoot []byte `protobuf:"bytes,3,opt,name=beacon_block_root,json=beaconBlockRoot,proto3" json:"beacon_block_root,omitempty" ssz-size:"32"` Source *Checkpoint `protobuf:"bytes,4,opt,name=source,proto3" json:"source,omitempty"` Target *Checkpoint `protobuf:"bytes,5,opt,name=target,proto3" json:"target,omitempty"` } ```` #### ````go type Attestation struct { AggregationBits bitfield.Bitlist `protobuf:"bytes,1,opt,name=aggregation_bits,json=aggregationBits,proto3" json:"aggregation_bits,omitempty" cast-type:"github.com/prysmaticlabs/go-bitfield.Bitlist" ssz-max:"2048"` Data *AttestationData `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` Signature []byte `protobuf:"bytes,3,opt,name=signature,proto3" json:"signature,omitempty" ssz-size:"96"` } ```` #### role handler ````go // SubmitAttestation completes the validator client's attester responsibility at a given slot. // It fetches the latest beacon block head along with the latest canonical beacon state // information in order to sign the block and include information about the validator's // participation in voting on the block. func (v *validator) SubmitAttestation(ctx context.Context, slot types.Slot, pubKey [fieldparams.BLSPubkeyLength]byte) { ... /* Ждет появления блока текущего слота от пропопозера, либо наступления 1/3 времени от начала слота */ v.waitOneThirdOrValidBlock(ctx, slot) ... /* Запрос данных аттестации с Бикон-ноды */ req := &ethpb.AttestationDataRequest{ Slot: slot, CommitteeIndex: duty.CommitteeIndex, } data, err := v.validatorClient.GetAttestationData(ctx, req) ... /* подпись данных */ indexedAtt := &ethpb.IndexedAttestation{ AttestingIndices: []uint64{uint64(duty.ValidatorIndex)}, Data: data, } _, signingRoot, err := v.getDomainAndSigningRoot(ctx, indexedAtt.Data) ... sig, _, err := v.signAtt(ctx, pubKey, data, slot) /* проверка Аттестера в комитете */ ... /* Bitfield: по блокам наблюдается, что: - для блоков block.slot % 32 == 0 : aggregation_bits="0xff03" - для всех остальных : aggregation_bits="0xff01" PS хотя возможно это не полная картина (одна нода и есть только первая аттестация) */ aggregationBitfield := bitfield.NewBitlist(uint64(len(duty.Committee))) aggregationBitfield.SetBitAt(indexInCommittee, true) attestation := &ethpb.Attestation{ Data: data, AggregationBits: aggregationBitfield, Signature: sig, } // Set the signature of the attestation and send it out to the beacon node. indexedAtt.Signature = sig if err := v.slashableAttestationCheck(ctx, indexedAtt, pubKey, signingRoot); err != nil { ... } /* Отсылаем аттестацию на Бикон-ноду */ attResp, err := v.validatorClient.ProposeAttestation(ctx, attestation) ... /* ??? */ if err := v.saveAttesterIndexToData(data, duty.ValidatorIndex); err != nil { ... } span.AddAttributes( trace.Int64Attribute("slot", int64(slot)), // lint:ignore uintcast -- This conversion is OK for tracing. trace.StringAttribute("attestationHash", fmt.Sprintf("%#x", attResp.AttestationDataRoot)), trace.Int64Attribute("committeeIndex", int64(data.CommitteeIndex)), trace.StringAttribute("blockRoot", fmt.Sprintf("%#x", data.BeaconBlockRoot)), trace.Int64Attribute("justifiedEpoch", int64(data.Source.Epoch)), trace.Int64Attribute("targetEpoch", int64(data.Target.Epoch)), trace.StringAttribute("bitfield", fmt.Sprintf("%#x", aggregationBitfield)), ) if v.emitAccountMetrics { ValidatorAttestSuccessVec.WithLabelValues(fmtKey).Inc() ValidatorAttestedSlotsGaugeVec.WithLabelValues(fmtKey).Set(float64(slot)) } } ```` #### beacon node ````go // GetAttestationData requests that the beacon node produce an attestation data object, // which the validator acting as an attester will then sign. func (vs *Server) GetAttestationData(ctx context.Context, req *ethpb.AttestationDataRequest) (*ethpb.AttestationData, error) { /* есть кэширование */ ... headState, err := vs.HeadFetcher.HeadState(ctx) headRoot, err := vs.HeadFetcher.HeadRoot(ctx) // In the case that we receive an attestation request after a newer state/block has been processed. if headState.Slot() > req.Slot { headRoot, err = helpers.BlockRootAtSlot(headState, req.Slot) if err != nil { return nil, status.Errorf(codes.Internal, "Could not get historical head root: %v", err) } headState, err = vs.StateGen.StateByRoot(ctx, bytesutil.ToBytes32(headRoot)) if err != nil { return nil, status.Errorf(codes.Internal, "Could not get historical head state: %v", err) } } if headState == nil || headState.IsNil() { return nil, status.Error(codes.Internal, "Could not lookup parent state from head.") } if time.CurrentEpoch(headState) < slots.ToEpoch(req.Slot) { headState, err = transition.ProcessSlotsUsingNextSlotCache(ctx, headState, headRoot, req.Slot) if err != nil { return nil, status.Errorf(codes.Internal, "Could not process slots up to %d: %v", req.Slot, err) } } targetEpoch := time.CurrentEpoch(headState) epochStartSlot, err := slots.EpochStart(targetEpoch) if err != nil { return nil, err } var targetRoot []byte if epochStartSlot == headState.Slot() { targetRoot = headRoot } else { targetRoot, err = helpers.BlockRootAtSlot(headState, epochStartSlot) if err != nil { return nil, status.Errorf(codes.Internal, "Could not get target block for slot %d: %v", epochStartSlot, err) } if bytesutil.ToBytes32(targetRoot) == params.BeaconConfig().ZeroHash { targetRoot = headRoot } } res = &ethpb.AttestationData{ Slot: req.Slot, CommitteeIndex: req.CommitteeIndex, BeaconBlockRoot: headRoot, Source: headState.CurrentJustifiedCheckpoint(), Target: &ethpb.Checkpoint{ Epoch: targetEpoch, Root: targetRoot, }, } ... return res, nil } ```` ````go // ProposeAttestation is a function called by an attester to vote // on a block via an attestation object as defined in the Ethereum Serenity specification. func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation) (*ethpb.AttestResponse, error) { if _, err := bls.SignatureFromBytes(att.Signature); err != nil { return nil, status.Error(codes.InvalidArgument, "Incorrect attestation signature") } root, err := att.Data.HashTreeRoot() if err != nil { return nil, status.Errorf(codes.Internal, "Could not tree hash attestation: %v", err) } // Broadcast the unaggregated attestation on a feed to notify other services in the beacon node // of a received unaggregated attestation. vs.OperationNotifier.OperationFeed().Send(&feed.Event{ Type: operation.UnaggregatedAttReceived, Data: &operation.UnAggregatedAttReceivedData{ Attestation: att, }, }) // Determine subnet to broadcast attestation to wantedEpoch := slots.ToEpoch(att.Data.Slot) vals, err := vs.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch) if err != nil { return nil, err } subnet := helpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot) // Broadcast the new attestation to the network. if err := vs.P2P.BroadcastAttestation(ctx, subnet, att); err != nil { return nil, status.Errorf(codes.Internal, "Could not broadcast attestation: %v", err) } go func() { ctx = trace.NewContext(context.Background(), trace.FromContext(ctx)) attCopy := ethpb.CopyAttestation(att) if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil { log.WithError(err).Error("Could not handle attestation in operations service") return } }() return &ethpb.AttestResponse{ AttestationDataRoot: root[:], }, nil } ```` ### RoleProposer #### intro - один на слот #### GenericBeaconBlock <- BeaconBlock ````go type GenericBeaconBlock struct { // Types that are assignable to Block: // *GenericBeaconBlock_Phase0 // *GenericBeaconBlock_Altair // *GenericBeaconBlock_Bellatrix // *GenericBeaconBlock_BlindedBellatrix Block isGenericBeaconBlock_Block `protobuf_oneof:"block"` } type BeaconBlock struct { Slot github_com_prysmaticlabs_eth2_types.Slot ProposerIndex github_com_prysmaticlabs_eth2_types.ValidatorIndex ParentRoot []byte StateRoot []byte Body *BeaconBlockBody } ```` #### role handler ````go // ProposeBlock proposes a new beacon block for a given slot. This method collects the // previous beacon block, any pending deposits, and ETH1 data from the beacon // chain node to construct the new block. The new block is then processed with // the state root computation, and finally signed by the validator before being // sent back to the beacon node for broadcasting. func (v *validator) ProposeBlock(ctx context.Context, slot types.Slot, pubKey [fieldparams.BLSPubkeyLength]byte) { if slot == 0 { log.Debug("Assigned to genesis slot, skipping proposal") return } ... // Sign randao reveal, it's used to request block from beacon node epoch := types.Epoch(slot / params.BeaconConfig().SlotsPerEpoch) randaoReveal, err := v.signRandaoReveal(ctx, pubKey, epoch, slot) g, err := v.getGraffiti(ctx, pubKey) ... // Request block from beacon node b, err := v.validatorClient.GetBeaconBlock(ctx, &ethpb.BlockRequest{ Slot: slot, RandaoReveal: randaoReveal, Graffiti: g, }) // Sign returned block from beacon node wb, err := wrapper.WrappedBeaconBlock(b.Block) sig, signingRoot, err := v.signBlock(ctx, pubKey, epoch, slot, wb) blk, err := wrapper.BuildSignedBeaconBlock(wb, sig) if err := v.slashableProposalCheck(ctx, pubKey, blk, signingRoot); err != nil { ... } // Propose and broadcast block via beacon node proposal, err := blk.PbGenericBlock() blkResp, err := v.validatorClient.ProposeBeaconBlock(ctx, proposal) ... } ```` #### beacon node ````go // GetBeaconBlock is called by a proposer during its assigned slot to request a block to sign // by passing in the slot and the signed randao reveal of the slot. Returns phase0 beacon blocks // before the Altair fork epoch and Altair blocks post-fork epoch. func (vs *Server) GetBeaconBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) { ... if slots.ToEpoch(req.Slot) < params.BeaconConfig().AltairForkEpoch { blk, err := vs.getPhase0BeaconBlock(ctx, req) if err != nil { return nil, status.Errorf(codes.Internal, "Could not fetch phase0 beacon block: %v", err) } return &ethpb.GenericBeaconBlock{Block: &ethpb.GenericBeaconBlock_Phase0{Phase0: blk}}, nil } else if slots.ToEpoch(req.Slot) < params.BeaconConfig().BellatrixForkEpoch { blk, err := vs.getAltairBeaconBlock(ctx, req) if err != nil { return nil, status.Errorf(codes.Internal, "Could not fetch Altair beacon block: %v", err) } return &ethpb.GenericBeaconBlock{Block: &ethpb.GenericBeaconBlock_Altair{Altair: blk}}, nil } ... } // Build data required for creating a new beacon block, so this method can be shared across forks. func (vs *Server) buildPhase0BlockData(ctx context.Context, req *ethpb.BlockRequest) (*blockData, error) { ctx, span := trace.StartSpan(ctx, "ProposerServer.buildPhase0BlockData") defer span.End() if vs.SyncChecker.Syncing() { return nil, fmt.Errorf("syncing to latest head, not ready to respond") } // Retrieve the parent block as the current head of the canonical chain. parentRoot, err := vs.HeadFetcher.HeadRoot(ctx) if err != nil { return nil, fmt.Errorf("could not retrieve head root: %v", err) } head, err := vs.HeadFetcher.HeadState(ctx) if err != nil { return nil, fmt.Errorf("could not get head state %v", err) } head, err = transition.ProcessSlotsUsingNextSlotCache(ctx, head, parentRoot, req.Slot) if err != nil { return nil, fmt.Errorf("could not advance slots to calculate proposer index: %v", err) } eth1Data, err := vs.eth1DataMajorityVote(ctx, head) if err != nil { return nil, fmt.Errorf("could not get ETH1 data: %v", err) } deposits, atts, err := vs.packDepositsAndAttestations(ctx, head, eth1Data) if err != nil { return nil, err } graffiti := bytesutil.ToBytes32(req.Graffiti) // Calculate new proposer index. idx, err := helpers.BeaconProposerIndex(ctx, head) if err != nil { return nil, fmt.Errorf("could not calculate proposer index %v", err) } proposerSlashings := vs.SlashingsPool.PendingProposerSlashings(ctx, head, false /*noLimit*/) validProposerSlashings := make([]*ethpb.ProposerSlashing, 0, len(proposerSlashings)) for _, slashing := range proposerSlashings { _, err := blocks.ProcessProposerSlashing(ctx, head, slashing, v.SlashValidator) if err != nil { log.WithError(err).Warn("Proposer: invalid proposer slashing") continue } validProposerSlashings = append(validProposerSlashings, slashing) } attSlashings := vs.SlashingsPool.PendingAttesterSlashings(ctx, head, false /*noLimit*/) validAttSlashings := make([]*ethpb.AttesterSlashing, 0, len(attSlashings)) for _, slashing := range attSlashings { _, err := blocks.ProcessAttesterSlashing(ctx, head, slashing, v.SlashValidator) if err != nil { log.WithError(err).Warn("Proposer: invalid attester slashing") continue } validAttSlashings = append(validAttSlashings, slashing) } exits := vs.ExitPool.PendingExits(head, req.Slot, false /*noLimit*/) validExits := make([]*ethpb.SignedVoluntaryExit, 0, len(exits)) for _, exit := range exits { val, err := head.ValidatorAtIndexReadOnly(exit.Exit.ValidatorIndex) if err != nil { log.WithError(err).Warn("Proposer: invalid exit") continue } if err := blocks.VerifyExitAndSignature(val, head.Slot(), head.Fork(), exit, head.GenesisValidatorsRoot()); err != nil { log.WithError(err).Warn("Proposer: invalid exit") continue } validExits = append(validExits, exit) } return &blockData{ ParentRoot: parentRoot, Graffiti: graffiti, ProposerIdx: idx, Eth1Data: eth1Data, Deposits: deposits, Attestations: atts, ProposerSlashings: validProposerSlashings, AttesterSlashings: validAttSlashings, VoluntaryExits: validExits, }, nil } ```` ````go // Build data required for creating a new beacon block, so this method can be shared across forks. func (vs *Server) buildPhase0BlockData(ctx context.Context, req *ethpb.BlockRequest) (*blockData, error) { ... if vs.SyncChecker.Syncing() { return nil, fmt.Errorf("syncing to latest head, not ready to respond") } // Retrieve the parent block as the current head of the canonical chain. parentRoot, err := vs.HeadFetcher.HeadRoot(ctx) ... head, err := vs.HeadFetcher.HeadState(ctx) ... head, err = transition.ProcessSlotsUsingNextSlotCache(ctx, head, parentRoot, req.Slot) ... eth1Data, err := vs.eth1DataMajorityVote(ctx, head) ... deposits, atts, err := vs.packDepositsAndAttestations(ctx, head, eth1Data) /*^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^*/ ... // Calculate new proposer index. idx, err := helpers.BeaconProposerIndex(ctx, head) ... /* обработка Слешинга и Екзитов */ ... return &blockData{ ParentRoot: parentRoot, Graffiti: graffiti, ProposerIdx: idx, Eth1Data: eth1Data, Deposits: deposits, Attestations: atts, ProposerSlashings: validProposerSlashings, AttesterSlashings: validAttSlashings, VoluntaryExits: validExits, }, nil } ```` ````go func (vs *Server) packDepositsAndAttestations(ctx context.Context, head state.BeaconState, eth1Data *ethpb.Eth1Data) ([]*ethpb.Deposit, []*ethpb.Attestation, error) { if features.Get().EnableGetBlockOptimizations { deposits, atts, err := vs.optimizedPackDepositsAndAttestations(ctx, head, eth1Data) ... return deposits, atts, nil } // Pack ETH1 deposits which have not been included in the beacon chain. deposits, err := vs.deposits(ctx, head, eth1Data) ... // Pack aggregated attestations which have not been included in the beacon chain. atts, err := vs.packAttestations(ctx, head) ... return deposits, atts, nil } ```` ````go func (vs *Server) packAttestations(ctx context.Context, latestState state.BeaconState) ([]*ethpb.Attestation, error) { atts := vs.AttPool.AggregatedAttestations() atts, err := vs.validateAndDeleteAttsInPool(ctx, latestState, atts) uAtts, err := vs.AttPool.UnaggregatedAttestations() uAtts, err = vs.validateAndDeleteAttsInPool(ctx, latestState, uAtts) atts = append(atts, uAtts...) // Remove duplicates from both aggregated/unaggregated attestations. This // prevents inefficient aggregates being created. atts, err = proposerAtts(atts).dedup() attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation, len(atts)) for _, att := range atts { attDataRoot, err := att.Data.HashTreeRoot() ... attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att) } attsForInclusion := proposerAtts(make([]*ethpb.Attestation, 0)) for _, as := range attsByDataRoot { as, err := attaggregation.Aggregate(as) ... attsForInclusion = append(attsForInclusion, as...) } deduped, err := attsForInclusion.dedup() ... sorted, err := deduped.sortByProfitability() ... atts = sorted.limitToMaxAttestations() return atts, nil } ```` ````go // ProposeBeaconBlock is called by a proposer during its assigned slot to create a block in an attempt // to get it processed by the beacon node as the canonical head. func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) { ... blk, err := wrapper.WrappedSignedBeaconBlock(req.Block) ... return vs.proposeGenericBeaconBlock(ctx, blk) } ```` ````go func (vs *Server) proposeGenericBeaconBlock(ctx context.Context, blk block.SignedBeaconBlock) (*ethpb.ProposeResponse, error) { ... root, err := blk.Block().HashTreeRoot() ... // Do not block proposal critical path with debug logging or block feed updates. defer func() { log.WithField("blockRoot", fmt.Sprintf("%#x", bytesutil.Trunc(root[:]))).Debugf( "Block proposal received via RPC") vs.BlockNotifier.BlockFeed().Send(&feed.Event{ Type: blockfeed.ReceivedBlock, Data: &blockfeed.ReceivedBlockData{SignedBlock: blk}, }) }() // Broadcast the new block to the network. if err := vs.P2P.Broadcast(ctx, blk.Proto()); err != nil { return nil, fmt.Errorf("could not broadcast block: %v", err) } log.WithFields(logrus.Fields{ "blockRoot": hex.EncodeToString(root[:]), }).Debug("Broadcasting block") if err := vs.BlockReceiver.ReceiveBlock(ctx, blk, root); err != nil { return nil, fmt.Errorf("could not process beacon block: %v", err) } return &ethpb.ProposeResponse{ BlockRoot: root[:], }, nil } ```` ````go // ReceiveBlock is a function that defines the operations (minus pubsub) // that are performed on blocks that is received from regular sync service. The operations consists of: // 1. Validate block, apply state transition and update check points // 2. Apply fork choice to the processed block // 3. Save latest head info func (s *Service) ReceiveBlock(ctx context.Context, block block.SignedBeaconBlock, blockRoot [32]byte) error { ... blockCopy := block.Copy() // Apply state transition on the new block. if err := s.onBlock(ctx, blockCopy, blockRoot); err != nil { err := errors.Wrap(err, "could not process block") tracing.AnnotateError(span, err) return err } // Handle post block operations such as attestations and exits. if err := s.handlePostBlockOperations(blockCopy.Block()); err != nil { return err } // Have we been finalizing? Should we start saving hot states to db? if err := s.checkSaveHotStateDB(ctx); err != nil { return err } ... } ```` ### RoleAggregator #### intro - совпадает с аттестаторами (по логам срабатывают все аттесторы в слоте, но вероятно) (Beacon node will verify the slot signature and determine if the validator is also an aggregator. If yes, then beacon node will broadcast aggregated signature and proof on the validator's behalf) `validator/client/validator.go:676` ````go func (v *validator) RolesAt(ctx context.Context, slot types.Slot) (map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole, error) { ... aggregator, err := v.isAggregator(ctx, duty.Committee, slot, bytesutil.ToBytes48(duty.PublicKey)) if aggregator { roles = append(roles, iface.RoleAggregator) } ... } ```` ````go // isAggregator checks if a validator is an aggregator of a given slot and committee, // it uses a modulo calculated by validator count in committee and samples randomness around it. func (v *validator) isAggregator(ctx context.Context, committee []types.ValidatorIndex, slot types.Slot, pubKey [fieldparams.BLSPubkeyLength]byte) (bool, error) { modulo := uint64(1) /* Если кол-во Аттестаторов <= TargetAggregatorsPerCommittee - то все Аттестаторы - Агрегаторы */ if len(committee)/int(params.BeaconConfig().TargetAggregatorsPerCommittee) > 1 { modulo = uint64(len(committee)) / params.BeaconConfig().TargetAggregatorsPerCommittee } slotSig, err := v.signSlotWithSelectionProof(ctx, pubKey, slot) if err != nil { return false, err } b := hash.Hash(slotSig) return binary.LittleEndian.Uint64(b[:8])%modulo == 0, nil } ```` - на Биконе срабатывает `SubmitAggregateSelectionProof` и `SubmitSignedAggregateSelectionProof` только для одного аттестатора на слот Агрегация аттестаций по слотам ````go aggregator, err := v.isAggregator(ctx, duty.Committee, slot, bytesutil.ToBytes48(duty.PublicKey)) ```` Агрегатор аггрегирует только аттестации своего коммитета - фильтруются неагрегированные аттс по слоту и индексу комитета. Если есть с разной датой - для каждой делается агрегация Агрегации добавляются в пул аггрегированных и удаляются обработанные аттестации из неагрегированных ````go func (c *AttCaches) AggregateUnaggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) error { unaggregatedAtts := c.UnaggregatedAttestationsBySlotIndex(ctx, slot, committeeIndex) //^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ return c.aggregateUnaggregatedAttestations(ctx, unaggregatedAtts) } ```` отфильтровуются агрегироанные аттс по слоту и индексу комитета ````go aggregatedAtts := vs.AttPool.AggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex) ```` выбирается наилучшая ````go // Filter out the best aggregated attestation (ie. the one with the most aggregated bits). if len(aggregatedAtts) == 0 { aggregatedAtts = vs.AttPool.UnaggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex) if len(aggregatedAtts) == 0 { return nil, status.Errorf(codes.NotFound, "Could not find attestation for slot and committee in pool") } } var indexInCommittee uint64 for i, idx := range committee { if idx == validatorIndex { indexInCommittee = uint64(i) } } best := aggregatedAtts[0] for _, aggregatedAtt := range aggregatedAtts[1:] { // The aggregator should prefer an attestation that they have signed. We check this by // looking at the attestation's committee index against the validator's committee index // and check the aggregate bits to ensure the validator's index is set. if aggregatedAtt.Data.CommitteeIndex == req.CommitteeIndex && aggregatedAtt.AggregationBits.BitAt(indexInCommittee) && (!best.AggregationBits.BitAt(indexInCommittee) || aggregatedAtt.AggregationBits.Count() > best.AggregationBits.Count()) { best = aggregatedAtt } // If the "best" still doesn't contain the validator's index, check the aggregation bits to // choose the attestation with the most bits set. if !best.AggregationBits.BitAt(indexInCommittee) && aggregatedAtt.AggregationBits.Count() > best.AggregationBits.Count() { best = aggregatedAtt } } a := &ethpb.AggregateAttestationAndProof{ Aggregate: best, SelectionProof: req.SlotSignature, AggregatorIndex: validatorIndex, } ```` #### AggregateSelectionRequest ````go type AggregateSelectionRequest struct { Slot github_com_prysmaticlabs_eth2_types.Slot CommitteeIndex github_com_prysmaticlabs_eth2_types.CommitteeIndex PublicKey []byte SlotSignature []byte } ```` #### SignedAggregateSubmitRequest ````go type SignedAggregateSubmitRequest struct { SignedAggregateAndProof *SignedAggregateAttestationAndProof } type SignedAggregateAttestationAndProof struct { Message *AggregateAttestationAndProof Signature []byte } type AggregateAttestationAndProof struct { AggregatorIndex github_com_prysmaticlabs_eth2_types.ValidatorIndex Aggregate *Attestation SelectionProof []byte } type Attestation struct { AggregationBits github_com_prysmaticlabs_go_bitfield.Bitlist Data *AttestationData Signature []byte } type AttestationData struct { Slot github_com_prysmaticlabs_eth2_types.Slot Index github_com_prysmaticlabs_eth2_types.CommitteeIndex BeaconBlockRoot []byte Source *Checkpoint Target *Checkpoint } ```` #### role handler ````go // SubmitAggregateAndProof submits the validator's signed slot signature to the beacon node // via gRPC. Beacon node will verify the slot signature and determine if the validator is also // an aggregator. If yes, then beacon node will broadcast aggregated signature and // proof on the validator's behalf. func (v *validator) SubmitAggregateAndProof(ctx context.Context, slot types.Slot, pubKey [fieldparams.BLSPubkeyLength]byte) { ... duty, err := v.duty(pubKey) ... // Avoid sending beacon node duplicated aggregation requests. /* почему-то в логах всегда duty.CommitteeIndex=0 */ k := validatorSubscribeKey(slot, duty.CommitteeIndex) if v.aggregatedSlotCommitteeIDCache.Contains(k) { return } v.aggregatedSlotCommitteeIDCache.Add(k, true) slotSig, err := v.signSlotWithSelectionProof(ctx, pubKey, slot) ... // As specified in spec, an aggregator should wait until two thirds of the way through slot // to broadcast the best aggregate to the global aggregate channel. // https://github.com/ethereum/consensus-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#broadcast-aggregate v.waitToSlotTwoThirds(ctx, slot) res, err := v.validatorClient.SubmitAggregateSelectionProof(ctx, &ethpb.AggregateSelectionRequest{ Slot: slot, CommitteeIndex: duty.CommitteeIndex, PublicKey: pubKey[:], SlotSignature: slotSig, }) ... sig, err := v.aggregateAndProofSig(ctx, pubKey, res.AggregateAndProof, slot) ... _, err = v.validatorClient.SubmitSignedAggregateSelectionProof(ctx, &ethpb.SignedAggregateSubmitRequest{ SignedAggregateAndProof: &ethpb.SignedAggregateAttestationAndProof{ Message: res.AggregateAndProof, Signature: sig, }, }) ... } ```` #### beacon node ````go // SubmitAggregateSelectionProof is called by a validator when its assigned to be an aggregator. // The aggregator submits the selection proof to obtain the aggregated attestation // object to sign over. func (vs *Server) SubmitAggregateSelectionProof(ctx context.Context, req *ethpb.AggregateSelectionRequest) (*ethpb.AggregateSelectionResponse, error) { // An optimistic validator MUST NOT participate in attestation. (i.e., sign across the DOMAIN_BEACON_ATTESTER, DOMAIN_SELECTION_PROOF or DOMAIN_AGGREGATE_AND_PROOF domains). if err := vs.optimisticStatus(ctx); err != nil { return nil, err } st, err := vs.HeadFetcher.HeadState(ctx) ... validatorIndex, exists := st.ValidatorIndexByPubkey(bytesutil.ToBytes48(req.PublicKey)) if !exists { return nil, status.Error(codes.Internal, "Could not locate validator index in DB") } epoch := slots.ToEpoch(req.Slot) activeValidatorIndices, err := helpers.ActiveValidatorIndices(ctx, st, epoch) seed, err := helpers.Seed(st, epoch, params.BeaconConfig().DomainBeaconAttester) committee, err := helpers.BeaconCommittee(ctx, activeValidatorIndices, seed, req.Slot, req.CommitteeIndex) // Check if the validator is an aggregator isAggregator, err := helpers.IsAggregator(uint64(len(committee)), req.SlotSignature) if !isAggregator { return nil, status.Errorf(codes.InvalidArgument, "Validator is not an aggregator") } if err := vs.AttPool.AggregateUnaggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex); err != nil { return nil, status.Errorf(codes.Internal, "Could not aggregate unaggregated attestations") } aggregatedAtts := vs.AttPool.AggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex) // Filter out the best aggregated attestation (ie. the one with the most aggregated bits). if len(aggregatedAtts) == 0 { aggregatedAtts = vs.AttPool.UnaggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex) if len(aggregatedAtts) == 0 { return nil, status.Errorf(codes.NotFound, "Could not find attestation for slot and committee in pool") } } var indexInCommittee uint64 for i, idx := range committee { if idx == validatorIndex { indexInCommittee = uint64(i) } } best := aggregatedAtts[0] for _, aggregatedAtt := range aggregatedAtts[1:] { // The aggregator should prefer an attestation that they have signed. We check this by // looking at the attestation's committee index against the validator's committee index // and check the aggregate bits to ensure the validator's index is set. if aggregatedAtt.Data.CommitteeIndex == req.CommitteeIndex && aggregatedAtt.AggregationBits.BitAt(indexInCommittee) && (!best.AggregationBits.BitAt(indexInCommittee) || aggregatedAtt.AggregationBits.Count() > best.AggregationBits.Count()) { best = aggregatedAtt } // If the "best" still doesn't contain the validator's index, check the aggregation bits to // choose the attestation with the most bits set. if !best.AggregationBits.BitAt(indexInCommittee) && aggregatedAtt.AggregationBits.Count() > best.AggregationBits.Count() { best = aggregatedAtt } } a := &ethpb.AggregateAttestationAndProof{ Aggregate: best, SelectionProof: req.SlotSignature, AggregatorIndex: validatorIndex, } return &ethpb.AggregateSelectionResponse{AggregateAndProof: a}, nil } ```` ````go // SubmitSignedAggregateSelectionProof is called by a validator to broadcast a signed // aggregated and proof object. func (vs *Server) SubmitSignedAggregateSelectionProof( ctx context.Context, req *ethpb.SignedAggregateSubmitRequest, ) (*ethpb.SignedAggregateSubmitResponse, error) { if req.SignedAggregateAndProof == nil || req.SignedAggregateAndProof.Message == nil || req.SignedAggregateAndProof.Message.Aggregate == nil || req.SignedAggregateAndProof.Message.Aggregate.Data == nil { return nil, status.Error(codes.InvalidArgument, "Signed aggregate request can't be nil") } emptySig := make([]byte, fieldparams.BLSSignatureLength) if bytes.Equal(req.SignedAggregateAndProof.Signature, emptySig) || bytes.Equal(req.SignedAggregateAndProof.Message.SelectionProof, emptySig) { return nil, status.Error(codes.InvalidArgument, "Signed signatures can't be zero hashes") } // As a preventive measure, a beacon node shouldn't broadcast an attestation whose slot is out of range. if err := helpers.ValidateAttestationTime(req.SignedAggregateAndProof.Message.Aggregate.Data.Slot, vs.TimeFetcher.GenesisTime(), params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil { return nil, status.Error(codes.InvalidArgument, "Attestation slot is no longer valid from current time") } if err := vs.P2P.Broadcast(ctx, req.SignedAggregateAndProof); err != nil { return nil, status.Errorf(codes.Internal, "Could not broadcast signed aggregated attestation: %v", err) } log.WithFields(logrus.Fields{ "slot": req.SignedAggregateAndProof.Message.Aggregate.Data.Slot, "committeeIndex": req.SignedAggregateAndProof.Message.Aggregate.Data.CommitteeIndex, "validatorIndex": req.SignedAggregateAndProof.Message.AggregatorIndex, "aggregatedCount": req.SignedAggregateAndProof.Message.Aggregate.AggregationBits.Count(), }).Debug("Broadcasting aggregated attestation and proof") return &ethpb.SignedAggregateSubmitResponse{}, nil } ```` ### RoleSyncCommittee #### intro - Срабатывает на каждом слоте начиная с эпохи Альтаир(?) - Срабатывает для всех активных(?) локальных валидаторов - Назначаются на всю эпоху (текущая и следующая) в количестве до 512 участников Служит для поддержки работы Легкого Клиента Подписывает предпоследний (слот-1) рут, чем подтверждает его правильность Отправляет SyncCommittee-сообщения которые добавляются в пулл SyncCommittee, и в последствии аггрегируются и добавляются в блок пропозером В общем виде работа аналогична обычному Аттестатору #### ````go ```` #### role handler ````go // SubmitSyncCommitteeMessage submits the sync committee message to the beacon chain. func (v *validator) SubmitSyncCommitteeMessage(ctx context.Context, slot types.Slot, pubKey [fieldparams.BLSPubkeyLength]byte) { ctx, span := trace.StartSpan(ctx, "validator.SubmitSyncCommitteeMessage") defer span.End() span.AddAttributes(trace.StringAttribute("validator", fmt.Sprintf("%#x", pubKey))) /* 1/3 слота либо нового блока*/ v.waitOneThirdOrValidBlock(ctx, slot) /* Root - текущего хед-блока res - &ethpb.SyncMessageBlockRootResponse{ Root: r, } */ res, err := v.validatorClient.GetSyncMessageBlockRoot(ctx, &emptypb.Empty{}) duty, err := v.duty(pubKey) d, err := v.domainData(ctx, slots.ToEpoch(slot), params.BeaconConfig().DomainSyncCommittee[:]) sszRoot := types.SSZBytes(res.Root) r, err := signing.ComputeSigningRoot(&sszRoot, d.SignatureDomain) sig, err := v.keyManager.Sign(ctx, &validatorpb.SignRequest{ PublicKey: pubKey[:], SigningRoot: r[:], SignatureDomain: d.SignatureDomain, Object: &validatorpb.SignRequest_SyncMessageBlockRoot{ SyncMessageBlockRoot: res.Root, }, SigningSlot: slot, }) msg := &ethpb.SyncCommitteeMessage{ Slot: slot, BlockRoot: res.Root, ValidatorIndex: duty.ValidatorIndex, Signature: sig.Marshal(), } if _, err := v.validatorClient.SubmitSyncMessage(ctx, msg); err != nil { log.WithError(err).Error("Could not submit sync committee message") return } log.WithFields(logrus.Fields{ "slot": msg.Slot, "blockRoot": fmt.Sprintf("%#x", bytesutil.Trunc(msg.BlockRoot)), "validatorIndex": msg.ValidatorIndex, }).Info("Submitted new sync message") } ```` #### beacon node ````go // GetSyncMessageBlockRoot retrieves the sync committee block root of the beacon chain. func (vs *Server) GetSyncMessageBlockRoot( ctx context.Context, _ *emptypb.Empty, ) (*ethpb.SyncMessageBlockRootResponse, error) { // An optimistic validator MUST NOT participate in sync committees // (i.e., sign across the DOMAIN_SYNC_COMMITTEE, DOMAIN_SYNC_COMMITTEE_SELECTION_PROOF or DOMAIN_CONTRIBUTION_AND_PROOF domains). if err := vs.optimisticStatus(ctx); err != nil { return nil, err } r, err := vs.HeadFetcher.HeadRoot(ctx) return &ethpb.SyncMessageBlockRootResponse{ Root: r, }, nil } ```` ````go // SubmitSyncMessage submits the sync committee message to the network. // It also saves the sync committee message into the pending pool for block inclusion. func (vs *Server) SubmitSyncMessage(ctx context.Context, msg *ethpb.SyncCommitteeMessage) (*emptypb.Empty, error) { errs, ctx := errgroup.WithContext(ctx) headSyncCommitteeIndices, err := vs.HeadFetcher.HeadSyncCommitteeIndices(ctx, msg.ValidatorIndex, msg.Slot) // Broadcasting and saving message into the pool in parallel. As one fail should not affect another. // This broadcasts for all subnets. for _, index := range headSyncCommitteeIndices { subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount subnet := uint64(index) / subCommitteeSize errs.Go(func() error { return vs.P2P.BroadcastSyncCommitteeMessage(ctx, subnet, msg) }) } if err := vs.SyncCommitteePool.SaveSyncCommitteeMessage(msg); err != nil { return &emptypb.Empty{}, err } // Wait for p2p broadcast to complete and return the first error (if any) err = errs.Wait() return &emptypb.Empty{}, err } ```` ### RoleSyncCommitteeAggregator Агрегирует SyncCommittee-сообщения, которые добавляются в пулл SyncCommittee, и добавляются в блок пропозером В общем виде работа аналогична обычному Аггрегатору #### intro - один на слот #### GenericBeaconBlock <- BeaconBlock ````go ```` #### role handler ````go ```` #### beacon node ````go ```` ````go ```` ````go ```` ````go ```` ````go ```` ````go ```` ````go ````