diff --git a/docs/tutorials/pdns.md b/docs/tutorials/pdns.md
index 33b43f3bb0337227a741bdc0d8efa058bfb11dc8..b04ccfee0aa49a65fa47107de89bac06d68329cf 100644
--- a/docs/tutorials/pdns.md
+++ b/docs/tutorials/pdns.md
@@ -15,8 +15,7 @@ anyway.
 
 The PDNS provider currently does not support:
 
-1. Dry running a configuration is not supported.
-2. The `--domain-filter` flag is not supported.
+* Dry running a configuration is not supported
 
 ## Deployment
 
@@ -47,10 +46,18 @@ spec:
         - --pdns-server={{ pdns-api-url }}
         - --pdns-api-key={{ pdns-http-api-key }}
         - --txt-owner-id={{ owner-id-for-this-external-dns }}
+        - --domain-filter=external-dns-test.my-org.com # will make ExternalDNS see only the zones matching provided domain; omit to process all available zones in PowerDNS
         - --log-level=debug
         - --interval=30s
 ```
 
+#### Domain Filter (--domain-filter)
+When the domain-filter argument is specified, external-dns will automatically create DNS records based on host names specified in ingress objects and services with the external-dns annotation that match the domain-filter argument in the external-dns deployment manifest.
+
+eg. ```--domain-filter=example.org``` will allow for zone `example.org` and any zones in PowerDNS that ends in `.example.org`, including `an.example.org`, ie. the subdomains of example.org.
+
+eg. ```--domain-filter=.example.org``` will allow *only* zones that end in `.example.org`, ie. the subdomains of example.org but not the `example.org` zone itself.
+
 ## RBAC
 
 If your cluster is RBAC enabled, you also need to setup the following, before you can run external-dns:
diff --git a/provider/pdns.go b/provider/pdns.go
index 862dab902e04eecb4874991c6710079febba7f53..7571c779a2e64cf354df11cf1c7ad741ab8b21b7 100644
--- a/provider/pdns.go
+++ b/provider/pdns.go
@@ -132,15 +132,17 @@ func stringifyHTTPResponseBody(r *http.Response) (body string) {
 // well as mock APIClients used in testing
 type PDNSAPIProvider interface {
 	ListZones() ([]pgo.Zone, *http.Response, error)
+	PartitionZones(zones []pgo.Zone) ([]pgo.Zone, []pgo.Zone)
 	ListZone(zoneID string) (pgo.Zone, *http.Response, error)
 	PatchZone(zoneID string, zoneStruct pgo.Zone) (*http.Response, error)
 }
 
 // PDNSAPIClient : Struct that encapsulates all the PowerDNS specific implementation details
 type PDNSAPIClient struct {
-	dryRun  bool
-	authCtx context.Context
-	client  *pgo.APIClient
+	dryRun       bool
+	authCtx      context.Context
+	client       *pgo.APIClient
+	domainFilter DomainFilter
 }
 
 // ListZones : Method returns all enabled zones from PowerDNS
@@ -153,7 +155,6 @@ func (c *PDNSAPIClient) ListZones() (zones []pgo.Zone, resp *http.Response, err
 			log.Debugf("Retrying ListZones() ... %d", i)
 			time.Sleep(retryAfterTime * (1 << uint(i)))
 			continue
-
 		}
 		return zones, resp, err
 	}
@@ -163,6 +164,22 @@ func (c *PDNSAPIClient) ListZones() (zones []pgo.Zone, resp *http.Response, err
 
 }
 
+// PartitionZones : Method returns a slice of zones that adhere to the domain filter and a slice of ones that does not adhere to the filter
+func (c *PDNSAPIClient) PartitionZones(zones []pgo.Zone) (filteredZones []pgo.Zone, residualZones []pgo.Zone) {
+	if c.domainFilter.IsConfigured() {
+		for _, zone := range zones {
+			if c.domainFilter.Match(zone.Name) {
+				filteredZones = append(filteredZones, zone)
+			} else {
+				residualZones = append(residualZones, zone)
+			}
+		}
+	} else {
+		residualZones = zones
+	}
+	return filteredZones, residualZones
+}
+
 // ListZone : Method returns the details of a specific zone from PowerDNS
 // ref: https://doc.powerdns.com/authoritative/http-api/zone.html#get--servers-server_id-zones-zone_id
 func (c *PDNSAPIClient) ListZone(zoneID string) (zone pgo.Zone, resp *http.Response, err error) {
@@ -216,10 +233,6 @@ func NewPDNSProvider(config PDNSConfig) (*PDNSProvider, error) {
 		return nil, errors.New("Missing API Key for PDNS. Specify using --pdns-api-key=")
 	}
 
-	// The default for when no --domain-filter is passed is [""], instead of [], so we check accordingly.
-	if len(config.DomainFilter.filters) != 1 && config.DomainFilter.filters[0] != "" {
-		return nil, errors.New("PDNS Provider does not support domain filter")
-	}
 	// We do not support dry running, exit safely instead of surprising the user
 	// TODO: Add Dry Run support
 	if config.DryRun {
@@ -238,9 +251,10 @@ func NewPDNSProvider(config PDNSConfig) (*PDNSProvider, error) {
 
 	provider := &PDNSProvider{
 		client: &PDNSAPIClient{
-			dryRun:  config.DryRun,
-			authCtx: context.WithValue(context.TODO(), pgo.ContextAPIKey, pgo.APIKey{Key: config.APIKey}),
-			client:  pgo.NewAPIClient(pdnsClientConfig),
+			dryRun:       config.DryRun,
+			authCtx:      context.WithValue(context.TODO(), pgo.ContextAPIKey, pgo.APIKey{Key: config.APIKey}),
+			client:       pgo.NewAPIClient(pdnsClientConfig),
+			domainFilter: config.DomainFilter,
 		},
 	}
 
@@ -281,22 +295,23 @@ func (p *PDNSProvider) ConvertEndpointsToZones(eps []*endpoint.Endpoint, changet
 	if err != nil {
 		return nil, err
 	}
+	filteredZones, residualZones := p.client.PartitionZones(zones)
 
 	// Sort the zone by length of the name in descending order, we use this
 	// property later to ensure we add a record to the longest matching zone
 
-	sort.SliceStable(zones, func(i, j int) bool { return len(zones[i].Name) > len(zones[j].Name) })
+	sort.SliceStable(filteredZones, func(i, j int) bool { return len(filteredZones[i].Name) > len(filteredZones[j].Name) })
 
-	// NOTE: Complexity of this loop is O(Zones*Endpoints).
+	// NOTE: Complexity of this loop is O(FilteredZones*Endpoints).
 	// A possibly faster implementation would be a search of the reversed
 	// DNSName in a trie of Zone names, which should be O(Endpoints), but at this point it's not
 	// necessary.
-	for _, zone := range zones {
+	for _, zone := range filteredZones {
 		zone.Rrsets = []pgo.RrSet{}
 		for i := 0; i < len(endpoints); {
 			ep := endpoints[i]
 			dnsname := ensureTrailingDot(ep.DNSName)
-			if strings.HasSuffix(dnsname, zone.Name) {
+			if dnsname == zone.Name || strings.HasSuffix(dnsname, "."+zone.Name) {
 				// The assumption here is that there will only ever be one target
 				// per (ep.DNSName, ep.RecordType) tuple, which holds true for
 				// external-dns v5.0.0-alpha onwards
@@ -345,7 +360,23 @@ func (p *PDNSProvider) ConvertEndpointsToZones(eps []*endpoint.Endpoint, changet
 
 	}
 
-	// If we still have some endpoints left, it means we couldn't find a matching zone for them
+	// residualZones is unsorted by name length like its counterpart
+	// since we only care to remove endpoints that do not match domain filter
+	for _, zone := range residualZones {
+		for i := 0; i < len(endpoints); {
+			ep := endpoints[i]
+			dnsname := ensureTrailingDot(ep.DNSName)
+			if dnsname == zone.Name || strings.HasSuffix(dnsname, "."+zone.Name) {
+				// "pop" endpoint if it's matched to a residual zone... essentially a no-op
+				log.Debugf("Ignoring Endpoint because it was matched to a zone that was not specified within Domain Filter(s): %s", dnsname)
+				endpoints = append(endpoints[0:i], endpoints[i+1:]...)
+			} else {
+				i++
+			}
+		}
+	}
+
+	// If we still have some endpoints left, it means we couldn't find a matching zone (filtered or residual) for them
 	// We warn instead of hard fail here because we don't want a misconfig to cause everything to go down
 	if len(endpoints) > 0 {
 		log.Warnf("No matching zones were found for the following endpoints: %+v", endpoints)
@@ -387,8 +418,9 @@ func (p *PDNSProvider) Records() (endpoints []*endpoint.Endpoint, _ error) {
 	if err != nil {
 		return nil, err
 	}
+	filteredZones, _ := p.client.PartitionZones(zones)
 
-	for _, zone := range zones {
+	for _, zone := range filteredZones {
 		z, _, err := p.client.ListZone(zone.Id)
 		if err != nil {
 			log.Warnf("Unable to fetch Records")
diff --git a/provider/pdns_test.go b/provider/pdns_test.go
index 0351d3e06d75a6c0736bba614958bf1b12030135..c7c5b592cda6c3587abd0ab59942e76b5ebd3a89 100644
--- a/provider/pdns_test.go
+++ b/provider/pdns_test.go
@@ -158,6 +158,18 @@ var (
 		endpoint.NewEndpointWithTTL("abcd.mock.noexist", endpoint.RecordTypeA, endpoint.TTL(300), "9.9.9.9"),
 		endpoint.NewEndpointWithTTL("abcd.mock.noexist", endpoint.RecordTypeTXT, endpoint.TTL(300), "\"heritage=external-dns,external-dns/owner=tower-pdns\""),
 	}
+	endpointsMultipleZonesWithLongRecordNotInDomainFilter = []*endpoint.Endpoint{
+		endpoint.NewEndpointWithTTL("example.com", endpoint.RecordTypeA, endpoint.TTL(300), "8.8.8.8"),
+		endpoint.NewEndpointWithTTL("example.com", endpoint.RecordTypeTXT, endpoint.TTL(300), "\"heritage=external-dns,external-dns/owner=tower-pdns\""),
+		endpoint.NewEndpointWithTTL("a.very.long.domainname.example.com", endpoint.RecordTypeA, endpoint.TTL(300), "9.9.9.9"),
+		endpoint.NewEndpointWithTTL("a.very.long.domainname.example.com", endpoint.RecordTypeTXT, endpoint.TTL(300), "\"heritage=external-dns,external-dns/owner=tower-pdns\""),
+	}
+	endpointsMultipleZonesWithSimilarRecordNotInDomainFilter = []*endpoint.Endpoint{
+		endpoint.NewEndpointWithTTL("example.com", endpoint.RecordTypeA, endpoint.TTL(300), "8.8.8.8"),
+		endpoint.NewEndpointWithTTL("example.com", endpoint.RecordTypeTXT, endpoint.TTL(300), "\"heritage=external-dns,external-dns/owner=tower-pdns\""),
+		endpoint.NewEndpointWithTTL("test.simexample.com", endpoint.RecordTypeA, endpoint.TTL(300), "9.9.9.9"),
+		endpoint.NewEndpointWithTTL("test.simexample.com", endpoint.RecordTypeTXT, endpoint.TTL(300), "\"heritage=external-dns,external-dns/owner=tower-pdns\""),
+	}
 
 	ZoneEmpty = pgo.Zone{
 		// Opaque zone id (string), assigned by the server, should not be interpreted by the application. Guaranteed to be safe for embedding in URLs.
@@ -174,6 +186,15 @@ var (
 		Rrsets: []pgo.RrSet{},
 	}
 
+	ZoneEmptySimilar = pgo.Zone{
+		Id:     "simexample.com.",
+		Name:   "simexample.com.",
+		Type_:  "Zone",
+		Url:    "/api/v1/servers/localhost/zones/simexample.com.",
+		Kind:   "Native",
+		Rrsets: []pgo.RrSet{},
+	}
+
 	ZoneEmptyLong = pgo.Zone{
 		Id:     "long.domainname.example.com.",
 		Name:   "long.domainname.example.com.",
@@ -239,6 +260,72 @@ var (
 		},
 	}
 
+	ZoneEmptyToSimplePatchLongRecordIgnoredInDomainFilter = pgo.Zone{
+		Id:    "example.com.",
+		Name:  "example.com.",
+		Type_: "Zone",
+		Url:   "/api/v1/servers/localhost/zones/example.com.",
+		Kind:  "Native",
+		Rrsets: []pgo.RrSet{
+			{
+				Name:       "a.very.long.domainname.example.com.",
+				Type_:      "A",
+				Ttl:        300,
+				Changetype: "REPLACE",
+				Records: []pgo.Record{
+					{
+						Content:  "9.9.9.9",
+						Disabled: false,
+						SetPtr:   false,
+					},
+				},
+				Comments: []pgo.Comment(nil),
+			},
+			{
+				Name:       "a.very.long.domainname.example.com.",
+				Type_:      "TXT",
+				Ttl:        300,
+				Changetype: "REPLACE",
+				Records: []pgo.Record{
+					{
+						Content:  "\"heritage=external-dns,external-dns/owner=tower-pdns\"",
+						Disabled: false,
+						SetPtr:   false,
+					},
+				},
+				Comments: []pgo.Comment(nil),
+			},
+			{
+				Name:       "example.com.",
+				Type_:      "A",
+				Ttl:        300,
+				Changetype: "REPLACE",
+				Records: []pgo.Record{
+					{
+						Content:  "8.8.8.8",
+						Disabled: false,
+						SetPtr:   false,
+					},
+				},
+				Comments: []pgo.Comment(nil),
+			},
+			{
+				Name:       "example.com.",
+				Type_:      "TXT",
+				Ttl:        300,
+				Changetype: "REPLACE",
+				Records: []pgo.Record{
+					{
+						Content:  "\"heritage=external-dns,external-dns/owner=tower-pdns\"",
+						Disabled: false,
+						SetPtr:   false,
+					},
+				},
+				Comments: []pgo.Comment(nil),
+			},
+		},
+	}
+
 	ZoneEmptyToLongPatch = pgo.Zone{
 		Id:    "long.domainname.example.com.",
 		Name:  "long.domainname.example.com.",
@@ -398,6 +485,9 @@ type PDNSAPIClientStub struct {
 func (c *PDNSAPIClientStub) ListZones() ([]pgo.Zone, *http.Response, error) {
 	return []pgo.Zone{ZoneMixed}, nil, nil
 }
+func (c *PDNSAPIClientStub) PartitionZones(zones []pgo.Zone) ([]pgo.Zone, []pgo.Zone) {
+	return zones, nil
+}
 func (c *PDNSAPIClientStub) ListZone(zoneID string) (pgo.Zone, *http.Response, error) {
 	return ZoneMixed, nil, nil
 }
@@ -415,6 +505,9 @@ type PDNSAPIClientStubEmptyZones struct {
 func (c *PDNSAPIClientStubEmptyZones) ListZones() ([]pgo.Zone, *http.Response, error) {
 	return []pgo.Zone{ZoneEmpty, ZoneEmptyLong, ZoneEmpty2}, nil, nil
 }
+func (c *PDNSAPIClientStubEmptyZones) PartitionZones(zones []pgo.Zone) ([]pgo.Zone, []pgo.Zone) {
+	return zones, nil
+}
 func (c *PDNSAPIClientStubEmptyZones) ListZone(zoneID string) (pgo.Zone, *http.Response, error) {
 
 	if strings.Contains(zoneID, "example.com") {
@@ -422,7 +515,7 @@ func (c *PDNSAPIClientStubEmptyZones) ListZone(zoneID string) (pgo.Zone, *http.R
 	} else if strings.Contains(zoneID, "mock.test") {
 		return ZoneEmpty2, nil, nil
 	} else if strings.Contains(zoneID, "long.domainname.example.com") {
-		return ZoneEmpty2, nil, nil
+		return ZoneEmptyLong, nil, nil
 	}
 	return pgo.Zone{}, nil, nil
 
@@ -469,6 +562,37 @@ func (c *PDNSAPIClientStubListZonesFailure) ListZones() ([]pgo.Zone, *http.Respo
 	return []pgo.Zone{}, nil, errors.New("Generic PDNS Error")
 }
 
+/******************************************************************************/
+// API that returns zone partitions given DomainFilter(s)
+type PDNSAPIClientStubPartitionZones struct {
+	// Anonymous struct for composition
+	PDNSAPIClientStubEmptyZones
+}
+
+func (c *PDNSAPIClientStubPartitionZones) ListZones() ([]pgo.Zone, *http.Response, error) {
+	return []pgo.Zone{ZoneEmpty, ZoneEmptyLong, ZoneEmpty2, ZoneEmptySimilar}, nil, nil
+}
+
+func (c *PDNSAPIClientStubPartitionZones) ListZone(zoneID string) (pgo.Zone, *http.Response, error) {
+
+	if strings.Contains(zoneID, "example.com") {
+		return ZoneEmpty, nil, nil
+	} else if strings.Contains(zoneID, "mock.test") {
+		return ZoneEmpty2, nil, nil
+	} else if strings.Contains(zoneID, "long.domainname.example.com") {
+		return ZoneEmptyLong, nil, nil
+	} else if strings.Contains(zoneID, "simexample.com") {
+		return ZoneEmptySimilar, nil, nil
+	}
+	return pgo.Zone{}, nil, nil
+}
+
+// Just overwrite the ListZones method to introduce a failure
+func (c *PDNSAPIClientStubPartitionZones) PartitionZones(zones []pgo.Zone) ([]pgo.Zone, []pgo.Zone) {
+	return []pgo.Zone{ZoneEmpty}, []pgo.Zone{ZoneEmptyLong, ZoneEmpty2}
+
+}
+
 /******************************************************************************/
 
 type NewPDNSProviderTestSuite struct {
@@ -488,7 +612,7 @@ func (suite *NewPDNSProviderTestSuite) TestPDNSProviderCreate() {
 		APIKey:       "foo",
 		DomainFilter: NewDomainFilter([]string{"example.com", "example.org"}),
 	})
-	assert.Error(suite.T(), err, "--domainfilter should raise an error")
+	assert.Nil(suite.T(), err, "--domain-filter should raise no error")
 
 	_, err = NewPDNSProvider(PDNSConfig{
 		Server:       "http://localhost:8081",
@@ -711,6 +835,51 @@ func (suite *NewPDNSProviderTestSuite) TestPDNSConvertEndpointsToZones() {
 	}
 }
 
+func (suite *NewPDNSProviderTestSuite) TestPDNSConvertEndpointsToZonesPartitionZones() {
+	// Test DomainFilters
+	p := &PDNSProvider{
+		client: &PDNSAPIClientStubPartitionZones{},
+	}
+
+	// Check inserting endpoints from a single zone which is specified in DomainFilter
+	zlist, err := p.ConvertEndpointsToZones(endpointsSimpleRecord, PdnsReplace)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), []pgo.Zone{ZoneEmptyToSimplePatch}, zlist)
+
+	// Check deleting endpoints from a single zone which is specified in DomainFilter
+	zlist, err = p.ConvertEndpointsToZones(endpointsSimpleRecord, PdnsDelete)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), []pgo.Zone{ZoneEmptyToSimpleDelete}, zlist)
+
+	// Check endpoints from multiple zones # which one is specified in DomainFilter and one is not
+	zlist, err = p.ConvertEndpointsToZones(endpointsMultipleZones, PdnsReplace)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), []pgo.Zone{ZoneEmptyToSimplePatch}, zlist)
+
+	// Check endpoints from multiple zones where some endpoints which don't exist and one that does
+	// and is part of DomainFilter
+	zlist, err = p.ConvertEndpointsToZones(endpointsMultipleZonesWithNoExist, PdnsReplace)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), []pgo.Zone{ZoneEmptyToSimplePatch}, zlist)
+
+	// Check endpoints from a zone that does not exist
+	zlist, err = p.ConvertEndpointsToZones(endpointsNonexistantZone, PdnsReplace)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), []pgo.Zone{}, zlist)
+
+	// Check endpoints that match multiple zones (one longer than other), is assigned to the right zone when the longer
+	// zone is not part of the DomainFilter
+	zlist, err = p.ConvertEndpointsToZones(endpointsMultipleZonesWithLongRecordNotInDomainFilter, PdnsReplace)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), []pgo.Zone{ZoneEmptyToSimplePatchLongRecordIgnoredInDomainFilter}, zlist)
+
+	// Check endpoints that match multiple zones (one longer than other and one is very similar)
+	// is assigned to the right zone when the similar zone is not part of the DomainFilter
+	zlist, err = p.ConvertEndpointsToZones(endpointsMultipleZonesWithSimilarRecordNotInDomainFilter, PdnsReplace)
+	assert.Nil(suite.T(), err)
+	assert.Equal(suite.T(), []pgo.Zone{ZoneEmptyToSimplePatch}, zlist)
+}
+
 func (suite *NewPDNSProviderTestSuite) TestPDNSmutateRecords() {
 	// Function definition: mutateRecords(endpoints []*endpoint.Endpoint, changetype pdnsChangeType) error
 
@@ -742,6 +911,7 @@ func (suite *NewPDNSProviderTestSuite) TestPDNSmutateRecords() {
 	assert.NotNil(suite.T(), err)
 
 }
+
 func TestNewPDNSProviderTestSuite(t *testing.T) {
 	suite.Run(t, new(NewPDNSProviderTestSuite))
 }