Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ const [primary, secondary] = await synapse.storage.createContexts({
})
const pieceCid = null as unknown as PieceCID;
// ---cut---
const response = await SP.waitForPullStatus(client, {
const response = await SP.waitForPullPieces(client, {
serviceURL: secondary.provider.pdp.serviceURL,
pieces: [{
pieceCid,
Expand Down
1 change: 1 addition & 0 deletions packages/synapse-core/src/errors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ export * from './pay.ts'
export * from './pdp.ts'
export * from './piece.ts'
export * from './pull.ts'
export * from './warm-storage.ts'
4 changes: 2 additions & 2 deletions packages/synapse-core/src/errors/pdp.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { AddPiecesRejected } from '../sp/wait-for-add-pieces.ts'
import type { CreateDataSetRejected } from '../sp/wait-for-create-dataset.ts'
import type { AddPiecesRejected } from '../sp/add-pieces.ts'
import type { CreateDataSetRejected } from '../sp/create-dataset.ts'
import { SIZE_CONSTANTS } from '../utils/constants.ts'
import { decodePDPError } from '../utils/decode-pdp-errors.ts'
import { isSynapseError, SynapseError } from './base.ts'
Expand Down
27 changes: 14 additions & 13 deletions packages/synapse-core/src/mocks/pdp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import assert from 'assert'
import { HttpResponse, http } from 'msw'
import { decodeAbiParameters, type Hex } from 'viem'
import type { addPieces, PullPieceInput, PullResponse, PullStatus } from '../sp/sp.ts'
import type { addPiecesApiRequest } from '../sp/add-pieces.ts'
import type { pullPiecesApiRequest } from '../sp/pull-pieces.ts'

export interface PDPMockOptions {
baseUrl?: string
Expand Down Expand Up @@ -332,7 +333,7 @@ export function addPiecesWithMetadataCapture(
) {
const baseUrl = options.baseUrl ?? 'https://pdp.example.com'

return http.post<{ id: string }, addPieces.RequestBody>(
return http.post<{ id: string }, addPiecesApiRequest.RequestBody>(
`${baseUrl}/pdp/data-sets/:id/pieces`,
async ({ params, request }) => {
if (params.id !== dataSetId.toString()) {
Expand Down Expand Up @@ -374,13 +375,13 @@ export interface PullRequestCapture {
extraData: string
recordKeeper: string
dataSetId?: number
pieces: PullPieceInput[]
pieces: pullPiecesApiRequest.PullPieceInput[]
}

/**
* Creates a handler for the pull pieces endpoint that returns a fixed response
*/
export function pullPiecesHandler(response: PullResponse, options: PDPMockOptions = {}) {
export function pullPiecesHandler(response: pullPiecesApiRequest.ReturnType, options: PDPMockOptions = {}) {
const baseUrl = options.baseUrl ?? 'http://pdp.local'

return http.post(`${baseUrl}/pdp/piece/pull`, async () => {
Expand All @@ -395,7 +396,7 @@ export function pullPiecesHandler(response: PullResponse, options: PDPMockOption
* Creates a handler that captures the request body and returns a response
*/
export function pullPiecesWithCaptureHandler(
response: PullResponse,
response: pullPiecesApiRequest.ReturnType,
captureCallback: (request: PullRequestCapture) => void,
options: PDPMockOptions = {}
) {
Expand Down Expand Up @@ -434,7 +435,7 @@ export function pullPiecesErrorHandler(errorMessage: string, statusCode = 500, o
*/
export function pullPiecesPollingHandler(
pendingCount: number,
finalResponse: PullResponse,
finalResponse: pullPiecesApiRequest.ReturnType,
options: PDPMockOptions = {}
) {
const baseUrl = options.baseUrl ?? 'http://pdp.local'
Expand All @@ -448,11 +449,11 @@ export function pullPiecesPollingHandler(
}

if (callCount <= pendingCount) {
const pendingResponse: PullResponse = {
const pendingResponse: pullPiecesApiRequest.ReturnType = {
status: 'pending',
pieces: finalResponse.pieces.map((p) => ({
pieceCid: p.pieceCid,
status: 'pending' as PullStatus,
status: 'pending' as pullPiecesApiRequest.PullStatus,
})),
}
return HttpResponse.json(pendingResponse, { status: 200 })
Expand All @@ -466,7 +467,7 @@ export function pullPiecesPollingHandler(
* Creates a handler that simulates a progression through statuses
*/
export function pullPiecesProgressionHandler(
statusProgression: PullStatus[],
statusProgression: pullPiecesApiRequest.PullStatus[],
pieces: Array<{ pieceCid: string }>,
options: PDPMockOptions = {}
) {
Expand All @@ -482,7 +483,7 @@ export function pullPiecesProgressionHandler(
console.debug(`SP Pull Mock: returning status ${currentStatus} (call ${callCount})`)
}

const response: PullResponse = {
const response: pullPiecesApiRequest.ReturnType = {
status: currentStatus,
pieces: pieces.map((p) => ({
pieceCid: p.pieceCid,
Expand All @@ -498,9 +499,9 @@ export function pullPiecesProgressionHandler(
* Helper to create a complete PullResponse
*/
export function createPullResponse(
status: PullStatus,
pieces: Array<{ pieceCid: string; status?: PullStatus }>
): PullResponse {
status: pullPiecesApiRequest.PullStatus,
pieces: Array<{ pieceCid: string; status?: pullPiecesApiRequest.PullStatus }>
): pullPiecesApiRequest.ReturnType {
return {
status,
pieces: pieces.map((p) => ({
Expand Down
2 changes: 1 addition & 1 deletion packages/synapse-core/src/piece/resolve-piece-url.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import pLocate from 'p-locate'
import pSome from 'p-some'
import type { Address, Chain, Client, Transport } from 'viem'
import { asChain } from '../chains.ts'
import { findPiece } from '../sp/sp.ts'
import { findPiece } from '../sp/find-piece.ts'
import type { PDPProvider } from '../sp-registry/types.ts'
import { createPieceUrlPDP } from '../utils/piece-url.ts'
import { getPdpDataSets } from '../warm-storage/get-pdp-data-sets.ts'
Expand Down
2 changes: 1 addition & 1 deletion packages/synapse-core/src/session-key/secp256k1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ interface Secp256k1SessionKeyOptions {
/**
* Secp256k1SessionKey - A session key for a secp256k1 private key.
*/
class Secp256k1SessionKey extends TypedEventTarget<SessionKeyEvents> implements SessionKey<'Secp256k1'> {
export class Secp256k1SessionKey extends TypedEventTarget<SessionKeyEvents> implements SessionKey<'Secp256k1'> {
#client: Client<Transport, SynapseChain, SessionKeyAccount<'Secp256k1'>>
#type: 'Secp256k1'
#expirations: Expirations
Expand Down
182 changes: 177 additions & 5 deletions packages/synapse-core/src/sp/add-pieces.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,87 @@
import type { Account, Chain, Client, Hex, Transport } from 'viem'
import { type AbortError, HttpError, type NetworkError, request, type TimeoutError } from 'iso-web/http'
import type { ToString } from 'multiformats'
import { type Account, type Chain, type Client, type Hex, isHex, type Transport } from 'viem'
import * as z from 'zod'
import { AddPiecesError, LocationHeaderError } from '../errors/index.ts'
import { WaitForAddPiecesError, WaitForAddPiecesRejectedError } from '../errors/pdp.ts'
import { AtLeastOnePieceRequiredError } from '../errors/warm-storage.ts'
import type { PieceCID } from '../piece/piece.ts'
import { signAddPieces } from '../typed-data/sign-add-pieces.ts'
import { RETRY_CONSTANTS } from '../utils/constants.ts'
import { type MetadataObject, pieceMetadataObjectToEntry } from '../utils/metadata.ts'
import * as PDP from './sp.ts'
import { zHex, zNumberToBigInt } from '../utils/schemas.ts'

export namespace addPiecesApiRequest {
export type OptionsType = {
/** The service URL of the PDP API. */
serviceURL: string
/** The ID of the data set. */
dataSetId: bigint
/** The pieces to add. */
pieces: PieceCID[]
/** The extra data for the add pieces. {@link TypedData.signAddPieces} */
extraData: Hex
}
export type OutputType = {
/** The transaction hash. */
txHash: Hex
/** The status URL. */
statusUrl: string
}
export type ErrorType = AddPiecesError | LocationHeaderError | TimeoutError | NetworkError | AbortError
export type RequestBody = {
pieces: {
pieceCid: ToString<PieceCID>
subPieces: { subPieceCid: ToString<PieceCID> }[]
}[]
extraData: Hex
}
}

/**
* Add pieces to a data set on the PDP API.
*
* POST /pdp/data-sets/{dataSetId}/pieces
*
* @param options - {@link addPiecesApiRequest.OptionsType}
* @returns Hash and status URL {@link addPiecesApiRequest.OutputType}
* @throws Errors {@link addPiecesApiRequest.ErrorType}
*/
export async function addPiecesApiRequest(
options: addPiecesApiRequest.OptionsType
): Promise<addPiecesApiRequest.OutputType> {
const { serviceURL, dataSetId, pieces, extraData } = options
const response = await request.post(new URL(`pdp/data-sets/${dataSetId}/pieces`, serviceURL), {
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
pieces: pieces.map((piece) => ({
pieceCid: piece.toString(),
subPieces: [{ subPieceCid: piece.toString() }],
})),
extraData: extraData,
}),
timeout: RETRY_CONSTANTS.MAX_RETRY_TIME,
})

if (response.error) {
if (HttpError.is(response.error)) {
throw new AddPiecesError(await response.error.response.text())
}
throw response.error
}
const location = response.result.headers.get('Location')
const txHash = location?.split('/').pop()
if (!location || !txHash || !isHex(txHash)) {
throw new LocationHeaderError(location)
}

return {
txHash: txHash as Hex,
statusUrl: new URL(location, serviceURL).toString(),
}
}

export namespace addPieces {
export type PieceType = {
Expand All @@ -25,8 +103,8 @@ export namespace addPieces {
extraData?: Hex
}

export type OutputType = PDP.addPieces.OutputType
export type ErrorType = PDP.addPieces.ErrorType
export type OutputType = addPiecesApiRequest.OutputType
export type ErrorType = addPiecesApiRequest.ErrorType | signAddPieces.ErrorType
}

/**
Expand Down Expand Up @@ -56,10 +134,104 @@ export async function addPieces(
metadata: pieceMetadataObjectToEntry(piece.metadata),
})),
}))
return PDP.addPieces({
return addPiecesApiRequest({
serviceURL: options.serviceURL,
dataSetId: options.dataSetId,
pieces: options.pieces.map((piece) => piece.pieceCid),
extraData,
})
}

export const AddPiecesPendingSchema = z.object({
txHash: zHex,
txStatus: z.literal('pending'),
dataSetId: zNumberToBigInt,
pieceCount: z.number(),
addMessageOk: z.null(),
piecesAdded: z.literal(false),
})

export const AddPiecesRejectedSchema = z.object({
txHash: zHex,
txStatus: z.literal('rejected'),
dataSetId: zNumberToBigInt,
pieceCount: z.number(),
addMessageOk: z.null(),
piecesAdded: z.literal(false),
})

export const AddPiecesSuccessSchema = z.object({
txHash: zHex,
txStatus: z.literal('confirmed'),
dataSetId: zNumberToBigInt,
pieceCount: z.number(),
addMessageOk: z.literal(true),
piecesAdded: z.literal(true),
confirmedPieceIds: z.array(zNumberToBigInt),
})

export type AddPiecesPending = z.infer<typeof AddPiecesPendingSchema>
export type AddPiecesRejected = z.infer<typeof AddPiecesRejectedSchema>
export type AddPiecesSuccess = z.infer<typeof AddPiecesSuccessSchema>
export type AddPiecesResponse = AddPiecesRejected | AddPiecesSuccess | AddPiecesPending
export type AddPiecesOutput = AddPiecesSuccess

const schema = z.discriminatedUnion('txStatus', [AddPiecesRejectedSchema, AddPiecesSuccessSchema])

export namespace waitForAddPieces {
export type OptionsType = {
/** The status URL to poll. */
statusUrl: string
/** The timeout in milliseconds. Defaults to 5 minutes. */
timeout?: number
/** The polling interval in milliseconds. Defaults to 4 seconds. */
pollInterval?: number
}
export type OutputType = AddPiecesOutput
export type ErrorType =
| WaitForAddPiecesError
| WaitForAddPiecesRejectedError
| TimeoutError
| NetworkError
| AbortError
}

/**
* Wait for the add pieces status.
*
* GET /pdp/data-sets/{dataSetId}/pieces/added/{txHash}
*
* @param options - {@link waitForAddPieces.OptionsType}
* @returns Status {@link waitForAddPieces.OutputType}
* @throws Errors {@link waitForAddPieces.ErrorType}
*/
export async function waitForAddPieces(options: waitForAddPieces.OptionsType): Promise<waitForAddPieces.OutputType> {
const response = await request.json.get<AddPiecesResponse>(options.statusUrl, {
async onResponse(response) {
if (response.ok) {
const data = (await response.clone().json()) as AddPiecesResponse
if (data.piecesAdded === false) {
throw new Error('Still pending')
}
}
},
retry: {
shouldRetry: (ctx) => ctx.error.message === 'Still pending',
retries: RETRY_CONSTANTS.RETRIES,
factor: RETRY_CONSTANTS.FACTOR,
minTimeout: options.pollInterval ?? RETRY_CONSTANTS.DELAY_TIME,
},
timeout: options.timeout ?? RETRY_CONSTANTS.MAX_RETRY_TIME,
})
if (response.error) {
if (HttpError.is(response.error)) {
throw new WaitForAddPiecesError(await response.error.response.text())
}
throw response.error
}
const data = schema.parse(response.result)
if (data.txStatus === 'rejected') {
throw new WaitForAddPiecesRejectedError(data)
}
return data
}
Loading
Loading