Skip to content
2 changes: 1 addition & 1 deletion src/sdam/server_selection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ export function sameServerSelector(description?: ServerDescription): ServerSelec
* server potentially for a write on a secondary.
*/
export function secondaryWritableServerSelector(
wireVersion?: number,
wireVersion: number,
readPreference?: ReadPreference
): ServerSelector {
// If server version < 5.0, read preference always primary.
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
return result;
}

get commonWireVersion(): number | undefined {
get commonWireVersion(): number {
return this.description.commonWireVersion;
}

Expand Down
2 changes: 1 addition & 1 deletion src/sdam/topology_description.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ export class TopologyDescription {

// update common wire version
if (serverDescription.maxWireVersion !== 0) {
if (commonWireVersion == null) {
if (commonWireVersion === 0) {
commonWireVersion = serverDescription.maxWireVersion;
} else {
commonWireVersion = Math.min(commonWireVersion, serverDescription.maxWireVersion);
Expand Down
69 changes: 69 additions & 0 deletions test/integration/crud/aggregation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -870,4 +870,73 @@ describe('Aggregation', function () {
.finally(() => client.close());
}
});

it(
'should perform aggregations with a write stage on secondary when readPreference is secondary',
{
metadata: { requires: { topology: 'replicaset', mongodb: '>=5.0' } },
async test() {
const databaseName = this.configuration.db;
const client = this.configuration.newClient(this.configuration.writeConcernMax(), {
maxPoolSize: 1,
monitorCommands: true
});

const events = [];
client.on('commandStarted', filterForCommands(['hello', 'aggregate'], events));

// Discover primary to be able to check the actual server address
await client.db('admin').command({ hello: 1 });
const [helloEvent] = events;
const primaryAddress = helloEvent.address;

// Clear events
events.length = 0;

const src = client.db(databaseName).collection('read_pref_src');
const outMerge = client.db(databaseName).collection('read_pref_merge_out');
const outOut = client.db(databaseName).collection('read_pref_out_out');

await Promise.all([src.deleteMany({}), outMerge.deleteMany({}), outOut.deleteMany({})]);
await src.insertMany([{ a: 1 }, { a: 2 }]);
await Promise.all([
src
.aggregate(
[
{
$merge: {
into: 'read_pref_merge_out',
whenMatched: 'replace',
whenNotMatched: 'insert'
}
}
],
{ readPreference: 'secondary' }
)
.toArray(),
src
.aggregate(
[
{
$out: 'read_pref_out_out'
}
],
{ readPreference: 'secondary' }
)
.toArray()
]);

expect(events).to.have.length(2);
events.forEach(event => {
expect(event).to.have.property('commandName', 'aggregate');
expect(event.address).to.not.equal(primaryAddress);
expect(event).to.have.deep.nested.property('command.$readPreference', {
mode: 'secondary'
});
});

await client.close();
}
}
);
});
10 changes: 5 additions & 5 deletions test/unit/sdam/server_selection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ describe('server selection', function () {
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector();
const selector = secondaryWritableServerSelector(0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0 is essentially same as "no value", since we initially set commonWireVersion into 0.

const servers = selector(
topologyDescription,
Array.from(serverDescriptions.values()),
Expand Down Expand Up @@ -401,7 +401,7 @@ describe('server selection', function () {
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector();
const selector = secondaryWritableServerSelector(0);
const servers = selector(
topologyDescription,
Array.from(serverDescriptions.values()),
Expand Down Expand Up @@ -507,7 +507,7 @@ describe('server selection', function () {
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector();
const selector = secondaryWritableServerSelector(0);
const servers = selector(
topologyDescription,
Array.from(serverDescriptions.values()),
Expand Down Expand Up @@ -564,7 +564,7 @@ describe('server selection', function () {
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector();
const selector = secondaryWritableServerSelector(0);
const servers = selector(
topologyDescription,
Array.from(serverDescriptions.values()),
Expand All @@ -587,7 +587,7 @@ describe('server selection', function () {
MIN_SECONDARY_WRITE_WIRE_VERSION,
{ localThresholdMS: 5 }
);
const selector = secondaryWritableServerSelector();
const selector = secondaryWritableServerSelector(0);
const servers = selector(
topologyDescription,
Array.from(serverDescriptions.values()),
Expand Down
63 changes: 63 additions & 0 deletions test/unit/sdam/topology_description.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { expect } from 'chai';

import { TopologyType } from '../../../src/sdam/common';
import { ServerDescription } from '../../../src/sdam/server_description';
import { TopologyDescription } from '../../../src/sdam/topology_description';

describe('TopologyDescription', function () {
describe('#constructor', function () {
it('sets commonWireVersion to 0', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

expect(initial.commonWireVersion).to.equal(0);
});
});

describe('update()', function () {
it('initializes commonWireVersion from first non-zero maxWireVersion', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

const sd1 = new ServerDescription('a:27017', {
maxWireVersion: 25
});

const updated = initial.update(sd1);

expect(updated.commonWireVersion).to.equal(25);
});

it('tracks the minimum non-zero maxWireVersion across updates in commonWireVersion', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

const sd1 = new ServerDescription('a:27017', {
maxWireVersion: 25
});

const sd2 = new ServerDescription('b:27017', {
maxWireVersion: 21
});

let updated = initial.update(sd1);
updated = updated.update(sd2);

expect(updated.commonWireVersion).to.equal(21);
});

it('ignores servers with maxWireVersion === 0 when computing commonWireVersion', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

const sd1 = new ServerDescription('a:27017', {
maxWireVersion: 25
});

const sdUnknown = new ServerDescription('b:27017', {
maxWireVersion: 0
});

let updated = initial.update(sd1);
updated = updated.update(sdUnknown);

expect(updated.commonWireVersion).to.equal(25);
});
});
});