@@ -67,6 +67,7 @@ public class JDBCDataSource implements ExternalDataSource<JDBCTaskMetadata, JDBC
6767 private static final String timestampColumnsProp = "Timestamp Columns" ;
6868 private static final String readDelayProp = "Read Delay" ;
6969 private static final String fullLoadIntervalProp = "Full Load Interval" ;
70+ private static final String loadIntervalProp = "Incremental Load Interval" ;
7071 private static final String userNameProp = "User Name" ;
7172 private static final String passwordProp = "Password" ;
7273 private static final String keepSourceTypes = "Keep JDBC source types" ;
@@ -83,10 +84,12 @@ public class JDBCDataSource implements ExternalDataSource<JDBCTaskMetadata, JDBC
8384 new SimplePropertyDescription (timestampColumnsProp , "Comma separated list of timestamp columns to use for loading new rows. The fist non-null value will be used. At least one of the values must not be null for each row" , true ),
8485 new SimplePropertyDescription (readDelayProp , "How long (in seconds) to wait before reading rows based on their timestamp. This allows waiting for all transactions of a certain timestamp to complete to avoid loading partial data. Default value is 0" , true ),
8586 new SimplePropertyDescription (fullLoadIntervalProp , "If set the full table will be read every configured interval (in minutes). When this is configured the update time and incrementing columns are not used." , true ),
86- new SimplePropertyDescription (keepSourceTypes , "Keep original data types from source to use string representation" , true , false , null , null , null , true , Optional .of ("true" )));
87+ new SimplePropertyDescription (keepSourceTypes , "Keep original data types from source to use string representation" , true , false , null , null , null , true , Optional .of ("true" )),
88+ new SimplePropertyDescription (loadIntervalProp , "Configures how often (in minutes) the data source will poll the database for new changes" , true ));
8789
8890 private long readDelay ;
8991 private long fullLoadIntervalMinutes ;
92+ private long loadIntervalMinutes ;
9093 private TableInfo tableInfo ;
9194 private QueryDialect queryDialect ;
9295 private long dbTimezoneOffset ;
@@ -103,6 +106,10 @@ private boolean isFullLoad() {
103106 return fullLoadIntervalMinutes > 0 ;
104107 }
105108
109+ private boolean hasCustomLoadInterval () {
110+ return loadIntervalMinutes > 1 ;
111+ }
112+
106113 @ Override
107114 public DataSourceDescription getDataSourceDescription () {
108115 return new JDBCDataSourceDescription ();
@@ -146,6 +153,7 @@ public void setProperties(Map<String, String> properties) {
146153 try (Connection con = getConnection ()) {
147154 readDelay = Long .parseLong (properties .getOrDefault (readDelayProp , "0" ));
148155 fullLoadIntervalMinutes = Long .parseLong (properties .getOrDefault (fullLoadIntervalProp , "0" ));
156+ loadIntervalMinutes = Long .parseLong (properties .getOrDefault (loadIntervalProp , "1" ));
149157 DatabaseMetaData metadata = con .getMetaData ();
150158 String userProvidedIncColumn = properties .get (incrementingColumnNameProp );
151159 tableInfo = loadTableInfo (metadata , properties .getOrDefault (schemaPatternProp , null ), properties .get (tableNameProp ));
@@ -404,9 +412,10 @@ public CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> getDataLoaders(Ta
404412
405413 var taskCount = completedRanges .size () + wantedRanges .size ();
406414 var itemsPerTask = (taskInfo .getMetadata ().itemsPerTask (taskCount ));
407- var emptyFullLoad = isFullLoad () && wantedRanges .stream ().noneMatch (this ::matchesLoadInterval );
415+ var skipAll = hasCustomLoadInterval () && wantedRanges .stream ().noneMatch (this ::matchesLoadInterval );
416+ var emptyFullLoad = isFullLoad () && wantedRanges .stream ().noneMatch (this ::matchesFullLoadInterval );
408417 var noDataToLoad = !isFullLoad () && !tableInfo .hasTimeColumns () && itemsPerTask == 0 ;
409- if (emptyFullLoad || noDataToLoad ) {
418+ if (skipAll || emptyFullLoad || noDataToLoad ) {
410419 List <DataLoader <JDBCTaskMetadata >> result =
411420 wantedRanges .stream ().map (t -> new NoDataLoader (t , taskInfo .getMetadata ())).collect (Collectors .toList ());
412421 return CompletableFuture .completedFuture (result .iterator ());
@@ -423,8 +432,16 @@ public CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> getDataLoaders(Ta
423432 }
424433 }
425434
435+ private boolean matchesFullLoadInterval (TaskRange x ) {
436+ return getTimeInMinutes (x .getInclusiveStartTime ()) % fullLoadIntervalMinutes == 0 ;
437+ }
438+
426439 private boolean matchesLoadInterval (TaskRange x ) {
427- return x .getInclusiveStartTime ().getEpochSecond () / 60 % fullLoadIntervalMinutes == 0 ;
440+ return getTimeInMinutes (x .getInclusiveStartTime ()) % loadIntervalMinutes == 0 ;
441+ }
442+
443+ private Long getTimeInMinutes (Instant time ) {
444+ return time .getEpochSecond () / 60L ;
428445 }
429446
430447 private List <JDBCTaskMetadata > getRunMetadatas (TaskInformation <JDBCTaskMetadata > taskInfo ,
@@ -445,10 +462,18 @@ private List<JDBCTaskMetadata> getRunMetadatas(TaskInformation<JDBCTaskMetadata>
445462 // First task does not have lower bound to ensure we don't skip data from the last point we stopped at
446463 var startTime =
447464 firstInBatch ? taskInfo .getMetadata ().getStartTime () : wantedRange .getInclusiveStartTime ().minusSeconds (readDelay );
465+ var lowerBound = taskInfo .getMetadata ().getStartTime ().getEpochSecond ();
466+ var truncatedStartTime = Math .max (lowerBound , hasCustomLoadInterval () ?
467+ (getTimeInMinutes (startTime ) / loadIntervalMinutes * 60 * loadIntervalMinutes )
468+ : startTime .getEpochSecond ());
469+ var endTime = wantedRange .getExclusiveEndTime ().minusSeconds (readDelay );
470+ var truncatedEndTime = Math .max (lowerBound ,hasCustomLoadInterval () ?
471+ getTimeInMinutes (endTime ) / loadIntervalMinutes * 60 * loadIntervalMinutes
472+ : endTime .getEpochSecond ());
448473 var metadata = new JDBCTaskMetadata (taskInfo .getMetadata ().getInclusiveStart (),
449474 taskInfo .getMetadata ().getExclusiveEnd (),
450- startTime ,
451- wantedRange . getExclusiveEndTime (). minusSeconds ( readDelay ));
475+ Instant . ofEpochSecond ( truncatedStartTime ) ,
476+ Instant . ofEpochSecond ( truncatedEndTime ));
452477 result .add (metadata );
453478 }
454479 } else {
@@ -486,43 +511,58 @@ private CompletionStage<Iterator<DataLoader<JDBCTaskMetadata>>> splitData(Result
486511 final var isLast = i == wantedRanges .size () - 1 ;
487512 final var taskRange = wantedRanges .get (i );
488513 final var metadata = runMetadatas .get (i );
489- var loader = new DataLoader <JDBCTaskMetadata >() {
490- @ Override
491- public TaskRange getTaskRange () {
492- return taskRange ;
493- }
494-
495- private final RowReader rowReader = new RowReader (tableInfo , valueGetter , metadata , connection , isFullLoad () && matchesLoadInterval (taskRange ));
496-
497- @ Override
498- public Iterator <LoadedData > loadData () {
499- ResultSetInputStream inputStream = new ResultSetInputStream (rowConverter , rowReader , isLast );
500- var result = new LoadedData (inputStream , new HashMap <>(), taskRange .getInclusiveStartTime ());
501- return Collections .singleton (result ).iterator ();
502- }
503-
504- @ Override
505- public JDBCTaskMetadata getCompletedMetadata () {
506- if (tableInfo .hasTimeColumns () && rowReader .readValues ()) {
507- if (rowReader .readValues ()) {
508- // If some data was successfully read then that's our next start point
509- lastReadTime .set (toUtc (rowReader .getLastTimestampValue ().toInstant ()));
510- lastReadIncValue .set (rowReader .getLastIncValue ());
511- }
512- metadata .setExclusiveEnd (lastReadIncValue .get () + 1 );
513- metadata .setEndTime (lastReadTime .get ());
514+ DataLoader <JDBCTaskMetadata > loader = null ;
515+ if (matchesLoadInterval (taskRange )){
516+ loader = getLoader (connection ,
517+ lastReadIncValue ,
518+ lastReadTime ,
519+ valueGetter ,
520+ isLast ,
521+ taskRange ,
522+ metadata );
523+ } else {
524+ loader = new NoDataLoader (taskRange , metadata );
514525 }
515-
516- return metadata ;
517- }
518- };
519526 result .add (loader );
520527
521528 }
522529 return CompletableFuture .completedFuture (result .iterator ());
523530
524531 }
525532
533+ private DataLoader <JDBCTaskMetadata > getLoader (Connection connection , AtomicReference <Long > lastReadIncValue , AtomicReference <Instant > lastReadTime , ResultSetValuesGetter valueGetter , boolean isLast , TaskRange taskRange , JDBCTaskMetadata metadata ) {
534+ return new DataLoader <JDBCTaskMetadata >() {
535+ @ Override
536+ public TaskRange getTaskRange () {
537+ return taskRange ;
538+ }
539+
540+ private final RowReader rowReader = new RowReader (tableInfo , valueGetter , metadata , connection , isFullLoad () && matchesFullLoadInterval (taskRange ));
541+
542+ @ Override
543+ public Iterator <LoadedData > loadData () {
544+ ResultSetInputStream inputStream = new ResultSetInputStream (rowConverter , rowReader , isLast );
545+ var result = new LoadedData (inputStream , new HashMap <>(), taskRange .getInclusiveStartTime ());
546+ return Collections .singleton (result ).iterator ();
547+ }
548+
549+ @ Override
550+ public JDBCTaskMetadata getCompletedMetadata () {
551+ if (tableInfo .hasTimeColumns () && rowReader .readValues ()) {
552+ if (rowReader .readValues ()) {
553+ // If some data was successfully read then that's our next start point
554+ lastReadTime .set (toUtc (rowReader .getLastTimestampValue ().toInstant ()));
555+ lastReadIncValue .set (rowReader .getLastIncValue ());
556+ }
557+ metadata .setExclusiveEnd (lastReadIncValue .get () + 1 );
558+ metadata .setEndTime (lastReadTime .get ());
559+ }
560+
561+ return metadata ;
562+ }
563+ };
564+ }
565+
526566
527567 @ Override
528568 public CompletionStage <TaskInformation <JDBCTaskMetadata >> getTaskInfo (JDBCTaskMetadata previousTaskMetadata ,
0 commit comments