# Removing Flink Table API legacy
Incomplete plan to get rid of the legacy in Table API, including:
* Old `Factory` stack
* Old String expression stack
* Old functions stack
* Old type system
* Legacy rules
## A couple of notes first
* When removing legacy in tests, make sure the test still makes sense after removing legacy. We have a lot of test utilities that are used only by tests effectively testing the old stack, and tests checking new stack might already have a similar utility implemented.
* When porting test code, make sure to:
* Don't rely on planner internals
* Use JUnit 5/AssertJ
* Move it to `flink-table-tests` module whenever is possible
## Peeling the [onion’s layers](https://www.youtube.com/watch?v=aJQmVZSAqlc)
### 1. Remove all implementations of `TableSink` and `TableSource`
```
Implementations of TableSink in (25 usages found)
Production (11 usages found)
flink-connector-cassandra_2.12 (1 usage found)
org.apache.flink.streaming.connectors.cassandra (1 usage found)
CassandraAppendTableSink.java (1 usage found)
33 public class CassandraAppendTableSink implements AppendStreamTableSink<Row> {
flink-table-api-java-bridge (6 usages found)
org.apache.flink.table.sinks (6 usages found)
AppendStreamTableSink.java (1 usage found)
39 public interface AppendStreamTableSink<T> extends StreamTableSink<T> {}
CsvTableSink.java (1 usage found)
44 public class CsvTableSink implements AppendStreamTableSink<Row> {
OutputFormatTableSink.java (1 usage found)
38 public abstract class OutputFormatTableSink<T> implements StreamTableSink<T> {
RetractStreamTableSink.java (1 usage found)
47 public interface RetractStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> {
StreamTableSink.java (1 usage found)
33 public interface StreamTableSink<T> extends TableSink<T> {
UpsertStreamTableSink.java (1 usage found)
58 public interface UpsertStreamTableSink<T> extends StreamTableSink<Tuple2<Boolean, T>> {
flink-table-common (1 usage found)
org.apache.flink.table.sinks (1 usage found)
TableSinkBase.java (1 usage found)
36 public abstract class TableSinkBase<T> implements TableSink<T> {
flink-table-planner_2.12 (3 usages found)
org.apache.flink.table.planner.sinks (3 usages found)
CollectTableSink.scala (2 usages found)
34 class CollectTableSink[T](produceOutputType: (Array[TypeInformation[_]] => TypeInformation[T]))
80 class CollectRowTableSink extends CollectTableSink[Row](new RowTypeInfo(_: _*))
DataStreamTableSink.scala (1 usage found)
36 class DataStreamTableSink[T](
Test (14 usages found)
flink-python_2.12 (4 usages found)
org.apache.flink.table.legacyutils (4 usages found)
legacyTestingSinks.scala (3 usages found)
39 private[flink] class TestAppendSink extends AppendStreamTableSink[Row] {
69 private[flink] class TestRetractSink extends RetractStreamTableSink[Row] {
95 private[flink] class TestUpsertSink(
TestCollectionTableFactory.scala (1 usage found)
168 class CollectionTableSink(val outputType: RowTypeInfo)
flink-table-common (1 usage found)
org.apache.flink.table.utils (1 usage found)
TypeMappingUtilsTest.java (1 usage found)
419 private static class TestTableSink implements TableSink<Tuple2<Boolean, Row>> {
flink-table-planner_2.12 (9 usages found)
org.apache.flink.table.planner.factories.utils (1 usage found)
TestCollectionTableFactory.scala (1 usage found)
152 class CollectionTableSink(val schema: TableSchema)
org.apache.flink.table.planner.runtime.batch.sql (1 usage found)
PartitionableSinkITCase.scala (1 usage found)
331 private class TestSink(
org.apache.flink.table.planner.runtime.utils (3 usages found)
StreamTestSink.scala (3 usages found)
265 final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone)
344 final class TestingAppendTableSink(tz: TimeZone) extends AppendStreamTableSink[Row] {
499 final class TestingRetractTableSink(tz: TimeZone) extends RetractStreamTableSink[Row] {
org.apache.flink.table.planner.utils (4 usages found)
MemoryTableSourceSinkUtil.scala (3 usages found)
80 final class UnsafeMemoryAppendTableSink
146 final class DataTypeOutputFormatTableSink(
181 final class DataTypeAppendStreamTableSink(
testTableSourceSinks.scala (1 usage found)
1317 class OptionsTableSink(
```
```
Implementations of TableSource in (31 usages found)
Production (8 usages found)
flink-batch-sql-test (1 usage found)
org.apache.flink.sql.tests (1 usage found)
BatchSQLTestProgram.java (1 usage found)
79 public static class GeneratorTableSource extends InputFormatTableSource<Row> {
flink-python_2.12 (2 usages found)
org.apache.flink.table.runtime.arrow.sources (1 usage found)
ArrowTableSource.java (1 usage found)
32 public class ArrowTableSource implements StreamTableSource<RowData> {
org.apache.flink.table.utils.python (1 usage found)
PythonInputFormatTableSource.java (1 usage found)
35 public class PythonInputFormatTableSource extends InputFormatTableSource<Row> {
flink-stream-sql-test_2.12 (1 usage found)
org.apache.flink.sql.tests (1 usage found)
StreamSQLTestProgram.java (1 usage found)
192 public static class GeneratorTableSource
flink-table-api-java-bridge (3 usages found)
org.apache.flink.table.sources (3 usages found)
CsvTableSource.java (1 usage found)
62 public class CsvTableSource
InputFormatTableSource.java (1 usage found)
39 public abstract class InputFormatTableSource<T> implements StreamTableSource<T> {
StreamTableSource.java (1 usage found)
33 public interface StreamTableSource<T> extends TableSource<T> {
flink-table-common (1 usage found)
org.apache.flink.table.sources (1 usage found)
LookupableTableSource.java (1 usage found)
38 public interface LookupableTableSource<T> extends TableSource<T> {
Test (23 usages found)
flink-python_2.12 (1 usage found)
org.apache.flink.table.legacyutils (1 usage found)
TestCollectionTableFactory.scala (1 usage found)
127 class CollectionTableSource(
flink-sql-client_2.12 (1 usage found)
org.apache.flink.table.client.gateway.utils (1 usage found)
SimpleCatalogFactory.java (1 usage found)
95 new StreamTableSource<Row>() {
flink-table-api-java (1 usage found)
org.apache.flink.table.utils (1 usage found)
TableSourceMock.java (1 usage found)
27 public class TableSourceMock implements TableSource<Row> {
flink-table-common (1 usage found)
org.apache.flink.table.utils (1 usage found)
TypeMappingUtilsTest.java (1 usage found)
375 private static class TestTableSource
flink-table-planner_2.12 (19 usages found)
org.apache.flink.table.planner.factories.utils (1 usage found)
TestCollectionTableFactory.scala (1 usage found)
115 class CollectionTableSource(
org.apache.flink.table.planner.plan.stream.sql.join (2 usages found)
LookupJoinTest.scala (2 usages found)
601 class TestTemporalTable(bounded: Boolean = false)
667 class TestInvalidTemporalTable private(
org.apache.flink.table.planner.runtime.utils (1 usage found)
InMemoryLookupableTableSource.scala (1 usage found)
50 class InMemoryLookupableTableSource(
org.apache.flink.table.planner.utils (15 usages found)
TableTestBase.scala (1 usage found)
1372 class TestTableSource(override val isBounded: Boolean, schema: TableSchema)
TestLegacyLimitableTableSource.scala (1 usage found)
44 class TestLegacyLimitableTableSource(
testTableSourceSinks.scala (13 usages found)
205 class TestTableSourceWithTime[T](
316 class TestPreserveWMTableSource[T](
344 class TestLegacyProjectableTableSource(
407 class TestNestedProjectableTableSource(
504 class TestLegacyFilterableTableSource(
731 class TestInputFormatTableSource[T](
786 class TestDataTypeTableSource(
847 class TestDataTypeTableSourceWithTime(
926 class TestStreamTableSource(
977 class TestFileInputFormatTableSource(
1038 class TestPartitionableTableSource(
1209 class WithoutTimeAttributesTableSource(bounded: Boolean) extends StreamTableSource[Row] {
1297 class OptionsTableSource(
```
This task can be further splitted in:
1. Remove all usages of production implementations of `TableSink` and `TableSource`. There should be already implementations replacing the legacy ones, except for:
1. `CassandraAppendTableSink`. Discuss with cassandra maintainers.
1. `ArrowTableSource` and related `PythonInputFormatTableSource`. Discuss with python maintainers.
1. Identify which test implementations are still required, and port them to `DynamicTableSourceFactory` and `DynamicTableSinkFactory`. Note that some of the test utilities are already provided in the new stack, e.g. `CollectionTableSource` and `GeneratorTableSource` sounds like overcomplicated mocks of connectors we already support natively in the new stack
1. Remove all usages of test implementations of `TableSink` and `TableSource`. This will involve removing entire tests, e.g. `LegacyTableSinkITCase` which is merely testing the old stack.
1. Cleanup remaining implementations of `TableSink` and `TableSource`.
I suggest in step 2 to port the old test implementations of `TableSink`/`TableSource` without removing the old ones, in order to simplify the execution of step 3. Then, in step 4, all the code now not referenced anymore, can be removed.
### 2. Remove `TableSink` and `TableSource`
Remove `TableSink`, `TableSource` and `TableFormatFactory` and cleanup all the code relying on these interfaces until the project compiles.
On the road, you should hit the following things to cleanup:
* Some user facing APIs, notably:
* `Table#insertInto`
* `TableEnvironment#insertInto`
* `TableEnvironment#fromTableSource`
* Old `Operation` nodes, e.g. `TableSourceQueryOperation`, `UnregisteredSinkModifyOperation`
* Some code here and there in the sql-to-rel conversion
* Old conversion rules, e.g. `BatchPhysicalLegacySinkRule`, `PushPartitionIntoLegacyTableSourceScanRule`.
### 3. Remove `TableFactory`
Remove `TableFactory` and cleanup all the code relying on it until the project compiles.
Some interfaces are implementing `TableFactory` for compatibility reason but should not be cleaned up, e.g. `CatalogFactory`, `ModuleFactory`.
This involves also cleaning up `TableFactoryUtil` and `TableFactoryService`.
### 4. Old functions stack
~~* Update Scala table and aggregate functions implicits to new stack.~~
* Update Python table and aggregate functions to new stack. Discuss with python maintainers.
* Update Hive functions to the new stack. Discuss with Hive maintainers.
* Remove old stack i.e. `TableFunctionDefinition`, `TableEnvironment.registerFunction`
### 5. ~~Remove String Scala Expression stack~~
This includes all the Table API methods that take strings, e.g. `Table.select(String)` or `fromDataStream(DataStream<T>, String)`.
This task includes:
* Reimplement the codegen tests with the new expression stack
* Python API methods that take expression strings. Discuss with python maintainers.
* Remove `ExpressionParser` entrypoint interface, its usages and its implementation.
### 6. Remove `PlannerExpression` tree
List of classes:
```
Subclasses of PlannerExpression in (56 usages found)
Production (56 usages found)
Unclassified (56 usages found)
flink-table-planner_2.12 (56 usages found)
org.apache.flink.table.planner.expressions (56 usages found)
aggregations.scala (5 usages found)
32 abstract sealed class Aggregation extends PlannerExpression {
43 case class ApiResolvedAggregateCallExpression(
55 case class DistinctAgg(child: PlannerExpression) extends Aggregation {
79 case class Collect(child: PlannerExpression) extends Aggregation {
92 case class AggFunctionCall(
call.scala (6 usages found)
38 case class ApiResolvedExpression(resolvedDataType: DataType)
51 case class UnresolvedOverCall(agg: PlannerExpression, alias: PlannerExpression)
71 case class OverCall(
166 case class PlannerScalarFunctionCall(
206 case class PlannerTableFunctionCall(
234 case class ThrowException(msg: PlannerExpression, tp: TypeInformation[_]) extends UnaryExpression {
collection.scala (3 usages found)
27 case class ArrayElement(array: PlannerExpression) extends PlannerExpression {
47 case class Cardinality(container: PlannerExpression) extends PlannerExpression {
64 case class ItemAt(container: PlannerExpression, key: PlannerExpression) extends PlannerExpression {
fieldExpression.scala (12 usages found)
32 trait NamedExpression extends PlannerExpression {
37 abstract class Attribute extends LeafExpression with NamedExpression {
46 case class RexPlannerExpression(
54 case class UnresolvedFieldReference(name: String) extends Attribute {
68 case class PlannerResolvedFieldReference(
83 case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = None) extends Attribute {
99 case class TableReference(name: String, tableOperation: QueryOperation)
112 abstract class TimeAttribute(val expression: PlannerExpression)
119 case class RowtimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr) {
159 case class ProctimeAttribute(expr: PlannerExpression) extends TimeAttribute(expr) {
184 case class StreamRecordTimestamp() extends LeafExpression {
194 case class PlannerLocalReference(
InputTypeSpec.scala (1 usage found)
29 trait InputTypeSpec extends PlannerExpression {
literals.scala (2 usages found)
51 case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpression {
80 case class Null(resultType: TypeInformation[_]) extends LeafExpression {
ordering.scala (3 usages found)
23 abstract class Ordering extends UnaryExpression {
33 case class Asc(child: PlannerExpression) extends Ordering {
39 case class Desc(child: PlannerExpression) extends Ordering {
overOffsets.scala (4 usages found)
24 case class CurrentRow() extends PlannerExpression {
32 case class CurrentRange() extends PlannerExpression {
40 case class UnboundedRow() extends PlannerExpression {
48 case class UnboundedRange() extends PlannerExpression {
PlannerExpression.scala (3 usages found)
74 abstract class BinaryExpression extends PlannerExpression {
80 abstract class UnaryExpression extends PlannerExpression {
85 abstract class LeafExpression extends PlannerExpression {
Reinterpret.scala (1 usage found)
26 case class Reinterpret(child: PlannerExpression, resultType: TypeInformation[_],
subquery.scala (1 usage found)
29 case class In(expression: PlannerExpression, elements: Seq[PlannerExpression])
symbols.scala (1 usage found)
31 case class SymbolPlannerExpression(symbol: PlannerSymbol) extends LeafExpression {
time.scala (11 usages found)
32 case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpression)
80 abstract class CurrentTimePoint(
106 case class CurrentDate() extends CurrentTimePoint(SqlTimeTypeInfo.DATE, local = false)
108 case class CurrentTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = false)
110 case class CurrentTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = false)
112 case class LocalTime() extends CurrentTimePoint(SqlTimeTypeInfo.TIME, local = true)
114 case class LocalTimestamp() extends CurrentTimePoint(SqlTimeTypeInfo.TIMESTAMP, local = true)
119 case class TemporalOverlaps(
214 case class DateFormat(timestamp: PlannerExpression, format: PlannerExpression)
223 case class TimestampDiff(
275 case class ToTimestampLtz(
windowProperties.scala (3 usages found)
33 abstract class AbstractWindowProperty(child: PlannerExpression)
49 case class WindowStart(child: PlannerExpression) extends AbstractWindowProperty(child) {
56 case class WindowEnd(child: PlannerExpression) extends AbstractWindowProperty(child) {
```
This task includes:
* Convert functions still creating these `PlannerExpression` nodes to new type inference, like `Extract`, `CurrentTimestamp`, `ArrayElement`, ...
* Remove `PlannerExpressionConverter`.
### 7. Pre-FLIP-136 DataStream conversions
* toAppendStream/toRetractStream/fromDataStream(..., Expression)