Skip to content

Commit

Permalink
obs, roachprod, rpc, spanconfig, storage, testutils: cleanup lock usages
Browse files Browse the repository at this point in the history
This PR updates unsafe/unnecessary manual unlocks to defer unlocks.
The criteria for leaving some unlocks as is:
- will not cause a leak
- releases the resources much earlier
- deferring will cause a deadlock without significant refactor
Part of #107946

Release note: None
  • Loading branch information
Santamaura committed Sep 6, 2023
1 parent 6a31120 commit 8e02a8a
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 156 deletions.
38 changes: 21 additions & 17 deletions pkg/obs/event_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,27 +317,31 @@ func (s *EventsExporter) Start(ctx context.Context, stopper *stop.Stopper) error
{Resource: &s.resource},
},
}
s.buf.mu.Lock()
// Iterate through the different types of events.
req.ResourceLogs[0].ScopeLogs = make([]otel_logs_pb.ScopeLogs, 0, len(s.buf.mu.events))
for _, buf := range s.buf.mu.events {
events, sizeBytes := buf.moveContents()
if len(events) == 0 {
continue
func() {
s.buf.mu.Lock()
defer s.buf.mu.Unlock()
// Iterate through the different types of events.
req.ResourceLogs[0].ScopeLogs = make([]otel_logs_pb.ScopeLogs, 0, len(s.buf.mu.events))
for _, buf := range s.buf.mu.events {
events, sizeBytes := buf.moveContents()
if len(events) == 0 {
continue
}
totalEvents += len(events)
s.buf.mu.sizeBytes -= sizeBytes
msgSize += sizeBytes
req.ResourceLogs[0].ScopeLogs = append(req.ResourceLogs[0].ScopeLogs,
otel_logs_pb.ScopeLogs{Scope: &buf.instrumentationScope, LogRecords: events})
}
totalEvents += len(events)
s.buf.mu.sizeBytes -= sizeBytes
msgSize += sizeBytes
req.ResourceLogs[0].ScopeLogs = append(req.ResourceLogs[0].ScopeLogs,
otel_logs_pb.ScopeLogs{Scope: &buf.instrumentationScope, LogRecords: events})
}
s.buf.mu.Unlock()
}()

if len(req.ResourceLogs[0].ScopeLogs) > 0 {
_, err := s.otelClient.Export(ctx, req, grpc.WaitForReady(true))
s.buf.mu.Lock()
s.buf.mu.memAccount.Shrink(ctx, int64(msgSize))
s.buf.mu.Unlock()
func() {
s.buf.mu.Lock()
defer s.buf.mu.Unlock()
s.buf.mu.memAccount.Shrink(ctx, int64(msgSize))
}()
if err != nil {
log.Warningf(ctx, "failed to export events: %s", err)
} else {
Expand Down
106 changes: 59 additions & 47 deletions pkg/roachprod/install/cluster_synced.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,8 +1339,8 @@ exit 1
return res, nil
}
mu.Lock()
defer mu.Unlock()
providerKnownHostData[nodeProvider] = []byte(res.Stdout)
mu.Unlock()
return res, nil
}, WithDisplay("scanning hosts")); err != nil {
return err
Expand Down Expand Up @@ -1930,40 +1930,46 @@ func (c *SyncedCluster) Put(
case r, ok := <-results:
done = !ok
if ok {
linesMu.Lock()
if r.err != nil {
setErr(r.err)
lines[r.index] = r.err.Error()
} else {
lines[r.index] = "done"
}
linesMu.Unlock()
func() {
linesMu.Lock()
defer linesMu.Unlock()
if r.err != nil {
setErr(r.err)
lines[r.index] = r.err.Error()
} else {
lines[r.index] = "done"
}
}()
}
}
if !config.Quiet {
linesMu.Lock()
for i := range lines {
fmt.Fprintf(&writer, " %2d: ", nodes[i])
if lines[i] != "" {
fmt.Fprintf(&writer, "%s", lines[i])
} else {
fmt.Fprintf(&writer, "%s", spinner[spinnerIdx%len(spinner)])
func() {
linesMu.Lock()
defer linesMu.Unlock()
for i := range lines {
fmt.Fprintf(&writer, " %2d: ", nodes[i])
if lines[i] != "" {
fmt.Fprintf(&writer, "%s", lines[i])
} else {
fmt.Fprintf(&writer, "%s", spinner[spinnerIdx%len(spinner)])
}
fmt.Fprintf(&writer, "\n")
}
fmt.Fprintf(&writer, "\n")
}
linesMu.Unlock()
}()
_ = writer.Flush(l.Stdout)
spinnerIdx++
}
}

if config.Quiet && l.File != nil {
l.Printf("\n")
linesMu.Lock()
for i := range lines {
l.Printf(" %2d: %s", nodes[i], lines[i])
}
linesMu.Unlock()
func() {
linesMu.Lock()
defer linesMu.Unlock()
for i := range lines {
l.Printf(" %2d: %s", nodes[i], lines[i])
}
}()
}

if finalErr != nil {
Expand Down Expand Up @@ -2301,39 +2307,45 @@ func (c *SyncedCluster) Get(
case r, ok := <-results:
done = !ok
if ok {
linesMu.Lock()
if r.err != nil {
haveErr = true
lines[r.index] = r.err.Error()
} else {
lines[r.index] = "done"
}
linesMu.Unlock()
func() {
linesMu.Lock()
defer linesMu.Unlock()
if r.err != nil {
haveErr = true
lines[r.index] = r.err.Error()
} else {
lines[r.index] = "done"
}
}()
}
}
if !config.Quiet && l.File == nil {
linesMu.Lock()
for i := range lines {
fmt.Fprintf(&writer, " %2d: ", nodes[i])
if lines[i] != "" {
fmt.Fprintf(&writer, "%s", lines[i])
} else {
fmt.Fprintf(&writer, "%s", spinner[spinnerIdx%len(spinner)])
func() {
linesMu.Lock()
defer linesMu.Unlock()
for i := range lines {
fmt.Fprintf(&writer, " %2d: ", nodes[i])
if lines[i] != "" {
fmt.Fprintf(&writer, "%s", lines[i])
} else {
fmt.Fprintf(&writer, "%s", spinner[spinnerIdx%len(spinner)])
}
fmt.Fprintf(&writer, "\n")
}
fmt.Fprintf(&writer, "\n")
}
linesMu.Unlock()
}()
_ = writer.Flush(l.Stdout)
spinnerIdx++
}
}

if config.Quiet && l.File != nil {
linesMu.Lock()
for i := range lines {
l.Printf(" %2d: %s", nodes[i], lines[i])
}
linesMu.Unlock()
func() {
linesMu.Lock()
defer linesMu.Unlock()
for i := range lines {
l.Printf(" %2d: %s", nodes[i], lines[i])
}
}()
}

if haveErr {
Expand Down
86 changes: 54 additions & 32 deletions pkg/roachprod/vm/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,12 @@ func (p *Provider) Create(
return err
}

p.mu.Lock()
subnet, ok := p.mu.subnets[location]
p.mu.Unlock()
subnet, ok := func() (network.Subnet, bool) {
p.mu.Lock()
defer p.mu.Unlock()
s, ok := p.mu.subnets[location]
return s, ok
}()
if !ok {
return errors.Errorf("missing subnet for location %q", location)
}
Expand Down Expand Up @@ -733,9 +736,7 @@ func (p *Provider) createNIC(
return
}

p.mu.Lock()
sg := p.mu.securityGroups[p.getVnetNetworkSecurityGroupName(*group.Location)]
p.mu.Unlock()
_, sg := p.getResourcesAndSecurityGroupByName("", p.getVnetNetworkSecurityGroupName(*group.Location))

future, err := client.CreateOrUpdate(ctx, *group.Name, *ip.Name, network.Interface{
Name: ip.Name,
Expand Down Expand Up @@ -772,9 +773,12 @@ func (p *Provider) createNIC(
func (p *Provider) getOrCreateNetworkSecurityGroup(
ctx context.Context, name string, resourceGroup resources.Group,
) (network.SecurityGroup, error) {
p.mu.Lock()
group, ok := p.mu.securityGroups[name]
p.mu.Unlock()
group, ok := func() (network.SecurityGroup, bool) {
p.mu.Lock()
defer p.mu.Unlock()
g, ok := p.mu.securityGroups[name]
return g, ok
}()
if ok {
return group, nil
}
Expand All @@ -793,8 +797,8 @@ func (p *Provider) getOrCreateNetworkSecurityGroup(

cacheAndReturn := func(group network.SecurityGroup) (network.SecurityGroup, error) {
p.mu.Lock()
defer p.mu.Unlock()
p.mu.securityGroups[name] = group
p.mu.Unlock()
return group, nil
}

Expand Down Expand Up @@ -1016,9 +1020,7 @@ func (p *Provider) createVNets(
newSubnetsCreated := false

for _, location := range providerOpts.Locations {
p.mu.Lock()
group := p.mu.resourceGroups[vnetResourceGroupName(location)]
p.mu.Unlock()
group, _ := p.getResourcesAndSecurityGroupByName(vnetResourceGroupName(location), "")
// Prefix already exists for the resource group.
if prefixString := group.Tags[tagSubnet]; prefixString != nil {
prefix, err := strconv.Atoi(*prefixString)
Expand All @@ -1033,18 +1035,18 @@ func (p *Provider) createVNets(
newSubnetsCreated = true
prefix := nextAvailablePrefix()
prefixesByLocation[location] = prefix
p.mu.Lock()
group := p.mu.resourceGroups[vnetResourceGroupName(location)]
p.mu.Unlock()
group, _ := p.getResourcesAndSecurityGroupByName(vnetResourceGroupName(location), "")
group, err = setVNetSubnetPrefix(group, prefix)
if err != nil {
return nil, errors.Wrapf(err, "for location %q", location)
}
// We just updated the VNet Subnet prefix on the resource group -- update
// the cached entry to reflect that.
p.mu.Lock()
p.mu.resourceGroups[vnetResourceGroupName(location)] = group
p.mu.Unlock()
func() {
p.mu.Lock()
defer p.mu.Unlock()
p.mu.resourceGroups[vnetResourceGroupName(location)] = group
}()
}
}

Expand All @@ -1055,10 +1057,7 @@ func (p *Provider) createVNets(
ret := make(map[string]network.VirtualNetwork)
vnets := make([]network.VirtualNetwork, len(ret))
for location, prefix := range prefixesByLocation {
p.mu.Lock()
resourceGroup := p.mu.resourceGroups[vnetResourceGroupName(location)]
networkSecurityGroup := p.mu.securityGroups[p.getVnetNetworkSecurityGroupName(location)]
p.mu.Unlock()
resourceGroup, networkSecurityGroup := p.getResourcesAndSecurityGroupByName(vnetResourceGroupName(location), p.getVnetNetworkSecurityGroupName(location))
if vnet, _, err := p.createVNet(l, ctx, resourceGroup, networkSecurityGroup, prefix, providerOpts); err == nil {
ret[location] = vnet
vnets = append(vnets, vnet)
Expand Down Expand Up @@ -1129,8 +1128,8 @@ func (p *Provider) createVNet(
}
subnet = (*vnet.Subnets)[0]
p.mu.Lock()
defer p.mu.Unlock()
p.mu.subnets[*resourceGroup.Location] = subnet
p.mu.Unlock()
l.Printf("created Azure VNet %q in %q with prefix %d", vnetName, *resourceGroup.Name, prefix)
return
}
Expand Down Expand Up @@ -1302,17 +1301,20 @@ func (p *Provider) getOrCreateResourceGroup(
) (resources.Group, error) {

// First, check the local provider cache.
p.mu.Lock()
group, ok := p.mu.resourceGroups[name]
p.mu.Unlock()
group, ok := func() (resources.Group, bool) {
p.mu.Lock()
defer p.mu.Unlock()
g, ok := p.mu.resourceGroups[name]
return g, ok
}()
if ok {
return group, nil
}

cacheAndReturn := func(group resources.Group) (resources.Group, error) {
p.mu.Lock()
defer p.mu.Unlock()
p.mu.resourceGroups[name] = group
p.mu.Unlock()
return group, nil
}

Expand Down Expand Up @@ -1401,9 +1403,11 @@ func (p *Provider) createUltraDisk(
func (p *Provider) getSubscription(
ctx context.Context,
) (sub subscriptions.Subscription, err error) {
p.mu.Lock()
sub = p.mu.subscription
p.mu.Unlock()
sub = func() subscriptions.Subscription {
p.mu.Lock()
defer p.mu.Unlock()
return p.mu.subscription
}()

if sub.SubscriptionID != nil {
return
Expand All @@ -1423,8 +1427,26 @@ func (p *Provider) getSubscription(
sub = page.Values()[0]

p.mu.Lock()
defer p.mu.Unlock()
p.mu.subscription = page.Values()[0]
p.mu.Unlock()
}
return
}

// getResourceGroupByName receives a string name and returns
// the resources.Group associated with it.
func (p *Provider) getResourcesAndSecurityGroupByName(
rName string, sName string,
) (resources.Group, network.SecurityGroup) {
var rGroup resources.Group
var sGroup network.SecurityGroup
p.mu.Lock()
defer p.mu.Unlock()
if rName != "" {
rGroup = p.mu.resourceGroups[rName]
}
if sName != "" {
sGroup = p.mu.securityGroups[sName]
}
return rGroup, sGroup
}
Loading

0 comments on commit 8e02a8a

Please sign in to comment.