# GOZALANDIA RESEARCH
## Topics
`beacon-chain/p2p/topics.go`
````go
...
// Message Types
//
// GossipAttestationMessage is the name for the attestation message type. It is
// specially extracted so as to determine the correct message type from an attestation
// subnet.
GossipAttestationMessage = "beacon_attestation"
// GossipSyncCommitteeMessage is the name for the sync committee message type. It is
// specially extracted so as to determine the correct message type from a sync committee
// subnet.
GossipSyncCommitteeMessage = "sync_committee"
// GossipBlockMessage is the name for the block message type.
GossipBlockMessage = "beacon_block"
// GossipExitMessage is the name for the voluntary exit message type.
GossipExitMessage = "voluntary_exit"
// GossipProposerSlashingMessage is the name for the proposer slashing message type.
GossipProposerSlashingMessage = "proposer_slashing"
// GossipAttesterSlashingMessage is the name for the attester slashing message type.
GossipAttesterSlashingMessage = "attester_slashing"
// GossipAggregateAndProofMessage is the name for the attestation aggregate and proof message type.
GossipAggregateAndProofMessage = "beacon_aggregate_and_proof"
// GossipContributionAndProofMessage is the name for the sync contribution and proof message type.
GossipContributionAndProofMessage = "sync_committee_contribution_and_proof"
...
````
## Бродкаст сообщений субсети
`beacon-chain/p2p/broadcaster.go`
````go
// BroadcastSyncCommitteeMessage broadcasts a sync committee message to the p2p network, the message is assumed to be
// broadcasted to the current fork.
func (s *Service) BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error {
forkDigest, err := s.currentForkDigest()
...
// Non-blocking broadcast, with attempts to discover a subnet peer if none available.
go s.broadcastSyncCommittee(ctx, subnet, sMsg, forkDigest)
return nil
}
// BroadcastAttestation broadcasts an attestation to the p2p network, the message is assumed to be
// broadcasted to the current fork.
func (s *Service) BroadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation) error {
...
forkDigest, err := s.currentForkDigest()
...
// Non-blocking broadcast, with attempts to discover
// a subnet peer if none available.
go s.broadcastAttestation(ctx, subnet, att, forkDigest)
return nil
}
func (s *Service) broadcastAttestation(ctx context.Context, subnet uint64, att *ethpb.Attestation, forkDigest [4]byte) {
...
// Ensure we have peers with this subnet.
hasPeer := s.hasPeerWithSubnet(attestationToTopic(subnet, forkDigest))
span.AddAttributes(
trace.BoolAttribute("hasPeer", hasPeer),
trace.Int64Attribute("slot", int64(att.Data.Slot)), // lint:ignore uintcast -- It's safe to do this for tracing.
trace.Int64Attribute("subnet", int64(subnet)), // lint:ignore uintcast -- It's safe to do this for tracing.
)
...
// In the event our attestation is outdated and beyond the
// acceptable threshold, we exit early and do not broadcast it.
currSlot := slots.CurrentSlot(uint64(s.genesisTime.Unix()))
...
if err := s.broadcastObject(ctx, att, attestationToTopic(subnet, forkDigest)); err != nil {...}
}
func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage, forkDigest [4]byte) {
...
// Ensure we have peers with this subnet.
hasPeer := s.hasPeerWithSubnet(syncCommitteeToTopic(subnet, forkDigest))
span.AddAttributes(
trace.BoolAttribute("hasPeer", hasPeer),
trace.Int64Attribute("slot", int64(sMsg.Slot)), // lint:ignore uintcast -- It's safe to do this for tracing.
trace.Int64Attribute("subnet", int64(subnet)), // lint:ignore uintcast -- It's safe to do this for tracing.
)
...
if err := s.broadcastObject(ctx, sMsg, syncCommitteeToTopic(subnet, forkDigest)); err != nil {...}
}
````
### BroadcastAttestation
````
BroadcastAttestation
Usages in All Places
Unclassified
.workspace
/home/mezin/go/src/tesseract/beacon-chain/p2p
/home/mezin/go/src/tesseract/beacon-chain/rpc/eth/beacon
pool.go
SubmitAttestations
if err := bs.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil {
/home/mezin/go/src/tesseract/beacon-chain/rpc/prysm/v1alpha1/validator
attester.go
ProposeAttestation
if err := vs.P2P.BroadcastAttestation(ctx, subnet, att); err != nil {
/home/mezin/go/src/tesseract/beacon-chain/sync
pending_attestations_queue.go
processPendingAtts
if err := s.cfg.p2p.BroadcastAttestation(ctx, helpers.ComputeSubnetForAttestation(valCount, signedAtt.Message.Aggregate), signedAtt.Message.Aggregate); err != nil {
````
### BroadcastSyncCommitteeMessage
```
BroadcastSyncCommitteeMessage
Usages in All Places
Unclassified
.workspace
/home/mezin/go/src/tesseract/beacon-chain/p2p
/home/mezin/go/src/tesseract/beacon-chain/rpc/prysm/v1alpha1/validator
sync_committee.go
SubmitSyncMessage
return vs.P2P.BroadcastSyncCommitteeMessage(ctx, subnet, msg)
```
## Получение и обработка сообщений субсети
### Subscription
смотри `beacon-chain/sync/subscriber.go`
// SubscribeToAllSubnets defines a flag to specify whether to subscribe to all possible attestation/sync subnets or not.
SubscribeToAllSubnets = &cli.BoolFlag{
Name: "subscribe-all-subnets",
Usage: "Subscribe to all possible attestation and sync subnets.",
}
используется три типа подписки
1. **subscribe**: subscribe to a given topic with a given validator and subscription handler.The base protobuf message is used to initialize new messages for decoding.
> общая подписка
1. **subscribeStaticWithSubnets**: subscribe to a static subnet with the given topic and index.A given validator and subscription handler is used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
> Позволяет сделать подписку на все субсети, возможно тестовый режим, активируется цмд-флагом `subscribe-all-subnets`
> ```
> // SubscribeToAllSubnets defines a flag to specify whether to subscribe to all possible attestation/sync subnets or not.
>SubscribeToAllSubnets = &cli.BoolFlag{
> Name: "subscribe-all-subnets",
> Usage: "Subscribe to all possible attestation and sync subnets.",
> }
> ```
1. **subscribeDynamicWithSubnets**: subscribe to a dynamically changing list of subnets. This method expects a fmt compatible string for the topic name and the list of subnets for subscribed topics that should be maintained.
> запуск без флага из пукта выше, т.е. дефолтная подписка.
В случае Гозаландии подписываемся на последних две по примеру аттестации и синкКомитета
`beacon-chain/sync/subscriber.go`
````go
// Register PubSub subscribers
func (s *Service) registerSubscribers(epoch types.Epoch, digest [4]byte) {
...
if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSubnets(
p2p.AttestationSubnetTopicFormat,
s.validateCommitteeIndexBeaconAttestation, /* validator */
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
digest,
)
} else {
s.subscribeDynamicWithSubnets(
p2p.AttestationSubnetTopicFormat,
s.validateCommitteeIndexBeaconAttestation, /* validator */
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
digest,
)
}
...
}
````
<hr>
># Beacon: voting
## updateHead
получает балансы от
````go
balances, err := s.justifiedBalances.get(s.ctx, bytesutil.ToBytes32(justified.Root))
...
newHeadRoot, err := s.updateHead(s.ctx, balances)
````
```
/home/mezin/go/src/tesseract/beacon-chain/blockchain
head.go
head_test.go
process_block.go
onBlock
headRoot, err := s.updateHead(ctx, balances)
receive_attestation.go
spawnProcessAttestationsRoutine
newHeadRoot, err := s.updateHead(s.ctx, balances)
```
````go
// This routine processes fork choice attestations from the pool to account for validator votes and fork choice.
func (s *Service) spawnProcessAttestationsRoutine(stateFeed *event.Feed) {
go func() {
...
for {
select {
case <-st.C():
if err := s.NewSlot(s.ctx, s.CurrentSlot()); err != nil {...}
...
candidates, err := s.cfg.ExecutionEngineCaller.ExecutionDagSync(s.ctx, syncParams)
...
/*
Обработка локальных агрегированных аттестаций в пуле
*/
// Continue when there's no fork choice attestation, there's nothing to process and update head.
// This covers the condition when the node is still initial syncing to the head of the chain.
if s.cfg.AttPool.ForkchoiceAttestationCount() == 0 {
continue
}
s.processAttestations(s.ctx)
// ^^^^^^^^^^^^^^^^^^^^^^^^^
justified := s.store.JustifiedCheckpt()
...
balances, err := s.justifiedBalances.get(s.ctx, bytesutil.ToBytes32(justified.Root))
...
newHeadRoot, err := s.updateHead(s.ctx, balances)
...
s.notifyEngineIfChangedHead(s.ctx, newHeadRoot)
}
}
}()
}
````
## Attestations pool
`beacon-chain/operations/attestations/kv/unaggregated.go`
`beacon-chain/operations/attestations/kv/forkchoice.go`